如上次分析,其實map函數中的context.write()調用過程如下所示:
梳理下調用過程,context的write方法其實是調用了TaskInputOutputContext類的write方法,而在這個write方法內部又調用了output字段的write方法,這個output字段是NewOutputCollector類的一個對象,自然就回到了NewOutputCollector(reduce數量不是0)這個類的write方法,而這個方法內部又調用了本類的一個字段collector的collector方法,而collector字段是MapOutputBuffer類型,接下來就主要分析這兩個類。
1)NewOutputCollector
private final MapOutputCollector<K,V> collector; private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner; private final int partitions;
在構造函數初始化這三個字段,collector初始化MapOutputBuffer的一個對象,partitions就是這次運行job的reduce數量,partitioner就是使用的分區器,利用partitioner對象的getPartitioner(K,V,partitions)就可以得到鍵值對對應的分區號,然后將這三個參數傳給collector的collect方法。MapOutputBuffer才是重點,接下來對其分析。
2)MapOutputBuffer
map的結果並不是直接輸入到硬盤的,而是先寫入內存緩沖區,這個內存緩沖區是通過三個環形結構的數組組成的,這三個環形數組分別是kvoffsets,kvindices,kvbuffer。這個三個數組又有對應的指示器變量,首先給出三個的關系圖:
kvoffsets和kvindices都是int[]型數組,而kvbuffer是byte[]類型。kvoffset數組中的一個元素就存儲一個鍵值對對應的分區號在kvindices數組中的索引。而kvindices每次使用三個元素來一個鍵值對的先關信息:在kvbuffer中key的起始存儲位置、在kvbuffer中value的起始存儲位置、以及此鍵值對對應的分區號partition。而kvbuffer就負責存儲鍵值對。需要注意的是,如上圖所示,kvoffsets和kvindices存儲信息的大小都是確定的,因為我們完全可以一個int型的正數存儲索引值和分區號,但是kvbuffer中存儲的key、value大小卻不是確定的。所以我們在kvindices中只是存儲了這些key、value的起始位置。我們不能發現,存儲一個鍵值對會帶來16個字節的額外開銷(一個int型變量是4個字節),分別是kvoffsets中的1個int變量+kvindices中的3個int變量。
其實,這三個數組的大小是由"io.sort.mb"指定,默認io.sort.mb=100,也就是sortmb=100,那么有maxmemUsage=sortmb<<20,也就是100M。默認這100M中kvoffsets和kvindices占5M(由"io.sort.record.percent"指定,默認值是0.05)其中,kvoffsets與kvindices的比例是1:3(kvoffset中一個索引值對應kvindices中的三個元素)。
在kvbuffer中如果容量達到一個的比例就會觸發spill(溢寫)操作,這個比例由"io.sort.spill.percent"指定,默認值是0.8。同樣的kvoffsets容量達到一定程度也會觸發spill操作,也是由"io.sort.spill.percent"指定。那么為什么要達到一定的比例就spill,而不是寫滿在spill?原因很簡單,如果是寫滿緩存才spill,那么在刷寫磁盤的時候就不能寫入(因為我要讀取這個緩存區域,寫之前“這塊區域”就是歸寫線程所有,其他線程不能訪問),那么寫線程就會阻塞,map結果就要等待寫入緩存,如果達到一定比例就spill,刷寫磁盤的時候就只是緩存中一定比例的區域歸寫線程所有,其他的部分就可以通過寫線程寫入map的輸出,提高了吞吐量。這樣就又存在一個問題,如果寫線程比spill的過程塊,寫線程的那塊兒區域已經寫滿了,但是spill還沒有完成,也要等待,雖然spill可能已經進行了一大半,spill區域的前半部分已經讀取到磁盤。所以才把kvoffsets設計為邏輯上環形數組,寫到末尾的時候通過查看下一個可寫的位置(kvindex)來決定是否可以寫入。如下圖所示:
首先給出kvoffsets的相關變量:
kvoffsets:
private volatile int kvstart = 0; //表示當前已寫的數據的開始位置 private volatile int kvend = 0; // 未執行spill是等於kvstart,執行spill是不等於kvstart(等於kvindex) private int kvindex = 0; // 表示下一個可寫的位置 private final int[] kvoffsets; //volatile修改的字段表示此字段可以被不同線程訪問和修改
kvbuffer:
private volatile int bufstart = 0; // marks beginning of spill private volatile int bufend = 0; // marks beginning of collectable private volatile int bufvoid = 0; // marks the point where we should stop // reading at the end of the buffer private int bufindex = 0; // marks end of collected private int bufmark = 0; // marks end of record private byte[] kvbuffer;
bufstart、bufend、bufindex與kvsetoffs中對應的變量意義是相同的,不過又增加了兩個變量:bufvoid和bufmark。bufvoid表示實際使用的緩存的最尾部,由於鍵值對的大小是不確定的,所以使用bufmark來標記一個鍵值對的結束,每當寫入一個鍵值對,就更新這個值。
map輸出的鍵值對存入緩存之前要首先經過序列化,序列化之后此才存儲到緩存中。因為我們是先將序列化的key存入緩存,再將序列化了的value存入緩存,這就存在一個問題:剩下的空間存不下key或者value。
a.如果說存不下value,就會拋出MapBufferTooSmallException異常(這就是觸發spill,開始spill過程了)。
b.如果存不下key,那么就要采取措施了。上張圖:
紅色區域表示已經寫入的鍵值對,第一個圖中bufvoid-bufindex即使當前剩余的存儲空間,而第二章圖中的兩塊藍色部分就是存入序列化的key所需要的空間,很明顯key的存儲跨越了緩存的尾端和首端,但是執行spill的之后我們需要使用RawComparator方法按照key對鍵值對排序,而這個方法只能對連續的二進制內存buffer進行排序,也就需要每個key都是連續存儲的,因此就需要調用BlockingBuffer的reset方法尾端的藍色部分移動到首端,此時更新bufvoid的值。
c.上面是情況是可以存下key,如果key存不下呢,那么就會直接輸出key、value,也就是調用spill線程將鍵值對寫入到文件(這就是觸發了spill機制,開始spil了呀!)。
3)在spill之前需要將kvbuffer中的數據排序,先按照分區排序,在分區內再按照key排序,排序和spill的操作都是在MapOutputBuffer類中的sortAndSpill中進行的:
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); int spindex = kvstart; IndexRecord rec = new IndexRecord(); InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { final int kvoff = kvoffsets[spindex % kvoffsets.length]; getVBytesForOffset(kvoff, value); key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART])); writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } // close the writer writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } finally { if (null != writer) writer.close(); } } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } }
其實排序事實上只是交換kvoffsets的值,比較對kvoffsets的排序比kvbuffer更快。上圖:
排序默認是升序。繼續看代碼:
1 for (int i = 0; i < partitions; ++i) {//循環訪問各個分區 2 IFile.Writer<K, V> writer = null; 3 try { 4 long segmentStart = out.getPos(); 5 writer = new Writer<K, V>(job, out, keyClass, valClass, codec, 6 spilledRecordsCounter); 7 if (combinerRunner == null) { 8 // spill directly 9 DataInputBuffer key = new DataInputBuffer(); 10 while (spindex < endPosition && 11 kvindices[kvoffsets[spindex % kvoffsets.length] 12 + PARTITION] == i) { 13 final int kvoff = kvoffsets[spindex % kvoffsets.length]; 14 getVBytesForOffset(kvoff, value); 15 key.reset(kvbuffer, kvindices[kvoff + KEYSTART], 16 (kvindices[kvoff + VALSTART] - 17 kvindices[kvoff + KEYSTART])); 18 writer.append(key, value); 19 ++spindex; 20 } 21 } else { 22 int spstart = spindex; 23 while (spindex < endPosition && 24 kvindices[kvoffsets[spindex % kvoffsets.length] 25 + PARTITION] == i) { 26 ++spindex; 27 } 28 // Note: we would like to avoid the combiner if we've fewer 29 // than some threshold of records for a partition 30 if (spstart != spindex) { 31 combineCollector.setWriter(writer); 32 RawKeyValueIterator kvIter = 33 new MRResultIterator(spstart, spindex); 34 combinerRunner.combine(kvIter, combineCollector); 35 } 36 } 37 38 // close the writer 39 writer.close(); 40 41 // record offsets 42 rec.startOffset = segmentStart; 43 rec.rawLength = writer.getRawLength(); 44 rec.partLength = writer.getCompressedLength(); 45 spillRec.putIndex(rec, i); 46 47 writer = null; 48 } finally { 49 if (null != writer) writer.close(); 50 } 51 }
可以看出再排序完成之后,循環訪問內存中的每個分區,如果沒有定義combine的話就直接把這個分區的鍵值對spill寫出到磁盤。spill文件的內容形式如下圖所示:
可以看到,spill的文件就是分區的。先是分區順序存儲,在分區內又是按照key順序存儲。
但是,這又存在一個問題,那就是我怎么才能識別出這個spill文件中的各個分區呢?查看上述的42-45行代碼:
rec.startOffset = segmentStart; 43 rec.rawLength = writer.getRawLength(); 44 rec.partLength = writer.getCompressedLength(); 45 spillRec.putIndex(rec, i);
而rec和spillRec的定義如下:
final SpillRecord spillRec = new SpillRecord(partitions);
IndexRecord rec = new IndexRecord();
IndexRecord有三個字段:
startOffset;//數據的起始位置,也就是一個分區的鍵值對的起始位置 rawLength;//數據原始長度 partLength;//數據壓縮后的長度(采用壓縮技術)
其實一個IndexRecord就對應一個分區內的鍵值對在spill文件的索引信息。
SpilRecord是以分區數量partitions作為初始化參數,很明顯,SpillRecord的就是由一個個IndexRecord組成的,由上面第45行代碼也可以看出。所以除了spill文件,我們還需要一個索引文件,這個索引文件的內容就是SpillRecord中的內容,索引文件格式是spill{$}.out.index,而spill的文件格式是spill{$}.out,l另外,雖然寫完了spill文件之后,就會把SpillRecord的內如寫入一個Spill索引文件,不過這個寫不是寫一個spill文件就對應寫一個spill索引文件,而是超過了一個界限(1MB)再寫index文件:
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; }
而totalIndexCacheMemory和INDEX_CACHE_MEMORY_LIMIT的定義分別如下,均是MapOutputBuffer的字段:
1 private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024; 2 private int totalIndexCacheMemory;
而MAP_OUTPUT_INDEX_RECORD_LENGTH和indexCacheList的定義如下:
private ArrayList<SpillRecord> indexCacheList; private int MAP_OUTPUT_INDEX_RECORD_LENGTH=24;
不難看出,每次spill產生的索引信息存儲在spillRec中,雖然沒有創建每次spill文件對應的索引文件,但是將這個包含索引信息的對象添加到了indexCacheList中,這個對象是SpillRecord類型的容器,如果本次沒有不創建索引文件,那么totalIndexCacheMemory就會被更新。
totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
最后的最后,當Map輸入數據處理完畢之后,會調用flush方法將緩存中的數據刷寫道磁盤。
spill文件和其索引文件的形式以及對應關系如下所示:
不過,一個map會產生很多的spill文件,當MapTask正常退出之前這些文件會被合並一個大out文件和一個大的index文件,如果想看到中間的spill文件就需要修改一些參數值:
io.sort.mb=10;//用於存儲map輸出鍵值對的緩存,默認值是100(100M) io.sort.spill.percent=0.1;//緩存spill的比例,默認是0.8. //將以上兩個參數修改之后就可以使map更多的spill數據到硬盤上,一個直觀的效果就是spill的文件有很多。我們知道IO是很耗時間的,相當於“拖慢”了map的速度。 //另外,還有一個參數: io.sort.factor=5;//這個參數默認是10,表示執行合並spill文件的時候最多一次可以合並多少個spill文件,調下這個值可以看到合並的過程,也就是看到file.out(所有spill文件合並后的大文件)和spill.out並存
上圖:
以下是spill文件的路徑:
spill是mapreduce的中間結果,存儲在數據節點的本地磁盤上,存儲路徑由以下參數指定:
hadoop.tmp.dir="/home/hadoop/HadoopData/tmp/";//這是hadoop的臨時基礎目錄,很對目錄都依賴它 mapred.local.dir=${hadoop.tmp.dir}/mapred/local;//這是默認值,也就是mapred的中間數據文件的存儲目錄。
最后要把所有的spill文件和index文件合並,合並之后就是這樣:
3)Merge
Merge就是將多個spill文件合並成一個大的file.out文件,首先讀取若干spill文件的內容到內存,然后對其排序,排序方法時多路歸並排序,首先按照分區號排序,在按照key排序(默認是快排),歸並過程簡要描述如下:
以上就是map輸出到磁盤的過程,這些中間文件(fiel.out,file.out.inde)將來是要等着Reducer取走的,不過並不是Reducer取走之后就刪除的,因為Reducer可能會運行失敗,在整個Job完成之后,JobTracker通知Mapper可以刪除了才會將這些中間文件刪掉.向硬盤中寫數據的時機:
(1)當內存緩沖區不能容下一個太大的kv對時。spillSingleRecord方法。
(2)內存緩沖區已滿時。SpillThread線程。
(3)Mapper的結果都已經collect了,需要對緩沖區做最后的清理。Flush方法。
感謝:
http://www.cnblogs.com/esingchan/p/3947156.html
http://zheming.wang/hadoop-mapreduce-zhi-xing-liu-cheng-xiang-jie.html
http://www.cnblogs.com/forfuture1978/archive/2010/11/19/1882279.html