Spark RDD 分區之HashPartitioner


Spark RDD 分區

Spark RDD分區是並行計算的一個計算單元,RDD在邏輯上被分為多個分區,分區的格式決定了並行計算的粒度,任務的個數是是由最后一個RDD的
的分區數決定的。
Spark自帶兩中分區:HashPartitioner RangerPartitioner。一般而言初始數據是沒有分區的,數據分區只作用於key value這樣的RDD上,
當一個Job包含Shuffle操作類型的算子時,如groupByKey,reduceByKey等,就會使用數據分區的方式進行分區,即確定key放在哪一個分區。

shuffle與Partition關系

摘自[Spark分區方式詳解](https://blog.csdn.net/dmy1115143060/article/details/82620715)

在Spark Shuffle階段中,共分為Shuffle Write階段和Shuffle Read階段,其中在Shuffle Write階段中,Shuffle Map Task對數據進行處理產生中間數據,然后再根據數據分區方式對中間數據進行分區。最終Shffle Read階段中的Shuffle Read Task會拉取Shuffle Write階段中產生的並已經分好區的中間數據。圖2中描述了Shuffle階段與Partition關系。下面則分別介紹Spark中存在的兩種數據分區方式。

HashPartitioner

HashPartitioner采用哈希方式對kay進行分區,分區規則為 partitionId = Key.hashCode % numPartitions,其中partitionId代表該Key對應的鍵值對數據應當分配到的Partition標識,Key.hashCode表示該Key的哈希值,numPartitions表示包含的Partition個數。

RDD 分區例子

package com.learn.hadoop.spark.doc.analysis.chpater.rdd;

import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * RDD分區
 *HashPartitioner
 */
public class RddTest05 {
    public static void main(String[] args) {
        SparkConf sparkConf =new SparkConf().setMaster("local[*]").setAppName("RddTest05");
        JavaSparkContext sc =new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd =sc.parallelize(Arrays.asList("hello spark world ","hello java world","hello python world"));

        //設置當前分區數與cpu core有關
        System.out.println("local partitions:");
        System.out.println("rdd partitions num "+rdd.getNumPartitions());
        System.out.println("rdd partitioner :"+rdd.partitioner().toString());

        JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });


        //輸出所有的word
        System.out.println("console all word");
        words.foreach(s -> System.out.println(s));

        JavaPairRDD<String,Integer> wordPairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s,1);
            }
        });
        //輸出所有的對pairRDD
        System.out.println("console all pair");
        //wordPairs.foreach(stringIntegerTuple2 -> System.out.println(stringIntegerTuple2));
        wordPairs.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(stringIntegerTuple2);
            }
        });
        System.out.println("wordPairs partitioner :"+wordPairs.partitioner().toString());

        //歸納redues
        JavaPairRDD<String,Integer> wordredues = wordPairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });
        //reduceByKey默認的分區器就是HashPartitioner
        System.out.println("wordredues partitioner num: "+wordredues.getNumPartitions());
        System.out.println("wordredues partitioner :"+wordredues.partitioner().toString());

        //輸出字符統計
        System.out.println("console all");
        wordredues.foreach(stringIntegerTuple2 -> System.out.println(stringIntegerTuple2));

        //測試默認排序,默認是ascending(上升)true,如果sortByKey參數是false則是降序
        System.out.println("test sort:");
        wordredues=wordredues.sortByKey(true);
        wordredues.foreach(stringIntegerTuple2 -> System.out.println(stringIntegerTuple2));
        //sorkByKey的分區器是RangerPartitioner
        System.out.println("after sort partitioner num: "+wordredues.getNumPartitions());
        System.out.println("after sort partitioner . partitioner : " +
                ""+wordredues.partitioner().toString());

        //設置HashPartitioner
        wordredues =wordredues.partitionBy(new HashPartitioner(wordredues.getNumPartitions()));
        System.out.println("after set hash partitioner . partitioner num :"+wordredues.partitioner().toString());
        System.out.println("after set hash partitioner . partitioner :"+wordredues.partitioner().toString());

    }
}


運行結果


local partitions:
rdd partitions num 8
rdd partitioner :Optional.empty
console all word

hello
java
world
hello
python
world
hello
spark
world
console all pair
(hello,1)
(java,1)
(world,1)
(hello,1)
(python,1)
(world,1)
(hello,1)
(spark,1)
(world,1)
wordPairs partitioner :Optional.empty
wordredues partitioner num: 8
wordredues partitioner :Optional[org.apache.spark.HashPartitioner@8]
console all
(python,1)
(spark,1)
(hello,3)
(java,1)
(world,3)
test sort:
(python,1)
(spark,1)
(java,1)
(hello,3)
(world,3)
after sort partitioner num: 5
after sort partitioner . partitioner : Optional[org.apache.spark.RangePartitioner@e6319a7d]
after set hash partitioner . partitioner num :5
after set hash partitioner . partitioner :Optional[org.apache.spark.HashPartitioner@5]



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM