spark: 二次排序-2


在上一篇文章,使用了###錯誤###的方法實現二次排序,導致排序按key字典排序,並非我們想要的結果

現在,使用自定義排序方法來實現二次排序

1, 思路

輸入數據
aa 12
bb 32
aa 3,
cc 43
dd 23
cc 5
cc 8
bb 23
bb 12

  1. 自定義排序類:對二元組(key:String, value:Int)進行排序

  2. 把每一行數據map成二元組(key:String, value:Int)

  3. 排序 -- scala自動使用步驟1定義的排序類進行排序

     /**
      *  1. 自定義排序類:對二元組(key:String, value:Int)進行排序
      */
     class KVOrdering extends Ordering[Tuple2[String, Int]] with Serializable {
       override def compare(x: Tuple2[String, Int], y: Tuple2[String, Int]):Int = {
     	 val comp = x._1.compareTo(y._1)
     	 if (comp == 0) {    // key相等, 則比較value
     		 x._2.compareTo(y._2)
     	 } else {
     		comp 
     	 }       
       }
     }
    

由於scala的隱式轉換機制,這個類會對閉包內的所有Tuple2[String, Int]產生作用

    val inputFile = sc.textFile("/home/hadoop/second_sort")

// 2. 把每一行數據map成二元組(key:String, value:Int)
val splitRdd = inputFile.map{x=>val y = x.split(' '); (y(0), Integer.valueOf(y(1)))}

// 3. 排序
val sortedRdd = splitRdd.sortBy(x=>x)

3 后記

  1. 自定義排序類需要繼承 com.google.common.collect.Ordering, 這個類在guava.jar, 測試的時候需要導入這個包。如果是在shell測試,使用下面的命令

    bin/spark-shell --jars  "PATH_TO_GUAVA"
    
  2. 這個方法忽略了一個事實:9park中, RDD是可分區的。現在我們只針對一個RDD的一個分區來做排序。RDD的多個分區,在多個worker node上並行排序,產生多個結果集,如何對這些結果集最后做一次歸並排序?后面的文章再繼續分析9這個問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM