hadoop1.x和hadoop2.x的區別:
Hadoop1.x版本:
內核主要由Hdfs和Mapreduce兩個系統組成,其中Mapreduce是一個離線分布式計算框架,由一個JobTracker和多個TaskTracker組成。
JobTracker的主要作用:JobTracker是框架的中心,接收任務,計算資源,分配資源,分配任務,與DataNode進行交流等功能。決策程序失敗時 重啟等操作。又當爹又當媽。
TaskTracker同時監視當前機器上的task運行狀況。TaskTracker需要把這些信息通過心跳,發送給jobTracker,jobTracker會收集這些信息以給新提交的job分配運行在那些機器上。
存在問題:
1.JobTracker是mapreduce的集中處理點,存在單點故障;
2.JobTracker完成了太多任務,造成了過多資源的消耗,當mapreduce job非常多的時候,會造成很大的內存消耗,同時 也增加了JobTracker失效的風險,這也是業界普遍總結出老的hadoop的mapreduce只能支持4000節點主機的上限。

Hadoop2.x版本:
第二代的hadoop版本,為克服hadoop1.0中的hdfs和mapreduce存在的各種問題而提出的。針對hadoop1.x中的單NameNode制約HDFS的擴展性問題,提出了HDFS Federation,它讓多個NameNode分管不同的目錄進而實現訪問隔離和橫向擴展,同時它徹底解決了NameNode單點故障問題,針對Hadoop1.0中的Mapreduce的Mapreduce在擴展性和多框架支持等方面不足。
MRv2具有與MRv1相同的編程模型和數據處理引擎,唯一不同的是運行時環境。MRv2是在MRv1基礎上經加工之后,運行於資源管理框架YARN之上的計算框架MapReduce。它的運行時環境不再由JobTracker和TaskTracker等服務組成,而是變為通用資源管理系統YARN和作業控制進程ApplicationMaster,其中,YARN負責資源管理和調度,而ApplicationMaster僅負責一個作業的管理。簡言之,MRv1僅是一個獨立的離線計算框架,而MRv2則是運行於YARN之上的MapReduce。
整體上:分為兩個方面
1.任務調度和資源管理方面:
1)Hadoop1中的JobTracker是一個功能集中的部分,負責資源的分配和任務的分配,所以JobTracker單點出問題就會造成整個集群無法使用了,而且MapReduce模式是集成在Hadoop1中,不易分解,不好添加其他模式;
2)Hadoop2中,ResourceManager(RM)就是負責資源的分配,NodeManager(NM)是從節點上管理資源的,而ApplicationMaster(AM)就是一個負責任務分配的組件,根據不同的模式有不同的AM,因此MapReduce模式有自己獨有的AM;
2.關於文件系統:
文件系統HDFS,1.x版本沒有HA功能,只能有一個NameNode;而2.x添加了HA部分,還可以有多個NameNode同時運行,每個負責集群中的一部分。

Mapreduce流程包括shuffle階段

Mapreduce的過程整體上分為四個階段:InputFormat MapTask ReduceTask OutPutFormat 當然中間還有shuffle階段
InputFormat:
我們通過在runner類中用 job.setInputPaths 或者是addInputPath添加輸入文件或者是目錄(這兩者是有區別的)不同的業務他們的輸入是不同的,我所完成的項目中使用了一個TableMapReduceUtil(hbase和Mapreduce的整合類)來設置的輸入目錄。
默認是FileInputFormat中的TextInputFormat類,獲取輸入分片,使用默認的RecordReader:LineRecordReader將一個輸入分片中的每一行按\n分割成key-value key是偏移量 value是每一行的內容。調用一次map()方法。一個輸入分片對應一個Maptask任務,
MapTask:
每一個key-value經過map()方法業務處理之后開始開始shuffle階段
以WordCount為例:該階段只做+1的操作,(aaa,1),然后開向緩沖區寫入數據
Shuffle:
Map-Shuffle:
寫入之前先進行分區Partition,用戶可以自定義分區(就是繼承Partitioner類),然后定制到job上,如果沒有進行分區,框架會使用 默認的分區(HashPartitioner)對key去hash值之后,然后在對reduceTaskNum進行取模(目的是為了平衡reduce的處理能力),然后決定由那個reduceTask來處理。
將分完區的結果<key,value,partition>開始序列化成字節數組,開始寫入緩沖區。
隨着map端的結果不端的輸入緩沖區,緩沖區里的數據越來越多,緩沖區的默認大小是100M,當緩沖區大小達到閥值時 默認是0.8【spill.percent】(也就是80M),開始啟動溢寫線程,鎖定這80M的內存執行溢寫過程,內存—>磁盤,此時map輸出的結果繼續由另一個線程往剩余的20M里寫,兩個線程相互獨立,彼此互不干擾。
溢寫spill線程啟動后,開始對key進行排序(Sort)默認的是自然排序,也是對序列化的字節數組進行排序(先對分區號排序,然后在對key進行排序)。
如果客戶端自定義了Combiner之后(相當於map階段的reduce),將相同的key的value相加,這樣的好處就是減少溢寫到磁盤的數據量(Combiner使用一定得慎重,適用於輸入key/value和輸出key/value類型完全一致,而且不影響最終的結果)
每次溢寫都會在磁盤上生成一個一個的小文件,因為最終的結果文件只有一個,所以需要將這些溢寫文件歸並到一起,這個過程叫做Merge,最終結果就是一個group({“aaa”,[5,8,3]})
集合里面的值是從不同的溢寫文件中讀取來的。這時候Map-Shuffle就算是完成了。
一個MapTask端生成一個結果文件。
ReduceTask:
Reduce-Shuffle:
接下來開始進行Reduce-Shuffle 階段。當MapTask完成任務數超過總數的5%后,開始調度執行ReduceTask任務,然后ReduceTask默認啟動5個copy線程到完成的MapTask任務節點上分別copy一份屬於自己的數據(使用Http的方式)。
這些拷貝的數據會首先保存到內存緩沖區中,當達到一定的閥值的時候,開始啟動內存到磁盤的Merge,也就是溢寫過程,一致運行直到map端沒有數據生成,最后啟動磁盤到磁盤的Merge方式生成最終的那個文件。在溢寫過程中,然后鎖定80M的數據,然后在延續Sort過程,然后記性group(分組)將相同的key放到一個集合中,然后在進行Merge
然后就開始reduceTask就會將這個文件交給reduced()方法進行處理,執行相應的業務邏輯
OutputFormat:
默認輸出到HDFS上,文件名稱是part-00001
當我們輸出需要指定到不同於HDFS時,需要自定義輸出類繼承OutputFormat類
Mapreduce編程模型
public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {}
}
public static class MyReduce extends Reducer<Text, IntWritable, Text, Text> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {}
}
public class Runner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Runner(), args);
}
}
Mapreduce中的技術:
Job依賴:
應用背景:當某個需求使用一個Mapreduce程序無法完成業務計算時,通常需要兩個mapreduce來配個完成 其實就是兩個job
關鍵代碼:

自定義數據類型:二次排序
Class MyWritable Implements WritableComparable<MyWritable>
重寫 write() readFile() compareTo() HashCode() equals()方法
自定義合並:
繼承Reducer 重寫reduce()方法
自定義分區:
繼承 Partitioner重寫getPartition()方法
自定義分組:比較字段是String類型

如果是Int類型就使用以下這種方法

添加緩沖文件:

自定義多文件輸出:
查看MutipleFileOutput.java
