我們有這樣一個文件
首先我們的思路是把輸入文件數據轉化成鍵值對的形式進行比較不就好了嘛!
但是你要明白這一點,我們平時所使用的鍵值對是不具有比較意義的,也就說他們沒法拿來直接比較。
我們可以通過sortByKey,sortBy(pair._2)來進行單列的排序,但是沒法進行兩列的同時排序。
那么我們該如何做呢?
我們可以自定義一個鍵值對的比較類來實現比較,
類似於JAVA中自定義類實現可比較性實現comparable接口。
我們需要繼承Ordered和Serializable特質來實現自定義的比較類。
1.讀取數據創建rdd
2.根據要求來定義比較類
任務要求,先根據key進行排序,相同再根據value進行排序。
我們可以把鍵值對當成一個數據有兩個數字,先通過第一個數字比大小,再通過第二個數字比大小。
(1)我們定義兩個Int參數的比較類
(2)繼承Ordered 和 Serializable 接口 實現 compare 方法實現可以比較
class UDFSort (val first:Int,val second:Int) extends Ordered[UDFSort] with Serializable { override def compare(that: UDFSort): Int = { if(this.first - that.first != 0){//第一個值不相等的時候,直接返回大小 this.first - that.first //返回值 } else {//第一個值相等的時候,比較第二個值 this.second - that.second } } }
其實,懂java的人能看出來這個跟實現comparable很類似。
3.處理rdd
我們將原始數據按照每行拆分成一個含有兩個數字的數組,然后傳入我們自定義的比較類中
不是可以通過UDFSort就可以比較出結果了嗎,
但是我們不能把結果給拆分掉,也就是說,我們只能排序,不能改數據。
我們這樣改怎么辦?
我們可以生成鍵值對的形式,key為UDFSort(line(0),line(1)),value為原始數據lines。
這樣,我們通過sortByKey就能完成排序,然后通過取value就可以保持原始數據不變。
4.排序取結果
完整代碼
package SparkDemo import org.apache.spark.{SparkConf, SparkContext} class UDFSort (val first:Int,val second:Int) extends Ordered[UDFSort] with Serializable {//自定義比較類 override def compare(that: UDFSort): Int = { if(this.first - that.first != 0){//第一個值不相等的時候,直接返回大小 this.first - that.first //返回值 } else {//第一個值相等的時候,比較第二個值 this.second - that.second } } } object Sort{ def main(args:Array[String]): Unit ={ //初始化配置:設置主機名和程序主類的名字 val conf = new SparkConf().setAppName("UdfSort"); //通過conf來創建sparkcontext val sc = new SparkContext(conf); val lines = sc.textFile("file:///...") //轉換為( udfsort( line(0),line(1) ),line ) 的形式 val pair = lines.map(line => (new UDFSort(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line)) //對key進行排序,然后取value val result = pair.sortByKey().map( x => x._2) } }