Spark源碼解析排序算子sortBy和sortByKey存在未排序的情況


一.在使用中出現的問題

 1 package test
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.sql.SparkSession
 5 
 6 /**
 7   * Created by Administrator on 2019/12/17.
 8   */
 9 object TestZip {
10   /**
11     * 設置日志級別
12     */
13   Logger.getLogger("org").setLevel(Level.WARN)
14   def main(args: Array[String]) {
15     val spark = SparkSession.builder().master("local[2]").appName(s"${this.getClass.getSimpleName}").getOrCreate()
16     val sc = spark.sparkContext
17     val array_left = 1 until 4 //生成1到count的數組
18     val array_right = Array("工單", "電力", "展示")
19 
20     val result = array_left.zip(array_right)
21     val rdd = sc.parallelize(result).sortBy(_._1)
22     val rdd2 = sc.parallelize(result).sortByKey(true)
23 
24     rdd.foreach(println)
25     println("----------------")
26     rdd2.foreach(println)
27   }
28 }

二.執行結果

  

  從結果中可以看出,sortBy和sortByKey都沒有實現排序的功能【雖然它們順序已經改變】。這是怎么回事?

  

  具體原因下面我們從源碼中進行分析!

三.源碼分析

  

  在Spark的源碼中,從RDD.scala代碼中可以看出,sortBy底層調用的是sortByKey算子,在無升序降序的參數下【ascending】,默認為升序【true】,因此,我們只需要前往sortByKey中分析為什么沒有實現排序功能即可。

  

  1.在OrderedRDDFunctions.scala中,可以看見sortByKey實現的具體細節,從注釋中可以看出,排序基於key,在每個分區內部實現一個排序序列,注意,是每個分區內,下面我們去重置分區為1來檢驗一下是否有效!

  2.另外,注釋也提到調用collect或者save也可以獲取一個有序的序列,下面也一塊去驗證!

四.設置分區為1

  

  執行結果:

  

  由此可見,當設置分區數為1時【即合並所有分區為一個分區,顯然之前的分區數不為1,查看存在多少分區數可以參考我的博客:https://www.cnblogs.com/yszd/p/10156231.html】,可以實現升序排序,當然降序也是可以的!

  

  

五.以collect為例進行驗證

  

  

  備注:由此可見,調用collect也可以實現排序,但調用collect之后會返回一個Array,不再是RDD!

六.原因分析

  不管是repartition還是collect,其本質都是把各個executor中的數據匯總到master主節點中,是調用action算子,會存在repartition和shuffle操作。源碼就很好的驗證了這一點:

  

  在sortByKey中,會調用分區算子進行重新分區,與顯式調用repartition有異曲同工之妙!

  

  而其中的ShuffledRDD會在shuffle執行過程中現在每個分區內進行排序,之后再進行整體的排序,以便提升排序性能,類似與歸並排序算法!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM