Spark RDD 分區
Spark RDD分區是並行計算的一個計算單元,RDD在邏輯上被分為多個分區,分區的格式決定了並行計算的粒度,任務的個數是是由最后一個RDD的
的分區數決定的。
Spark自帶兩中分區:HashPartitioner RangerPartitioner。一般而言初始數據是沒有分區的,數據分區只作用於key value這樣的RDD上,
當一個Job包含Shuffle操作類型的算子時,如groupByKey,reduceByKey等,就會使用數據分區的方式進行分區,即確定key放在哪一個分區。
shuffle與Partition關系
摘自[Spark分區方式詳解](https://blog.csdn.net/dmy1115143060/article/details/82620715)
在Spark Shuffle階段中,共分為Shuffle Write階段和Shuffle Read階段,其中在Shuffle Write階段中,Shuffle Map Task對數據進行處理產生中間數據,然后再根據數據分區方式對中間數據進行分區。最終Shffle Read階段中的Shuffle Read Task會拉取Shuffle Write階段中產生的並已經分好區的中間數據。圖2中描述了Shuffle階段與Partition關系。下面則分別介紹Spark中存在的兩種數據分區方式。
HashPartitioner
HashPartitioner采用哈希方式對kay進行分區,分區規則為 partitionId = Key.hashCode % numPartitions,其中partitionId代表該Key對應的鍵值對數據應當分配到的Partition標識,Key.hashCode表示該Key的哈希值,numPartitions表示包含的Partition個數。
RDD 分區例子
package com.learn.hadoop.spark.doc.analysis.chpater.rdd;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* RDD分區
*HashPartitioner
*/
public class RddTest05 {
public static void main(String[] args) {
SparkConf sparkConf =new SparkConf().setMaster("local[*]").setAppName("RddTest05");
JavaSparkContext sc =new JavaSparkContext(sparkConf);
JavaRDD<String> rdd =sc.parallelize(Arrays.asList("hello spark world ","hello java world","hello python world"));
//設置當前分區數與cpu core有關
System.out.println("local partitions:");
System.out.println("rdd partitions num "+rdd.getNumPartitions());
System.out.println("rdd partitioner :"+rdd.partitioner().toString());
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//輸出所有的word
System.out.println("console all word");
words.foreach(s -> System.out.println(s));
JavaPairRDD<String,Integer> wordPairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}
});
//輸出所有的對pairRDD
System.out.println("console all pair");
//wordPairs.foreach(stringIntegerTuple2 -> System.out.println(stringIntegerTuple2));
wordPairs.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2);
}
});
System.out.println("wordPairs partitioner :"+wordPairs.partitioner().toString());
//歸納redues
JavaPairRDD<String,Integer> wordredues = wordPairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
//reduceByKey默認的分區器就是HashPartitioner
System.out.println("wordredues partitioner num: "+wordredues.getNumPartitions());
System.out.println("wordredues partitioner :"+wordredues.partitioner().toString());
//輸出字符統計
System.out.println("console all");
wordredues.foreach(stringIntegerTuple2 -> System.out.println(stringIntegerTuple2));
//測試默認排序,默認是ascending(上升)true,如果sortByKey參數是false則是降序
System.out.println("test sort:");
wordredues=wordredues.sortByKey(true);
wordredues.foreach(stringIntegerTuple2 -> System.out.println(stringIntegerTuple2));
//sorkByKey的分區器是RangerPartitioner
System.out.println("after sort partitioner num: "+wordredues.getNumPartitions());
System.out.println("after sort partitioner . partitioner : " +
""+wordredues.partitioner().toString());
//設置HashPartitioner
wordredues =wordredues.partitionBy(new HashPartitioner(wordredues.getNumPartitions()));
System.out.println("after set hash partitioner . partitioner num :"+wordredues.partitioner().toString());
System.out.println("after set hash partitioner . partitioner :"+wordredues.partitioner().toString());
}
}
運行結果
local partitions:
rdd partitions num 8
rdd partitioner :Optional.empty
console all word
hello
java
world
hello
python
world
hello
spark
world
console all pair
(hello,1)
(java,1)
(world,1)
(hello,1)
(python,1)
(world,1)
(hello,1)
(spark,1)
(world,1)
wordPairs partitioner :Optional.empty
wordredues partitioner num: 8
wordredues partitioner :Optional[org.apache.spark.HashPartitioner@8]
console all
(python,1)
(spark,1)
(hello,3)
(java,1)
(world,3)
test sort:
(python,1)
(spark,1)
(java,1)
(hello,3)
(world,3)
after sort partitioner num: 5
after sort partitioner . partitioner : Optional[org.apache.spark.RangePartitioner@e6319a7d]
after set hash partitioner . partitioner num :5
after set hash partitioner . partitioner :Optional[org.apache.spark.HashPartitioner@5]