概述
一個完整的 MapReduce 程序在分布式運行時有兩類實例進程:
1、MRAppMaster:負責整個程序的過程調度及狀態協調
2、Yarnchild:負責 map 階段的整個數據處理流程
3、Yarnchild:負責 reduce 階段的整個數據處理流程 以上兩個階段 MapTask 和 ReduceTask 的進程都是 YarnChild,並不是說這 MapTask 和 ReduceTask 就跑在同一個 YarnChild 進行里
MapReduce 套路圖
MapReduce 程序的運行
1、一個 mr 程序啟動的時候,最先啟動的是 MRAppMaster,MRAppMaster 啟動后根據本次 job 的描述信息,計算出需要的 maptask 實例數量,然后向集群申請機器啟動相應數量的 maptask 進程
2、 maptask 進程啟動之后,根據給定的數據切片(哪個文件的哪個偏移量范圍)范圍進行數 據處理,主體流程為:
A、利用客戶指定的 InputFormat 來獲取 RecordReader 讀取數據,形成輸入 KV 對
B、將輸入 KV 對傳遞給客戶定義的 map()方法,做邏輯運算,並將 map()方法輸出的 KV 對收 集到緩存
C、將緩存中的 KV 對按照 K 分區排序后不斷溢寫到磁盤文件
3、 MRAppMaster 監控到所有 maptask 進程任務完成之后(真實情況是,某些 maptask 進 程處理完成后,就會開始啟動 reducetask 去已完成的 maptask 處 fetch 數據),會根據客戶指 定的參數啟動相應數量的 reducetask 進程,並告知 reducetask 進程要處理的數據范圍(數據 分區)
4、Reducetask 進程啟動之后,根據 MRAppMaster 告知的待處理數據所在位置,從若干台 maptask 運行所在機器上獲取到若干個 maptask 輸出結果文件,並在本地進行重新歸並排序, 然后按照相同 key 的 KV 為一個組,調用客戶定義的 reduce()方法進行邏輯運算,並收集運 算輸出的結果 KV,然后調用客戶指定的 OutputFormat 將結果數據輸出到外部存儲
mapTask的並行度
Hadoop中MapTask的並行度的決定機制。在MapReduce程序的運行中,並不是MapTask越多就越好。需要考慮數據量的多少及機器的配置。如果數據量很少,可能任務啟動的時間都遠遠超過數據的處理時間。同樣可不是越少越好。
那么應該如何切分呢?
假如我們有一個300M的文件,它會在HDFS中被切成3塊。0-128M,128-256M,256-300M。並被放置到不同的節點上去了。在MapReduce任務中,這3個Block會被分給3個MapTask。
MapTask在任務切片時實際上也是分配一個范圍,只是這個范圍是邏輯上的概念,與block的物理划分沒有什么關系。但在實踐過程中如果MapTask讀取的數據不在運行的本機,則必須通過網絡進行數據傳輸,對性能的影響非常大。所以常常采取的策略是就按照塊的存儲切分MapTask,使得每個MapTask盡可能讀取本機的數據。
如果一個Block非常小,也可以把多個小Block交給一個MapTask。
所以MapTask的切分要看情況處理。默認的實現是按照Block大小進行切分。MapTask的切分工作由客戶端(我們寫的main方法)負責。一個切片就對應一個MapTask實例。
MapTask並行度的決定機制
1個job的map階段並行度由客戶端在提交job時決定。
而客戶端對map階段並行度的規划的基本邏輯為:
將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據划分成邏輯上的多個split),然后每一個split分配一個mapTask並行實例處理
這段邏輯及形成的切片規划描述文件,由FileInputFormat實現類的getSplits()方法完成,其過程如下圖:
切片機制
FileInputFormat 中默認的切片機制
1、簡單地按照文件的內容長度進行切片
2、切片大小,默認等於 block 大小
3、切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片 比如待處理數據有兩個文件:
File1.txt 200M
File2.txt 100M
經過 getSplits()方法處理之后,形成的切片信息是:
File1.txt-split1 0-128M
File1.txt-split2 129M-200M
File2.txt-split1 0-100M
通過分析源碼,在 FileInputFormat 中,計算切片大小的邏輯: long splitSize = computeSplitSize(blockSize, minSize, maxSize),翻譯一下就是求這三個值的中 間值
切片主要由這幾個值來運算決定:
blocksize:默認是 128M,可通過 dfs.blocksize 修改
minSize:默認是 1,可通過 mapreduce.input.fileinputformat.split.minsize 修改
maxsize:默認是 Long.MaxValue,可通過 mapreduce.input.fileinputformat.split.maxsize 修改
因此,如果 maxsize 調的比 blocksize 小,則切片會小於 blocksize 如果 minsize 調的比 blocksize 大,則切片會大於 blocksize 但是,不論怎么調參數,都不能讓多個小文件“划入”一個 split
MapTask 並行度經驗之談
如果硬件配置為 2*12core + 64G,恰當的 map 並行度是大約每個節點 20-100 個 map,最好 每個 map 的執行時間至少一分鍾。
1、如果 job 的每個 map 或者 reduce task 的運行時間都只有 30-40 秒鍾,那么就減少該 job 的 map 或者 reduce 數,每一個 task(map|reduce)的 setup 和加入到調度器中進行調度,這個 中間的過程可能都要花費幾秒鍾,所以如果每個 task 都非常快就跑完了,就會在 task 的開 始和結束的時候浪費太多的時間。
配置 task 的 JVM 重用可以改善該問題:
mapred.job.reuse.jvm.num.tasks,默認是 1,表示一個 JVM 上最多可以順序執行的 task 數目(屬於同一個 Job)是 1。也就是說一個 task 啟一個 JVM。這個值可以在 mapred-site.xml 中進行更改,當設置成多個,就意味着這多個 task 運行在同一個 JVM 上,但不是同時執行, 是排隊順序執行
2、如果 input 的文件非常的大,比如 1TB,可以考慮將 hdfs 上的每個 blocksize 設大,比如 設成 256MB 或者 512MB
ReduceTask 並行度
reducetask 的並行度同樣影響整個 job 的執行並發度和執行效率,但與 maptask 的並發數由 切片數決定不同,Reducetask 數量的決定是可以直接手動設置: job.setNumReduceTasks(4);
默認值是 1,
手動設置為 4,表示運行 4 個 reduceTask,
設置為 0,表示不運行 reduceTask 任務,也就是沒有 reducer 階段,只有 mapper 階段
如果數據分布不均勻,就有可能在 reduce 階段產生數據傾斜
注意:reducetask 數量並不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全 局匯總結果,就只能有 1 個 reducetask
盡量不要運行太多的 reducetask。對大多數 job 來說,最好 rduce 的個數最多和集群中的 reduce 持平,或者比集群的 reduce slots 小。這個對於小集群而言,尤其重要。
ReduceTask 並行度決定機制
1、job.setNumReduceTasks(number);
2、job.setReducerClass(MyReducer.class);
3、job.setPartitioonerClass(MyPTN.class);
分以下幾種情況討論:
1、如果number為1,並且2已經設置為自定義Reducer, reduceTask的個數就是1
不管用戶編寫的MR程序有沒有設置Partitioner,那么該分區組件都不會起作用
2、如果number沒有設置,並且2已經設置為自定義Reducer, reduceTask的個數就是1
在默認的分區組件的影響下,不管用戶設置的number,不管是幾,只要大於1,都是可以正常執行的。
如果在設置自定義的分區組件時,那么就需要注意:
你設置的reduceTasks的個數,必須要 ==== 分區編號中的最大值 + 1
最好的情況下:分區編號都是連續的。
那么reduceTasks = 分區編號的總個數 = 分區編號中的最大值 + 1
3、如果number為 >= 2 並且2已經設置為自定義Reducer reduceTask的個數就是number
底層會有默認的數據分區組件在起作用
4、如果你設置了number的個數,但是沒有設置自定義的reducer,那么該mapreduce程序不代表沒有reducer階段
真正的reducer中的邏輯,就是調用父類Reducer中的默認實現邏輯:原樣輸出
reduceTask的個數 就是 number
5、如果一個MR程序中,不想有reducer階段。那么只需要做一下操作即可:
job.setNumberReudceTasks(0);
整個MR程序只有mapper階段。沒有reducer階段。
那么就沒有shuffle階段