mapreduce是一種計算模型,是google的一篇論文向全世界介紹了MapReduce。MapReduce其實可以可以用多種語言編寫Map或Reduce程序,因為hadoop是java寫的,所以通常情況下我們都是選擇java編程語言。其實mr的編寫格式或者說語法要求很簡單,其實復雜的是我們要學會利用這個模型,將問題分解計算。
MapReduce計算模型
MapReduce Job
每個mr任務都被初始化成一個job,后續我們在編寫自己的第一個mr任務的時候也會感受到。每個job分為Map階段和reduce階段,其實絕大部分情況我們還有combiner,這個combiner具體是什么在后續關於wordcount第一個mr任務的時候再介紹,這樣比較好理解。Map函數接收一個<k1,v1>形式輸入,輸出<key,value>然后hadoop會將所有相同的中間key的value值進行合並,格式類似<key,list(values)>作為reduce的輸入,大致過程如下:
INPUT =====> K1,V1 ======>(MAP) K2,V2 ======>(REDUCE) K3,V3 =====>OUTPUT
直接動手第一個程序 WordCount
wordcount任務是計算文件中每個單詞出現的次數,類似:select count(1) as total,word from words group by word.
package com.wangke.hadoop.example; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.StringTokenizer; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0 ; for (IntWritable value : values) { sum += value.get(); } result.set(sum); context.write(key,result); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
接下來的步驟,maven打成jar包,拷貝到你部署hadoop的機器,執行以下相關命令(我用的是2.8.1)
-
$ bin/hdfs namenode -format
sbin/start-dfs.sh
-
$ bin/hdfs dfs -mkdir input $ bin/hdfs dfs -put etc/hadoop/*.xml input
- bin/hadoop jar study-1.0-SNAPSHOT.jar com.wangke.hadoop.example.WordCount input output
- bin/hdfs dfs -cat output/* (查看結果)
關於此小程序的一些介紹
- Mapper和Reduce都是直接繼承抽象類 Mapper,,Reducer,這兩個之所以是抽象類,因為我們自己寫的Mapper/Reducer真正實現過程是抽象父類實現的
setup函數:called once at the start of the task
cleanup函數:called once at the end of the task
run函數:run函數的過程其實map或者reduce的執行過程,可以看源代碼就知道,這能夠幫助我們更好的去理解map或者reduce函數是怎么執行和得到結果的
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) { this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); }
- InputSplite InputFormat OutputFormat
InputSplit是Hadoop中把輸入數據傳送給每個單獨的Map,InputSplite存儲的並非數據本身,而是一個分片長度和以及記錄數據位置的數組。而生成InputSplit的方式時可以通過Inputformat()來設置,所以InputFormat()方法是用來生成可以提供Map處理的<key,value>的。常見的InputFormat子類:
BileyBorweinPlouffe.BbpInputFormat DBInputFormat ComposableInputFormat FileInputFormat(CombineFileInputFormat,KeyValueTextInputFormat,TextInputFormat)等等
OutputFormat是跟InputFormat相對應的,也就是mr結果的輸出格式,我們示例中就是FileOutputFormat.
- Map ,Reduce數據流和控制流以及示例代碼中的 job.setCombinerClass(IntSumReducer.class)中的Combiner是什么

這個是WordCount數據流程圖,FileInputStream被處理形成兩個InputSplit,然后輸入到兩個Map中(一般計算和存儲都是在同一機器上,避免數據傳輸,也就是移動計算,避免移動存儲),Map輸出的結果是直接寫在磁盤上的,而不是HDFS。而Reduce這時候會讀取Map的輸出數據,將結果寫到HDFS。在Reduce過程中時無法避免數據傳輸的,這時候你應該會想到為了避免過多的數據傳輸,我們會將每個Map的結果先局部的做一個WordCount,這個過程就是我們的Combiner了,所以Combiner其實也就是一個reducer過程。
MapReduce的優化與性能調優
MapReduce計算模型的優化主要集中在兩個方面:計算性能方面的優化,I/O操作方面的優化。
優化的幾個方面:
- 任務調度:任務分配給空閑的機器;盡量將Map任務分配給InputSplit所在的機器,移動計算來減少網絡I/O
- 數據預處理與InputSplit的大小:Hadoop擅長處理少量的大數據而不是處理大量的小數據。,在MaxCompute使用時,我們也會發現在執行task前 會有一步合並小文件的步驟。
- Map和Reduce任務的數量:調整相關參數,設置map和reduce的數量
- 上節中提到的Combine函數
- 壓縮:對Map和最終的輸出結果進行壓縮,其中壓縮算法是可以配置的
- 自定義comparator:在hadoop中可以自定義數據類型
- 過濾數據以及數據傾斜:通過數據過濾可以降低數據規模。在MaxCompute中我們知道用mapjoin來解決大表和小表關聯的性能問題,在hadoop中我們用Bloom Filter來解決類似問題,關於BloomFilter我們在下一小節具體介紹
Bloom Filter:
Bloom Filter是由Howard Bloom提出的二進制向量數據結構。在保存所有集合元素特征的同時,他能在保證高效空間效率和一定出錯率的前提下迅速檢測一個元素是不是集合中的成員。怎樣去利用Bloom Filter去解決大表和小表的內連接呢,也就是怎么實現Maxcompute的mapjoin。
先創建BloomFilter對象,將小表中所有連接列上的值都保存在Bloom Filter中(數據量小,直接加載在內存中),然后開始通過mr執行內連接。在map階段,讀小表的數據直接以連接列值為key,以數據未value的<key,value>;讀大表數據時,在輸出前先判斷當前元祖的連接列值是否在BF中,如果不存在就不需要輸出,如果存在 就采用與小表同樣的方式輸出,對於不在集合內的元素一定是判斷正確,這樣就可以過濾掉不需要的數據。最后在reduce階段,針對每個連接列值連接兩個表的元組並輸出結果。
BF實現原理:他有兩個重要接口add() ,membershipTest(),add負責保存集合元素的特征到位數組,membershipTest判斷某個值是否是集合中的元素。BF利用k個相互獨立的Hash函數將集合中的每個元素迎神到(1,2,..,M)個范圍內,這時候就相當於是一個k維特征向量,如果判斷一個元素是否存在(也就是該元素與小表中的某個元素相等),其實看他們的向量是否一樣,因為hash的特性,A != B,但是對應的向量是一樣的,這種情況很少見,所以會有少量錯誤率,但是性能大大提高。我們如果增加k和M時可以降低錯誤率的
Map ReduceJob中全局共享數據
- 讀寫HDFS文件:Map task和Reduce task甚至是不同的Job都可以通過讀寫HDFS中預定好的同一個文件共享全局變量
- 配置Job屬性:在MapReduce執行過程中,task可以讀取Job的屬性,基於這個特性,大家可以在任務啟動之初利用Cofiguration類中的set(name,value)將一些曲劇變量封裝進去,然后通過get方法讀取。
- 使用distributedCache:這個是MapReduce為應用提供緩存文件的只讀工具。在使用時,用戶可以在作業配置時使用本地或者HDFS文件的url來將其設置成共享緩存文件。task啟動之前,MR框架會將緩存文件復制到執行任務節點的本地。優點是每個Job共享文件只會啟動之后復制一次,且適用於大量的共享數據,缺點就是只讀。
鏈接MapReduce Job
在日常數據處理過程中,並不是一個mr就能解決問題的,需要多個mr作業配合完成,其實實際中我們更多的是利用一個mapper多個reducer。
- 線性MapReduce Job流:就是多個Job按照一定順序,第一個job的輸出作為后面一個Job的輸入。缺點:順序結構,且流程不好控制
- 復雜MapReduce Job流:例如 Job3需要Job1 和Job2的結果,其實MR框架提供了ControlledJob和JobControl類控制,具體不介紹了。
本地測試
對於寫好一個mrJob ,不能每次都通過打包放到集群測試,這樣效率太低,Score_Process類繼承與Configure的實現接口Toll,通過run方法可以實現對程序進行測試。
import com.wangke.hadoop.example.WordCount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ScoreTest implements Tool{ public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCount.TokenizerMapper.class); job.setCombinerClass(WordCount.IntSumReducer.class); job.setReducerClass(WordCount.IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean success = job.waitForCompletion(true); return success ? 0:1; } public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new ScoreTest(),args); System.exit(ret); } public void setConf(Configuration configuration) { } public Configuration getConf() { return null; } }
