一、mapreduce的思想
MapReduce擅長處理大數據,它為什么具有這種能力呢?這可由MapReduce的設計思想發覺。MapReduce的思想就是“分而治之”。
(1)Mapper負責“分”,即把復雜的任務分解為若干個“簡單的任務”來處理。“簡單的任務”包含三層含義:一是數據或計算的規模相對原任務要大大縮小;二是就近計算原則,即任務會分配到存放着所需數據的節點上進行計算;三是這些小任務可以並行計算,彼此間幾乎沒有依賴關系。
(2)Reducer負責對map階段的結果進行匯總。至於需要多少個Reducer,用戶可以根據具體問題,通過在mapred-site.xml配置文件里設置參數mapred.reduce.tasks的值,缺省值為1。
二、mapreduce編寫規范
1、用戶編寫的程序分成三個部分:Mapper,Reducer,Driver(提交運行 MR 程序的客戶端)
2、Mapper 的輸入數據是 KV 對的形式(KV 的類型可自定義)
3、Mapper 的輸出數據是 KV 對的形式(KV 的類型可自定義)
4、Mapper 中的業務邏輯寫在 map()方法中
5、map()方法(maptask 進程)對每一個<k,v>調用一次
6、Reducer 的輸入數據類型對應 Mapper 的輸出數據類型,也是 KV 對的形式
7、Reducer 的業務邏輯寫在 reduce()方法中
8、Reducetask 進程對每一組相同 k 的<k,v>組調用一次 reduce()方法
9、用戶自定義的 Mapper 和 Reducer 都要繼承各自的父類
10、整個程序需要一個 Drvier 來進行提交,提交的是一個描述了各種必要信息的 job 對象
三、mapreduce程序在分布式運行時的實例進程
1、MRAppMaster:負責整個程序的過程調度及狀態協調
2、Yarnchild:負責 map 階段的整個數據處理流程
3、Yarnchild:負責 reduce 階段的整個數據處理流程 以上兩個階段 MapTask 和 ReduceTask 的進程都是 YarnChild,並不是說這 MapTask 和 ReduceTask 就跑在同一個 YarnChild 進行里
四、mapreduce運行的過程
1、一個 mr 程序啟動的時候,最先啟動的是 MRAppMaster,MRAppMaster 啟動后根據本次 job 的描述信息,計算出需要的 maptask 實例數量,然后向集群申請機器啟動相應數量的 maptask 進程
2、 maptask 進程啟動之后,根據給定的數據切片(哪個文件的哪個偏移量范圍)范圍進行數 據處理,主體流程為:
A、利用客戶指定的 InputFormat 來獲取 RecordReader 讀取數據,形成輸入 KV 對
B、將輸入 KV 對傳遞給客戶定義的 map()方法,做邏輯運算,並將 map()方法輸出的 KV 對收 集到緩存
C、將緩存中的 KV 對按照 K 分區排序后不斷溢寫到磁盤文件
3、 MRAppMaster 監控到所有 maptask 進程任務完成之后(真實情況是,某些 maptask 進 程處理完成后,就會開始啟動 reducetask 去已完成的 maptask 處 fetch 數據),會根據客戶指 定的參數啟動相應數量的 reducetask 進程,並告知 reducetask 進程要處理的數據范圍(數據 分區)
4、Reducetask 進程啟動之后,根據 MRAppMaster 告知的待處理數據所在位置,從若干台 maptask 運行所在機器上獲取到若干個 maptask 輸出結果文件,並在本地進行重新歸並排序, 然后按照相同 key 的 KV 為一個組,調用客戶定義的 reduce()方法進行邏輯運算,並收集運 算輸出的結果 KV,然后調用客戶指定的 OutputFormat 將結果數據輸出到外部存儲
五、maptask的並行度
Hadoop中MapTask的並行度的決定機制。在MapReduce程序的運行中,並不是MapTask越多就越好。需要考慮數據量的多少及機器的配置。如果數據量很少,可能任務啟動的時間都遠遠超過數據的處理時間。同樣可不是越少越好。
那么應該如何切分呢?
假如我們有一個300M的文件,它會在HDFS中被切成3塊。0-128M,128-256M,256-300M。並被放置到不同的節點上去了。在MapReduce任務中,這3個Block會被分給3個MapTask。
MapTask在任務切片時實際上也是分配一個范圍,只是這個范圍是邏輯上的概念,與block的物理划分沒有什么關系。但在實踐過程中如果MapTask讀取的數據不在運行的本機,則必須通過網絡進行數據傳輸,對性能的影響非常大。所以常常采取的策略是就按照塊的存儲切分MapTask,使得每個MapTask盡可能讀取本機的數據。
如果一個Block非常小,也可以把多個小Block交給一個MapTask。
所以MapTask的切分要看情況處理。默認的實現是按照Block大小進行切分。MapTask的切分工作由客戶端(我們寫的main方法)負責。一個切片就對應一個MapTask實例。
六、maptask並行度決定機制
1個job的map階段並行度由客戶端在提交job時決定。
而客戶端對map階段並行度的規划的基本邏輯為:
將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據划分成邏輯上的多個split),然后每一個split分配一個mapTask並行實例處理
切片機制:
FileInputFormat 中默認的切片機制
1、簡單地按照文件的內容長度進行切片
2、切片大小,默認等於 block 大小
3、切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片 比如待處理數據有兩個文件:
File1.txt 200M
File2.txt 100M
經過 getSplits()方法處理之后,形成的切片信息是:
File1.txt-split1 0-128M
File1.txt-split2 129M-200M
File2.txt-split1 0-100M
通過分析源碼,在 FileInputFormat 中,計算切片大小的邏輯: long splitSize = computeSplitSize(blockSize, minSize, maxSize),翻譯一下就是求這三個值的中 間值
切片主要由這幾個值來運算決定:
blocksize:默認是 128M,可通過 dfs.blocksize 修改
minSize:默認是 1,可通過 mapreduce.input.fileinputformat.split.minsize 修改
maxsize:默認是 Long.MaxValue,可通過 mapreduce.input.fileinputformat.split.maxsize 修改
因此,如果 maxsize 調的比 blocksize 小,則切片會小於 blocksize 如果 minsize 調的比 blocksize 大,則切片會大於 blocksize 但是,不論怎么調參數,都不能讓多個小文件“划入”一個 split
七、reducetask並行度
reducetask 的並行度同樣影響整個 job 的執行並發度和執行效率,但與 maptask 的並發數由 切片數決定不同,Reducetask 數量的決定是可以直接手動設置: job.setNumReduceTasks(4);
默認值是 1,
手動設置為 4,表示運行 4 個 reduceTask,
設置為 0,表示不運行 reduceTask 任務,也就是沒有 reducer 階段,只有 mapper 階段
如果數據分布不均勻,就有可能在 reduce 階段產生數據傾斜
注意:reducetask 數量並不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全 局匯總結果,就只能有 1 個 reducetask
盡量不要運行太多的 reducetask。對大多數 job 來說,最好 rduce 的個數最多和集群中的 reduce 持平,或者比集群的 reduce slots 小。這個對於小集群而言,尤其重要。
八、reducetask並行度決定機制
1、job.setNumReduceTasks(number);
2、job.setReducerClass(MyReducer.class);
3、job.setPartitioonerClass(MyPTN.class);
分以下幾種情況討論:
1、如果number為1,並且2已經設置為自定義Reducer, reduceTask的個數就是1
不管用戶編寫的MR程序有沒有設置Partitioner,那么該分區組件都不會起作用
2、如果number沒有設置,並且2已經設置為自定義Reducer, reduceTask的個數就是1
在默認的分區組件的影響下,不管用戶設置的number,不管是幾,只要大於1,都是可以正常執行的。
如果在設置自定義的分區組件時,那么就需要注意:
你設置的reduceTasks的個數,必須要 ==== 分區編號中的最大值 + 1
最好的情況下:分區編號都是連續的。
那么reduceTasks = 分區編號的總個數 = 分區編號中的最大值 + 1
3、如果number為 >= 2 並且2已經設置為自定義Reducer reduceTask的個數就是number
底層會有默認的數據分區組件在起作用
4、如果你設置了number的個數,但是沒有設置自定義的reducer,那么該mapreduce程序不代表沒有reducer階段
真正的reducer中的邏輯,就是調用父類Reducer中的默認實現邏輯:原樣輸出
reduceTask的個數 就是 number
5、如果一個MR程序中,不想有reducer階段。那么只需要做一下操作即可:
job.setNumberReudceTasks(0);
整個MR程序只有mapper階段。沒有reducer階段。
那么就沒有shuffle階段
九、partitioner的作用
在進行MapReduce計算時,有時候需要把最終的輸出數據分到不同的文件中,比如按照省份划分的話,需要把同一省份的數據放到一個文件中;按照性別划分的話,需要把同一性別的數據放到一個文件中。我們知道最終的輸出數據是來自於Reducer任務。那么,如果要得到多個文件,意味着有同樣數量的Reducer任務在運行。Reducer任務的數據來自於Mapper任務,也就說Mapper任務要划分數據,對於不同的數據分配給不同的Reducer任務運行。Mapper任務划分數據的過程就稱作Partition。負責實現划分數據的類稱作Partitioner。
十、combinar的作用
combiner其實屬於優化方案,由於帶寬限制,應該盡量map和reduce之間的數據傳輸數量。它在Map端把同一個key的鍵值對合並在一起並計算,計算規則與reduce一致,所以combiner也可以看作特殊的Reducer。
執行combiner操作要求開發者必須在程序中設置了combiner(程序中通過job.setCombinerClass(myCombine.class)自定義combiner操作)。
Combiner組件是用來做局部匯總的,就在mapTask中進行匯總;Reducer組件是用來做全局匯總的,最終的,最后一次匯總。
十一、mapreduce的shuffle詳解
1、MapReduce 中,mapper 階段處理的數據如何傳遞給 reducer 階段,是 MapReduce 框架中 最關鍵的一個流程,這個流程就叫 Shuffle
2、Shuffle: 數據混洗 ——(核心機制:數據分區,排序,局部聚合,緩存,拉取,再合並 排序)
3、具體來說:就是將 MapTask 輸出的處理結果數據,按照 Partitioner 組件制定的規則分發 給 ReduceTask,並在分發的過程中,對數據按 key 進行了分區和排序
MapReduce的Shuffle過程介紹:
Shuffle的本義是洗牌、混洗,把一組有一定規則的數據盡量轉換成一組無規則的數據,越隨機越好。MapReduce中的Shuffle更像是洗牌的逆過程,把一組無規則的數據盡量轉換成一組具有一定規則的數據。
為什么MapReduce計算模型需要Shuffle過程?我們都知道MapReduce計算模型一般包括兩個重要的階段:Map是映射,負責數據的過濾分發;Reduce是規約,負責數據的計算歸並。Reduce的數據來源於Map,Map的輸出即是Reduce的輸入,Reduce需要通過Shuffle來獲取數據。
從Map輸出到Reduce輸入的整個過程可以廣義地稱為Shuffle。Shuffle橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和sort過程,如圖所示:
Spill過程:
Spill過程包括輸出、排序、溢寫、合並等步驟,如圖所示:
Collect
每個Map任務不斷地以對的形式把數據輸出到在內存中構造的一個環形數據結構中。使用環形數據結構是為了更有效地使用內存空間,在內存中放置盡可能多的數據。
這個數據結構其實就是個字節數組,叫Kvbuffer,名如其義,但是這里面不光放置了數據,還放置了一些索引數據,給放置索引數據的區域起了一個Kvmeta的別名,在Kvbuffer的一塊區域上穿了一個IntBuffer(字節序采用的是平台自身的字節序)的馬甲。數據區域和索引數據區域在Kvbuffer中是相鄰不重疊的兩個區域,用一個分界點來划分兩者,分界點不是亘古不變的,而是每次Spill之后都會更新一次。初始的分界點是0,數據的存儲方向是向上增長,索引數據的存儲方向是向下增長,如圖所示:
Kvbuffer的存放指針bufindex是一直悶着頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之后,bufindex增長為4,一個Int型的value寫完之后,bufindex增長為8。
索引是對在kvbuffer中的索引,是個四元組,包括:value的起始位置、key的起始位置、partition值、value的長度,占用四個Int長度,Kvmeta的存放指針Kvindex每次都是向下跳四個“格子”,然后再向上一個格子一個格子地填充四元組的數據。比如Kvindex初始位置是-4,當第一個寫完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長度,然后Kvindex跳到-8位置,等第二個和索引寫完之后,Kvindex跳到-32位置。
Kvbuffer的大小雖然可以通過參數設置,但是總共就那么大,和索引不斷地增加,加着加着,Kvbuffer總有不夠用的那天,那怎么辦?把數據從內存刷到磁盤上再接着往內存寫數據,把Kvbuffer中的數據刷到磁盤上的過程就叫Spill,多么明了的叫法,內存中的數據滿了就自動地spill到具有更大空間的磁盤。
關於Spill觸發的條件,也就是Kvbuffer用到什么程度開始Spill,還是要講究一下的。如果把Kvbuffer用得死死得,一點縫都不剩的時候再開始Spill,那Map任務就需要等Spill完成騰出空間之后才能繼續寫數據;如果Kvbuffer只是滿到一定程度,比如80%的時候就開始Spill,那在Spill的同時,Map任務還能繼續寫數據,如果Spill夠快,Map可能都不需要為空閑空間而發愁。兩利相衡取其大,一般選擇后者。
Spill這個重要的過程是由Spill線程承擔,Spill線程從Map任務接到“命令”之后就開始正式干活,干的活叫SortAndSpill,原來不僅僅是Spill,在Spill之前還有個頗具爭議性的Sort。
Sort:
先把Kvbuffer中的數據按照partition值和key兩個關鍵字升序排序,移動的只是索引數據,排序結果是Kvmeta中數據按照partition為單位聚集在一起,同一partition內的按照key有序。
Spill:
Spill線程為這次Spill過程創建一個磁盤文件:從所有的本地目錄中輪訓查找能存儲這么大空間的目錄,找到之后在其中創建一個類似於“spill12.out”的文件。Spill線程根據排過序的Kvmeta挨個partition的把數據吐到這個文件中,一個partition對應的數據吐完之后順序地吐下個partition,直到把所有的partition遍歷完。一個partition在文件中對應的數據也叫段(segment)。
所有的partition對應的數據都放在這個文件里,雖然是順序存放的,但是怎么直接知道某個partition在這個文件中存放的起始位置呢?強大的索引又出場了。有一個三元組記錄某個partition對應的數據在這個文件中的索引:起始位置、原始數據長度、壓縮之后的數據長度,一個partition對應一個三元組。然后把這些索引信息存放在內存中,如果內存中放不下了,后續的索引信息就需要寫到磁盤文件中了:從所有的本地目錄中輪訓查找能存儲這么大空間的目錄,找到之后在其中創建一個類似於“spill12.out.index”的文件,文件中不光存儲了索引數據,還存儲了crc32的校驗數據。(spill12.out.index不一定在磁盤上創建,如果內存(默認1M空間)中能放得下就放在內存中,即使在磁盤上創建了,和spill12.out文件也不一定在同一個目錄下。)
每一次Spill過程就會最少生成一個out文件,有時還會生成index文件,Spill的次數也烙印在文件名中。索引文件和數據文件的對應關系如下圖所示:
在Spill線程如火如荼的進行SortAndSpill工作的同時,Map任務不會因此而停歇,而是一無既往地進行着數據輸出。Map還是把數據寫到kvbuffer中,那問題就來了:只顧着悶頭按照bufindex指針向上增長,kvmeta只顧着按照Kvindex向下增長,是保持指針起始位置不變繼續跑呢,還是另謀它路?如果保持指針起始位置不變,很快bufindex和Kvindex就碰頭了,碰頭之后再重新開始或者移動內存都比較麻煩,不可取。Map取kvbuffer中剩余空間的中間位置,用這個位置設置為新的分界點,bufindex指針移動到這個分界點,Kvindex移動到這個分界點的-16位置,然后兩者就可以和諧地按照自己既定的軌跡放置數據了,當Spill完成,空間騰出之后,不需要做任何改動繼續前進。分界點的轉換如下圖所示:
Map任務總要把輸出的數據寫到磁盤上,即使輸出數據量很小在內存中全部能裝得下,在最后也會把數據刷到磁盤上。
Merge
Map任務如果輸出數據量很大,可能會進行好幾次Spill,out文件和Index文件會產生很多,分布在不同的磁盤上。最后把這些文件進行合並的merge過程閃亮登場。
Merge過程怎么知道產生的Spill文件都在哪了呢?從所有的本地目錄上掃描得到產生的Spill文件,然后把路徑存儲在一個數組里。Merge過程又怎么知道Spill的索引信息呢?沒錯,也是從所有的本地目錄上掃描得到Index文件,然后把索引信息存儲在一個列表里。到這里,又遇到了一個值得納悶的地方。在之前Spill過程中的時候為什么不直接把這些信息存儲在內存中呢,何必又多了這步掃描的操作?特別是Spill的索引數據,之前當內存超限之后就把數據寫到磁盤,現在又要從磁盤把這些數據讀出來,還是需要裝到更多的內存中。之所以多此一舉,是因為這時kvbuffer這個內存大戶已經不再使用可以回收,有內存空間來裝這些數據了。(對於內存空間較大的土豪來說,用內存來省卻這兩個io步驟還是值得考慮的。)
然后為merge過程創建一個叫file.out的文件和一個叫file.out.Index的文件用來存儲最終的輸出和索引。
一個partition一個partition的進行合並輸出。對於某個partition來說,從索引列表中查詢這個partition對應的所有索引信息,每個對應一個段插入到段列表中。也就是這個partition對應一個段列表,記錄所有的Spill文件中對應的這個partition那段數據的文件名、起始位置、長度等等。
然后對這個partition對應的所有的segment進行合並,目標是合並成一個segment。當這個partition對應很多個segment時,會分批地進行合並:先從segment列表中把第一批取出來,以key為關鍵字放置成最小堆,然后從最小堆中每次取出最小的輸出到一個臨時文件中,這樣就把這一批段合並成一個臨時的段,把它加回到segment列表中;再從segment列表中把第二批取出來合並輸出到一個臨時segment,把其加入到列表中;這樣往復執行,直到剩下的段是一批,輸出到最終的文件中。
最終的索引數據仍然輸出到Index文件中。
Map端的Shuffle過程到此結束。
Copy:
Reduce任務通過HTTP向各個Map任務拖取它所需要的數據。每個節點都會啟動一個常駐的HTTP server,其中一項服務就是響應Reduce拖取Map數據。當有MapOutput的HTTP請求過來的時候,HTTP server就讀取相應的Map輸出文件中對應這個Reduce部分的數據通過網絡流輸出給Reduce。
Reduce任務拖取某個Map對應的數據,如果在內存中能放得下這次數據的話就直接把數據寫到內存中。Reduce要向每個Map去拖取數據,在內存中每個Map對應一塊數據,當內存中存儲的Map數據占用空間達到一定程度的時候,開始啟動內存中merge,把內存中的數據merge輸出到磁盤上一個文件中。
如果在內存中不能放得下這個Map的數據的話,直接把Map數據寫到磁盤上,在本地目錄創建一個文件,從HTTP流中讀取數據然后寫到磁盤,使用的緩存區大小是64K。拖一個Map數據過來就會創建一個文件,當文件數量達到一定閾值時,開始啟動磁盤文件merge,把這些文件合並輸出到一個文件。
有些Map的數據較小是可以放在內存中的,有些Map的數據較大需要放在磁盤上,這樣最后Reduce任務拖過來的數據有些放在內存中了有些放在磁盤上,最后會對這些來一個全局合並。
Merge Sort:
這里使用的Merge和Map端使用的Merge過程一樣。Map的輸出數據已經是有序的,Merge進行一次合並排序,所謂Reduce端的sort過程就是這個合並的過程。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的。
Reduce端的Shuffle過程至此結束。