在代碼中又確認了一下,Combiner在spill的時候會執行,同時在merge的時候只有spill的文件數大於min.num.spill.for.combine才會執行,具體見代碼:
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, size); out = rfs.create(filename); final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); int spindex = kvstart; IndexRecord rec = new IndexRecord(); InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { final int kvoff = kvoffsets[spindex % kvoffsets.length]; getVBytesForOffset(kvoff, value); key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART])); writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } // close the writer writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } finally { if (null != writer) writer.close(); } } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( getTaskID(), numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } }
private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException { // get the approximate size of the final output/index files long finalOutFileSize = 0; long finalIndexFileSize = 0; final Path[] filename = new Path[numSpills]; final TaskAttemptID mapId = getTaskID(); for(int i = 0; i < numSpills; i++) { filename[i] = mapOutputFile.getSpillFile(mapId, i); finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } if (numSpills == 1) { //the spill is the final output rfs.rename(filename[0], new Path(filename[0].getParent(), "file.out")); if (indexCacheList.size() == 0) { rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0), new Path(filename[0].getParent(),"file.out.index")); } else { indexCacheList.get(0).writeToFile( new Path(filename[0].getParent(),"file.out.index"), job); } return; } // read in paged indices for (int i = indexCacheList.size(); i < numSpills; ++i) { Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i); indexCacheList.add(new SpillRecord(indexFileName, job)); } //make correction in the length to include the sequence file header //lengths for each partition finalOutFileSize += partitions * APPROX_HEADER_LENGTH; finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId, finalOutFileSize); Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite( mapId, finalIndexFileSize); //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); if (numSpills == 0) { //create dummy files IndexRecord rec = new IndexRecord(); SpillRecord sr = new SpillRecord(partitions); try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null); writer.close(); rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); sr.putIndex(rec, i); } sr.writeToFile(finalIndexFile, job); } finally { finalOut.close(); } return; } { IndexRecord rec = new IndexRecord(); final SpillRecord spillRec = new SpillRecord(partitions); for (int parts = 0; parts < partitions; parts++) { //create the segments to be merged List<Segment<K,V>> segmentList = new ArrayList<Segment<K, V>>(numSpills); for(int i = 0; i < numSpills; i++) { IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); Segment<K,V> s = new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset, indexRecord.partLength, codec, true); segmentList.add(i, s); if (LOG.isDebugEnabled()) { LOG.debug("MapId=" + mapId + " Reducer=" + parts + "Spill =" + i + "(" + indexRecord.startOffset + "," + indexRecord.rawLength + ", " + indexRecord.partLength + ")"); } } //merge @SuppressWarnings("unchecked") RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, job.getInt("io.sort.factor", 100), new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, null, spilledRecordsCounter); //write merged output to disk long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); } //close writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, parts); } spillRec.writeToFile(finalIndexFile, job); finalOut.close(); for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); } } } } // MapOutputBuffer
Shuffle過程是MapReduce的核心,也被稱為奇跡發生的地方。要想理解MapReduce, Shuffle是必須要了解的。我看過很多相關的資料,但每次看完都雲里霧里的繞着,很難理清大致的邏輯,反而越攪越混。前段時間在做MapReduce job 性能調優的工作,需要深入代碼研究MapReduce的運行機制,這才對Shuffle探了個究竟。考慮到之前我在看相關資料而看不懂時很惱火,所以在這 里我盡最大的可能試着把Shuffle說清楚,讓每一位想了解它原理的朋友都能有所收獲。如果你對這篇文章有任何疑問或建議請留言到后面,謝謝!
Shuffle的正常意思是洗牌或弄亂,可能大家更熟悉的是Java API里的Collections.shuffle(List)方法,它會隨機地打亂參數list里的元素順序。如果你不知道MapReduce里 Shuffle是什么,那么請看這張圖:
這張是官方對Shuffle過程的描述。但我可以肯定的 是,單從這張圖你基本不可能明白Shuffle的過程,因為它與事實相差挺多,細節也是錯亂的。后面我會具體描述Shuffle的事實情況,所以這里你只 要清楚Shuffle的大致范圍就成-怎樣把map task的輸出結果有效地傳送到reduce端。也可以這樣理解, Shuffle描述着數據從map task輸出到reduce task輸入的這段過程。
在Hadoop這樣的集群環境中,大部分map task與reduce task的執行是在不同的節點上。當然很多情況下Reduce執行時需要跨節點去拉取其它節點上的map task結果。如果集群正在運行的job有很多,那么task的正常執行對集群內部的網絡資源消耗會很嚴重。這種網絡消耗是正常的,我們不能限制,能做的 就是最大化地減少不必要的消耗。還有在節點內,相比於內存,磁盤IO對job完成時間的影響也是可觀的。從最基本的要求來說,我們對Shuffle過程的 期望可以有:
- 完整地從map task端拉取數據到reduce 端。
- 在跨節點拉取數據時,盡可能地減少對帶寬的不必要消耗。
- 減少磁盤IO對task執行的影響。
OK,看到這里時,大家可以先停下來想想,如果是自己來設計這段Shuffle過程,那么你的設計目標是什么。我想能優化的地方主要在於減少拉取數據的量及盡量使用內存而不是磁盤。
我的分析是基於Hadoop0.21.0的源碼,如果與你所認識的Shuffle過程有差別,不吝指出。我會以WordCount為例,並假設它有8個 map task和3個reduce task。從上圖看出,Shuffle過程橫跨map與reduce兩端,所以下面我也會分兩部分來展開。
先看看map端的情況,如下圖:
上圖可能是某個map task的運行情況。拿它與官方圖的左半邊比較,會發現很多不一致。官方圖沒有清楚地說明partition, sort與combiner到底作用在哪個階段。我畫了這張圖,希望讓大家清晰地了解從map數據輸入到map端所有數據准備好的全過程。
整個流程我分了四步。簡單些可以這樣說,每個map task都有一個內存緩沖區,存儲着map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束后再對磁盤中這個map task產生的所有臨時文件做合並,生成最終的正式輸出文件,然后等待reduce task來拉數據。
當然這里的每一步都可能包含着多個步驟與細節,下面我對細節來一一說明:
1. 在map task執行時,它的輸入數據來源於HDFS的block,當然在MapReduce概念中,map task只讀取split。Split與block的對應關系可能是多對一,默認是一對一。在WordCount例子里,假設map的輸入數據都是像 “aaa”這樣的字符串。
2. 在經過mapper的運行后,我們得知mapper的輸出是這樣一個key/value對: key是“aaa”, value是數值1。因為當前map端只做加1的操作,在reduce task里才去合並結果集。前面我們知道這個job有3個reduce task,到底當前的“aaa”應該交由哪個reduce去做呢,是需要現在決定的。
MapReduce提供Partitioner接口,它的作用就是根據key或value及reduce的數量來決定當前的這對輸出數據最終應該交由哪個 reduce task處理。默認對key hash后再以reduce task數量取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以訂制並設置到job上。
在我們的例子中,“aaa”經過Partitioner后返回0,也就是這對值應當交由第一個reducer來處理。接下來,需要將數據寫入內存緩沖區 中,緩沖區的作用是批量收集map結果,減少磁盤IO的影響。我們的key/value對以及Partition的結果都會被寫入緩沖區。當然寫入之 前,key與value值都會被序列化成字節數組。
整個內存緩沖區就是一個字節數組,它的字節索引及key/value存儲結構我沒有研究過。如果有朋友對它有研究,那么請大致描述下它的細節吧。
3. 這個內存緩沖區是有大小限制的,默認是100MB。當map task的輸出結果很多時,就可能會撐爆內存,所以需要在一定條件下將緩沖區中的數據臨時寫入磁盤,然后重新利用這塊緩沖區。這個從內存往磁盤寫數據的過 程被稱為Spill,中文可譯為溢寫,字面意思很直觀。這個溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不應該阻止map 的結果輸出,所以整個緩沖區有個溢寫的比例spill.percent。這個比例默認是0.8,也就是當緩沖區的數據已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的內存,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB內存中寫,互不影響。
當溢寫線程啟動后,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型默認的行為,這里的排序也是對序列化的字節做的排序。
在這里我們可以想想,因為map task的輸出是需要發送到不同的reduce端去,而內存緩沖區沒有對將發送到相同reduce端的數據做合並,那么這種合並應該是體現是磁盤文件中 的。從官方圖上也可以看到寫到磁盤中的溢寫文件是對不同的reduce端的數值做過合並。所以溢寫過程一個很重要的細節在於,如果有很多個 key/value對需要發送到某個reduce端去,那么需要將這些key/value值拼接到一塊,減少與partition相關的索引記錄。
在針對每個reduce端而合並數據時,有些數據可能像這樣:“aaa”/1, “aaa”/1。對於WordCount例子,就是簡單地統計單詞出現的次數,如果在同一個map task的結果中有很多個像“aaa”一樣出現多次的key,我們就應該把它們的值合並到一塊,這個過程叫reduce也叫combine。但 MapReduce的術語中,reduce只指reduce端執行從多個map task取數據做計算的過程。除reduce外,非正式地合並數據只能算做combine了。其實大家知道的,MapReduce中將Combiner等 同於Reducer。
如果client設置過Combiner,那么現在就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減少溢 寫到磁盤的數據量。Combiner會優化MapReduce的中間結果,所以它在整個模型中會多次使用。那哪些場景才能使用Combiner呢?從這里 分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那種 Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值等。Combiner的使用一定 得慎重,如果用好,它對job執行效率有幫助,反之會影響reduce的最終結果。
4. 每次溢寫會在磁盤上生成一個溢寫文件,如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁盤上相應的就會有多個溢寫文件存在。當map task真正完成時,內存緩沖區中的數據也全部溢寫到磁盤中形成一個溢寫文件。最終磁盤中會至少有一個這樣的溢寫文件存在(如果map的輸出結果很少,當 map執行完成時,只會產生一個溢寫文件),因為最終的文件只有一個,所以需要將這些溢寫文件歸並到一起,這個過程就叫做Merge。Merge是怎樣 的?如前面的例子,“aaa”從某個map task讀取過來時值是5,從另外一個map 讀取時值是8,因為它們有相同的key,所以得merge成group。什么是group。對於“aaa”就是像這樣的:{“aaa”, [5, 8, 2, …]},數組中的值就是從不同溢寫文件中讀取出來的,然后再把這些值加起來。請注意,因為merge是將多個溢寫文件合並到一個文件,所以可能也有相同的 key存在,在這個過程中如果client設置過Combiner,也會使用Combiner來合並相同的key。
至此,map端的所有工作都已結束,最終生成的這個文件也存放在TaskTracker夠得着的某個本地目錄內。每個reduce task不斷地通過RPC從JobTracker那里獲取map task是否完成的信息,如果reduce task得到通知,獲知某台TaskTracker上的map task執行完成,Shuffle的后半段過程開始啟動。
簡單地說,reduce task在執行之前的工作就是不斷地拉取當前job里每個map task的最終結果,然后對從不同地方拉取過來的數據不斷地做merge,也最終形成一個文件作為reduce task的輸入文件。見下圖:
如map 端的細節圖,Shuffle在reduce端的過程也能用圖上標明的三點來概括。當前reduce copy數據的前提是它要從JobTracker獲得有哪些map task已執行結束,這段過程不表,有興趣的朋友可以關注下。Reducer真正運行之前,所有的時間都是在拉取數據,做merge,且不斷重復地在做。 如前面的方式一樣,下面我也分段地描述reduce 端的Shuffle細節:
1. Copy過程,簡單地拉取數據。Reduce進程啟動一些數據copy線程(Fetcher),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。因為map task早已結束,這些文件就歸TaskTracker管理在本地磁盤中。
2. Merge階段。這里的merge如map端的merge動作,只是數組中存放的是不同map端copy來的數值。Copy過來的數據會先放入內存緩沖區 中,這里的緩沖區大小要比map端的更為靈活,它基於JVM的heap size設置,因為Shuffle階段Reducer不運行,所以應該把絕大部分的內存都給Shuffle用。這里需要強調的是,merge有三種形 式:1)內存到內存 2)內存到磁盤 3)磁盤到磁盤。默認情況下第一種形式不啟用,讓人比較困惑,是吧。當內存中的數據量到達一定閾值,就啟動內存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然后在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運 行,直到沒有map端的數據時才結束,然后啟動第三種磁盤到磁盤的merge方式生成最終的那個文件。
3. Reducer的輸入文件。不斷地merge后,最后會生成一個“最終文件”。為什么加引號?因為這個文件可能存在於磁盤上,也可能存在於內存中。對我們 來說,當然希望它存放於內存中,直接作為Reducer的輸入,但默認情況下,這個文件是存放於磁盤中的。至於怎樣才能讓這個文件出現在內存中,之后的性能優化篇我再說。當Reducer的輸入文件已定,整個Shuffle才最終結束。然后就是Reducer執行,把結果放到HDFS上。
上面就是整個Shuffle的過程。細節很多,我很多都略過了,只試着把要點說明白。當然,我可能也有理解或表述上的很多問題,不吝指點。我希望不斷地完 善和修改這篇文章,能讓它通俗、易懂,看完就能知道Shuffle的方方面面。至於具體的實現原理,各位有興趣就自己去探索,如果不方便的話,留言給我, 我再來研究並反饋。