在Hadoop中,每個MapReduce任務都被初始化為一個job,每個job又可分為兩個階段:map階段和reduce階段。這兩個階段分別用兩個函數來表示。Map函數接收一個<key,value>形式的輸入,然后同樣產生一個<ey,value>形式的中間輸出,Hadoop會負責將所有具有相同中間key值的value集合在一起傳遞給reduce函數,reduce函數接收一個如<key,(list of values)>形式的輸入,然后對這個value集合進行處理,每個reduce產生0或1個輸出,reduce的輸出也是<key,value>形式。
簡易代碼:
public static class Map extends MapReduceBase implments Mapper<LongWritable,Text,Text,IntWritable>{ //設置常量1,用來形成<word,1>形式的輸出 private fianll static IntWritable one = new IntWritable(1) private Text word = new Text(); public void map(LongWritable key,Text value,OutputCollector<Text,output,Reporter reporter) throws IOException{ //hadoop執行map函數時為是一行一行的讀取數據處理,有多少行,就會執行多少次map函數 String line = value.toString(); //進行單詞的分割,可以多傳入進行分割的參數 StringTokenizer tokenizer = new StringTokenizer(line); //遍歷單詞 while(tokenizer.hasMoreTokens()){ //往Text中寫入<word,1> word.set(tokenizer.nextToken()); output.collect(word,one); } } } //需要注意的是,reduce將相同key值(這里是word)的value值收集起來,形成<word,list of 1>的形式,再將這些1累加 public static class Reduce extends MapReduceBase implements Reducer<Text IntWritable,Text,IntWritable>{ public void reduce(Text key,Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException{ //初始word個數設置 int sum = 0; while(values,hasNext()){ //單詞個數相加 sum += value.next().get(); } output.collect(key,new IntWritbale(sum)); } }
執行概念總結:
job.setInputFormatClass(TextInputFormat.class);
1.InputFormat()和inputSplit
inputSplit是Hadoop定義的用來傳送給每個單獨的map的數據,InputSplit存儲的並非數據本身,而是一個分片長度和一個記錄數據位置的數組,生成InputSplit的方法可以通過InputFormat(I)來設置。當數據傳送給map時,map會將輸入分片傳送到inputFormat上,InputFormat則調用getREcordReduer()方法生成RecordReader,RecordReader再通過createKey()、createValue()方法創建可供map處理的<key,value>對,即<k1,v1>,簡而言之InputFormat方法是用來生成可供map處理的<key,value>對的。
在這里如果不設置的話,TextInputFormat會是Hadoop默認的輸入方法,在TextInputFormat中,每個人間(或其一部分)都會單獨地作為map的輸入,繼承自FileInputFormat,之后,每行數據都會生成一條記錄,每條記錄則表示成<key,value>形式:
其中,key值是每個數據的記錄在數據分片中的字節偏移量,數據類型是LongWritable.
value值是每行的內容,數據類型是Text。
job.setOutputValueClass(TextInputFormat.class);
2、OutputFormat
每一種輸入格式都有一種輸出格式與其對應。同樣,默認的輸出格式是TextOutputFormat,這種輸出方式與輸入類似,會將每條記錄以一行的形式存入文本文件。不過它的鍵和值都可以以任意形式的,因為程序內部會調用toString()方法將鍵和值轉換為String類型再輸出。
3、map和reduce
map函數接收經過inputFormat處理產生的<k1,v1>,然后輸出<k2,v2>,map函數老的版本寫法是繼承MapReduceBase然后實現Mapper接口,但是現在可以直接繼承Mapper接口,此接口是一個泛型類型,有4種形式的參數,分別用來指定map的輸入key值類型(LongWritable key),輸入value值類型(Text value)、輸出key值類型和(Text)輸出value值類型(IntWritable,本例是reporter)。
reduce函數以map的輸出作為輸入,因此reduce的輸入類型是<Text,IntWritable>.而reduce的輸出是單詞和它的數目,因此,它的輸出類型是<Text,IntWritable>
4、任務調度
計算方面:Hadoop總會有限將任務分配給空閑的機器,使所有的任務能公平地分享系統資源,I/O方面:Hadoop會盡量將map任務分配給InputSplit所在機器,以減少網絡I/O的消耗。
5、數據預處理與InputSplit的大小。
Hadoop會在處理每個block后將其作為一個InputSplit,因此合理地甚至block塊大小是很重要的。也可通過合理地設置map任務的數量來調節map任務的數據輸入。
6、map和reduce任務的數量
設置map任務槽和reduce任務槽,map/reduce任務槽是這個集群能夠同時運行的map/reduce任務的最大數量。可以通過hadoop的配置文件設置每台機器最多可以同時運行map任務和reduce任務的個數,比如有10台機器,設置每台最多可以同時運行10個map任務和5個reduce任務,那么這個集群的map任務槽就是1000,reduce任務槽就是500.一般來說,設置的reduce任務數量應該是reduce任務槽的0.95或是1.75倍
7、combine函數
combine函數是用於在本地合並數據的函數,從wordcount程序中,詞頻是一個接近於zipf分布的,每個map任務可能會產生成千上萬個<the,i>記錄,若將這些記錄一一傳給reduce任務是很耗時的,所以可以設置一個combine函數,用於本地合並,大大減少網絡I/O操作的消耗。
job.setCombinerClass(combine.class); //指定reduce函數為combine函數 job.setReducerClass(Reduce.class);
8、Hadoop流的工作原理
當一個可執行文件作為Mapper時,每個map任務會以一個獨立的進程啟動這個可執行文件,然后在map任務運行時,會把輸入切分成行提供給可執行文件,並作為它的標准輸入(stdin)內容。當可執行文件運行處結果時,map從標准輸出(stdout)中手機數據,並將其轉化為<key,value>對,作為map的輸出。
參考:<Hadoop實戰>