Spark提供了HashPartitioner和RangePartitioner兩種分區策略
,這兩種分區策略在很多情況下都適合我們的場景。但是有些情況下,Spark內部不能符合咱們的需求
,這時候我們就可以自定義分區策略。
為此,Spark提供了相應的接口,我們只需要擴展Partitioner抽象類,然后實現里面的方法。
Partitioner類如下
/** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. */ abstract class Partitioner extends Serializable { //這個方法返回你要創建分區的個數; def numPartitions: Int //這個方法對輸入的key做計算,返回該key對應的分區ID,范圍是0到numPartitions-1 def getPartition(key: Any): Int }
spark默認的實現是hashPartitioner,看一下它的實現方法:
/** * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using * Java's `Object.hashCode`. * * Java arrays have hashCodes that are based on the arrays' identities rather than their contents, * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will * produce an unexpected or incorrect result. */ class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } //這個是Java標准的判斷相等的函數,這個函數是因為Spark內部會比較兩個RDD的分區是否一樣。 override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions }
nonNegativeMod方法:
/* Calculates 'x' modulo 'mod', takes to consideration sign of x, * i.e. if 'x' is negative, than 'x' % 'mod' is negative too * so function return (x % mod) + mod in that case. */ def nonNegativeMod(x: Int, mod: Int): Int = { val rawMod = x % mod rawMod + (if (rawMod < 0) mod else 0) }
舉個例子
//將jack、world相關的元素分到單獨的分區中 JavaRDD<String> javaRDD =jsc.parallelize(Arrays.asList("jack1", "jack2", "jack3" , "world1", "world2", "world3"));
自定義partitioner
import org.apache.spark.Partitioner; /** * 自定義Partitioner */ public class MyPartitioner extends Partitioner { private int numPartitions; public MyPartitioner(int numPartitions){ this.numPartitions = numPartitions; } @Override public int numPartitions() { return numPartitions; } @Override public int getPartition(Object key) { if(key == null){ return 0; } String str = key.toString(); int hashCode = str.substring(0, str.length() - 1).hashCode(); return nonNegativeMod(hashCode,numPartitions); } public boolean equals(Object obj) { if (obj instanceof MyPartitioner) { return ((MyPartitioner) obj).numPartitions == numPartitions; } return false; } //Utils.nonNegativeMod(key.hashCode, numPartitions) private int nonNegativeMod(int hashCode,int numPartitions){ int rawMod = hashCode % numPartitions; if(rawMod < 0){ rawMod = rawMod + numPartitions; } return rawMod; } }
然后我們在partitionBy()方法里面使用自定義的partitioner,測試示例:
//將jack、world相關的元素分到單獨的分區中 JavaRDD<String> javaRDD =jsc.parallelize(Arrays.asList("jack1", "jack2", "jack3" , "world1", "world2", "world3")); //自定義partitioner需要在pairRDD的基礎上調用 JavaPairRDD<String, Integer> pairRDD = javaRDD.mapToPair(s -> new Tuple2<>(s, 1)); JavaPairRDD<String, Integer> pairRDD1 = pairRDD.partitionBy(new MyPartitioner(2)); System.out.println("指定分區之后的分區數:"+pairRDD1.getNumPartitions()); pairRDD1.mapPartitionsWithIndex((v1, v2) -> { ArrayList<String> result = new ArrayList<>(); while (v2.hasNext()){ result.add(v1+"_"+v2.next()); } return result.iterator(); },true).foreach(s -> System.out.println(s));
輸出
指定分區之后的分區數:2 0_(world1,1) 0_(world2,1) 0_(world3,1) 1_(jack1,1) 1_(jack2,1) 1_(jack3,1)
參考:https://my.oschina.net/u/939952/blog/1863372
參考:https://www.iteblog.com/archives/1368.html