本節所用到的數據下載地址為:http://pan.baidu.com/s/1bnfELmZ
MapReduce的排序分組任務與要求
我們知道排序分組是MapReduce中Mapper端的第四步,其中分組排序都是基於Key的,我們可以通過下面這幾個例子來體現出來。其中的數據和任務如下圖1.1,1.2所示。
#首先按照第一列升序排列,當第一列相同時,第二列升序排列 3 3 3 2 3 1 2 2 2 1 1 1 ------------------- #結果 1 1 2 1 2 2 3 1 3 2 3 3
圖 1.1 排序
#當第一列相同時,求出第二列的最小值 3 3 3 2 3 1 2 2 2 1 1 1 ------------------- #結果 3 1 2 1 1 1
圖 1.2 分組
一、 排序算法
1.1 MapReduce默認排序算法
使用MapReduce默認排序算法代碼如下1.1所示,在代碼中我將第一列作為鍵,第二列作為值。
1 package sort; 2 3 import java.io.IOException; 4 import java.net.URI; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.FileStatus; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 20 21 public class SortApp { 22 private static final String INPUT_PATH = "hdfs://hadoop:9000/newinput"; 23 private static final String OUT_PATH = "hdfs://hadoop:9000/newoutput"; 24 public static void main(String[] args) throws Exception { 25 Configuration conf=new Configuration(); 26 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 27 final Path outpath = new Path(OUT_PATH); 28 if(fileSystem.exists(outpath)){ 29 fileSystem.delete(outpath,true); 30 } 31 32 final Job job = new Job(conf,SortApp.class.getSimpleName()); 33 34 //1.1 指定輸入文件路徑 35 FileInputFormat.setInputPaths(job, INPUT_PATH); 36 job.setInputFormatClass(TextInputFormat.class);//指定哪個類用來格式化輸入文件 37 38 //1.2指定自定義的Mapper類 39 job.setMapperClass(MyMapper.class); 40 job.setMapOutputKeyClass(LongWritable.class);//指定輸出<k2,v2>的類型 41 job.setMapOutputValueClass(LongWritable.class); 42 43 //1.3 指定分區類 44 job.setPartitionerClass(HashPartitioner.class); 45 job.setNumReduceTasks(1); 46 47 //1.4 TODO 排序、分區 48 49 //1.5 TODO (可選)合並 50 51 //2.2 指定自定義的reduce類 52 job.setReducerClass(MyReducer.class); 53 job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的類型 54 job.setOutputValueClass(LongWritable.class); 55 56 //2.3 指定輸出到哪里 57 FileOutputFormat.setOutputPath(job, outpath); 58 job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類 59 job.waitForCompletion(true);//把代碼提交給JobTracker執行 60 } 61 static class MyMapper extends Mapper<LongWritable, Text,LongWritable,LongWritable>{ 62 63 @Override 64 protected void map( 65 LongWritable key, 66 Text value, 67 Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context) 68 throws IOException, InterruptedException { 69 final String[] splited = value.toString().split("\t"); 70 final long k2 = Long.parseLong(splited[0]); 71 final long v2 = Long.parseLong(splited[1]); 72 context.write(new LongWritable(k2),new LongWritable(v2)); 73 } 74 } 75 static class MyReducer extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable>{ 76 77 @Override 78 protected void reduce( 79 LongWritable k2, 80 Iterable<LongWritable> v2s, 81 Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context) 82 throws IOException, InterruptedException { 83 for(LongWritable v2:v2s){ 84 context.write(k2, v2); 85 } 86 } 87 } 88 }
代碼 1.1
運行結果如下圖1.3所示
1 1 2 2 2 1 3 3 3 2 3 1
圖 1.3
從上面圖中運行結果可以看出,MapReduce默認排序算法只對Key進行了排序,並沒有對value進行排序,沒有達到我們的要求,所以要實現我們的要求,還要我們自定義一個排序算法
1.2 自定義排序算法
從上面圖中運行結果可以知道,MapReduce默認排序算法只對Key進行了排序,並沒有對value進行排序,沒有達到我們的要求,所以要實現我們的要求,還要我們自定義一個排序算法。在map和reduce階段進行排序時,比較的是k2。v2是不參與排序比較的。如果要想讓v2也進行排序,需要把k2和v2組裝成新的類作為k 2 ,才能參與比較。所以在這里我們新建一個新的類型NewK2類型來封裝原來的k2和v2。代碼如1.2所示。
1 package sort; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.io.WritableComparable; 14 import org.apache.hadoop.mapreduce.Job; 15 import org.apache.hadoop.mapreduce.Mapper; 16 import org.apache.hadoop.mapreduce.Reducer; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 20 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 21 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 22 23 public class SortApp { 24 static final String INPUT_PATH = "hdfs://hadoop:9000/newinput"; 25 static final String OUT_PATH = "hdfs://hadoop:9000/newoutput"; 26 public static void main(String[] args) throws Exception{ 27 final Configuration configuration = new Configuration(); 28 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); 29 if(fileSystem.exists(new Path(OUT_PATH))){ 30 fileSystem.delete(new Path(OUT_PATH), true); 31 } 32 final Job job = new Job(configuration, SortApp.class.getSimpleName()); 33 //1.1 指定輸入文件路徑 34 FileInputFormat.setInputPaths(job, INPUT_PATH); 35 job.setInputFormatClass(TextInputFormat.class);//指定哪個類用來格式化輸入文件 36 37 //1.2指定自定義的Mapper類 38 job.setMapperClass(MyMapper.class); 39 job.setMapOutputKeyClass(NewK2.class);//指定輸出<k2,v2>的類型 40 job.setMapOutputValueClass(LongWritable.class); 41 42 //1.3 指定分區類 43 job.setPartitionerClass(HashPartitioner.class); 44 job.setNumReduceTasks(1); 45 46 //2.2 指定自定義的reduce類 47 job.setReducerClass(MyReducer.class); 48 job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的類型 49 job.setOutputValueClass(LongWritable.class); 50 51 //2.3 指定輸出到哪里 52 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); 53 job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類 54 job.waitForCompletion(true);//把代碼提交給JobTracker執行 55 } 56 57 58 static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ 59 protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 60 final String[] splited = value.toString().split("\t"); 61 final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1])); 62 final LongWritable v2 = new LongWritable(Long.parseLong(splited[1])); 63 context.write(k2, v2); 64 }; 65 } 66 67 static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ 68 protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 69 context.write(new LongWritable(k2.first), new LongWritable(k2.second)); 70 }; 71 } 72 73 /** 74 * 問:為什么實現該類? 75 * 答:因為原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,作為新的k2 76 * 77 */ 78 static class NewK2 implements WritableComparable<NewK2>{ 79 Long first; 80 Long second; 81 82 public NewK2(){} 83 84 public NewK2(long first, long second){ 85 this.first = first; 86 this.second = second; 87 } 88 89 90 @Override 91 public void readFields(DataInput in) throws IOException { 92 this.first = in.readLong(); 93 this.second = in.readLong(); 94 } 95 96 @Override 97 public void write(DataOutput out) throws IOException { 98 out.writeLong(first); 99 out.writeLong(second); 100 } 101 102 /** 103 * 當k2進行排序時,會調用該方法. 104 * 當第一列不同時,升序;當第一列相同時,第二列升序 105 */ 106 @Override 107 public int compareTo(NewK2 o) { 108 final long minus = this.first - o.first; 109 if(minus !=0){ 110 return (int)minus; 111 } 112 return (int)(this.second - o.second); 113 } 114 115 @Override 116 public int hashCode() { 117 return this.first.hashCode()+this.second.hashCode(); 118 } 119 120 @Override 121 public boolean equals(Object obj) { 122 if(!(obj instanceof NewK2)){ 123 return false; 124 } 125 NewK2 oK2 = (NewK2)obj; 126 return (this.first==oK2.first)&&(this.second==oK2.second); 127 } 128 } 129 130 }
代碼 1.2
從上面的代碼中我們可以發現,我們的新類型NewK2實現了WritableComparable接口,其中該接口中有一個compareTo()方法,當對關鍵字進行比較會調用該方法,而我們就在該方法中實現了我們想要做的事。
運行結果如下圖1.4所示。
1 1 2 1 2 2 3 1 3 2 3 3
圖 1.4
二、分組算法
2.1 MapReduce默認分組
分組是在MapReduce中Mapper端的第四步,分組也是基於Key進行的,將相同key的value放到一個集合中去。還以上面排序代碼為例,業務邏輯如下圖2.1所示。在代碼中以NewK2為關鍵字,每個鍵都不相同,所以會將數據分為六組,這樣就不能實現我們的業務要求,但利用自定義類型NewK2,可以自定義排序算法的同時我們也可以自定義分組算法。
#當第一列相同時,求出第二列的最小值
3 3
3 2
3 1
2 2
2 1
1 1
------------------- #結果 3 1 2 1 1 1
圖 2.1
2.2 自定義分組比較器
由於業務要求分組是按照第一列分組,但是NewK2的比較規則決定了不能按照第一列分,只能自定義分組比較器,代碼如下2.1所示。
1 package group; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.RawComparator; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.io.WritableComparable; 15 import org.apache.hadoop.io.WritableComparator; 16 import org.apache.hadoop.mapreduce.Job; 17 import org.apache.hadoop.mapreduce.Mapper; 18 import org.apache.hadoop.mapreduce.Reducer; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 23 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 24 25 public class GroupApp { 26 static final String INPUT_PATH = "hdfs://hadoop:9000/newinput"; 27 static final String OUT_PATH = "hdfs://hadoop:9000/newoutput"; 28 public static void main(String[] args) throws Exception{ 29 final Configuration configuration = new Configuration(); 30 31 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); 32 if(fileSystem.exists(new Path(OUT_PATH))){ 33 fileSystem.delete(new Path(OUT_PATH), true); 34 } 35 final Job job = new Job(configuration, GroupApp.class.getSimpleName()); 36 37 //1.1 指定輸入文件路徑 38 FileInputFormat.setInputPaths(job, INPUT_PATH); 39 job.setInputFormatClass(TextInputFormat.class);//指定哪個類用來格式化輸入文件 40 41 //1.2指定自定義的Mapper類 42 job.setMapperClass(MyMapper.class); 43 job.setMapOutputKeyClass(NewK2.class);//指定輸出<k2,v2>的類型 44 job.setMapOutputValueClass(LongWritable.class); 45 46 //1.3 指定分區類 47 job.setPartitionerClass(HashPartitioner.class); 48 job.setNumReduceTasks(1); 49 50 //1.4 TODO 排序、分區 51 job.setGroupingComparatorClass(MyGroupingComparator.class); 52 //1.5 TODO (可選)合並 53 54 //2.2 指定自定義的reduce類 55 job.setReducerClass(MyReducer.class); 56 job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的類型 57 job.setOutputValueClass(LongWritable.class); 58 59 //2.3 指定輸出到哪里 60 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); 61 job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類 62 job.waitForCompletion(true);//把代碼提交給JobTracker執行 63 } 64 65 66 static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ 67 protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 68 final String[] splited = value.toString().split("\t"); 69 final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1])); 70 final LongWritable v2 = new LongWritable(Long.parseLong(splited[1])); 71 context.write(k2, v2); 72 }; 73 } 74 75 static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ 76 protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 77 long min = Long.MAX_VALUE; 78 for (LongWritable v2 : v2s) { 79 if(v2.get()<min){ 80 min = v2.get(); 81 } 82 } 83 84 context.write(new LongWritable(k2.first), new LongWritable(min)); 85 }; 86 } 87 88 /** 89 * 問:為什么實現該類? 90 * 答:因為原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,作為新的k2 91 * 92 */ 93 static class NewK2 implements WritableComparable<NewK2>{ 94 Long first; 95 Long second; 96 97 public NewK2(){} 98 99 public NewK2(long first, long second){ 100 this.first = first; 101 this.second = second; 102 } 103 104 105 @Override 106 public void readFields(DataInput in) throws IOException { 107 this.first = in.readLong(); 108 this.second = in.readLong(); 109 } 110 111 @Override 112 public void write(DataOutput out) throws IOException { 113 out.writeLong(first); 114 out.writeLong(second); 115 } 116 117 /** 118 * 當k2進行排序時,會調用該方法. 119 * 當第一列不同時,升序;當第一列相同時,第二列升序 120 */ 121 @Override 122 public int compareTo(NewK2 o) { 123 final long minus = this.first - o.first; 124 if(minus !=0){ 125 return (int)minus; 126 } 127 return (int)(this.second - o.second); 128 } 129 130 @Override 131 public int hashCode() { 132 return this.first.hashCode()+this.second.hashCode(); 133 } 134 135 @Override 136 public boolean equals(Object obj) { 137 if(!(obj instanceof NewK2)){ 138 return false; 139 } 140 NewK2 oK2 = (NewK2)obj; 141 return (this.first==oK2.first)&&(this.second==oK2.second); 142 } 143 } 144 145 static class MyGroupingComparator implements RawComparator<NewK2>{ 146 147 @Override 148 public int compare(NewK2 o1, NewK2 o2) { 149 return (int)(o1.first - o2.first); 150 } 151 152 @Override 153 public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, 154 int arg4, int arg5) { 155 return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8); 156 } 157 158 } 159 }
代碼2.1
從上面的代碼中我們可以知道,我們自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,RawComparator又繼承了Comparator接口,這兩個接口的代碼如下:
public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
public interface Comparator<T> { int compare(T o1, T o2); boolean equals(Object obj); }
在類MyGroupingComparator中分別對着兩個接口中的方法進行了實現,RawComparator中的compare()方法是基於字節的比較,Comparator中的compare()方法是基於對象的比較。在該方法一共有六個參數,如下:
* @param arg0 表示第一個參與比較的字節數組
* @param arg1 表示第一個參與比較的字節數組的起始位置
* @param arg2 表示第一個參與比較的字節數組的偏移量
*
* @param arg3 表示第二個參與比較的字節數組
* @param arg4 表示第二個參與比較的字節數組的起始位置
* @param arg5 表示第二個參與比較的字節數組的偏移量
在於NewK2中存儲着兩個long類型,每個long類型為8字節,.compareBytes()方法的參數如下:.compareBytes(arg0, arg1, 8, arg3, arg4, 8);因為比較的是第一列,所以讀取的偏移量為8字節。由於我們要求出每一分組的最小值,所以還重寫Reduce方法,求出每一分租的最小值。最后的運行結果如下圖2.1所示
1 1 2 1 3 1
圖 2.1
三、MapReduce的一些算法
3.1 MapReduce中Shuffle過程
Shuffle是MapReduce過程的核心,了解Shuffle非常有助於理解MapReduce的工作原理。huffle的正常意思是洗牌或弄亂,可能大家更熟悉的是Java API里的Collections.shuffle(List)方法,它會隨機地打亂參數list里的元素順序。如果你不知道MapReduce里Shuffle是什么,那么請看這張圖:
在該圖中分為Map任務和Reduce任務兩個部分,從map端到reduce端的紅色和綠色的線表示數據流的一個過程,也就是從<K1,V1>到<K2,V2>到<K3,V3>的一個過程。
Map端
<1>在map端首先接觸的是InputSplit,在InputSplit中含有DataNode中的數據,每一個InputSplit都會分配一個Mapper任務,Mapper任務結束后產生<K2,V 2>的輸出,這些輸出顯存放在緩存中,每個map有一個環形內存緩沖區,用於存儲任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spil l.percent),一個后台線程就把內容寫到(spill)Linux本地磁盤中的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。
<2>寫磁盤前,要partition,sort。通過分區,將不同類型的數據分開處理,之后對不同分區的數據進行排序,如果有Combiner,還要對排序后的數據進行co mbine。等最后記錄寫完,將全部溢出文件合並為一個分區且排序的文件。
<3>最后將磁盤中的數據送到Reduce中,從圖中可以看出Map輸出有三個分區,有一個分區數據被送到圖示的Reduce任務中,剩下的兩個分區被送到其他Reducer任務中。而圖示的Reducer任務的其他的三個輸入來自其他的Map輸出。
Reduce端
<1>Reducer通過Http方式得到輸出文件的分區。
<2>TaskTracker為分區文件運行Reduce任務。復制階段把Map輸出復制到Reducer的內存或磁盤。一個Map任務完成,Reduce就開始復制輸出。
<3>排序階段合並map輸出。然后走Reduce階段。
3.2 Hadoop壓縮算法
3.2.1 算法介紹
Hadoop的壓縮過程並不是一個必須的過程,但為什么還要使用呢?在哪些階段可以使用,有什么好處呢?
<1>在Map輸出到Reduce時可以使用,因為map端輸出的數據要通過網絡輸出到Reduce端,為了減少傳輸的數據量我們可以采用壓縮的方式來減少延遲。
<2>在整個作業的輸出也可以使用
Codec為是壓縮,解壓縮的算法的實現,在Hadoop中,codec由CompressionCode的實現來表示。下面是一些實現,如下圖3.1所示。
圖 3.1
3.2.2 MapReduce的輸出進行壓縮
輸出的壓縮屬性,和使用方式:如下圖3.2,3.3所示。
圖 3.2
圖3.3
3.3 常見算法
3.3.1 MapReduce常見算介紹
<1>單詞計數(已介紹)
<2>數據去重(去掉重復數據不難理解吧)
<3>排序(在上節已介紹)
<4>Top K(是求最值問題,下面會介紹)
下面算法,跟我們數據庫中的方法比較類似,
<5>選擇---行
數據庫中:該操作的結果應該是一行一行的顯示,相當於where。
MapReduce的實現:以求最值為例,從100萬數據中選出一行最小值。
<6>投影---列
數據庫中:該操作的結果應該是一列一列的顯示,相當於select。
MapReduce的實現:以求處理手機上網日志為例,從其11個字段選出了五個字段來顯示我們的手機上網流量。
<7>分組
數據庫中:相當於group by。
MapReduce的實現:相當於分區,以求處理手機上網日志為例,喊手機號和非手機號分為兩組。
<9>多表連接
MapReduce中:在MapReduce中可以同時進入多個文件進行操作,其中兩個文件有關系就相當於表連接。那么如何知道文件之間的關系呢?我可以通過map函數的context參數來獲得文件路徑代碼如下
final FileSplit inputSplit = (FileSplit) context.getInputSplit(); final String path = inputSplit.getPath().toString();
<10>單表關聯
通過上面的分析我們可以知道,sql中的方法也可以在MapReduce中實現,也就是說當把關系型數據庫中的算法全部在MapReduce中實現時,也就意味着sql的使用范圍擴展到了Hadoop,也就是大數據領域,這樣意義是非常大的。
3.3.2 Top K 最值案例
求最值的方法,在我們的生活中應用非常的廣,比如找出高考中的最高分,也就是狀元,就非常類似分布式計算,要選出全國的最高分就首先選出各省份的,要選出各省份就得選出各市級的等等,而這些數據量非常大,無法直接全部加載到內存中,面對如此大的數據量我就可以考慮使用分布式計算的方式。我們以從100萬的數據中求出其中的最大值為例,介紹該方法。
求最值最簡單的辦法就是對該文件進行一次遍歷得出最值,但是現實中數據比量比較大,這種方法不能實現。在傳統的MapReduce思想中,將文件的數據經過map迭代出來送到reduce中,在Reduce中求出最大值。但這個方法顯然不夠優化,我們可采用“分而治之”的思想,不需要map的所有數據全部送到reduce中,我們可以在map中先求出最大值,將該map任務的最大值送reduce中,這樣就減少了數據的傳輸量。那么什么時候該把這個數據寫出去呢?我們知道,每一個鍵值對都會調用一次map(),由於數據量大調用map()的次數也就多了,顯然在map()函數中將該數據寫出去是不明智的,所以最好的辦法該Mapper任務結束后將該數據寫出去。我們又知道,當Mapper/Reducer任務結束后會調用cleanup函數,所以我們可以在該函數中將該數據寫出去。了解了這些我們可以看一下程序的代碼如3.1所示。
1 package suanfa; 2 3 import java.net.URI; 4 5 import mapreduce.WordCountApp; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.NullWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 20 public class TopKApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/input2"; 22 static final String OUT_PATH = "hdfs://hadoop:9000/out2"; 23 24 public static void main(String[] args) throws Exception { 25 Configuration conf = new Configuration(); 26 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 27 final Path outPath = new Path(OUT_PATH); 28 if(fileSystem.exists(outPath)){ 29 fileSystem.delete(outPath, true); 30 } 31 32 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 33 FileInputFormat.setInputPaths(job, INPUT_PATH); 34 job.setMapperClass(MyMapper.class); 35 job.setReducerClass(MyReducer.class); 36 job.setOutputKeyClass(LongWritable.class); 37 job.setOutputValueClass(NullWritable.class); 38 FileOutputFormat.setOutputPath(job, outPath); 39 job.waitForCompletion(true); 40 } 41 static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{ 42 long max = Long.MIN_VALUE; 43 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 44 final long temp = Long.parseLong(v1.toString()); 45 if(temp>max){ 46 max = temp; 47 } 48 }; 49 50 protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException { 51 context.write(new LongWritable(max), NullWritable.get()); 52 }; 53 } 54 55 static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{ 56 long max = Long.MIN_VALUE; 57 protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException { 58 final long temp = k2.get(); 59 if(temp>max){ 60 max = temp; 61 } 62 }; 63 64 protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException { 65 context.write(new LongWritable(max), NullWritable.get()); 66 }; 67 } 68 }
代碼3.1
運行結果為:32767,也就是我們數據中的最大值
