Spark性能調試是使用Spark的用戶在進行大數據處理的時候必須面對的問題,性能調優的方法有很多,這里首先介紹一種常見的調優問題-小分區合並問題。
一:小分區合並問題介紹
在使用Spark進行數據處理的過程中,常常會使用filter方法來對數據進行一些預處理,過濾掉一些不符合條件的數據。
在使用該方法對數據進行頻繁過濾或者是過濾掉的數據量過大的情況下就會造成大量小分區的生成。
在Spark內部會對每一個分區分配一個task執行,如果task過多,那么每個task處理的數據量很小,就會造成線程頻繁的在task之間切換,使得資源開銷較大,且很多任務等待執行,並行度不高,這會造成集群工作效益低下。
為了解決這一個問題,常采用RDD中重分區的函數(coalesce函數或rePartition函數)來進行數據緊縮,減少分區數量,將小分區合並為大分區,從而提高效率
二:回顧寬、窄依賴
https://www.cnblogs.com/ssyfj/p/12556867.html
三:Coalesce()方法和rePartition()方法源碼分析
(一)Coalesce()方法源碼:
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T] = withScope { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions).values } else { new CoalescedRDD(this, numPartitions) } }
(二)rePartition()方法源碼:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
通過源碼可以看出兩者的區別:coalesce()方法的參數shuffle默認設置為false,repartition()方法就是coalesce()方法shuffle為true的情況。
四:使用場景
假設RDD有N個分區,需要重新划分成M個分區:
(一)N<M
N < M: 一般情況下N個分區有數據分布不均勻的狀況,利用HashPartitioner函數將數據重新分區為M個,這時需要將shuffle設置為true。
因為重分區前后相當於寬依賴,會發生shuffle過程,此時可以使用coalesce(shuffle=true),或者直接使用repartition()。

(二)N > M並且N和M相差不多(假如N是1000,M是100)
那么就可以將N個分區中的若干個分區合並成一個新的分區,最終合並為M個分區,這是前后是窄依賴關系,可以使用coalesce(shuffle=false)。

(三)如果 N> M並且兩者相差懸殊
這時如果將shuffle設置為false,父子RDD是窄依賴關系,他們同處在一個Stage中,就可能造成spark程序的並行度不夠,從而影響性能,如果在M為1的時候,為了使coalesce之前的操作有更好的並行度,可以將shuffle設置為true。
(四)總結
如果傳入的參數大於現有的分區數目,而shuffle為false,RDD的分區數不變,也就是說不經過shuffle,是無法將RDDde分區數變多的。---所以必須經過shuffle
五:補充repartitionAndSortWithinPartitions(RDD 重新分區,排序)
需求:將rdd數據中相同班級的學生分到一個partition中,並根據分數降序排序。
(一)repartitionAndSortWithinPartitions優勢
此實例用到的repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,如果需要在repartition重分區之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。
因為該算子可以一邊進行重分區的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。
六:算子代碼演示
(一)coalesce(默認減少分區)
public static void coalesce(){ List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); JavaRDD<Integer> listRDD = sc.parallelize(list, 3); listRDD.coalesce(1).foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.print(num); } }); }
public static void coalesce() { List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); JavaRDD<Integer> listRDD = sc.parallelize(list, 3); listRDD.coalesce(1).foreach(num -> System.out.println(num)); }
def coalesce(): Unit = { val list = List(1,2,3,4,5,6,7,8,9) sc.parallelize(list,3).coalesce(1).foreach(println(_)) }
(二)replication(增加分區)
public static void replication(){ List<Integer> list = Arrays.asList(1, 2, 3, 4); JavaRDD<Integer> listRDD = sc.parallelize(list, 1); listRDD.repartition(2).foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.println(num); } }); }
public static void replication(){ List<Integer> list = Arrays.asList(1, 2, 3, 4); JavaRDD<Integer> listRDD = sc.parallelize(list, 1); listRDD.repartition(2).foreach(num -> System.out.println(num)); }
def replication(): Unit ={ val list = List(1,2,3,4) val listRDD = sc.parallelize(list,1) listRDD.repartition(2).foreach(println(_)) }
(三)repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions函數是repartition函數的變種,與repartition函數不同的是,repartitionAndSortWithinPartitions在給定的partitioner內部進行排序,性能比repartition要高。
public static void repartitionAndSortWithinPartitions(){ List<Integer> list = Arrays.asList(1, 3, 55, 77, 33, 5, 23); JavaRDD<Integer> listRDD = sc.parallelize(list, 1); JavaPairRDD<Integer, Integer> pairRDD = listRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer num) throws Exception { return new Tuple2<>(num, num); } }); JavaPairRDD<Integer, Integer> parationRDD = pairRDD.repartitionAndSortWithinPartitions(new Partitioner() { @Override public int getPartition(Object key) { Integer index = Integer.valueOf(key.toString()); if (index % 2 == 0) { return 0; } else { return 1; } } @Override public int numPartitions() { return 2; } }); parationRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer, Integer>>, Iterator<String>>() { @Override public Iterator<String> call(Integer index, Iterator<Tuple2<Integer, Integer>> iterator) throws Exception { final ArrayList<String> list1 = new ArrayList<>(); while (iterator.hasNext()){ list1.add(index+"_"+iterator.next()); } return list1.iterator(); } },false).foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); }
public static void repartitionAndSortWithinPartitions(){ List<Integer> list = Arrays.asList(1, 4, 55, 66, 33, 48, 23); JavaRDD<Integer> listRDD = sc.parallelize(list, 1); JavaPairRDD<Integer, Integer> pairRDD = listRDD.mapToPair(num -> new Tuple2<>(num, num)); pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(2)) .mapPartitionsWithIndex((index,iterator) -> { ArrayList<String> list1 = new ArrayList<>(); while (iterator.hasNext()){ list1.add(index+"_"+iterator.next()); } return list1.iterator(); },false) .foreach(str -> System.out.println(str)); }
def repartitionAndSortWithinPartitions(): Unit ={ val list = List(1, 4, 55, 66, 33, 48, 23) val listRDD = sc.parallelize(list,1) listRDD.map(num => (num,num)) .repartitionAndSortWithinPartitions(new HashPartitioner(2)) .mapPartitionsWithIndex((index,iterator) => { val listBuffer: ListBuffer[String] = new ListBuffer while (iterator.hasNext) { listBuffer.append(index + "_" + iterator.next()) } listBuffer.iterator },false) .foreach(println(_)) }
