在上一篇文章,使用了###錯誤###的方法實現二次排序,導致排序按key字典排序,並非我們想要的結果
現在,使用自定義排序方法來實現二次排序
1, 思路
輸入數據
aa 12
bb 32
aa 3,
cc 43
dd 23
cc 5
cc 8
bb 23
bb 12
-
自定義排序類:對二元組(key:String, value:Int)進行排序
-
把每一行數據map成二元組(key:String, value:Int)
-
排序 -- scala自動使用步驟1定義的排序類進行排序
/** * 1. 自定義排序類:對二元組(key:String, value:Int)進行排序 */ class KVOrdering extends Ordering[Tuple2[String, Int]] with Serializable { override def compare(x: Tuple2[String, Int], y: Tuple2[String, Int]):Int = { val comp = x._1.compareTo(y._1) if (comp == 0) { // key相等, 則比較value x._2.compareTo(y._2) } else { comp } } }
由於scala的隱式轉換機制,這個類會對閉包內的所有Tuple2[String, Int]產生作用
val inputFile = sc.textFile("/home/hadoop/second_sort")
// 2. 把每一行數據map成二元組(key:String, value:Int)
val splitRdd = inputFile.map{x=>val y = x.split(' '); (y(0), Integer.valueOf(y(1)))}
// 3. 排序
val sortedRdd = splitRdd.sortBy(x=>x)
3 后記
-
自定義排序類需要繼承 com.google.common.collect.Ordering, 這個類在guava.jar, 測試的時候需要導入這個包。如果是在shell測試,使用下面的命令
bin/spark-shell --jars "PATH_TO_GUAVA"
-
這個方法忽略了一個事實:9park中, RDD是可分區的。現在我們只針對一個RDD的一個分區來做排序。RDD的多個分區,在多個worker node上並行排序,產生多個結果集,如何對這些結果集最后做一次歸並排序?后面的文章再繼續分析9這個問題