不多說,直接上干貨!
這篇博客里的算法部分的內容來自《數據算法:Hadoop/Spark大數據處理技巧》一書,不過書中的代碼雖然思路正確,但是代碼不完整,並且只有java部分的編程,我在它的基礎上又加入scala部分,當然是在使用Spark的時候寫的scala。
一、輸入、期望輸出、思路。
輸入為SecondarySort.txt,內容為:
2000,12,04,10 2000,11,01,20 2000,12,02,-20 2000,11,07,30 2000,11,24,-40 2012,12,21,30 2012,12,22,-20 2012,12,23,60 2012,12,24,70 2012,12,25,10 2013,01,23,90 2013,01,24,70 2013,01,20,-10
意義為:年,月,日,溫度
期望輸出:
2013-01 90,70,-10 2012-12 70,60,30,10,-20 2000-12 10,-20 2000-11 30,20,-40
意義為:
年-月 溫度1,溫度2,溫度3,……
年-月從上之下降序排列,
溫度從左到右降序排列
思路:
拋棄不需要的代表日的哪一行數據
將年月作為組合鍵(key),比較大小,降序排列
將對應年月(key)的溫度的值(value)進行降序排列和拼接
二、使用Java編寫MapReduce程序實現二次排序
代碼要實現的類有:
除了常見的SecondarySortingMapper,SecondarySortingReducer,和SecondarySortDriver以外
這里還多出了兩個個插件類(DateTemperatureGroupingComparator和DateTemperaturePartioner)和一個自定義類型(DateTemperaturePair)
以下是實現的代碼(注意以下每個文件的代碼段我去掉了包名,所以要使用的話自己加上吧):
SecondarySortDriver.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SecondarySortDriver extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration configuration = getConf(); Job job = Job.getInstance(configuration, "SecondarySort"); job.setJarByClass(SecondarySortDriver.class); job.setJobName("SecondarySort"); Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 設置map輸出key value格式 job.setMapOutputKeyClass(DateTemperaturePair.class); job.setMapOutputValueClass(IntWritable.class); // 設置reduce輸出key value格式 job.setOutputKeyClass(DateTemperaturePair.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(SecondarySortingMapper.class); job.setReducerClass(SecondarySortingReducer.class); job.setPartitionerClass(DateTemperaturePartitioner.class); job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class); boolean status = job.waitForCompletion(true); return status ? 0 : 1; } public static void main(String[] args) throws Exception { if (args.length != 2) { throw new IllegalArgumentException( "!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: SecondarySortDriver" + "<input-path> <output-path>"); } int returnStatus = ToolRunner.run(new SecondarySortDriver(), args); System.exit(returnStatus); } }
DateTemperaturePair.java
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class DateTemperaturePair implements Writable, WritableComparable<DateTemperaturePair> { private String yearMonth; private String day; protected Integer temperature; public int compareTo(DateTemperaturePair o) { int compareValue = this.yearMonth.compareTo(o.getYearMonth()); if (compareValue == 0) { compareValue = temperature.compareTo(o.getTemperature()); } return -1 * compareValue; } public void write(DataOutput dataOutput) throws IOException { Text.writeString(dataOutput, yearMonth); dataOutput.writeInt(temperature); } public void readFields(DataInput dataInput) throws IOException { this.yearMonth = Text.readString(dataInput); this.temperature = dataInput.readInt(); } @Override public String toString() { return yearMonth.toString(); } public String getYearMonth() { return yearMonth; } public void setYearMonth(String text) { this.yearMonth = text; } public String getDay() { return day; } public void setDay(String day) { this.day = day; } public Integer getTemperature() { return temperature; } public void setTemperature(Integer temperature) { this.temperature = temperature; } }
SecondarySortingMapper.java
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SecondarySortingMapper extends Mapper<LongWritable, Text, DateTemperaturePair, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split(","); // YYYY = tokens[0] // MM = tokens[1] // DD = tokens[2] // temperature = tokens[3] String yearMonth = tokens[0] + "-" + tokens[1]; String day = tokens[2]; int temperature = Integer.parseInt(tokens[3]); DateTemperaturePair reduceKey = new DateTemperaturePair(); reduceKey.setYearMonth(yearMonth); reduceKey.setDay(day); reduceKey.setTemperature(temperature); context.write(reduceKey, new IntWritable(temperature)); } }
DateTemperaturePartioner.java
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class DateTemperaturePartitioner extends Partitioner<DateTemperaturePair, Text> { @Override public int getPartition(DateTemperaturePair dataTemperaturePair, Text text, int i) { return Math.abs(dataTemperaturePair.getYearMonth().hashCode() % i); } }
DateTemperatureGroupingComparator.java
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class DateTemperatureGroupingComparator extends WritableComparator { public DateTemperatureGroupingComparator() { super(DateTemperaturePair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { DateTemperaturePair pair1 = (DateTemperaturePair) a; DateTemperaturePair pair2 = (DateTemperaturePair) b; return pair1.getYearMonth().compareTo(pair2.getYearMonth()); } }
SecondarySortingReducer.java
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SecondarySortingReducer extends Reducer<DateTemperaturePair, IntWritable, DateTemperaturePair, Text> { @Override protected void reduce(DateTemperaturePair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { StringBuilder sortedTemperatureList = new StringBuilder(); for (IntWritable temperature : values) { sortedTemperatureList.append(temperature); sortedTemperatureList.append(","); } sortedTemperatureList.deleteCharAt(sortedTemperatureList.length()-1); context.write(key, new Text(sortedTemperatureList.toString())); } }
三、使用scala編寫Spark程序實現二次排序
這個代碼想必就比較簡潔了。如下:
SecondarySort.scala
package spark import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions import org.apache.spark.rdd.RDD.rddToPairRDDFunctions object SecondarySort { def main(args: Array[String]) { val conf = new SparkConf().setAppName(" Secondary Sort ") .setMaster("local") var sc = new SparkContext(conf) sc.setLogLevel("Warn") //val file = sc.textFile("hdfs://localhost:9000/Spark/SecondarySort/Input/SecondarySort2.txt") val file = sc.textFile("e:\\SecondarySort.txt") val rdd = file.map(line => line.split(",")) .map(x=>((x(0),x(1)),x(3))).groupByKey().sortByKey(false) .map(x => (x._1._1+"-"+x._1._2,x._2.toList.sortWith(_>_))) rdd.foreach( x=>{ val buf = new StringBuilder() for(a <- x._2){ buf.append(a) buf.append(",") } buf.deleteCharAt(buf.length()-1) println(x._1+" "+buf.toString()) }) sc.stop() } }