本文系原創,若有轉載需要,請注明出處。https://www.cnblogs.com/bigdata-stone/
1.mapReduce簡介
MapReduce是面向大數據並行處理的計算模型、框架和平台。
- 映射(Mapping) :對集合里的每個目標應用同一個操作。即,如果你想把表單里每個單元格乘以二,那么把這個函數單獨地應用在每個單元格上的操作就屬於mapping(這里體現了移動計算而不是移動數據)。
- 化簡(Reducing):遍歷集合中的元素來返回一個綜合的結果。即,輸出表單里一列數字的和這個任務屬於reducing。
- MapReduce將復雜的、運行於大規模集群上的並行計算過程高度地抽象到了兩個函數:Map和Reduce。簡單來講,map就是分,而reduce 就是合。
- 數據從map端進入(以key,value的形式),同樣以(key,value)的形式傳出mapper,進過shuffle過程,所有相同的key都會分配進入同一個reduce。而對於整個作業的map數目取決於split切片的個數,經過inputformat進行切割,切了多少塊就有幾個map,切完之后再在每個節點上並發執行,並行計算,所以可以有多個map,而reduce的個數是可以進行設置的,所以reduce的個數和map的個數是多對多的關系。
- 對於map端而言,產生的數據需要在map端進行分區,而分區partitions的數量取決於reduce的數量,而默認的分區函數就是hash(),所以在map端的分區相當於將數據進行預先的分組,經過shuffle之后,相應的分區就會進入相應的reduce里面去
2.圖解計算框架:(畫圖不易,請勿挑剔)
2.1. inputformat工作機制
-
在 MapReduce 程序的開發過程中,往往需要用到 FileInputFormat與 TextInputFormat,我們會發現 TextInputFormat 這個類繼承自FileInputFormat , FileInputFormat 這 個 類 繼 承 自 InputFormat ,InputFormat 這個類會將文件 file 按照邏輯進行划分,划分成的每一個split 切片將會被分配給一個 Mapper 任務,文件先被切分成 split 塊,而后每一個 split 切片對應一個 Mapper 任務 。
- TextInputFormat這個類繼承自FileInputFormat這個抽象類,文本文件被切成行,使用回車換行符作為行結束的標志,key是這一行在文件中的位置,value是這一行文本。
- 關於切片split。hdfs切片的計算法則是:Math.max(minSize, Math.min(maxSize, blockSize));三值中取中間值。 簡單地按照文件的內容長度進行切片切片大小,默認等於 block 大小。切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片默認情況下, split size =block size,在 hadoop 2.x 中為 128M
2.2 MapTask 端的工作機制
input File 通過 split 被邏輯切分為多個 split 文件,通過 Record按行讀取內容給 map(用戶自己實現的)進行處理,數據被 map 處理結束之后交給 OutputCollector 收集器,對其結果 key 進行分區(默認使用 hash 分區),然后寫入 buffer,每個 map task 都有一個內存緩沖區,存儲着 map 的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以一個臨時文件的方式存放到磁盤,當整個 map task 結束后再對磁盤中這個 maptask 產生的所有臨時文件做合並,生成最終的正式輸出文件,然后等待 reduce task 來拉數據。 Map 端的輸入的(k,v)分別是該行的起始偏移量,以及每一行的數據內容,map 端的輸出(k,v)可以根據需求進行自定義,但是如果輸出的是 javabean 對象,需要對javabean 繼承 writable 。
1)partitioner
分區函數partitioner 的作用是將 mapper輸出的 key/value通過給定的分區函數來拆分為分片(shard),每個 reducer 對應一個分片 默認情況下, partitioner 先計算 key 的散列值(通常為 md5值)。然后通reducer 個數執行取模運算: key.hashCode%(reducer 個數)。這種方式不僅能夠隨機地將整個key空間平均分發給每個reducer,同時也能確保不同mapper產生的相同key能被分發到同一個reducer。也可以自定義分區去繼承 partition<key,value>把不同的結果寫入不同的文件中分區 Partitioner 主要作用在於以下兩點 (1)根據業務需要,產生多個輸出文件;(2)多個 reduce 任務並發運行,提高整體 job 的運行效率 map 端的 combine 組件。
2)Combiner
每一個 map 都可能會產生大量的本地輸出, Combiner 的作用就是對 map 端的輸出先做一次合並,以減少在 map 和 reduce 節點之間的數據傳輸量,以提高網絡 IO 性能,是 MapReduce 的一種優化手段之一combiner 是 MR 程序中 Mapper 和 Reducer 之外的一種組件combiner 組件的父類就是 Reducercombiner 和 reducer 的區別在於運行的位置:combiner 是在每一個 maptask 所在的節點運行reducer 是接收全局所有 Mapper 的輸出結果;combiner 的意義就是對每一個 maptask 的輸出進行局部匯總,以減小網絡傳輸量具體實現步驟:
1)自定義一個 combiner 繼承 Reducer,重寫 reduce 方法
2)中設置: job.setCombinerClass(CustomCombiner.class)combiner 能夠應用的前提是不能影響最終的業務邏輯,而且,combine輸出 kv 應該跟 reducer 的輸入 kv 類型要對應起來
Combiner 使用需要注意的是:
1.有很多人認為這個 combiner 和 map 輸出的數據合並是一個過程,其實不然, map 輸出的數據合並只會產生在有數據 spill 出的時候,即進行 merge 操作。
2.與 mapper 與 reducer 不同的是, combiner 沒有默認的實現,需要顯式的設置在 conf 中才有作用。
3.並不是所有的 job 都適用 combiner,只有操作滿足結合律的才可設置 combiner。 combine 操作類似於: opt(opt(1, 2, 3), opt(4, 5,6))。如果 opt 為求和、求最大值的話,可以使用,但是如果是求中值的話,不適用。
4.一般來說, combiner 和 reducer 它們倆進行同樣的操作。
2.3shuffle 的過程
shuffle 的過程是:Map 產生輸出開始到 Reduc 取得數據作為輸入之前的過程稱作 shuffle.1).Collect 階段:將 MapTask 的結果輸出到默認大小為100M 的環形緩沖區,保存的是 key/value, Partition 分區信息等。2).Spill 階段:當內存中的數據量達到一定的閥值的時候,就會將數據寫入本地磁盤,在將數據寫入磁盤之前需要對數據進行一次排序的操作,如果配置了 combiner,還會將有相同分區號和 key 的數據進行排序。3).Merge 段把所有溢出的臨時文件進行一次合並操作,以確保一個MapTask 最終只產生一個中間數據文件。4).Copy 階段: ReduceTask 啟動 Fetcher 線程到已經完成MapTask 的節點上復制一份屬於自己的數據,這些數據默認會存在內存的緩沖區中,當內存的緩沖區達到一定的閥值的時候,就會將數據寫到磁盤之上。5).Merge 階段:在 ReduceTask 遠程復制數據的同時,會在后台開啟兩個線程對內存到本地的數據文件進行合並操作。6).Sort 階段:在對數據進行合並的同時,會進行排序操作,由於 MapTask 階段已經對數據進行了局部的排序,ReduceTask 只需保證 Copy 的數據的最終整體有效性即可。Shuffle 中的緩沖區大小會影響到 mapreduce 程序的執行效率,原則上說,緩沖區越大,磁盤 io 的次數越少,執行速度就越快緩沖區的大小可以通過參數調整, 參數:io.sort.mb 默認 100M
2.4reduceTask
reducer 將已經分好組的數據作為輸入,並依次為每個鍵對應分組執行 reduce 函數。 reduce 函數的輸入是鍵以及包含與該鍵對應的所有值的迭代器。reduce 端的輸入是 map 端的輸出,它的輸出的(k,v)根據需求進行自定義reducetask 並行度同樣影響整個 job 的執行並發度和執行效率,與maptask的並發數由切片數決定不同, Reducetask 數量的決定是可以直接手動設置:job.setNumReduceTasks(4);如果數據分布不均勻,就有可能在 reduce 階段產生數據傾斜。默認的 reduceTask 的是 1 。
2.5 outputformat
OutputFormat 主要用於描述輸出數據的格式,它能夠將用戶提供的 key/value對寫入特定格式的文件中。 Hadoop 自帶了很多 OutputFormat 的實現,它們與InputFormat 實現相對應,足夠滿足我們業務的需要。
3.wordcount實例講解
Map端代碼
public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text keyout = new Text();
IntWritable valueout = new IntWritable();
String[] arr = value.toString().split(" ");
for(String s:arr){
keyout.set(s);
valueout.set(1);
context.write(keyout,valueout);
}
}
}
reduce端代碼:
public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0 ;
for(IntWritable iw:values){
count=iw.get()+count;
}
context.write(key,new IntWritable(count));
}
}
app端代碼:
public class WCApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf= new Configuration();
// conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
job.setJobName("WCApp");
job.setJarByClass(WCApp.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
3.1各個角色實體
-
程序運行時過程設計到的一個角色實體
1.1. Client:編寫mapreduce程序,配置作業,提交作業的客戶端 ;
1.2. ResourceManager:集群中的資源分配管理 ;
1.3. NodeManager:啟動和監管各自節點上的計算資源 ;
1.4. ApplicationMaster:每個程序對應一個AM,負責程序的任務調度,本身也是運行在NM的Container中 ;
1.5. HDFS:分布式文件系統,保存作業的數據、配置信息等等。 -
客戶端提交Job
2.1. 客戶端編寫好Job后,調用Job實例的Submit()或者waitForCompletion()方法提交作業;
2.2. 客戶端向ResourceManager請求分配一個Application ID,客戶端會對程序的輸出、輸入路徑進行檢查,如果沒有問題,進行作業輸入分片的計算。 -
Job提交到ResourceManager
3.1. 將作業運行所需要的資源拷貝到HDFS中(jar包、配置文件和計算出來的輸入分片信息等);
3.2. 調用ResourceManager的submitApplication方法將作業提交到ResourceManager。 -
給作業分配ApplicationMaster
4.1. ResourceManager收到submitApplication方法的調用之后會命令一個NodeManager啟動一個Container ;
4.2. 在該NodeManager的Container上啟動管理該作業的ApplicationMaster進程。 -
ApplicationMaster初始化作業
5.1. ApplicationMaster對作業進行初始化操作;
5.2. ApplicationMaster從HDFS中獲得輸入分片信息(map、reduce任務數) -
任務分配
6.1. ApplicationMaster為其每個map和reduce任務向RM請求計算資源;
6.2. map任務優先於reduce任,map數據優先考慮本地化的數據。任務執行,在 Container 上啟動任務(通過YarnChild進程來運行),執行map/reduce任務。
3.2時間先后順序
-
輸入分片(input split)
每個輸入分片會讓一個map任務來處理,默認情況下,以HDFS的一個塊的大小(默認為128M,可以設置)為一個分片。map輸出的結果會暫且放在一個環形內存緩沖區中(mapreduce.task.io.sort.mb=100M
),當該緩沖區快要溢出時(默認mapreduce.map.sort.spill.percent=0.8
),會在本地文件系統中創建一個溢出文件,將該緩沖區中的數據寫入這個文件; -
map階段:由我們自己編寫,最后調用 context.write(…);
-
partition分區階段
3.1. 在map中調用 context.write(k2,v2)方法輸出,該方法會立刻調用 Partitioner類對數據進行分區,一個分區對應一個 reduce task。
3.2. 默認的分區實現類是 HashPartitioner ,根據k2的哈希值 % numReduceTasks
,可能出現“數據傾斜”現象。
3.3. 可以自定義 partition ,調用 job.setPartitioner(…)自己定義分區函數。 -
combiner合並階段:將屬於同一個reduce處理的輸出結果進行合並操作
4.1. 是可選的;
4.2. 目的有三個:1.減少Key-Value對;2.減少網絡傳輸;3.減少Reduce的處理。 -
shuffle階段:即Map和Reduce中間的這個過程
5.1. 首先 map 在做輸出時候會在內存里開啟一個環形內存緩沖區,專門用來做輸出,同時map還會啟動一個守護線程;
5.2. 如緩沖區的內存達到了閾值的80%,守護線程就會把內容寫到磁盤上,這個過程叫spill,另外的20%內存可以繼續寫入要寫進磁盤的數據;
5.3. 寫入磁盤和寫入內存操作是互不干擾的,如果緩存區被撐滿了,那么map就會阻塞寫入內存的操作,讓寫入磁盤操作完成后再繼續執行寫入內存操作;
5.4. 寫入磁盤時會有個排序操作,如果定義了combiner函數,那么排序前還會執行combiner操作;
5.5. 每次spill操作也就是寫入磁盤操作時候就會寫一個溢出文件,也就是說在做map輸出有幾次spill就會產生多少個溢出文件,等map輸出全部做完后,map會合並這些輸出文件,這個過程里還會有一個Partitioner操作(如上)
5.6. 最后 reduce 就是合並map輸出文件,Partitioner會找到對應的map輸出文件,然后進行復制操作,復制操作時reduce會開啟幾個復制線程,這些線程默認個數是5個(可修改),這個復制過程和map寫入磁盤過程類似,也有閾值和內存大小,閾值一樣可以在配置文件里配置,而內存大小是直接使用reduce的tasktracker的內存大小,復制時候reduce還會進行排序操作和合並文件操作,這些操作完了就會進行reduce計算了。 -
reduce階段:由我們自己編寫,最終結果存儲在hdfs上的。