一、mr介紹
1、MapReduce設計理念是移動計算而不是移動數據,就是把分析計算的程序,分別拷貝一份到不同的機器上,而不是移動數據.
2、計算框架有很多,不是誰替換誰的問題,是誰更適合的問題.mr離線計算框架 適合離線計算;storm流式計算框架 適合實時計算;sprak內存計算框架 適合快速得到結果的計算.
二、mr原理
1、mr第一個部分是把hdfs的數據切成一個個split片段,第二部分是map部分,第三個部分從map執行結束到reduce執行之前都是shullf部分,第四部分就是reduce.最后part就是整個mr輸出部分,保存在hdfs中.
2、mr的輸入來自hdfs,每個split片段由一個map線程執行,split切分算法:max(min.split,min(max.split,block)),block大小也可以設置.
3、例:統計超大文本文件中每個單詞出現的次數
input輸入是一個大文件,不要理解為一行就是一個片段,map的輸入和輸出必須是鍵值;shuff可以把map的輸出按照某種key值重新切分和組合成n分,把key值符合某種范圍的輸出送到reduce那里處理,可以簡化reduce過程,在這里shuff就是對map的輸出進行合並和排序,key相同的合並到一個數據塊中,然后再把key相同的數據塊作為集合傳給reduce;reduce作用是對map階段的輸出結果進行匯總,可以根據map輸出數據量大小決定reduce個數(默認是1,可以在mapred-site.xml中配置).
5、shuff具體過程
1>shuff首先對map的輸出進行partitions(分區),之后sort,再溢寫到磁盤.map輸出的結果首先在內存中,內存設置有一定的域值,當到達這個域值后,就溢寫到磁盤,溢寫之前要做幾件事情,即partitions,sort,也就是說到磁盤中的數據已經分好區排好序,這個過程全是在那個map所在節點的本地,沒有跨網絡移動數據.
2>partitions默認的分區規則是以哈希模運算來進行分區,map輸出這個對象的哈希值模reduce的個數,分區的目的是為了把map的輸出進行負載均衡,或者叫解決reduce端的數據傾斜問題.map端是沒有數據傾斜問題的,split切的非常平均.
3>sort排序默認的排序規則是字典排序.
4>所有輸出的數據都進行了partitions和sort,接下來就溢寫到磁盤,每一次溢寫產生一個文件而不是追加.磁盤文件越來越多這個時候就要進行合並,也有默認的合並規則,就是按照鍵的哈希值進行合並(相同鍵哈希值相同),這個合並的過程叫combiner.這里的目的不是為了統計,目的就是為了減少map的輸出,因為下一步數據給reduce可能要跨網絡拷貝.
5>下來就是磁盤中的這些數據要交給reduce執行,這時可能map和reduce不在一台機器,就要進行跨網絡拷貝.reduce只拷貝那些分區的時候分給自己那些數據,這時reduce端很多的小數據,需要合並,這個不是程序員控制的,默認按照key相同的進行合並,這些key相同的數據可能來自不同的map task,然后分別傳給reduce執行.
ps:partitions、sort、combiner根據需求不需要就不用寫.map和reduce完全程序員控制,shulff階段只能控制一部分如paratitions,combainer.
6、reduce執行過程如下,和map很像也有溢寫過程.
三、mr架構--一主多從架構
1、主JobTracker負責調度分配每一個子任務task運行與TaskTracker上,如果發現有失敗的task就重新分配其任務到其它節點.每一個hadoop集群中只有一個JobTracker,一般運行在master節點上.
2、從TaskTracker主動與JobTracker通信,接收作業,並負責直接執行每一個任務,為了減少網絡帶寬TaskTracker最好運行在hdfs的DataNode上,TaskTracker是map task 或者reduce task.
3、配置JobTacker,修改MapReduce的核心配置文件mapred-site.xml:
<property> <name>mapred.job.tracker</name> <value>node1:9001<value> <property>
TaskTracker默認在DN上就不用配了,JobTracker可以指定任何一台機器,但是經測試放在node3不好使,放在node1上正常運行.ps:2.0之后就沒有JobTracker,TaskTracker了.
4、start-all.sh啟動hdfs和mr,通過http://node1:50030查看mr的管理界面.hdfs eclipse插件略,多練習使用hadoop命令.
四、demo:統計單詞在文件中出現的個數
1、確定hdfs、MapReduce、jobTracker等是否正常啟動,各節點通過jps查看.
2、在Hadoop文件系統根目錄中創建input文件夾.
[root@statsys-ku6 bin]# ./hadoop fs -mkdir /wangwei
3、將bin目錄下的所有文件放到hadoop文件系統的input目錄下.
[root@statsys-ku6 bin]# ./hadoop fs -put *.sh /wangwei
4、執行wordcount命令統計單詞個數.
[root@statsys-ku6 bin]# ./hadoop jar hadoop-test.jar WordCountDemo /wangwei /out
ps:hadoop-test.jar程序代碼(開發時導入hadoop-core-1.2.1.jar):
import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDemo { public static void main(String[] args) throws Exception { if (args == null || args.length != 2) { System.err.println("Lack input && output args"); System.exit(-1); } Job job = new Job(); job.setJarByClass(WordCountDemo.class); // 為job指定輸入文件 FileInputFormat.addInputPath(job, new Path(args[0])); // 為job指定輸出文件 FileOutputFormat.setOutputPath(job, new Path(args[1])); // mapper job.setMapperClass(WordCounterMapper.class); // reducer job.setReducerClass(WordCounterReducer.class); // 輸出key類型 job.setOutputKeyClass(Text.class); // 輸出value類型 job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } private static class WordCounterMapper extends Mapper<LongWritable, Text, Text, IntWritable> { // 每次調用map方法會傳入split中一行數據 key:該行數據所在文件中的位置下標 value:這行數據 protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String line = value.toString(); StringTokenizer st = new StringTokenizer(line); while (st.hasMoreElements()) { String token = st.nextToken(); if (!isWord(token)) {// 不是單詞 continue; } // 這個單詞出現了一次 context.write(new Text(token), new IntWritable(1)); // map的輸出 } }; } private static class WordCounterReducer extends Reducer<Text, IntWritable, Text, IntWritable> { protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws java.io.IOException,InterruptedException { int sum = 0; for (IntWritable value : values) { //這里是迭代器,如果是集合的話可能撐爆內存 sum += value.get(); } context.write(key, new IntWritable(sum)); }; } //是否為合法的單詞[a-zA-Z]+ static boolean isWord(String input) { return input.matches("[a-zA-Z]+"); } }
ps: hadoop fs -lsr /out 查看reduce輸出,hadoop fs -get /out /root/ 下載查看輸出.