MapReduce剖析筆記之八: Map輸出數據的處理類MapOutputBuffer分析


在上一節我們分析了Child子進程啟動,處理Map、Reduce任務的主要過程,但對於一些細節沒有分析,這一節主要對MapOutputBuffer這個關鍵類進行分析。

MapOutputBuffer顧名思義就是Map輸出結果的一個Buffer,用戶在編寫map方法的時候有一個參數OutputCollector:

  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)  throws IOException;

這個OutputCollector是一個接口,典型實現是OldOutputCollector,這個類的構造方法如下:

    OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
      numPartitions = conf.getNumReduceTasks();
      if (numPartitions > 0) {
        partitioner = (Partitioner<K,V>)
          ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
      } else {
        partitioner = new Partitioner<K,V>() {
          @Override
          public void configure(JobConf job) { }
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return -1;
          }
        };
      }
      this.collector = collector;
    }

可以看出,其核心是MapOutputCollector的對象,另外,在構造方法里還創建了Partitioner<K,V> partitioner對象,如果用戶寫了分區的自定義方法,那么通過反射即可實例化自定義類(),否則使用系統自帶的類。即默認為HashPartitioner,這在前面我們已經分析過:

  public Class<? extends Partitioner> getPartitionerClass() {
    return getClass("mapred.partitioner.class",
                    HashPartitioner.class, Partitioner.class);
  }

這樣的話,當用戶調用OutputCollector的collect()方法的時候,獲取Key對應的分區號(getPartition())后,實際上調用的就是MapOutputCollector的collect()方法:

    public void collect(K key, V value) throws IOException {
      try {
        collector.collect(key, value, partitioner.getPartition(key, value, numPartitions));
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
        throw new IOException("interrupt exception", ie);
      }
    }

MapOutputCollector實際上是一個接口,實現該接口的類有MapOutputBuffer和DirectMapOutputCollector,后者用於一個作業在沒有Reduce階段時使用,讓Map處理的數據直接寫入HDFS,前面已經看過這段代碼:

    MapOutputCollector collector = null;
    if (numReduceTasks > 0) {
      collector = new MapOutputBuffer(umbilical, job, reporter);
    } else { 
      collector = new DirectMapOutputCollector(umbilical, job, reporter);
    }

典型的場合下使用的就是MapOutputBuffer的collect方法。因為用戶在編寫Map方法的時候,對於映射后的KV都是調用collect方法執行,因此關於KV的分區、合並、壓縮、緩存、寫盤等等功能都是在MapOutputBuffer的統一指揮下進行的。

明白了MapOutputBuffer的作用,我們下面分析一下MapOutputBuffer的細節。

 

MapOutputBuffer類里面包含的變量比較多,我們對其關鍵變量進行分析:

1、int partitions,分區數量,表示Map任務的輸出需要分為多份,partitions的值等於job.getNumReduceTasks(),也就是等於Reduce的數量;

2、TaskReporter reporter,是一個Child子進程向父進程TaskTracker匯報狀態的線程類,匯報接口使用umbilical RPC接口,這在前面各節已經多次分析過,不再贅述。

3、Class<K> keyClass和Class<K> valClass代表Map處理的Key和Value的類信息,代表用戶上傳的配置文件中指定的"mapred.mapoutput.key.class""mapred.mapoutput.value.class"

      keyClass = (Class<K>)job.getMapOutputKeyClass();
      valClass = (Class<V>)job.getMapOutputValueClass();

  public Class<?> getMapOutputKeyClass() {
    Class<?> retv = getClass("mapred.mapoutput.key.class", null, Object.class);
    if (retv == null) {
      retv = getOutputKeyClass();
    }
    return retv;
  }

  public Class<?> getMapOutputValueClass() {
    Class<?> retv = getClass("mapred.mapoutput.value.class", null,
        Object.class);
    if (retv == null) {
      retv = getOutputValueClass();
    }
    return retv;
  }

4,RawComparator<K> comparator,表示用來對Key-Value記錄進行排序的自定義比較器:

  public RawComparator getOutputKeyComparator() {
    Class<? extends RawComparator> theClass = getClass("mapred.output.key.comparator.class",
            null, RawComparator.class);
    if (theClass != null)
      return ReflectionUtils.newInstance(theClass, this);
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
  }

Map處理的輸入並不排序,會對處理完畢后的結果進行排序,此時就會用到該比較器。

5,SerializationFactory serializationFactory,序列化工廠類,其功能是從配置文件中讀取序列化類的集合。Map處理的輸出是Key,Value集合,需要進行序列化才能寫到緩存以及文件中。

6,Serializer<K> keySerializer和Serializer<V> valSerializer分別用於對Map后的Key和Value進行序列化。其創建來自序列化工廠類:

      keySerializer = serializationFactory.getSerializer(keyClass);
      keySerializer.open(bb);
      valSerializer = serializationFactory.getSerializer(valClass);
      valSerializer.open(bb);

這里又涉及一個變量bb,其定義是:BlockingBuffer bb = new BlockingBuffer()

BlockingBuffer是MapOutputBuffer的一個內部類,繼承於java.io.DataOutputStream,keySerializer和valSerializer使用BlockingBuffer的意義在於將序列化后的Key或Value送入BlockingBuffer。在其serialize序列化方法中,將可序列化的對象(實現Writable接口的對象)序列化后寫入流,此處這個流也就是BlockingBuffer:

    public void serialize(Writable w) throws IOException {
      w.write(dataOut);
    }

Writable是個接口,w.write方法又有什么實現呢?取決於KV類型。Hadoop中需要序列化的對象(包括輸入輸出Key,Value都必須是可序列化的)繼承於Writable接口,該接口提供兩個方法:讀和寫:

public interface Writable {

  void write(DataOutput out) throws IOException;

  void readFields(DataInput in) throws IOException;
}

Hadoop內也有一些典型的實現,比較典型的比如IntWritable,其實現就是調用java.io.DataInput中的方法:

  public void readFields(DataInput in) throws IOException {
    value = in.readInt();
  }

  public void write(DataOutput out) throws IOException {
    out.writeInt(value);
  }

因此,當對Key、Value進行序列化的時候,實際上是調用如IntWritable(假如Key,Value類型是整型)的write方法,該方法又是反過來,調用DataOutput的writeInt方法。

在此處,BlockingBuffer內部又引入一個類:Buffer,也是MapOutputBuffer的一個內部類,繼承於java.io.OutputStream。為什么要引入兩個類呢?BlockingBuffer和Buffer有什么區別?初步來看,Buffer是一個基本的緩沖區,提供了write、flush、close等方法,BlockingBuffer提供了markRecord、reset方法,處理Buffer的邊界等一些特殊情況,是Buffer的進一步封裝,可以理解為是增強了Buffer的功能。Buffer實際上最終也封裝了一個字節緩沖區,即后面我們要分析的非常關鍵的byte[] kvbuffer,基本上,Map之后的結果暫時都會存入kvbuffer這個緩存區,等到要慢的時候再刷寫到磁盤,Buffer這個類的作用就是對kvbuffer進行封裝,比如在其write方法中存在以下代碼:

public synchronized void write(byte b[], int off, int len)
{
        spillLock.lock();
        try {
          do {
。。。。。。。。
             } while (buffull && !wrap);
          } finally {
          spillLock.unlock();
          }
        // here, we know that we have sufficient space to write
        if (buffull) {
          final int gaplen = bufvoid - bufindex;
          System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
          len -= gaplen;
          off += gaplen;
          bufindex = 0;
        }
        System.arraycopy(b, off, kvbuffer, bufindex, len);
        bufindex += len;
      }
}

上面的System.arraycopy就是將要寫入的b(序列化后的數據)寫入到kvbuffer中。關於kvbuffer,我們后面會詳細分析,這里需要知道的是序列化后的結果會調用該方法進一步寫入到kvbuffer也就是Map后結果的緩存中,后面可以看見,kvbuffer寫到一定程度的時候(80%),需要將已經寫了的結果刷寫到磁盤,這個工作是由Buffer的write判斷的。在kvbuffer這樣的字節數組中,會被封裝為一個環形緩沖區,這樣,一個Key可能會切分為兩部分,一部分在尾部,一部分在字節數組的開始位置,雖然這樣讀寫沒問題,但在對KeyValue進行排序時,需要對Key進行比較,這時候需要Key保持字節連續,因此,當出現這種情況下,需要對Buffer進行重啟(reset)操作,這個功能是在BlockingBuffer中完成的,因此,Buffer相當於封裝了kvbuffer,實現環形緩沖區等功能,BlockingBuffer則繼續對此進行封裝,使其支持內部Key的比較功能。本質上,這個緩沖區需要是一個Key-Value記錄的緩沖區,而byte[] kvbuffer只是一個字節緩沖區,因此需要進行更高層次的封裝。比如:1,到達一定程度需要刷寫磁盤;2,Key需要保持字節連續等等。

那么,上面write這個方法又是什么時候調用的呢?實際上就是MapOutputBuffer的collect方法中,會對KeyValue進行序列化,在序列化方法中,會進行寫入:

 

    public void serialize(Writable w) throws IOException {
      w.write(dataOut);
    }

 

此處的dataout就是前面keySerializer.open(bb)這一方法中傳進來的,也就是BlockingBuffer(又封裝了Buffer):

    public void open(OutputStream out) {
      if (out instanceof DataOutputStream) {
        dataOut = (DataOutputStream) out;
      } else {
        dataOut = new DataOutputStream(out);
      }
    }

因此,當執行序列化方法serialize的時候,會調用Buffer的write方法,最終將數據寫入byte[] kvbuffer。

 

 

7,CombinerRunner<K,V> combinerRunner,用於對Map處理的輸出結果進行合並處理,減少Shuffle網絡開銷。CombinerRunner是一個抽象類,根據新舊API的不同,有兩種實現:OldCombinerRunner、NewCombinerRunner。這兩個類里面都有一個combine方法,實現KeyValue的合並。以OldCombinerRunner為例,其combine方法如下:

    protected void combine(RawKeyValueIterator kvIter,
                           OutputCollector<K,V> combineCollector
                           ) throws IOException {
      Reducer<K,V,K,V> combiner = 
        ReflectionUtils.newInstance(combinerClass, job);
      try {
        CombineValuesIterator<K,V> values = 
          new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, 
                                         valueClass, job, Reporter.NULL,
                                         inputCounter);
        while (values.more()) {
          combiner.reduce(values.getKey(), values, combineCollector,
              Reporter.NULL);
          values.nextKey();
        }
      } finally {
        combiner.close();
      }
    }

從其代碼可以看出,首先根據combinerClass利用反射機制創建了一個combiner對象,實際上這個對象就是一個遵從Reducer接口的對象。之后利用CombineValuesIterator對KV進行逐一提取,執行其reduce方法,CombineValuesIterator在上一節看過,是ValuesIterator的子類,可以看出,combiner實現的就是本Map任務內的、局部的reduce。

 

8,CombineOutputCollector<K, V> combineCollector,即Combine之后的輸出對象。其創建代碼為:

      if (combinerRunner != null) {
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf);
      } else {
        combineCollector = null;
      }

其定義里面有一個Writer對象:

  protected static class CombineOutputCollector<K extends Object, V extends Object> 
  implements OutputCollector<K, V> {
    private Writer<K, V> writer;
。。。

當啟用了Combine功能后,會調用上面的combine方法進行(reduce)操作后再寫入文件(reduce里會使用CombineOutputCollector對象進行collect,見下面Reducer接口的reduce定義代碼),這里的Writer就是寫入文件的作用。如果沒有啟用Combine功能呢,則直接利用Writer寫文件。

  void reduce(K2 key, Iterator<V2> values,
              OutputCollector<K3, V3> output, Reporter reporter)
    throws IOException;

 

9,CompressionCodec codec,用於對Map的輸出進行壓縮。其創建代碼為:

      // compression
      if (job.getCompressMapOutput()) {
        Class<? extends CompressionCodec> codecClass =
          job.getMapOutputCompressorClass(DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, job);
      }

是否對Map的輸出進行壓縮決定於變量"mapred.compress.map.output",默認不壓縮。

 

10,int[] kvoffsets,int[] kvindices,byte[] kvbuffer。三者是為了記錄KV緩存的數據結構,kvBuffer按照Key-Value(序列化后)的順序記錄,前面說的BlockingBuffer和Buffer封裝的底層數據結構就是kvbuffer(它們都是內部類,可以處理MapOutputBuffer中的變量);kvindices記錄了一堆kvindex,每個kvindex包含三個信息:分區號、Key和Value在kvbuffer中的位置;為了對kvindices中的kvindex進行定位,於是有了第三個結構kvoffsets,只記錄每個kvindex的位置(一個整數即可),另外一個作用是當超過了一定數量后,則會觸發Spill操作,Spill的中文指溢出,大致的含義是當緩存放慢了,就溢出寫到磁盤上去。三者關系的示意圖如下:

上面的結構有什么好處呢?我們知道,Map輸出的結果是一堆KV對,可以不斷地存入kvbuffer中,但怎么按照分區號提取相應的KV對呢?kvindices就是干這個的,通過解析這個數組,就可以得到某個分區的所有KV的位置。之所以需要按照分區號提取,是因為Map的輸出結果需要分為多份,分別送到不同的Reduce任務,否則還需要對key進行計算才得到分區號,除了提高速度之外,更關鍵的作用是排序,Map處理后的結果有多份,每一份默認是按照分區號對KV記錄進行排序的,但是在kvbuffer中源源不斷過來的KeyValue序列並沒有什么順序,為此,當對kvbuffer中的某一個分區的KeyValue序列進行排序時,排序結果只需要將kvoffsets中對應的索引項進行交換即可(后面會看到這一詳細過程),保證kvoffsets中索引的順序其實就想記錄的KeyValue的真實順序。換句話說,我們要對一堆對象進行排序,實際上只要記錄他們索引的順序(類似於指針數組,每個指針指向一個對象)即可,原始記錄保持不動(因為空間很大),而kvoffsets就是一堆整數的序列,交換起來快得多。

從上面的圖可以看出,對於任意一個KeyValue記錄,都會額外產生16個字節的索引開銷,其中12個字節是kvindices中用於記錄分區號、Key位置和Value位置(都是整型),另外4個字節是kvoffsets中的整數值。MapOutputBuffer類里也定義了幾個變量用於說明上述四個變量的位置和所占字節數:

    private static final int PARTITION = 0; // partition offset in acct
    private static final int KEYSTART = 1;  // key offset in acct
    private static final int VALSTART = 2;  // val offset in acct
    private static final int ACCTSIZE = 3;  // total #fields in acct
    private static final int RECSIZE =
                       (ACCTSIZE + 1) * 4;  // acct bytes per record

ACCT表示kvindices中的一個kvindex,ACCTSIZE也就是3個字節,這里的命名稍微有些不規范,RECSIZE稱為記錄大小,這里的記錄指的就是對每個KV索引的大小,即3+1=4個字節。

kvbuffer、kvindices、kvoffsets三個數組的大小之和由參數"io.sort.mb"指定,默認是sortmb=100,於是maxMemUsage = sortmb << 20,即100MB(1MB=2^20B),maxMemUsage是MapOutputBuffer所占內存的主要部分。這100MB中,有一部分拿出來存儲kvindices和kvoffsets,占比為"io.sort.record.percent",默認是recper=0.05,即5MB左右用來(需要是16的整數倍)存儲kvindices和kvoffsets。另外95MB左右用以存儲kvbuffer。

在kvbuffer中,如果達到了一定容量,需要Spill到磁盤上,這個門限由參數"io.sort.spill.percent"指定,默認是spillper=0.8。softBufferLimit這個門限就是用於記錄Spill門限容量:

softBufferLimit = (int)(kvbuffer.length * spillper);

此外,除了kvbuffer增加會引起Spill之外,kvoffsets的膨脹也會引起Spill,比例也是spillper=0.8,這個門限由softRecordLimit參數記錄:

softRecordLimit = (int)(kvoffsets.length * spillper);

即無論哪個到達了80%,都觸發Spill。為什么到達80%就需要刷寫磁盤呢?如果寫滿了才刷寫磁盤,那么在刷寫磁盤的過程中不能寫入,寫就被阻塞了,但是如果到了一定程度就刷寫磁盤,那么緩沖區就一直有剩余空間可以寫,這樣就可以設計成讀寫不沖突,提高吞吐量。KV緩存中的最頂級索引是kvoffsets,因此當出現Spill時,需要將kvoffsets中已經記錄的索引對應的KV提取出來進行寫磁盤,當spill后,kvoffsets又成為空數組。我們粗略想一下,kvoffsets不斷地往后增加記錄,到達一定程度后,觸發Spill,於是從頭(即下標0)到當前位置(比如稱為kvindex)的所有索引對應的KV都寫到磁盤上,Spill結束(此時假定KV緩存寫入暫停)后,又從下標0開始增加記錄,這種形式會有什么問題?

一個比較大的問題是Spill的時候,意味着有個用戶在讀取kvoffsets從0-kvindex的數據,這個時候這部分數據就不能寫,因為下一次寫要從下標0開始,這樣就需要對kvoffsets加鎖才行,否則會引起讀錯誤,這樣的話,還是難以實現讀寫並行。為了解決這種加鎖引發的性能問題,典型方法就是采用環形緩沖區。kvoffsets看做一個環形數組,Spill的時候,只要kvbuffer和kvoffsets還沒有滿(能容納新的KeyValue記錄和索引),kvoffsets仍然可以繼續往后面寫;同理,kvbuffer也是一個環形緩沖區,這樣的話,如果我們把spill到磁盤這一過程用另外一個線程實現(Hadoop里面確實也是這么做的),那么讀寫可以不沖突,提高了性能。

實現環形緩沖區的典型方法也是Hadoop中采用的方法。以kvoffsets為例,一共有三個變量:kvstart、kvindex和kvend。kvstart表示當前已寫的數據的開始位置,kvindex表示寫一個下一個可寫的位置,因此,從kvstart到(kvindex-1)這部分數據就是已經寫的數據,另外一個線程來Spill的時候,讀取的數據就是這一部分。而寫線程仍然從kvindex位置開始,並不沖突(如果寫得太快而讀得太慢,追了一圈后可以通過變量值判斷,也無需加鎖,只是等待)。

舉例來說,下面的第一個圖表示按順時針往kvoffsets里面加入索引,此時kvend=kvstart,但kvindex遞增;當觸發Spill的時候,kvend=kvindex,Spill的值涵蓋從kvstart到kvend-1區間的數據,kvindex不影響,繼續按照進入的數據遞增;當進行完Spill的時候,kvindex增加,kvstart移動到kvend處,在Spill這段時間,kvindex可能已經往前移動了,但並不影響數據的讀取,因此,kvend實際上一般情況下不變,只有在要讀取環形緩沖區中的數據時發生一次改變(即設置kvend=kvindex):

在源代碼的解釋中,kvstart是記錄spill的起始位置,kvend是記錄collectable的起始位置,kvindex是記錄collected的結束位置,collect即前面說過的map方法產生的KV對需要寫入緩沖區,從生產者-消費者角度來看,collect就是這個環形緩沖區的生產者,或者叫寫線程;spill是這個環形緩沖區的消費者,或者叫讀線程。這樣看來,spill每次消費多少數據實際上可以與上面的圖有所差別,比如目前只Spill從1-8這個區間的數據,那么之后kvstart設置為9所在的位置即可,下一次Spill即從9開始。

上圖反映了環形緩沖區的利用,對於kvbuffer的使用原理也一樣,同樣存在三個變量:bufstart、bufmark(為什么不叫bufindex呢?下面分析)、bufend。對於kvoffsets來說,有三個變量就可以實現環形緩沖區,但對於kvbuffer來說,三個變量還不夠,這是為什么呢?因為kvoffsets里面都是以整數為基本單位,每個整數占用4個字節,kvoffsets的類型是int[],不會出現什么問題,使用起來也很方便。但是kvbuffer就不一樣,其定義為byte[],但是一個Key-Value的長度是不固定的,雖然形式上環形緩沖區不存在頭部和尾部的概念,但其物理上緩沖區還是存在頭和尾,並不是物理連續的,按理來說,對於Key-Value的操作,只要把接口封裝好,讓上層應用看起來是連續的即可。但Hadoop里面對Key的比較設計成逐字節比較,其定義為:

public interface RawComparator<T> extends Comparator<T> {

  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

}

為什么不設計成compare(Key1 key, Key2 key)這種形式呢?這不是更直觀嗎,個人理解,排序是對Map后的Key-Value緩沖區操作的,如果將Key、Value都看做JAVA對象,設計Object的排序,排序的速度要比byte這個層次更差,因為封裝的層次更高了,所以,將所有key全部序列化后存入緩沖區,然后對其進行排序操作會更快,這樣的話,就需要Key在物理上(實際上是JAVA字節數組這個層次,當然不是指硬盤的磁道等等更底層的層次)保持連續,畢竟,按Key排序作為MapReduce中一個很核心的東西,這樣做還是值得的。為此,在緩存里面就需要保證Key的連續性,自然,當往緩沖區里面寫入一個會超越邊界的key的時候,就需要進行特殊處理,這種處理由BlockingBuffer實現,稱為reset,當檢測到這種情況的時候,就調用一下reset,代碼如下:

        // serialize key bytes into buffer
        int keystart = bufindex;
        keySerializer.serialize(key);
        if (bufindex < keystart) {
          // wrapped the key; reset required
          bb.reset();
          keystart = 0;
        }

所謂reset,其實就是把跨越邊界(如何判斷:寫入一個Key之后的bufindex還比寫之前的bufindex位置還小)的Key的尾部拷貝一下到頭部,使其連續。bufindex的含義和前面kvindex類似,代表了下一個可以寫的位置。如下圖所示,紅色表示已經寫入的KeyValue記錄,藍色表示要寫入的下一個Key,在調用Buffer的write方法后,如果發現跨越了邊界(bufindex變小了),則將尾部的那塊藍色區域拷貝到頭部,頭部那塊藍色區域往后挪,形成一個整體的Key,尾部藍色那塊區域空出來的就無效了,在讀的時候就需要跳過。這樣就需要多個變量來記錄位置信息,除了bufindex,bufvoid就表示這個緩沖區在讀取的時候需要停止的位置,因為這個位置不一定是緩沖區的最大長度,但肯定只會在緩沖區的尾巴處出現,所以需要1個變量來記錄;這里還新增了一個bufmark,其含義是一個KeyValue的結尾處,因為kvoffsets里面不存在這個問題,每個整數值就是一個基本單元,但一個KeyValue長度不一,需要用bufmark記錄下來。每當序列化寫入一個Key-Value對,就更新這個數值。記下來的目的之一比如說下面這種情況,需要將尾部的藍色區域拷貝到頭部的時候,就需要知道尾部這一段有多少個字節,bufvoid-bufmark就等於尾部這段藍色區域的長度。

 

理解了上面的變量,reset的代碼就比較簡單了,如下,其中headbytelen就是尾部藍色區域的長度,另外,在下面的代碼中,如果拷貝后的key長度超過了bufstart,也就是空間不夠了,那么就會直接把key直接輸出,此時bufindex置為0:

      protected synchronized void reset() throws IOException {
        int headbytelen = bufvoid - bufmark;
        bufvoid = bufmark;
        if (bufindex + headbytelen < bufstart) {
          System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
          System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
          bufindex += headbytelen;
        } else {
          byte[] keytmp = new byte[bufindex];
          System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
          bufindex = 0;
          out.write(kvbuffer, bufmark, headbytelen);
          out.write(keytmp);
        }
      }

 

 12,SpillThread spillThread,這是一個線程對象,繼承於Thread:

    private final SpillThread spillThread = new SpillThread();

其作用就是當kvbuffer或kvoffsets超過80%以上,將觸發該線程將kvbuffer中的數據寫入到磁盤。讀寫分別是兩個線程。觸發Spill的代碼為:

    private synchronized void startSpill() {
      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex + "; length = " + kvoffsets.length);
      kvend = kvindex;
      bufend = bufmark;
      spillReady.signal();
    }

從這里可以看出,觸發Spill時,正是我們前面分析過的,需要執行這個動作:kvend = kvindex、bufend=bufmark。注意,寫入磁盤的數據實際上只是kvbuffer里面的記錄,kvoffsets索引只是同步更新,后面我們會看到,跟kvbuffer同時寫入的實際上是有一個索引數據的,但不是上面提到的這幾個。spillReady是一個java.util.concurrent.locks.Condition對象,環形緩沖區的讀寫線程之間的同步使用JAVA中提供的方法實現,涉及到以下變量:

 

    private final ReentrantLock spillLock = new ReentrantLock();
    private final Condition spillDone = spillLock.newCondition();
    private final Condition spillReady = spillLock.newCondition();

 

 我們前面分析過,環形緩沖區在一個生產者和一個消費者條件下,雙方對讀寫數據是不需要加鎖的,因為讀寫數據不會位於同一個位置,大家處理的是整個環上不同的部分,那么這里引入鎖的目的是什么呢?一種情況自然是當緩沖區滿了的時候,此時可以使用鎖,理論上也可以判斷變量,看看是否寫滿等等,但無論如何此時寫線程需要阻塞,如果寫線程每過一段時間來掃描一下是否可以寫,這種方式造成的延時跟另一個線程直接發信號通知比起來更慢;另外,讀寫雙方涉及到三個變量kvstart、kvend、kvindex的修改。也就是說,當寫完畢,或者讀完畢時需要修改變量的時候,加鎖保證了變量的一致性。這里不使用synchronized這種傳統的同步方法,主要原因是synchronized不夠靈活,擴展性不好,JAVA中提供了另外一種機制,即ReentrantLock等典型鎖類。這種方式靈活性更好,因為鎖只是一個對象(synchronized是一個關鍵字,用語法來支持同步)。ReentrantLock的含義是可重入鎖,因為Re-entrant就是可以重新進入的意思,什么叫可重入呢?比如一個函數被遞歸調用,在執行這個函數代碼的過程中,還沒執行完畢又被再次調用,就是不斷重入的意思。ReentrantLock也如此,一個線程A獲得鎖以后,這個線程A還可以繼續再次獲取這把鎖,其他線程B要想獲得這把鎖,要么等着A把鎖釋放了,如果A不顯式釋放,但是通過發信號等待的方式,也可以間接地使得鎖釋放(這一點很關鍵)。此時,線程B就可以獲得這把鎖。讓一個線程多次獲取一把鎖有什么意義呢?比如有兩段代碼分布在不同的地方,都加了同樣的一個鎖對象,某個線程則可以連續執行這兩段代碼,因為是一把鎖。否則,執行完了第一段后,第二段就無法執行了,這樣就很容易出現死鎖。另外,通過發信號(而且可以有很多不同的信號)的方式釋放鎖,為線程在不同特定條件下釋放鎖提供了極大靈活性。

在線程A拿到鎖之后,可以通過發送信號控制其它線程B的執行。比如A拿到了鎖,但是需要等待一個條件C才能往下執行,但我又不想釋放這把鎖,於是可以調用一個稱為C.await的方法,讓線程B可以獲得這把鎖,去執行它的代碼,當在線程B里滿足了條件C,它調用C.signal這個方法,則可以讓線程A的條件滿足不再等待,接着往下執行(但需要線程B釋放鎖,或者也調用另外一個條件D的await方法)。這種同步模式比較靈活,所以一般來說,典型應用場景是兩個線程共有一把ReentrantLock鎖,並且有兩個以上共有的條件變量Condition1、Condition2。一個線程負責執行Condition1.signal和Condition2.await;另一個線程負責執行Condition2.signal和Condition1.await。

比如上面的例子就創建了兩種信號:spillDone和spillReady,它們的邏輯如下:

1)對於寫線程來說,如果寫滿了,就調用spillDone.await等待spillDone信號;否則不斷往緩沖區里面寫,到了一定程度,就發送spillReady.signal這個信號給讀線程,發完這個信號后如果緩沖區沒滿,就釋放鎖繼續寫(這段代碼無需鎖),如果滿了,就等待spillDone信號;

2)對於讀線程來說,在平時調用spillReady.await等待spillReady這個信號,當讀取之后(此時寫線程要么釋放鎖了,要么調用spillDone.await在等待了,讀線程肯定可以獲得鎖),則把鎖釋放掉,開始Spill(這段代碼無需鎖),完了讀線程再次獲取鎖,修改相應參數,發送信號spillDone給寫線程,表明Spill完畢。

上面的線程同步模式+環形緩沖區的使用是經典案例,值得仔細學習。

作為SpillThread這個消費者、讀線程而言,主要代碼是在其run方法內:

 

      public void run() {
        spillLock.lock();
        spillThreadRunning = true;
        try {
          while (true) {
            spillDone.signal();
            while (kvstart == kvend) {
              spillReady.await();
            }
            try {
              spillLock.unlock();
              sortAndSpill();
            } catch (Exception e) {
             。。。
            } finally {
              spillLock.lock();
              if (bufend < bufindex && bufindex < bufstart) {
                bufvoid = kvbuffer.length;
              }
              kvstart = kvend;
              bufstart = bufend;
            }
          }
        } catch (InterruptedException e) {
          ........
        } finally {
          spillLock.unlock();
          spillThreadRunning = false;
        }
      }

MapOutputBuffer的collect方法是生產者、寫線程,主要代碼即在該方法內,其中startSpill前面已經看過,主要是改變kvend值以及發送spillReady信號:kvnext是kvindex加1,用於判斷是否寫滿,如果kvnext==kvstart,表示寫滿,布爾變量kvfull則為true。

kvsoftlimit是是否超過Spill門限的標志。

    public synchronized void collect(K key, V value, int partition
                                     ) throws IOException {
      final int kvnext = (kvindex + 1) % kvoffsets.length;
      spillLock.lock();
      try {
        boolean kvfull;
        do {
          // sufficient acct space
          kvfull = kvnext == kvstart;
          final boolean kvsoftlimit = ((kvnext > kvend)
              ? kvnext - kvend > softRecordLimit
              : kvend - kvnext <= kvoffsets.length - softRecordLimit);
          if (kvstart == kvend && kvsoftlimit) {
            startSpill();
          }
          if (kvfull) {
            try {
              while (kvstart != kvend) {
                reporter.progress();
                spillDone.await();
              }
            } catch (InterruptedException e) {
。。。。。。。。
            }
          }
        } while (kvfull);
      } finally {
        spillLock.unlock();
      }
。。。。
寫數據
。。。

 

 13,ArrayList<SpillRecord> indexCacheList,這個是SpillRecord的數組,SpillRecord里面緩存的是一個一個的記錄,所以並不是一整塊無結構字節流,而是以IndexRecord為基本單位組織起來 的,IndexRecord非常簡單,描述了一個記錄在緩存中的起始偏移、原始長度、實際長度(可能壓縮)等信息。SpillRecord里面放了一堆 IndexRecord,並有方法可以插入記錄、獲取記錄等。IndexRecord的定義很簡單如下:

class IndexRecord {
  long startOffset;
  long rawLength;
  long partLength;

  public IndexRecord() { }

  public IndexRecord(long startOffset, long rawLength, long partLength) {
    this.startOffset = startOffset;
    this.rawLength = rawLength;
    this.partLength = partLength;
  }
}

SpillRecord的意義在什么地方呢?當kvbuffer觸發Spill的時候,會將kvbuffer的記錄寫入到磁盤(實際上還會包括記錄的長度等信息)。Spill結束后,會生成一個spill文件,這個文件內部包含很多分區的數據,但是是排序過的KeyValue數據(關於排序后面會討論),分為兩層,首先是對分區號進行排序,其次是在一個分區號內,按照Key的大小進行排序,因此Spill文件是一個分區的數據接着一個分區的數據,且每個分區里面的Key-Value都已經按照Key的順序進行排列;SpillRecord就記錄了每個分區數據在文件中的起始位置、長度、以及壓縮長度,這些內容表示成IndexRecord。一個IndexRecord記錄的是一個分區的位置信息,因為一個Spill文件包含N個分區,於是就會有N個IndexRecord,這N個IndexRecord記錄在一個SpillRecord對象中。SpillRecord里面有兩個變量:ByteBuffer buf,以及LongBuffer entries。ByteBuffer和LongBuffer都是java.nio里面提供的類,ByteBuffer是IndexRecord存儲的真正區域,LongBuffer就是對ByteBuffer進行了一點接口封裝,把它當做一個存儲Long型的Buffer。這種概念類似於數據庫里面的視圖跟表的關系一樣。因為IndexRecor里面包含三個Long型變量,每個8字節,因此一個 IndexRecord記錄占用24字節,這就是MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH這個變量指定的。分區數量是numPartitions,因此一個文件需要numPartitions*24來記錄,這也就是一個SpillRecord的大小,每個文件都有一個SpillRecord,因為Spill會有很多次,每次都寫成一個文件,所以會有很多個Spill文件,對應於很多個SpillRecord,這很多個SpillRecord即為ArrayList<SpillRecord> indexCacheList。

為什么要把各個分區數據的位置記錄下來呢?因為MapReduce對Map后的結果會按照分區號對Key-Value進行排序,假定最終生成了10個Spill文件,需要按照分區,將每個分區對應的數據全部拿出來進行歸並排序(Merger),這種排序在Map這一端就有兩個階段,首先是一個Spill文件內部要按照分區對KV排好序(kvoffsets排好序按照順序寫進Spill文件),之后還得把10個Spill文件內部的KV拿過來歸並排序。另外,實際上在Reduce端還會進行歸並排序,因為我們目前討論的都只是在單個Map任務內的排序,Reduce之前還會把各個Map任務排好序的結果進行再次歸並排序,可見,有三種歸並排序,MapReduce中的排序就是不斷地進行歸並排序的過程。

另外,除了將kvbuffer的數據寫進文件,SpillRecord的信息也會寫到文件里,作為后面多個Spill文件歸並的索引。如果不寫入這個信息,怎么知道Spill文件里面的KeyValue是屬於哪個分區呢?如果沒有這個信息,也就無法實現后面的歸並。

 

14,IndexedSorter sorter,理解了上面的過程,這個變量就容易了,如何對map后的KeyValue進行排序就取決於該對象。IndexedSorter是一個接口,用戶可以實現自定義的方法,其創建代碼如下:

      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);

可以看出,如果用戶沒有配置,默認就使用Hadoop自帶的QuickSort類,即快速排序。另外,排序的規則是對Key進行比較,這里采用的比較對象就是RawComparator<K> comparator。

排序的對象是一個IndexedSortable接口對象,MapOutputBuffer實現了這個接口中的compare和swap方法,compare方法即比較兩個Key的大小,返回整數:

    public int compare(int i, int j) {
      final int ii = kvoffsets[i % kvoffsets.length];
      final int ij = kvoffsets[j % kvoffsets.length];
      // sort by partition
      if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
        return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
      }
      // sort by key
      return comparator.compare(kvbuffer,
          kvindices[ii + KEYSTART],
          kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
          kvbuffer,
          kvindices[ij + KEYSTART],
          kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
    }

可見,這個比較分為兩個層次,首先是分區號的排序,在分區號相同條件下,再進行Key的比較。怎么進行Key的比較呢?每個kvoffsets里面就一個字節,記錄了對應的kvindex,kvindex又有3字節,分別是分區號、key在kvbuffer的位置、value在kvbuffer的位置,所以其比較就是首先獲得i、j對應的兩個kvindex,最終調用RawComparator<K> comparator的compare方法,比較兩個Key值的大小,key在kvbuffer中的位置是在kvindices[ii + 1]開始到kvindices[ii + 2]之間,另一個key的位置是在kvbuffer的kvindices[ij + 1]kvindices[ij + 2]之間。前面已經對kvbuffer、kvindices、kvoffsets進行了詳細分析,這里也就比較簡單了。

在排序的過程中會進行交換,kvbuffer和kvindices都保持不變,只有kvoffsets進行了交換:

 

    public void swap(int i, int j) {
      i %= kvoffsets.length;
      j %= kvoffsets.length;
      int tmp = kvoffsets[i];
      kvoffsets[i] = kvoffsets[j];
      kvoffsets[j] = tmp;
    }

 

因為按照排序原則,如果不是同一個分區的KV,那就不用排序;如果是同一個分區的KV,那就進行排序,所以最終的排序只在kvoffsets中進行交換,當交換完畢后,排序也就結束。要寫入文件時,只要按照kvoffsets的順序將對應的kvbuffer中的數據寫入文件即可。

 

15,上面對MapOutputBuffer涉及的變量進行了分析,其原理也基本涵蓋在上面的各個分析之中,下面我們來看一看collect方法的過程。

該方法的聲明為:

    public synchronized void collect(K key, V value, int partition
                                     ) throws IOException

其作用就是對map之后的KeyValue進行處理。

首先獲得kvoffsets中的kvindex的下一個位置,用於判斷kvoffsets是否寫滿:

final int kvnext = (kvindex + 1) % kvoffsets.length;

因為kvindex代表了下一個可寫的位置,如果再下一個已經等於kvstart,那么說明已經寫滿了,需要等待SpillThread處理。

於是設置了一個變量kvfull = kvnext == kvstart;即二者相等時即為true。

要判斷是否Spill,加鎖:

spillLock.lock();

之后判斷是否應該Spill:

          final boolean kvsoftlimit = ((kvnext > kvend)
              ? kvnext - kvend > softRecordLimit
              : kvend - kvnext <= kvoffsets.length - softRecordLimit);

之所以會有兩種情況,是因為這是一個環形緩沖區,可能kvnext大於kvend(沒有Spill時等於kvstart)很多,也可能kvnext已經繞回到了0那個位置,不管怎樣,如果兩者的差距(絕對值)大於softRecordLimit(80%的kvoffsets),則kvsoftlimit=true。

如果kvstart==kvend,表示此時沒有處於Spill(前面分析過,Spill時會將kvend設置為kvindex),並且如果滿足了kvsoftlimit,則進行Spill,向SpillThread發信號:

          if (kvstart == kvend && kvsoftlimit) {
            LOG.info("Spilling map output: record full = " + kvsoftlimit);
            startSpill();
          }

發完信號后不一定可以寫了,因為此時緩沖區說不定滿了,所以如果滿了,就等待SpillDone信號,這個信號是SpillThread發過來的:

          if (kvfull) {
            try {
              while (kvstart != kvend) {
                reporter.progress();
                spillDone.await();
              }
            } catch (InterruptedException e) {
              throw (IOException)new IOException(
                  "Collector interrupted while waiting for the writer"
                  ).initCause(e);
            }
          }

好了,如果跳出來了,說明此時緩沖區可寫了,於是把鎖釋放,准備往緩沖區里面寫數據(再重復一遍,讀寫數據不用加鎖):

finally {
        spillLock.unlock();
      }

要寫入key,首先要將其序列化:

        int keystart = bufindex;
        keySerializer.serialize(key);

之后,因為有可能key序列化后超出了kvbuffer的邊界,進行一些邊界條件處理,這一邊界問題在前面已經分析過:

        if (bufindex < keystart) {
          // wrapped the key; reset required
          bb.reset();
          keystart = 0;
        }

緊接着是對value進行序列化:

        // serialize value bytes into buffer
        final int valstart = bufindex;
        valSerializer.serialize(value);
        int valend = bb.markRecord();

之后,更新kvindices,kvoffsets中的索引信息:

        // update accounting info
        int ind = kvindex * ACCTSIZE;
        kvoffsets[kvindex] = ind;
        kvindices[ind + PARTITION] = partition;
        kvindices[ind + KEYSTART] = keystart;
        kvindices[ind + VALSTART] = valstart;
        kvindex = kvnext;

此處的ind就是新的kvindex的位置,乘以3字節就等於其在kvindices中的位置。同時更新kvindices,kvindex向前移動一個字節。

於是,collect方法就結束了,KV已經被序列化進入kvbuffer了,下面看一看SpillThread涉及到的方法。

 

16,SpillThread在構造方法中被啟動:

      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw (IOException)new IOException("Spill thread failed to initialize"
            ).initCause(sortSpillException);
      } finally {
        spillLock.unlock();
      }

進入SpillThread的run方法,該方法的處理邏輯在前面已經分析過,主要涉及的方法是sortAndSpill。

首先獲得要寫入的Spill文件的大小:

      //approximate the length of the output file to be the length of the
      //buffer + header lengths for the partitions
      long size = (bufend >= bufstart
          ? bufend - bufstart
          : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH;

每個分區都會有一些頭開銷,此處為150個字節,這個與Spill文件的文件格式有關,在每個分區之前都會加入一些記錄信息,這里可以看出,Spill文件里面實際上是所有分區的數據混合在一起(但是是一個分區的數據跟着另一個分區的數據)。

然后獲取要寫入的本地文件的文件名,注意不是HDFS文件,而是本地Linux文件:

        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size);
        out = rfs.create(filename);

  public Path getSpillFileForWrite(int spillNumber, long size)
      throws IOException {
    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
        + spillNumber + ".out", size, conf);
  }

在這時會創建一個與Spill文件對應的SpillRecord對象(輸入參數為分區總數),其文件名為:

TaskTracker.OUTPUT + "/spill" + spillNumber + ".out"

TaskTracker.OUTPUT其實就是一個字符串String OUTPUT = "output",所以Spill的文件名為output/spill2.out等,表示這個文件是第2個Spill文件(最終會有多個Spill文件,前面分析過)。

然后調用上面分析過的排序對象進行排序,實際上就是通過交換kvoffsets里面的字節達到目的:

        final int endPosition = (kvend > kvstart)
          ? kvend
          : kvoffsets.length + kvend;
        sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);

之后是一個大循環,對每個分區依次進行以下操作。

創建一個寫文件的對象:

  writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter);

此時有兩種情況,排序后的Key-Value不一定直接寫入文件,如果需要在Map端進行合並(Combiner)的話,則先進行合並再寫入:

我們先來看不需要合並的代碼。就是一個循環:

            DataInputBuffer key = new DataInputBuffer();
            while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) {
              final int kvoff = kvoffsets[spindex % kvoffsets.length];
              getVBytesForOffset(kvoff, value);
              key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]));
              writer.append(key, value);
              ++spindex;
            }

注意while條件中只挑選那些分區號滿足大循環中當前分區號的數據,獲得KeyValue在kvbuffer中的位置(kvoff),然后key的值就從kvindices[kvoff + KEYSTART]kvindices[kvoff + VALSTART]之間。KEYSTART和VALSTART是固定值1、2,我們再回顧一下,kvindices[kvoff]記錄的是分區號、kvindices[kvoff + 1]記錄的Key在kvbuffer中的起始位置,kvindices[kvoff + 2]記錄的是Value在kvbuffer中的起始位置,於是就得到了key。

Value的獲取是利用getVBytesForOffset實現的。原理也一樣:

 

    private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
      final int nextindex = (kvoff / ACCTSIZE == (kvend - 1 + kvoffsets.length) % kvoffsets.length)
        ? bufend
        : kvindices[(kvoff + ACCTSIZE + KEYSTART) % kvindices.length];
      int vallen = (nextindex >= kvindices[kvoff + VALSTART])
        ? nextindex - kvindices[kvoff + VALSTART]
        : (bufvoid - kvindices[kvoff + VALSTART]) + nextindex;
      vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
    }

 

即nextindex要么是bufend,要么是繞一圈之后的對應值。

之后調用writer.append(key, value)寫入KV即可。

如果是需要對KeyValue進行合並的,則執行combine方法:

              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }

combine方法我們前面分析過,其實就是調用了用戶寫的reduce方法:

    protected void combine(RawKeyValueIterator kvIter,
                           OutputCollector<K,V> combineCollector
                           ) throws IOException {
      Reducer<K,V,K,V> combiner = 
        ReflectionUtils.newInstance(combinerClass, job);
      try {
        CombineValuesIterator<K,V> values = new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, 
                                         valueClass, job, Reporter.NULL,
                                         inputCounter);
        while (values.more()) {
          combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL);
          values.nextKey();
        }
      } finally {
        combiner.close();
      }
    }
  }

當寫入Spill文件后,還需要對SpillRecord進行記錄:

            // record offsets
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            spillRec.putIndex(rec, i);

即當前這個分區中數據的起始位置、原始長度、壓縮后長度。在Writer類中,其append方法會將Key長度和Value長度都寫進去:

      WritableUtils.writeVInt(out, keyLength);
      WritableUtils.writeVInt(out, valueLength);
      out.write(key.getData(), key.getPosition(), keyLength); 
      out.write(value.getData(), value.getPosition(), valueLength);

使用的VInt即變長整數編碼,這種編碼方式類似於ProtoBuf(但是否完全一樣還沒分析),見我寫的另外一篇介紹ProtocolBuffer的博客。可以看出,KeyValue的記錄加上了Key的長度、Value的長度兩個信息,如果不加無法區分Key、Value的邊界。

注意到,如果設置了壓縮,則在Writer構造方法里將寫入流對象換成另外一個:

      if (codec != null) {
        this.compressor = CodecPool.getCompressor(codec);
        this.compressor.reset();
        this.compressedOut = codec.createOutputStream(checksumOut, compressor);
        this.out = new FSDataOutputStream(this.compressedOut,  null);
        this.compressOutput = true;
      } else {
        this.out = new FSDataOutputStream(checksumOut,null);
      }

按照上面的過程,對每個分區進行循環即可不斷地寫入到Spill文件,可見,一個Spill文件,比如output/spill2.out這個文件,其內容是一個分區跟着一個分區,每個分區里面的數據都經過了排序。每次觸發Spill的時候就會生成一個文件。如:

output/spill1.out、output/spill2.out、output/spill3.out、....

寫完了Spill文件后,還會把SpillRecord的內容寫入成一個Spill索引文件,不過這個寫不是一個Spill文件就對應於一個索引文件,而是超過了一個界限(1MB)再寫入:

        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
          // create spill index file
          Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);
        } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }

getSpillIndexFileForWrite方法來看,其命名是output/spill2.out.index等等:

  public Path getSpillIndexFileForWrite(int spillNumber, long size)
      throws IOException {
    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
        + spillNumber + ".out.index", size, conf);
  }

所以,實際上並不是一個Spill文件就對應於一個spill index文件。但一個Spill文件確實是對應於一個SpillRecord的,一個SpillRecord的大小等於分區數量*24字節。

 

17,到此為止,MapOutputBuffer的基本處理過程就明白了,那么,什么時候結束呢,自然是當Map輸入數據處理完畢之后,由下面的代碼進行結束的:

    try {
      runner.run(in, new OldOutputCollector(collector, conf), reporter);
      collector.flush();
      
      in.close();
      in = null;
      collector.close();
      collector = null;
    } finally {
      closeQuietly(in);
      closeQuietly(collector);
    }

此時就調用了collector的flush方法。在map內只是調用其collect方法。因此我們再來看看其flush方法。

flush方法的邏輯還是比較清楚的,首先對kvbuffer內剩余還沒有Spill的數據進行Spill:

      spillLock.lock();
      try {
        while (kvstart != kvend) {
          reporter.progress();
          spillDone.await();
        }
        if (sortSpillException != null) {
          throw (IOException)new IOException("Spill failed"
              ).initCause(sortSpillException);
        }
        if (kvend != kvindex) {
          kvend = kvindex;
          bufend = bufmark;
          sortAndSpill();
        }
      } catch (InterruptedException e) {
        throw (IOException)new IOException(
            "Buffer interrupted while waiting for the writer"
            ).initCause(e);
      } finally {
        spillLock.unlock();
      }

可以看出,此時是這個線程調用了sortAndSpill方法(之前是SpillThread那個線程調用)。

全部刷寫到磁盤后,給SpillThread線程發送暫停信號,等待SpillThread關閉(join方法):

      try {
        spillThread.interrupt();
        spillThread.join();
      } catch (InterruptedException e) {
        throw (IOException)new IOException("Spill failed"
            ).initCause(e);
      }

之后,我們得到了N個Spill文件以及多個索引文件,於是需要按照分區歸並成分區數量個文件,調用mergeParts方法。mergeParts方法的目的是將多個Spill文件合並為一個,注意,雖然最后要把結果送到多個Reduce任務去,但仍然只是寫到一個文件里,不同Reduce任務需要的數據在文件的不同區域。按照SpillRecord索引信息可以取出來。

 

18,在mergeParts里,首先獲得這些Spill文件的文件名:

      for(int i = 0; i < numSpills; i++) {
        filename[i] = mapOutputFile.getSpillFile(i);
        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
      }

如果numSpills=1,那么Spill文件相當於就是要Map輸出的文件,因為在Spill內部已經進行了排序。而且因為沒有多余的Spill文件需要歸並,所以重命名文件名即可:

      if (numSpills == 1) { //the spill is the final output
        rfs.rename(filename[0], new Path(filename[0].getParent(), "file.out"));
        if (indexCacheList.size() == 0) {
          rfs.rename(mapOutputFile.getSpillIndexFile(0),
              new Path(filename[0].getParent(),"file.out.index"));
        } else {
          indexCacheList.get(0).writeToFile(new Path(filename[0].getParent(),"file.out.index"), job);
        }
        return;
      }

此時,Map輸出文件名為output/file.out和output/file.out.index。

如果多於一個Spill文件,則需要進行歸並處理。

首先將全部索引數據從文件中讀出來,加入到indexCacheList數組里,這里似乎有一個問題,如果索引文件太大怎么辦,會不會導致Out of Memory?不過粗略算一下應該不太可能,假定Reduce個數是100個,一個SpillRecord的大小則是2400字節。假定Map任務輸出100個Spill文件,則indexCacheList大小為240000字節,240KB。這個數量級已經是MapReduce中比較大的了,所以可以忽略這個問題。

      // read in paged indices
      for (int i = indexCacheList.size(); i < numSpills; ++i) {
        Path indexFileName = mapOutputFile.getSpillIndexFile(i);
        indexCacheList.add(new SpillRecord(indexFileName, job, null));
      }

獲得這個indexCacheList的目的是為了方便地找到某個分區在各個Spill文件中的位置,以便進行歸並處理:

之后,獲得最終要輸出的文件名:

      //make correction in the length to include the sequence file header
      //lengths for each partition
      finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
      finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
      Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize);
      Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);

從下面可以看出Map輸出的文件名,分別是file.out和file.out.index,最終輸出也就是這兩個文件:

  public Path getOutputFileForWrite(long size)
      throws IOException {
    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
        + "file.out", size, conf);
  }

  public Path getOutputIndexFileForWrite(long size)
      throws IOException {
    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
        + "file.out.index", size, conf);
  }

創建文件,rfs是本地文件系統:

      //The output stream for the final single output file
      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

如果一個分區文件都沒有,也需要創建記錄文件(CRC等信息,這樣更不會出錯,否則會不會文件被刪了?):

      if (numSpills == 0) {
        //create dummy files
        IndexRecord rec = new IndexRecord();
        SpillRecord sr = new SpillRecord(partitions);
        try {
          for (int i = 0; i < partitions; i++) {
            long segmentStart = finalOut.getPos();
            Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
            writer.close();
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            sr.putIndex(rec, i);
          }
          sr.writeToFile(finalIndexFile, job);
        } finally {
          finalOut.close();
        }
        return;
      }

否則,對於每個分區進行一個大循環,內部對每個Spill文件進行一個小循環:

        for (int parts = 0; parts < partitions; parts++) {
          List<Segment<K,V>> segmentList =
            new ArrayList<Segment<K, V>>(numSpills);
          for(int i = 0; i < numSpills; i++) {
            IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

            Segment<K,V> s =
              new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
                               indexRecord.partLength, codec, true);
            segmentList.add(i, s);
。。。。。。。

segmentList是關於一個分區的信息,這個分區信息在每一個Spill文件中都存在,根據IndexRecord可以生成出來,除了位置信息,還包括是否采用了壓縮等等信息。

之后,調用Merger中的merge方法進行歸並處理:

          //merge
          @SuppressWarnings("unchecked")
          RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                         keyClass, valClass, codec,
                         segmentList, job.getInt("io.sort.factor", 100),
                         new Path(mapId.toString()),
                         job.getOutputKeyComparator(), reporter,
                         null, spilledRecordsCounter);

這個方法也比較復雜,主要實現的是歸並排序,在后面各節再進行詳細分析。這里可以看出,在一個Map任務內,對於某個分區的那些記錄,默認用快速排序(QuickSort)實現,之后更大范圍的排序使用歸並排序。

歸並完畢后,將其寫入文件,這里又見到了Combine,我們在前面已經分析過Combine,那里是對每個刷寫Spill文件之前某個分區的KV進行合並,這里是對歸並排序后每個分區的結果進行歸並,是不是冗余了?實際上不是,前面那個Combine還是局部的Combine,后面這個Combine是在前面的那個合並的基礎上進行的再次合並。比如要對64MB的文本計算hello這個單詞出現的次數,前面那個Combine比如是對每1MB內的文本累積次數,一共有64個數,最后這個Combine是對64個數加起來,得到64MB中hello的次數,這就是Map的輸出結果;Reduce那邊則是對整個大文件(比如6400MB)的hello次數根據不同Map任務(即100個數)輸出的結果進行再次累和,Combine基本上可以理解為就是Map端的Reduce。因此,從Combine、Sort等過程來看,MapReduce就是一個將小數據的結果進行處理,得到局部(合並、排序)結果后,然后不斷匯總處理的過程。基本上有三個階段,一個是在單個Spill內,一個是單個Map內,一個是全局處理。個人理解這算是MapReduce的核心思想。

          Writer<K, V> writer =
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
                               spilledRecordsCounter);
          if (combinerRunner == null || numSpills < minSpillsForCombine) {
            Merger.writeFile(kvIter, writer, reporter, job);
          } else {
            combineCollector.setWriter(writer);
            combinerRunner.combine(kvIter, combineCollector);
          }

同樣,對每個分區都記錄索引信息:

          // record offsets
          rec.startOffset = segmentStart;
          rec.rawLength = writer.getRawLength();
          rec.partLength = writer.getCompressedLength();
          spillRec.putIndex(rec, parts);

等到每個分區都完成了上面的步驟后,將索引信息寫入到一個文件:

 

spillRec.writeToFile(finalIndexFile, job);

然后刪除以前寫入的各個Spill文件:

        for(int i = 0; i < numSpills; i++) {
          rfs.delete(filename[i],true);
        }

於是整個Map輸出過程即結束。

 

后記:本節將Map處理后的結果(Key-Value記錄序列)如何處理的過程分析了一遍,其核心思想是要按照分區來處理,以便送到不同的Reduce任務,先緩存、到達一定程度后刷寫磁盤,刷寫之前進行Spill這個層面的Combine和Sort,得到N個Spill文件,最后,對N個Spill文件的結果進行歸並排序和二次Combine。最終得到一個結果文件寫入到本地,等待Reduce來取,至於Reduce怎么來取,以及Map端又怎么配合,在后續博文中再進行分析。

 另外,從本節可以看出,一個好的框架不僅僅是思想,更重要的是為了實現這些想法,采用哪些算法和數據結構,比如緩存怎么設計,排序如何實現,使得流程既高效,又通用,這可能就是軟件框架設計的核心吧。慢慢學習。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM