數據處理總流程
MapReduce計算框架體現的是一個分治的思想。及將待處理的數據分片在每個數據分片上並行運行相同邏輯的map()函數,然后將每一個數據分片的處理結果匯集到reduce()函數進行規約整理,最后輸出結果。
總體上來說MapReduce的處理流程從邏輯上看並不復雜。對於應用Hadoop進行數據分析的開發人員來說,只需實現map()方法和reduce()方法就能完成大部分的工作。正是因為Hadoop邏輯上和開發上都不復雜使它被廣泛的應用於各行各業。
Map階段
Map階段更為詳細的處理過程如圖所示:
一般情況下用戶需要處理分析的數據都在HDFS上。因此,MapReduce計算框架會是使用InputFormat(org.apache.hadoop.mapreduce)的子類將輸入數據分片(InputSplit)。分片后的數據將作為MapTask的輸入,MapTask會根據map()中的程序邏輯將數據分為K-V鍵值對。
為了更好的理解數據分片的過程和實現的邏輯,本文以InputFormat的一個子類FileInputFormat為例研究數據分片的過程。
FileInputFormat類將數據分片,然而這里所說的分片並不是將數據物理上分成多個數據塊而是邏輯分片。
PS:並不是所有文件都可以分片,比如gzip,snappy壓縮的文件就無法分割 .
數據邏輯分片的核心方法是getSplits():
public List<InputSplit> getSplits(JobContext job) throws IOException {
。。。。。。
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
。。。。。。
return splits;
}
其流程圖如下所示:
getSplits()中的BlockLocation類保存待處理文件的數據塊信息,它包含了數據塊所在DataNode的hostname,帶有緩存副本的數據塊所在的節點的hostname,訪問數據塊所在DataNode的IP:端口號,在拓撲網絡中的絕對路徑名,數據塊在整個數據文件中的偏移量,數據塊長度,是否是壞塊。getSplits()會依據這些信息創建一個FileSplit完成一個邏輯分片,然后將所有的邏輯分片信息保存到List中。List中的InputSplit包含四個內容,文件的路徑,文件開始的位置,文件結束的位置,數據塊所在的host。
除了getSplits()方法另一比較重要的算法是computeSplitSize()方法,它負責確定數據分片的大小,數據分片的大小對程序的性能會有一定的影響,最好將數據分片的大小設置的和HDFS中數據分片的大小一致。確定分片大小的算法是:
Math.max(minSize, Math.min(maxSize, blockSize))
set mapred.max.split.size=256000000;2.x版本默認約是128M,我們集群配置的是256M
set mapred.min.split.size=10000000;2.x版本默認是約10M,我們集群配置的是1
blockSize 在hdfs-site.xml參數dfs.block.size中配置,我們集群設置的是默認的是134217728=128M
set mapred.map.tasks 對map task數量僅僅是參考的作用,我們集群默認的是2
對應的是set mapred.reduce.tasks,我們集群默認的是-1
reducer數量可能起作用的
hive.exec.reducers.bytes.per.reducer=256000000
hive.exec.reducers.max=1009
min( hive.exec.reducers.max ,總輸入數據量/hive.exec.reducers.bytes.per.reducer)
其中,minSize是配置文件中設置的分片最小值,minSize則為最大值,blockSize為HDFS中數據塊的大小。
完成邏輯分片后,FileInputFormat的各個子類向MapTask映射k-v鍵值對(如TextInputFormat)。FileInputFormat的子類是對數據分片中的數據進行處理。
TextInputFormat中createRecorderReader()將InputSplit解析為k-v傳給mapTask,該方法中用到了LineRecordReader它繼承自RecordReader。
MapTask最終是通過調用nextKeyValue()方法來遍歷分片中的數據並且將行數以及每一行的的數據分別作為key和value傳遞給map()方法。map()方法按照開發工程師編寫的邏輯對輸入的key和value進行處理后會組成新的k-v對然后寫出到一個內存緩沖區中。
每個MapTask都有一個內存緩沖區,對緩沖區讀寫是典型的生產者消費者模式。這里內存緩沖區的結構設計對MapTask的IO效率有着直接的影響。Hadoop采用了環形內存緩沖區,當緩沖區數據量達到閾值消費者線程SpillThread開始將數據寫出,於此同時充當生產者的writer()函數依然可以將處理完的數據寫入到緩沖區中。生產者和消費者之間的同步是通過可重入互斥鎖spillLock來完成的。
在寫磁盤之前,線程會對緩沖區內的數據進行分區,以決定各個數據會傳輸到哪個Reduce中。而在每個分區中會按key進行排序(如果此時有個Combiner則它會在排序后的輸出上運行一次,以壓縮傳輸的數據)
mapred-site.xml 文件中
mapreduce.task.io .sort.mb=300M
mapreduce.map.sort.spill.percent 配置的默認只0.8
用戶可以通過繼承Partitiner類並且實現getPartitioner()方法,從而定制自己的分區規則。默認的分區規則是通過key的hashCode來完成分區的。
環形緩沖區在達到溢寫的閾值后,溢寫到磁盤(每次溢寫都會新建一個溢寫文件)最后合並溢寫文件,形成一個分區有序的中間結果。另外可以對中間結果進行壓縮,以減少傳輸的數據量。
Reduce階段
Reduce階段更為詳細的流程如下圖所示:
ReduceTask對數據進行規約的第一步就是從MapTask的輸出磁盤上將數據拉取過來。這個過程重點分析shuffle類和Fetcher類。Shuffle類如下圖所示:
Shuffle類中的init()方法負責初始化Shuffle階段需要的上下文,並且在Shuffle的最后階段調用歸並排序方法。Shuffle類的核心方法為run()方法。
public RawKeyValueIterator run() throws IOException, InterruptedException {
。。。。。。
// Start the map-output fetcher threads
Boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getint(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
。。。。。。
eventFetcher.shutDown();
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}
scheduler.close();
copyPhase.complete();
// copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
RawKeyValueIterator kvIter = null;
。。。。。。
return kvIter;
}
在run()方法中它是通過啟動fetcher線程來拉取數據的。首先需要判斷將要拉取的數據是否具有本地性,如果數據在本地則直接傳入文件的地址否則創建fetcher線程來從其他節點遠程拉取數據。Fetcher類類圖如下:
Fetcher繼承自Thread類因此它重寫了run()方法並且調用了copyFromHost()方法。copyFromHost()方法首先獲取指定host上運行完成的MapTaskID然后循環的從Map段讀取數據直到所有的數據都讀取完成。
protected void copyFromHost(MapHost host) throws IOException {
。。。。。。
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
。。。。。。
while (!remaining.isEmpty() && failedTasks == null) {
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
}
catch (IOException e) {
。。。。。
}
}
}
讀取數據是在copyMapOutput()方法中完成的,方法中用到了ShufferHeader類它實現了Writable接口從而可以完成序列化與反序列化的工作,它調用readFields()方法從數據流中讀取數據。
mapreduce.task.io .sort.factor =25
讀取數據過程中需要注意的是,如果中間結果小則復制到內存緩沖區中否則復制到本地磁盤中。當內存緩沖區達到大小閾值或者文件數閾值則溢寫到本地磁盤,與此同時后台線程會不停的合並溢寫文件形成大的有序的文件。
在Shuffle-copy階段進行的同時Shuffle-Sort也在處理數據,這個階段就是針對內存中的數據和磁盤上的數據進行歸並排序。
復制完所有的map輸出做循環歸並排序合並數據。舉個例子更加好理解,若合並因子為10,50個輸出文件,則合並5次,最后剩下5個文件不符合合並條件,則將這5個文件交給Reduce處理。
Reduce階段會接收到已經排完序的k-v對,然后對k-v對進行邏輯處理最后輸出結果k-v對到HDFS中.