MapReduce計算框架
一、MapReduce實現原理
圖展示了MapReduce實現中的全部流程,處理步驟如下:
1、用戶程序中的MapReduce函數庫首先把輸入文件分成M塊(每塊大小默認64M),在集群上執行處理程序,見序號1
2、主控程序master分配Map任務和Reduce任務給工作執行機器worker。見序號2
3、一個分配了Map任務的worker讀取並處理輸入數據塊。從數據片段中解析出key/value鍵值對,然后把其傳遞給Map函數,由Map函數生成並輸出中間key/value鍵值對集合,暫緩內存中。見序號3
4、緩存中key/value鍵值對通過分區函數分成R個區域,之后周期性地寫到本地磁盤上。同時將本地磁盤的存儲位置傳給master,由master負責把這些存儲位置再傳遞給Reduce worker,見序號4
5、當Reduce worker收到master的存儲位置信息后,使用RPC從Map worker所在的磁盤上讀取數據。最后通過對key進行排序使得具有key值的數據聚合到一起。見序號5
6、Reduce worker程序遍歷排序后的中間數據。Reduce worker程序將這個key值和相關的中間value值的集合傳遞給Rdeduce函數,最后,Reduce的輸出被追加到所屬分區的輸入文件。見序號6
二、計算流程與機制
2.1、作業提交和初始化
MapReduce的JobTracker接收到客戶端提交的作業后首先要把作業初始化為Map任務和Reduce任務,然后等待調度執行。
圖中展示了MapReduce客戶端提交作業以及初始化的流程。
作業提交過程:
1、命令行提交。調用JobClient.runJob()方法開始提交,最終通過Job對象內部JobClient對象的submitJobInternal方法來提交作業到JobTracker。
2、作業上傳。在提交到JobTracker之前還需要完成相關的初始化工作(獲取用戶作業的JobId,創建HDFS目錄,上傳作業、相關依賴庫,需要分發的文件等到HDFS上,獲取用戶輸入數據的所有分片信息)。
3、產生切片文件。在作業提交后,JobClient調用InputFormat中的getSplits()方法產生用戶數據的split分片信息。
4、提交作業到JobTracker。JobClient通過RPC將作業提交到JobTracker作業調度器中,首先為作業創建JobInProgress對象,用於跟蹤正在運行的作業的狀態和進度。其次檢查用戶是否具有指定隊列的作業提交權限。接着檢查作業配置的內存使用量是否合理。最后通過TaskScheduler初始化作業,JobTracker收到提交的作業后,會交給TaskScheduler調度器,然后按照一定的策略對作業執行初始化操作。
作業的初始化:主要是構造Map Task和Reduce Task並對他們進行初始化操作,主要是調度器調用JobTracker.initJob()方法來進行的。具體分為四個類型的任務:
Setup Task--->Map Task--->Reduce Task--->Cleanup Task
2.2、Mapper
在作業提交完成之后,就開始執行Map Task任務了,Mapper的任務就是執行用戶的Map()函數將輸入鍵值對(key/value pair)映像到一組中間格式的鍵值對集合。
圖中是Mapper的處理流程圖。Mapper的輸入文件在HDFS上;InputFormat接口描述文件的格式信息,通過這個接口可以獲得InputSplit的實現,然后對輸入的數據進行切分;每一個Split分塊對應一個Mapper任務,通過RecordReader對象從輸入分塊中讀取並生成<k,v>鍵值對;Map函數接收這些鍵值對根據用戶的Map函數進行處理后輸出<k1,v1>鍵值對,Map函數通過context.collect方法將結果寫到context對象中;當Mapper的輸出鍵值對被收集后,他們會被Partitioner類中的partition()函數以指定的分區寫到輸出緩沖區,同時調用sort函數對輸出進行排序,如果用戶為Mapper指定了Combiner,則在Mapper輸出它的鍵值對<k1,v1>時,不會馬上寫到輸出中,會被收集在list對象中,當寫入一定數量的鍵值對,這部分緩沖會被Combiner中的combine函數合並,然后輸出被寫入本地文件系統之后會進入Reduce階段。
2.3、Reducer
Reducer有三個主要階段:Shuffer、Sort和Reduce,處理流程圖為:
Reducer的整個處理流程為:
1、Shuffle階段,此時Reducer的輸入就是Mapper已經排序好的輸出。可以理解為混洗階段,相當於數據復制階段。
2、Sort階段,按照key值對Reducer的輸入進行分組,Shuffle和Sort是同時進行的。
3、Reduce階段,通過前兩個階段得到的<key,(list of values)>會送到Reducer中的reduce()函數中處理。輸出的結果通過OutputFormat輸出到DFS中。
2.4、Reporter和OutputCollector
Reporter是用於MapReduce應用程序報告進度,設定應用級別的消息,更新Counters計數器的機制。
OutputCollector是一個由Map/Reduce框架提供的,用於收集Mapper和Reducer輸出數據的通用機制,老版本用collect函數,新版本用write函數。
三、MapReduce的I/O格式
3.1、輸入格式
Hadoop中的MapReduce框架依賴InputFormat提供數據輸入,也就是InputFormat為MapReduce作業描述輸入的細節規范。InputFormat有三個作用:
1、檢查作業輸入的有效性。
2、把輸入文件切分成多個邏輯InputSplit實例,並把每一個實例分別分發給一個Mapper,也就是一個Mapper的輸入只對應一個邏輯InputSplit,只處理一個Split文件的數據塊。
3、提供ReduceReader的實現。從邏輯InputSplit中獲取輸入記錄,這些記錄將由Mapper處理,Mapper利用該實現從InputSplit中讀取輸入的<K,V>鍵值對。
1、TextInputFormat
用於讀取純文本文件,是Hadoop默認的InputFormat的派生類。LineRecordReader將Inputsplit解析成<key,value>對,key是每一行的位置(偏移量,為LongWritable類型),value是每一行的內容(為Text類型)。類圖如下。
通過類圖看出,TextInputFormat類繼承了FileInputFormat基類,實現了JobConfigure接口,實現了InputFormat中的getRecordReader這一方法,返回一個RecordReader用於划分中讀取<key,value>鍵值對。
2、KeyValueTextInputFormat
類似TextInputFormat一樣。
3、NLineInputFormat
這個類型可以將文件以行為單位進行split切分,比如每一行對應一個Map,得到key是每一行的位置,value是每一行的內容。這里的N指的是每一個Mapper接收的對應一個mapper來處理。
4、SequenceFileInputFormat
用於讀取sequencefile。sequence是hadoop用於存儲數自定義格式的二進制binary文件。其主要有兩個子類。SequenceFileAsBinaryInputFormat用於處理二進制數據。SequenceFileAsTextInputFormat,主要是為了適應Hadoop streaming接口而設計的,在讀取鍵值對后會調用toString()方法將key和value類型轉化為Text類型對象。
3.2、輸出格式
hadoop中的OutputFormat用來描述MapReduce作業的輸出格式。OutputFormat主要有三個作用:
1、檢驗作業的輸出
2、驗證輸出結果類型是否如在Config中所配置的。
3、提供一個RecordWriter的實現,用來輸出作業結果。RecordWriter生成<key,value>鍵值對到輸出文件。
1、TextOutputFormat
是默認的輸出格式,就是輸出純文本文件,格式為key+"\t"+value,key與value之間默認以\t分割。
2、SequeceFileOutputFormat
和輸入類似
3、MapFileOutputFormat
這個輸出類型可以將數據輸出為hadoop中的MpaFile文件格式。MapFile與SequenceFile類似。
4、MultipleOutputFormat
是hadoop類中的多路輸出