map寫數據到本地磁盤過程解析----spill和merge


    如上次分析,其實map函數中的context.write()調用過程如下所示:

    梳理下調用過程,context的write方法其實是調用了TaskInputOutputContext類的write方法,而在這個write方法內部又調用了output字段的write方法,這個output字段是NewOutputCollector類的一個對象,自然就回到了NewOutputCollector(reduce數量不是0)這個類的write方法,而這個方法內部又調用了本類的一個字段collector的collector方法,而collector字段是MapOutputBuffer類型,接下來就主要分析這兩個類。

1)NewOutputCollector

private final MapOutputCollector<K,V> collector;
private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
private final int partitions;

在構造函數初始化這三個字段,collector初始化MapOutputBuffer的一個對象,partitions就是這次運行job的reduce數量,partitioner就是使用的分區器,利用partitioner對象的getPartitioner(K,V,partitions)就可以得到鍵值對對應的分區號,然后將這三個參數傳給collector的collect方法。MapOutputBuffer才是重點,接下來對其分析。

2)MapOutputBuffer

map的結果並不是直接輸入到硬盤的,而是先寫入內存緩沖區,這個內存緩沖區是通過三個環形結構的數組組成的,這三個環形數組分別是kvoffsets,kvindices,kvbuffer。這個三個數組又有對應的指示器變量,首先給出三個的關系圖:

      kvoffsets和kvindices都是int[]型數組,而kvbuffer是byte[]類型。kvoffset數組中的一個元素就存儲一個鍵值對對應的分區號在kvindices數組中的索引。而kvindices每次使用三個元素來一個鍵值對的先關信息:在kvbuffer中key的起始存儲位置、在kvbuffer中value的起始存儲位置、以及此鍵值對對應的分區號partition。而kvbuffer就負責存儲鍵值對。需要注意的是,如上圖所示,kvoffsets和kvindices存儲信息的大小都是確定的,因為我們完全可以一個int型的正數存儲索引值和分區號,但是kvbuffer中存儲的key、value大小卻不是確定的。所以我們在kvindices中只是存儲了這些key、value的起始位置。我們不能發現,存儲一個鍵值對會帶來16個字節的額外開銷(一個int型變量是4個字節),分別是kvoffsets中的1個int變量+kvindices中的3個int變量。

    其實,這三個數組的大小是由"io.sort.mb"指定,默認io.sort.mb=100,也就是sortmb=100,那么有maxmemUsage=sortmb<<20,也就是100M。默認這100M中kvoffsets和kvindices占5M(由"io.sort.record.percent"指定,默認值是0.05)其中,kvoffsets與kvindices的比例是1:3(kvoffset中一個索引值對應kvindices中的三個元素)。

     在kvbuffer中如果容量達到一個的比例就會觸發spill(溢寫)操作,這個比例由"io.sort.spill.percent"指定,默認值是0.8。同樣的kvoffsets容量達到一定程度也會觸發spill操作,也是由"io.sort.spill.percent"指定。那么為什么要達到一定的比例就spill,而不是寫滿在spill?原因很簡單,如果是寫滿緩存才spill,那么在刷寫磁盤的時候就不能寫入(因為我要讀取這個緩存區域,寫之前“這塊區域”就是歸寫線程所有,其他線程不能訪問),那么寫線程就會阻塞,map結果就要等待寫入緩存,如果達到一定比例就spill,刷寫磁盤的時候就只是緩存中一定比例的區域歸寫線程所有,其他的部分就可以通過寫線程寫入map的輸出,提高了吞吐量。這樣就又存在一個問題,如果寫線程比spill的過程塊,寫線程的那塊兒區域已經寫滿了,但是spill還沒有完成,也要等待,雖然spill可能已經進行了一大半,spill區域的前半部分已經讀取到磁盤。所以才把kvoffsets設計為邏輯上環形數組,寫到末尾的時候通過查看下一個可寫的位置(kvindex)來決定是否可以寫入。如下圖所示:    

首先給出kvoffsets的相關變量:

kvoffsets:

private volatile int kvstart = 0;  //表示當前已寫的數據的開始位置
private volatile int kvend = 0;    // 未執行spill是等於kvstart,執行spill是不等於kvstart(等於kvindex)
private int kvindex = 0;           // 表示下一個可寫的位置
private final int[] kvoffsets;    
//volatile修改的字段表示此字段可以被不同線程訪問和修改

kvbuffer:

private volatile int bufstart = 0; // marks beginning of spill
private volatile int bufend = 0;   // marks beginning of collectable
private volatile int bufvoid = 0;  // marks the point where we should stop
                                  // reading at the end of the buffer
private int bufindex = 0;          // marks end of collected
private int bufmark = 0;           // marks end of record
private byte[] kvbuffer; 

      bufstart、bufend、bufindex與kvsetoffs中對應的變量意義是相同的,不過又增加了兩個變量:bufvoid和bufmark。bufvoid表示實際使用的緩存的最尾部,由於鍵值對的大小是不確定的,所以使用bufmark來標記一個鍵值對的結束,每當寫入一個鍵值對,就更新這個值。

     map輸出的鍵值對存入緩存之前要首先經過序列化,序列化之后此才存儲到緩存中。因為我們是先將序列化的key存入緩存,再將序列化了的value存入緩存,這就存在一個問題:剩下的空間存不下key或者value。

a.如果說存不下value,就會拋出MapBufferTooSmallException異常(這就是觸發spill,開始spill過程了)。

b.如果存不下key,那么就要采取措施了。上張圖:

紅色區域表示已經寫入的鍵值對,第一個圖中bufvoid-bufindex即使當前剩余的存儲空間,而第二章圖中的兩塊藍色部分就是存入序列化的key所需要的空間,很明顯key的存儲跨越了緩存的尾端和首端,但是執行spill的之后我們需要使用RawComparator方法按照key對鍵值對排序,而這個方法只能對連續的二進制內存buffer進行排序,也就需要每個key都是連續存儲的,因此就需要調用BlockingBuffer的reset方法尾端的藍色部分移動到首端,此時更新bufvoid的值。

c.上面是情況是可以存下key,如果key存不下呢,那么就會直接輸出key、value,也就是調用spill線程將鍵值對寫入到文件(這就是觸發了spill機制,開始spil了呀!)。

 3)在spill之前需要將kvbuffer中的數據排序,先按照分區排序,在分區內再按照key排序,排序和spill的操作都是在MapOutputBuffer類中的sortAndSpill中進行的:

  private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {
      //approximate the length of the output file to be the length of the
      //buffer + header lengths for the partitions
      long size = (bufend >= bufstart
          ? bufend - bufstart
          : (bufvoid - bufend) + bufstart) +
                  partitions * APPROX_HEADER_LENGTH;
      FSDataOutputStream out = null;
      try {
        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename =
            mapOutputFile.getSpillFileForWrite(numSpills, size);
        out = rfs.create(filename);

        final int endPosition = (kvend > kvstart)
          ? kvend
          : kvoffsets.length + kvend;
        sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
        int spindex = kvstart;
        IndexRecord rec = new IndexRecord();
        InMemValBytes value = new InMemValBytes();
        for (int i = 0; i < partitions; ++i) {
          IFile.Writer<K, V> writer = null;
          try {
            long segmentStart = out.getPos();
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
                                      spilledRecordsCounter);
            if (combinerRunner == null) {
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                final int kvoff = kvoffsets[spindex % kvoffsets.length];
                getVBytesForOffset(kvoff, value);
                key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
                          (kvindices[kvoff + VALSTART] - 
                           kvindices[kvoff + KEYSTART]));
                writer.append(key, value);
                ++spindex;
              }
            } else {
              int spstart = spindex;
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                ++spindex;
              }
              // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }
            }

            // close the writer
            writer.close();

            // record offsets
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            spillRec.putIndex(rec, i);

            writer = null;
          } finally {
            if (null != writer) writer.close();
          }
        }

        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
          // create spill index file
          Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);
        } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }
        LOG.info("Finished spill " + numSpills);
        ++numSpills;
      } finally {
        if (out != null) out.close();
      }
    }

其實排序事實上只是交換kvoffsets的值,比較對kvoffsets的排序比kvbuffer更快。上圖:

排序默認是升序。繼續看代碼:

 1 for (int i = 0; i < partitions; ++i) {//循環訪問各個分區
 2           IFile.Writer<K, V> writer = null;
 3           try {
 4             long segmentStart = out.getPos();
 5             writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
 6                                       spilledRecordsCounter);
 7             if (combinerRunner == null) {
 8               // spill directly
 9               DataInputBuffer key = new DataInputBuffer();
10               while (spindex < endPosition &&
11                   kvindices[kvoffsets[spindex % kvoffsets.length]
12                             + PARTITION] == i) {
13                 final int kvoff = kvoffsets[spindex % kvoffsets.length];
14                 getVBytesForOffset(kvoff, value);
15                 key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
16                           (kvindices[kvoff + VALSTART] - 
17                            kvindices[kvoff + KEYSTART]));
18                 writer.append(key, value);
19                 ++spindex;
20               }
21             } else {
22               int spstart = spindex;
23               while (spindex < endPosition &&
24                   kvindices[kvoffsets[spindex % kvoffsets.length]
25                             + PARTITION] == i) {
26                 ++spindex;
27               }
28               // Note: we would like to avoid the combiner if we've fewer
29               // than some threshold of records for a partition
30               if (spstart != spindex) {
31                 combineCollector.setWriter(writer);
32                 RawKeyValueIterator kvIter =
33                   new MRResultIterator(spstart, spindex);
34                 combinerRunner.combine(kvIter, combineCollector);
35               }
36             }
37 
38             // close the writer
39             writer.close();
40 
41             // record offsets
42             rec.startOffset = segmentStart;
43             rec.rawLength = writer.getRawLength();
44             rec.partLength = writer.getCompressedLength();
45             spillRec.putIndex(rec, i);
46 
47             writer = null;
48           } finally {
49             if (null != writer) writer.close();
50           }
51         }

可以看出再排序完成之后,循環訪問內存中的每個分區,如果沒有定義combine的話就直接把這個分區的鍵值對spill寫出到磁盤。spill文件的內容形式如下圖所示:

可以看到,spill的文件就是分區的。先是分區順序存儲,在分區內又是按照key順序存儲。

但是,這又存在一個問題,那就是我怎么才能識別出這個spill文件中的各個分區呢?查看上述的42-45行代碼:

 rec.startOffset = segmentStart;
43             rec.rawLength = writer.getRawLength();
44             rec.partLength = writer.getCompressedLength();
45             spillRec.putIndex(rec, i);

而rec和spillRec的定義如下:

 final SpillRecord spillRec = new SpillRecord(partitions);
 IndexRecord rec = new IndexRecord();

IndexRecord有三個字段:

startOffset;//數據的起始位置,也就是一個分區的鍵值對的起始位置
rawLength;//數據原始長度
partLength;//數據壓縮后的長度(采用壓縮技術)

其實一個IndexRecord就對應一個分區內的鍵值對在spill文件的索引信息。

SpilRecord是以分區數量partitions作為初始化參數,很明顯,SpillRecord的就是由一個個IndexRecord組成的,由上面第45行代碼也可以看出。所以除了spill文件,我們還需要一個索引文件,這個索引文件的內容就是SpillRecord中的內容,索引文件格式是spill{$}.out.index,而spill的文件格式是spill{$}.out,l另外,雖然寫完了spill文件之后,就會把SpillRecord的內如寫入一個Spill索引文件,不過這個寫不是寫一個spill文件就對應寫一個spill索引文件,而是超過了一個界限(1MB)再寫index文件:

 if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
          // create spill index file
          Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);
        } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }

而totalIndexCacheMemory和INDEX_CACHE_MEMORY_LIMIT的定義分別如下,均是MapOutputBuffer的字段:

1   private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
2   private int totalIndexCacheMemory;

而MAP_OUTPUT_INDEX_RECORD_LENGTH和indexCacheList的定義如下:

 private ArrayList<SpillRecord> indexCacheList;
private int MAP_OUTPUT_INDEX_RECORD_LENGTH=24;

不難看出,每次spill產生的索引信息存儲在spillRec中,雖然沒有創建每次spill文件對應的索引文件,但是將這個包含索引信息的對象添加到了indexCacheList中,這個對象是SpillRecord類型的容器,如果本次沒有不創建索引文件,那么totalIndexCacheMemory就會被更新。

 totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;

最后的最后,當Map輸入數據處理完畢之后,會調用flush方法將緩存中的數據刷寫道磁盤。

 spill文件和其索引文件的形式以及對應關系如下所示:

 

 

不過,一個map會產生很多的spill文件,當MapTask正常退出之前這些文件會被合並一個大out文件和一個大的index文件,如果想看到中間的spill文件就需要修改一些參數值:

io.sort.mb=10;//用於存儲map輸出鍵值對的緩存,默認值是100(100M)
io.sort.spill.percent=0.1;//緩存spill的比例,默認是0.8.
//將以上兩個參數修改之后就可以使map更多的spill數據到硬盤上,一個直觀的效果就是spill的文件有很多。我們知道IO是很耗時間的,相當於“拖慢”了map的速度。
//另外,還有一個參數:
io.sort.factor=5;//這個參數默認是10,表示執行合並spill文件的時候最多一次可以合並多少個spill文件,調下這個值可以看到合並的過程,也就是看到file.out(所有spill文件合並后的大文件)和spill.out並存

上圖:

以下是spill文件的路徑:

spill是mapreduce的中間結果,存儲在數據節點的本地磁盤上,存儲路徑由以下參數指定:

hadoop.tmp.dir="/home/hadoop/HadoopData/tmp/";//這是hadoop的臨時基礎目錄,很對目錄都依賴它
mapred.local.dir=${hadoop.tmp.dir}/mapred/local;//這是默認值,也就是mapred的中間數據文件的存儲目錄。

最后要把所有的spill文件和index文件合並,合並之后就是這樣:

3)Merge

Merge就是將多個spill文件合並成一個大的file.out文件,首先讀取若干spill文件的內容到內存,然后對其排序,排序方法時多路歸並排序,首先按照分區號排序,在按照key排序(默認是快排),歸並過程簡要描述如下:

 

以上就是map輸出到磁盤的過程,這些中間文件(fiel.out,file.out.inde)將來是要等着Reducer取走的,不過並不是Reducer取走之后就刪除的,因為Reducer可能會運行失敗,在整個Job完成之后,JobTracker通知Mapper可以刪除了才會將這些中間文件刪掉.向硬盤中寫數據的時機:

(1)當內存緩沖區不能容下一個太大的kv對時。spillSingleRecord方法。

(2)內存緩沖區已滿時。SpillThread線程。

(3)Mapper的結果都已經collect了,需要對緩沖區做最后的清理。Flush方法。

感謝:

http://www.cnblogs.com/esingchan/p/3947156.html

http://zheming.wang/hadoop-mapreduce-zhi-xing-liu-cheng-xiang-jie.html

http://www.cnblogs.com/forfuture1978/archive/2010/11/19/1882279.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM