Article From:

First, partition principle

1.Why do they want to be partitioned?(This is one of the words of other people.)

  In order to reduce network transmission, it is necessary to increase CPU computing load.Data partitioning, in distributed cluster, the cost of network communication is very large, reducing network transmission can greatly enhance performance. The performance expenditure of the MapReduce framework is mainly in the IO and network transmission, and it is unavoidable for IO to read and write a large number of files, but the network transmission is avoidable.File compression reduces files, thereby reducing network transmission, but increases the computational load of CPU.

SparkIO is also inevitable, but the network transmission spark has been optimized.sparkPartition RDD (partitioned) and put it on the cluster for parallel computing. The same RDD is divided into 100, 10 nodes, averaging one node and 10 partitions. When sum type is calculated,The sum for each partition firstThen the sum value shuffle is transferred to the main program for global sum.,So sum type calculation is very small for network transmission. But for the calculation of join type, the data itself needs to be shuffle, and the network cost is very large.

2.sparkHow to optimize this problem?

  • mapreducepartition:That is to say, the network transmission of MapReduce is mainly in the shuffle stage.shuffleThe fundamental reason is that the same key exists on different nodes, and key has to be shuffle when it is aggregated.shuffleIt affects the network very much. It wants to mix all the data in the network, and then it can pull the same key data together.To do shuffle is a storage decision
  • sparkpartition:sparkThe partition is based on key, that is, the partition of key hashcode (the same key, the same hashcode). So when it is partitioned, the 100t data is divided into 10 parts, each part of which has 10 t, which ensures that the same key must be in one point.In the area, and it can ensure that the same key can exist on the same node when it is stored. For example, a RDD is divided into 100 parts, and the cluster has 10 nodes, so each node has 10 copies, each of which is called a partition, and spark can guarantee that the same key exists on the same node.In fact, the same key exists in the same partition.
  • keyThe uneven distribution determines the large partitions in some districts. There is no guarantee that the partition data volume is completely equal, but it will guarantee a close range. So for some of the work done by MapReduce, spark does not need shuffle, and spark solves the network transmission problem.The fundamental principle is this.

    The time for join is two tables, and it is impossible to partition the two tables. Usually, the frequent large tables are partitioned in advance, and the small table performs the shuffle process when it is associated with it.

Large tables do not require shuffle.

   RDD The internal data set is logically (and physically) divided into several small sets, so each small set is called a partition. Like the following picture, there are three RDD, each of which has two partitions within the RDD.


    At source level, a Partition list is stored in class RDD.Each Partition object contains a index member. RDD Number + index You can determine the only partition Block number,The persistent RDD can be obtained from the storage medium through this Block number.Partition data。(RDD + index-> Block Numbered -> partition data)

Two, Spark partition principle and method

1.RDDA partition principle for partitions: as far as possible, the number of partitions is equal to the number of cluster cores.

Next we will discuss only the default number of partitions in Spark. Here we will analyze the default partition number in parallelize and textFile respectively.

   Whether it is a local, Standalone, YARN, or Mesos mode, we can configure the number of the default partitions by the spark.default.parallelism. If this value is not set, it is determined according to the different cluster environment.Value.

  • Local mode: the default is the number of CPU on the local machine. If local[N] is set, the default is N.
  • Apache Mesos:The number of the default partitions is 8
  • StandaloneOr YARN: take the sum of all the core numbers in the cluster by default, or 2, and take the larger value of the two. For parallelize, there is no specified number of partitions in the method, and the default is spark.default.parallelism, for textF.For ile, the default number of partitions in the method is min (defaultParallelism, 2), and defaultParallelism corresponds to spark.default.parallelism. asThe result is to read files from HDFS. The number of partitions is the number of files (128MB/).

2.How to create partitions?

There are two situations when creating RDD and getting new RDD through transformation operation. In fact, it is the 2 way to create RDD.

  • For the former, manually specify the number of partitions when calling textFile and parallelize methods. For example, sc.parallelize (Array (1, 2, 3, 5, 6), 2) specifies the RD to be created.The number of D partitions is 2.
  • For the latter, the repartition method can be called directly. In fact, the number of partitions is determined according to the dependencies between transformation operations corresponding to multiple RDD.

           1)Narrow dependency, the sub RDD is determined by the number of parent RDD partitions, such as map operation, the number of parent RDD and sub RDD partition is consistent.

           2)Shuffle Dependency is determined by the partition (Partitioner), for example, the number of new RDD partitions obtained by groupByKey (New HashPartitioner (2)) or direct groupByKey (2) is equal to 2.

3.spark shuffleWhere is the timing of the implementation of the partitioner?

  •  partitionerExecuting on the worker node, the last step of each stage (except the last one) is to partition the data into the disk, and then report the partition information to the master.
  • masterStarting the new stage will bring the partition information of the last stage to the new task, so that the new task will know where to read the data.

  Partitions are the smallest granularity in spark parallelism, that is to say, the data of a partition must be processed by one thread and cannot be split. Several partitions are parallel several task.

  For example, suppose that a RDD’s data comes from 2 HDFS files, then the data set is loaded by default by 2 partitions at first, and 2 files can be processed in parallel. If you have 100 nodes, each node has one core, then only 2 nodes can be used at most. Then I want to improve the parallelism.You can re – partition these data, divide the two files into 100 partitions, and then use hashpartitioner to hash these data into 100 partitions.

What do you do?

  Since the first is 2 partitions, 2 tasks are generated and dispersed to 2 nodes. Each task is partitioned with hashpartitioner, and the data is written to the disk. At this time, the 2 nodes will have 100 partitions in each of the 2 nodes, and the number 0-99.. That is to say, the data of a partition is actually located at 2 nodes. Then they report these partition information to driver, so that driver knows the location of these partitions. The process is shuffle.

  Next, suppose that we need to count the RDD after this partition. At this time, there are 100 partitions, which can take advantage of 100 nodes in the cluster. For each partition, it is actually getting the partition information from the driver and then getting the data from the two nodes through the network to get tired.Add the calculation (aggregate reduce).

 Reference resources:

1.,SparkBasic essays: partition detailed solution

2. – control of data zoning and distribution


Leave a Reply

Your email address will not be published. Required fields are marked *