1.基礎排序算法
sc.textFile("/data/putfile.txt").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_,1).map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1)).collect //key value交換 sc.setLogLevel("WARN")
2.二次排序算法
所謂二次排序就是指排序的時候考慮兩個維度(有可能10次排序)
Java版本
package com.dt.java.spark; import akka.util.HashCode; import scala.math.Ordered; import java.io.Serializable; //實現Ordered接口(scala的) public class SecondarySort implements Ordered<SecondarySort>,Serializable { //自定義二次排序的key private int first; private int second; public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } public SecondarySort(int first,int second) { this.first =first; this.second=second; } @Override public int compare(SecondarySort that) { if (this.first - that.getFirst()!=0) { return this.first - that.getFirst(); }else { return this.second - that.getSecond(); } } @Override public boolean $less(SecondarySort that) { if(this.first < that.getFirst()) { return true; }else if(this.first == that.getFirst() && this.second < that.getSecond()) { return true; } return false; } @Override public boolean $greater(SecondarySort that) { if(this.first > that.getFirst()){ return true; }else if(this.first == that.getFirst() && this.second > that.second) { return true; } return false; } @Override public boolean $less$eq(SecondarySort that) { if(this.$less(that)){ return true; }else if(this.first == that.getFirst() && this.second == that.second) { return true; } return false; } @Override public boolean $greater$eq(SecondarySort that) { if(this.$greater(that)) { return true; }else if(this.first == that.getFirst() && this.second == that.getSecond()) { return true; } return false; } @Override public int compareTo(SecondarySort that) { if (this.first - that.getFirst()!=0) { return this.first - that.getFirst(); }else { return this.second - that.getSecond(); } } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SecondarySort that = (SecondarySort) o; if (first != that.first) return false; return second == that.second; } @Override public int hashCode() { int result = first; result = 31 * result + second; return result; } }
package com.dt.java.spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; //二次排序,具體實現步驟 //第一步:按照Ordered和Serrializable接口實現自定義排序的Key //第二步:將要進行二次排序的文件加載進來生成《key,value》類型的RDD //第三步:使用sortByKey基於自定義的Key進行二次排序 //第四步:去除掉排序的key,,只保留排序結果 public class SecondarySortApp { public static void main(String[] args){ SparkConf conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD lines = sc.textFile("D:\\JavaWorkspaces\\sparkproject\\sparktest.txt"); JavaPairRDD,String> pairs = lines.mapToPair(new PairFunction, SecondarySort, String>() { @Override public Tuple2, String> call(String line) throws Exception { String[] splited = line.split(" "); SecondarySort key = new SecondarySort(Integer.valueOf(splited[0]),Integer.valueOf(splited[1])); return new Tuple2, String>(key,line); } } ); JavaPairRDD,String> sorted = pairs.sortByKey();//完成二次排序 //過濾掉排序后自定的key,保留排序的結果 JavaRDD secondarysorted = sorted.map(new Function, String>, String>() { @Override public String call(Tuple2, String> sortedContent) throws Exception { return sortedContent._2(); } } ); // secondarysorted.foreach(new VoidFunction() { @Override public void call(String sorted) throws Exception { System.out.println(sorted); } }); } }//生成默認的構造器
Scala版本
package com.dt.scala.spark class SecondarySort(val first:Int, val second:Int) extends Ordered[SecondarySort] with Serializable{ override def compare(that: SecondarySort): Int = { if(this.first - that.first != 0) { this.first - that.first }else { this.second - that.second } } }
package com.dt.scala.spark import org.apache.spark.{SparkContext, SparkConf} object SecondarySortApp { def main (args: Array[String]) { //第一步;創建spark的配置對象sparkconf val conf = new SparkConf()//創建sparkconf對象 conf.setAppName("SecondarySortApp")//設置應用程序的名稱 conf.setMaster("local")//設置本地運行 //創建sparkcontext對象,sparkcontext是程序的唯一入口 val sc = new SparkContext(conf) val lines = sc.textFile("D:\\JavaWorkspaces\\sparkproject\\sparktest.txt") val pairWithSortkey = lines.map(line =>( new SecondarySort( line.split(" ")(0).toInt,line.split(" ")(1).toInt),line )) val sorted = pairWithSortkey.sortByKey(false) val sortedResult = sorted.map(sortedline => sortedline._2) sortedResult.collect.foreach(println) } }