一.在使用中出現的問題
1 package test 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.sql.SparkSession 5 6 /** 7 * Created by Administrator on 2019/12/17. 8 */ 9 object TestZip { 10 /** 11 * 設置日志級別 12 */ 13 Logger.getLogger("org").setLevel(Level.WARN) 14 def main(args: Array[String]) { 15 val spark = SparkSession.builder().master("local[2]").appName(s"${this.getClass.getSimpleName}").getOrCreate() 16 val sc = spark.sparkContext 17 val array_left = 1 until 4 //生成1到count的數組 18 val array_right = Array("工單", "電力", "展示") 19 20 val result = array_left.zip(array_right) 21 val rdd = sc.parallelize(result).sortBy(_._1) 22 val rdd2 = sc.parallelize(result).sortByKey(true) 23 24 rdd.foreach(println) 25 println("----------------") 26 rdd2.foreach(println) 27 } 28 }
二.執行結果
從結果中可以看出,sortBy和sortByKey都沒有實現排序的功能【雖然它們順序已經改變】。這是怎么回事?
具體原因下面我們從源碼中進行分析!
三.源碼分析
在Spark的源碼中,從RDD.scala代碼中可以看出,sortBy底層調用的是sortByKey算子,在無升序降序的參數下【ascending】,默認為升序【true】,因此,我們只需要前往sortByKey中分析為什么沒有實現排序功能即可。
1.在OrderedRDDFunctions.scala中,可以看見sortByKey實現的具體細節,從注釋中可以看出,排序基於key,在每個分區內部實現一個排序序列,注意,是每個分區內,下面我們去重置分區為1來檢驗一下是否有效!
2.另外,注釋也提到調用collect或者save也可以獲取一個有序的序列,下面也一塊去驗證!
四.設置分區為1
執行結果:
由此可見,當設置分區數為1時【即合並所有分區為一個分區,顯然之前的分區數不為1,查看存在多少分區數可以參考我的博客:https://www.cnblogs.com/yszd/p/10156231.html】,可以實現升序排序,當然降序也是可以的!
五.以collect為例進行驗證
備注:由此可見,調用collect也可以實現排序,但調用collect之后會返回一個Array,不再是RDD!
六.原因分析
不管是repartition還是collect,其本質都是把各個executor中的數據匯總到master主節點中,是調用action算子,會存在repartition和shuffle操作。源碼就很好的驗證了這一點:
在sortByKey中,會調用分區算子進行重新分區,與顯式調用repartition有異曲同工之妙!
而其中的ShuffledRDD會在shuffle執行過程中現在每個分區內進行排序,之后再進行整體的排序,以便提升排序性能,類似與歸並排序算法!