在上一節我們分析了Child子進程啟動,處理Map、Reduce任務的主要過程,但對於一些細節沒有分析,這一節主要對MapOutputBuffer這個關鍵類進行分析。
MapOutputBuffer顧名思義就是Map輸出結果的一個Buffer,用戶在編寫map方法的時候有一個參數OutputCollector:
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException;
這個OutputCollector是一個接口,典型實現是OldOutputCollector,這個類的構造方法如下:
OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) { numPartitions = conf.getNumReduceTasks(); if (numPartitions > 0) { partitioner = (Partitioner<K,V>) ReflectionUtils.newInstance(conf.getPartitionerClass(), conf); } else { partitioner = new Partitioner<K,V>() { @Override public void configure(JobConf job) { } @Override public int getPartition(K key, V value, int numPartitions) { return -1; } }; } this.collector = collector; }
可以看出,其核心是MapOutputCollector的對象,另外,在構造方法里還創建了Partitioner<K,V> partitioner對象,如果用戶寫了分區的自定義方法,那么通過反射即可實例化自定義類(),否則使用系統自帶的類。即默認為HashPartitioner,這在前面我們已經分析過:
public Class<? extends Partitioner> getPartitionerClass() { return getClass("mapred.partitioner.class", HashPartitioner.class, Partitioner.class); }
這樣的話,當用戶調用OutputCollector的collect()方法的時候,獲取Key對應的分區號(getPartition())后,實際上調用的就是MapOutputCollector的collect()方法:
public void collect(K key, V value) throws IOException { try { collector.collect(key, value, partitioner.getPartition(key, value, numPartitions)); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException("interrupt exception", ie); } }
MapOutputCollector實際上是一個接口,實現該接口的類有MapOutputBuffer和DirectMapOutputCollector,后者用於一個作業在沒有Reduce階段時使用,讓Map處理的數據直接寫入HDFS,前面已經看過這段代碼:
MapOutputCollector collector = null; if (numReduceTasks > 0) { collector = new MapOutputBuffer(umbilical, job, reporter); } else { collector = new DirectMapOutputCollector(umbilical, job, reporter); }
典型的場合下使用的就是MapOutputBuffer的collect方法。因為用戶在編寫Map方法的時候,對於映射后的KV都是調用collect方法執行,因此關於KV的分區、合並、壓縮、緩存、寫盤等等功能都是在MapOutputBuffer的統一指揮下進行的。
明白了MapOutputBuffer的作用,我們下面分析一下MapOutputBuffer的細節。
MapOutputBuffer類里面包含的變量比較多,我們對其關鍵變量進行分析:
1、int partitions,分區數量,表示Map任務的輸出需要分為多份,partitions的值等於job.getNumReduceTasks(),也就是等於Reduce的數量;
2、TaskReporter reporter,是一個Child子進程向父進程TaskTracker匯報狀態的線程類,匯報接口使用umbilical RPC接口,這在前面各節已經多次分析過,不再贅述。
3、Class<K> keyClass和Class<K> valClass代表Map處理的Key和Value的類信息,代表用戶上傳的配置文件中指定的"mapred.mapoutput.key.class"和"mapred.mapoutput.value.class"
keyClass = (Class<K>)job.getMapOutputKeyClass(); valClass = (Class<V>)job.getMapOutputValueClass(); public Class<?> getMapOutputKeyClass() { Class<?> retv = getClass("mapred.mapoutput.key.class", null, Object.class); if (retv == null) { retv = getOutputKeyClass(); } return retv; } public Class<?> getMapOutputValueClass() { Class<?> retv = getClass("mapred.mapoutput.value.class", null, Object.class); if (retv == null) { retv = getOutputValueClass(); } return retv; }
4,RawComparator<K> comparator,表示用來對Key-Value記錄進行排序的自定義比較器:
public RawComparator getOutputKeyComparator() { Class<? extends RawComparator> theClass = getClass("mapred.output.key.comparator.class", null, RawComparator.class); if (theClass != null) return ReflectionUtils.newInstance(theClass, this); return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); }
Map處理的輸入並不排序,會對處理完畢后的結果進行排序,此時就會用到該比較器。
5,SerializationFactory serializationFactory,序列化工廠類,其功能是從配置文件中讀取序列化類的集合。Map處理的輸出是Key,Value集合,需要進行序列化才能寫到緩存以及文件中。
6,Serializer<K> keySerializer和Serializer<V> valSerializer分別用於對Map后的Key和Value進行序列化。其創建來自序列化工廠類:
keySerializer = serializationFactory.getSerializer(keyClass); keySerializer.open(bb); valSerializer = serializationFactory.getSerializer(valClass); valSerializer.open(bb);
這里又涉及一個變量bb,其定義是:BlockingBuffer bb = new BlockingBuffer()
BlockingBuffer是MapOutputBuffer的一個內部類,繼承於java.io.DataOutputStream,keySerializer和valSerializer使用BlockingBuffer的意義在於將序列化后的Key或Value送入BlockingBuffer。在其serialize序列化方法中,將可序列化的對象(實現Writable接口的對象)序列化后寫入流,此處這個流也就是BlockingBuffer:
public void serialize(Writable w) throws IOException { w.write(dataOut); }
Writable是個接口,w.write方法又有什么實現呢?取決於KV類型。Hadoop中需要序列化的對象(包括輸入輸出Key,Value都必須是可序列化的)繼承於Writable接口,該接口提供兩個方法:讀和寫:
public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
Hadoop內也有一些典型的實現,比較典型的比如IntWritable,其實現就是調用java.io.DataInput中的方法:
public void readFields(DataInput in) throws IOException { value = in.readInt(); } public void write(DataOutput out) throws IOException { out.writeInt(value); }
因此,當對Key、Value進行序列化的時候,實際上是調用如IntWritable(假如Key,Value類型是整型)的write方法,該方法又是反過來,調用DataOutput的writeInt方法。
在此處,BlockingBuffer內部又引入一個類:Buffer,也是MapOutputBuffer的一個內部類,繼承於java.io.OutputStream。為什么要引入兩個類呢?BlockingBuffer和Buffer有什么區別?初步來看,Buffer是一個基本的緩沖區,提供了write、flush、close等方法,BlockingBuffer提供了markRecord、reset方法,處理Buffer的邊界等一些特殊情況,是Buffer的進一步封裝,可以理解為是增強了Buffer的功能。Buffer實際上最終也封裝了一個字節緩沖區,即后面我們要分析的非常關鍵的byte[] kvbuffer,基本上,Map之后的結果暫時都會存入kvbuffer這個緩存區,等到要慢的時候再刷寫到磁盤,Buffer這個類的作用就是對kvbuffer進行封裝,比如在其write方法中存在以下代碼:
public synchronized void write(byte b[], int off, int len) { spillLock.lock(); try { do { 。。。。。。。。 } while (buffull && !wrap); } finally { spillLock.unlock(); } // here, we know that we have sufficient space to write if (buffull) { final int gaplen = bufvoid - bufindex; System.arraycopy(b, off, kvbuffer, bufindex, gaplen); len -= gaplen; off += gaplen; bufindex = 0; } System.arraycopy(b, off, kvbuffer, bufindex, len); bufindex += len; } }
上面的System.arraycopy就是將要寫入的b(序列化后的數據)寫入到kvbuffer中。關於kvbuffer,我們后面會詳細分析,這里需要知道的是序列化后的結果會調用該方法進一步寫入到kvbuffer也就是Map后結果的緩存中,后面可以看見,kvbuffer寫到一定程度的時候(80%),需要將已經寫了的結果刷寫到磁盤,這個工作是由Buffer的write判斷的。在kvbuffer這樣的字節數組中,會被封裝為一個環形緩沖區,這樣,一個Key可能會切分為兩部分,一部分在尾部,一部分在字節數組的開始位置,雖然這樣讀寫沒問題,但在對KeyValue進行排序時,需要對Key進行比較,這時候需要Key保持字節連續,因此,當出現這種情況下,需要對Buffer進行重啟(reset)操作,這個功能是在BlockingBuffer中完成的,因此,Buffer相當於封裝了kvbuffer,實現環形緩沖區等功能,BlockingBuffer則繼續對此進行封裝,使其支持內部Key的比較功能。本質上,這個緩沖區需要是一個Key-Value記錄的緩沖區,而byte[] kvbuffer只是一個字節緩沖區,因此需要進行更高層次的封裝。比如:1,到達一定程度需要刷寫磁盤;2,Key需要保持字節連續等等。
那么,上面write這個方法又是什么時候調用的呢?實際上就是MapOutputBuffer的collect方法中,會對KeyValue進行序列化,在序列化方法中,會進行寫入:
public void serialize(Writable w) throws IOException { w.write(dataOut); }
此處的dataout就是前面keySerializer.open(bb)這一方法中傳進來的,也就是BlockingBuffer(又封裝了Buffer):
public void open(OutputStream out) { if (out instanceof DataOutputStream) { dataOut = (DataOutputStream) out; } else { dataOut = new DataOutputStream(out); } }
因此,當執行序列化方法serialize的時候,會調用Buffer的write方法,最終將數據寫入byte[] kvbuffer。
7,CombinerRunner<K,V> combinerRunner,用於對Map處理的輸出結果進行合並處理,減少Shuffle網絡開銷。CombinerRunner是一個抽象類,根據新舊API的不同,有兩種實現:OldCombinerRunner、NewCombinerRunner。這兩個類里面都有一個combine方法,實現KeyValue的合並。以OldCombinerRunner為例,其combine方法如下:
protected void combine(RawKeyValueIterator kvIter, OutputCollector<K,V> combineCollector ) throws IOException { Reducer<K,V,K,V> combiner = ReflectionUtils.newInstance(combinerClass, job); try { CombineValuesIterator<K,V> values = new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, valueClass, job, Reporter.NULL, inputCounter); while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL); values.nextKey(); } } finally { combiner.close(); } }
從其代碼可以看出,首先根據combinerClass利用反射機制創建了一個combiner對象,實際上這個對象就是一個遵從Reducer接口的對象。之后利用CombineValuesIterator對KV進行逐一提取,執行其reduce方法,CombineValuesIterator在上一節看過,是ValuesIterator的子類,可以看出,combiner實現的就是本Map任務內的、局部的reduce。
8,CombineOutputCollector<K, V> combineCollector,即Combine之后的輸出對象。其創建代碼為:
if (combinerRunner != null) { combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf); } else { combineCollector = null; }
其定義里面有一個Writer對象:
protected static class CombineOutputCollector<K extends Object, V extends Object> implements OutputCollector<K, V> { private Writer<K, V> writer; 。。。
當啟用了Combine功能后,會調用上面的combine方法進行(reduce)操作后再寫入文件(reduce里會使用CombineOutputCollector對象進行collect,見下面Reducer接口的reduce定義代碼),這里的Writer就是寫入文件的作用。如果沒有啟用Combine功能呢,則直接利用Writer寫文件。
void reduce(K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException;
9,CompressionCodec codec,用於對Map的輸出進行壓縮。其創建代碼為:
// compression if (job.getCompressMapOutput()) { Class<? extends CompressionCodec> codecClass = job.getMapOutputCompressorClass(DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); }
是否對Map的輸出進行壓縮決定於變量"mapred.compress.map.output",默認不壓縮。
10,int[] kvoffsets,int[] kvindices,byte[] kvbuffer。三者是為了記錄KV緩存的數據結構,kvBuffer按照Key-Value(序列化后)的順序記錄,前面說的BlockingBuffer和Buffer封裝的底層數據結構就是kvbuffer(它們都是內部類,可以處理MapOutputBuffer中的變量);kvindices記錄了一堆kvindex,每個kvindex包含三個信息:分區號、Key和Value在kvbuffer中的位置;為了對kvindices中的kvindex進行定位,於是有了第三個結構kvoffsets,只記錄每個kvindex的位置(一個整數即可),另外一個作用是當超過了一定數量后,則會觸發Spill操作,Spill的中文指溢出,大致的含義是當緩存放慢了,就溢出寫到磁盤上去。三者關系的示意圖如下:
上面的結構有什么好處呢?我們知道,Map輸出的結果是一堆KV對,可以不斷地存入kvbuffer中,但怎么按照分區號提取相應的KV對呢?kvindices就是干這個的,通過解析這個數組,就可以得到某個分區的所有KV的位置。之所以需要按照分區號提取,是因為Map的輸出結果需要分為多份,分別送到不同的Reduce任務,否則還需要對key進行計算才得到分區號,除了提高速度之外,更關鍵的作用是排序,Map處理后的結果有多份,每一份默認是按照分區號對KV記錄進行排序的,但是在kvbuffer中源源不斷過來的KeyValue序列並沒有什么順序,為此,當對kvbuffer中的某一個分區的KeyValue序列進行排序時,排序結果只需要將kvoffsets中對應的索引項進行交換即可(后面會看到這一詳細過程),保證kvoffsets中索引的順序其實就想記錄的KeyValue的真實順序。換句話說,我們要對一堆對象進行排序,實際上只要記錄他們索引的順序(類似於指針數組,每個指針指向一個對象)即可,原始記錄保持不動(因為空間很大),而kvoffsets就是一堆整數的序列,交換起來快得多。
從上面的圖可以看出,對於任意一個KeyValue記錄,都會額外產生16個字節的索引開銷,其中12個字節是kvindices中用於記錄分區號、Key位置和Value位置(都是整型),另外4個字節是kvoffsets中的整數值。MapOutputBuffer類里也定義了幾個變量用於說明上述四個變量的位置和所占字節數:
private static final int PARTITION = 0; // partition offset in acct private static final int KEYSTART = 1; // key offset in acct private static final int VALSTART = 2; // val offset in acct private static final int ACCTSIZE = 3; // total #fields in acct private static final int RECSIZE = (ACCTSIZE + 1) * 4; // acct bytes per record
ACCT表示kvindices中的一個kvindex,ACCTSIZE也就是3個字節,這里的命名稍微有些不規范,RECSIZE稱為記錄大小,這里的記錄指的就是對每個KV索引的大小,即3+1=4個字節。
kvbuffer、kvindices、kvoffsets三個數組的大小之和由參數"io.sort.mb"指定,默認是sortmb=100,於是maxMemUsage = sortmb << 20,即100MB(1MB=2^20B),maxMemUsage是MapOutputBuffer所占內存的主要部分。這100MB中,有一部分拿出來存儲kvindices和kvoffsets,占比為"io.sort.record.percent",默認是recper=0.05,即5MB左右用來(需要是16的整數倍)存儲kvindices和kvoffsets。另外95MB左右用以存儲kvbuffer。
在kvbuffer中,如果達到了一定容量,需要Spill到磁盤上,這個門限由參數"io.sort.spill.percent"指定,默認是spillper=0.8。softBufferLimit這個門限就是用於記錄Spill門限容量:
softBufferLimit = (int)(kvbuffer.length * spillper);
此外,除了kvbuffer增加會引起Spill之外,kvoffsets的膨脹也會引起Spill,比例也是spillper=0.8,這個門限由softRecordLimit參數記錄:
softRecordLimit = (int)(kvoffsets.length * spillper);
即無論哪個到達了80%,都觸發Spill。為什么到達80%就需要刷寫磁盤呢?如果寫滿了才刷寫磁盤,那么在刷寫磁盤的過程中不能寫入,寫就被阻塞了,但是如果到了一定程度就刷寫磁盤,那么緩沖區就一直有剩余空間可以寫,這樣就可以設計成讀寫不沖突,提高吞吐量。KV緩存中的最頂級索引是kvoffsets,因此當出現Spill時,需要將kvoffsets中已經記錄的索引對應的KV提取出來進行寫磁盤,當spill后,kvoffsets又成為空數組。我們粗略想一下,kvoffsets不斷地往后增加記錄,到達一定程度后,觸發Spill,於是從頭(即下標0)到當前位置(比如稱為kvindex)的所有索引對應的KV都寫到磁盤上,Spill結束(此時假定KV緩存寫入暫停)后,又從下標0開始增加記錄,這種形式會有什么問題?
一個比較大的問題是Spill的時候,意味着有個用戶在讀取kvoffsets從0-kvindex的數據,這個時候這部分數據就不能寫,因為下一次寫要從下標0開始,這樣就需要對kvoffsets加鎖才行,否則會引起讀錯誤,這樣的話,還是難以實現讀寫並行。為了解決這種加鎖引發的性能問題,典型方法就是采用環形緩沖區。kvoffsets看做一個環形數組,Spill的時候,只要kvbuffer和kvoffsets還沒有滿(能容納新的KeyValue記錄和索引),kvoffsets仍然可以繼續往后面寫;同理,kvbuffer也是一個環形緩沖區,這樣的話,如果我們把spill到磁盤這一過程用另外一個線程實現(Hadoop里面確實也是這么做的),那么讀寫可以不沖突,提高了性能。
實現環形緩沖區的典型方法也是Hadoop中采用的方法。以kvoffsets為例,一共有三個變量:kvstart、kvindex和kvend。kvstart表示當前已寫的數據的開始位置,kvindex表示寫一個下一個可寫的位置,因此,從kvstart到(kvindex-1)這部分數據就是已經寫的數據,另外一個線程來Spill的時候,讀取的數據就是這一部分。而寫線程仍然從kvindex位置開始,並不沖突(如果寫得太快而讀得太慢,追了一圈后可以通過變量值判斷,也無需加鎖,只是等待)。
舉例來說,下面的第一個圖表示按順時針往kvoffsets里面加入索引,此時kvend=kvstart,但kvindex遞增;當觸發Spill的時候,kvend=kvindex,Spill的值涵蓋從kvstart到kvend-1區間的數據,kvindex不影響,繼續按照進入的數據遞增;當進行完Spill的時候,kvindex增加,kvstart移動到kvend處,在Spill這段時間,kvindex可能已經往前移動了,但並不影響數據的讀取,因此,kvend實際上一般情況下不變,只有在要讀取環形緩沖區中的數據時發生一次改變(即設置kvend=kvindex):
在源代碼的解釋中,kvstart是記錄spill的起始位置,kvend是記錄collectable的起始位置,kvindex是記錄collected的結束位置,collect即前面說過的map方法產生的KV對需要寫入緩沖區,從生產者-消費者角度來看,collect就是這個環形緩沖區的生產者,或者叫寫線程;spill是這個環形緩沖區的消費者,或者叫讀線程。這樣看來,spill每次消費多少數據實際上可以與上面的圖有所差別,比如目前只Spill從1-8這個區間的數據,那么之后kvstart設置為9所在的位置即可,下一次Spill即從9開始。
上圖反映了環形緩沖區的利用,對於kvbuffer的使用原理也一樣,同樣存在三個變量:bufstart、bufmark(為什么不叫bufindex呢?下面分析)、bufend。對於kvoffsets來說,有三個變量就可以實現環形緩沖區,但對於kvbuffer來說,三個變量還不夠,這是為什么呢?因為kvoffsets里面都是以整數為基本單位,每個整數占用4個字節,kvoffsets的類型是int[],不會出現什么問題,使用起來也很方便。但是kvbuffer就不一樣,其定義為byte[],但是一個Key-Value的長度是不固定的,雖然形式上環形緩沖區不存在頭部和尾部的概念,但其物理上緩沖區還是存在頭和尾,並不是物理連續的,按理來說,對於Key-Value的操作,只要把接口封裝好,讓上層應用看起來是連續的即可。但Hadoop里面對Key的比較設計成逐字節比較,其定義為:
public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
為什么不設計成compare(Key1 key, Key2 key)這種形式呢?這不是更直觀嗎,個人理解,排序是對Map后的Key-Value緩沖區操作的,如果將Key、Value都看做JAVA對象,設計Object的排序,排序的速度要比byte這個層次更差,因為封裝的層次更高了,所以,將所有key全部序列化后存入緩沖區,然后對其進行排序操作會更快,這樣的話,就需要Key在物理上(實際上是JAVA字節數組這個層次,當然不是指硬盤的磁道等等更底層的層次)保持連續,畢竟,按Key排序作為MapReduce中一個很核心的東西,這樣做還是值得的。為此,在緩存里面就需要保證Key的連續性,自然,當往緩沖區里面寫入一個會超越邊界的key的時候,就需要進行特殊處理,這種處理由BlockingBuffer實現,稱為reset,當檢測到這種情況的時候,就調用一下reset,代碼如下:
// serialize key bytes into buffer int keystart = bufindex; keySerializer.serialize(key); if (bufindex < keystart) { // wrapped the key; reset required bb.reset(); keystart = 0; }
所謂reset,其實就是把跨越邊界(如何判斷:寫入一個Key之后的bufindex還比寫之前的bufindex位置還小)的Key的尾部拷貝一下到頭部,使其連續。bufindex的含義和前面kvindex類似,代表了下一個可以寫的位置。如下圖所示,紅色表示已經寫入的KeyValue記錄,藍色表示要寫入的下一個Key,在調用Buffer的write方法后,如果發現跨越了邊界(bufindex變小了),則將尾部的那塊藍色區域拷貝到頭部,頭部那塊藍色區域往后挪,形成一個整體的Key,尾部藍色那塊區域空出來的就無效了,在讀的時候就需要跳過。這樣就需要多個變量來記錄位置信息,除了bufindex,bufvoid就表示這個緩沖區在讀取的時候需要停止的位置,因為這個位置不一定是緩沖區的最大長度,但肯定只會在緩沖區的尾巴處出現,所以需要1個變量來記錄;這里還新增了一個bufmark,其含義是一個KeyValue的結尾處,因為kvoffsets里面不存在這個問題,每個整數值就是一個基本單元,但一個KeyValue長度不一,需要用bufmark記錄下來。每當序列化寫入一個Key-Value對,就更新這個數值。記下來的目的之一比如說下面這種情況,需要將尾部的藍色區域拷貝到頭部的時候,就需要知道尾部這一段有多少個字節,bufvoid-bufmark就等於尾部這段藍色區域的長度。
理解了上面的變量,reset的代碼就比較簡單了,如下,其中headbytelen就是尾部藍色區域的長度,另外,在下面的代碼中,如果拷貝后的key長度超過了bufstart,也就是空間不夠了,那么就會直接把key直接輸出,此時bufindex置為0:
protected synchronized void reset() throws IOException { int headbytelen = bufvoid - bufmark; bufvoid = bufmark; if (bufindex + headbytelen < bufstart) { System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex); System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen); bufindex += headbytelen; } else { byte[] keytmp = new byte[bufindex]; System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex); bufindex = 0; out.write(kvbuffer, bufmark, headbytelen); out.write(keytmp); } }
12,SpillThread spillThread,這是一個線程對象,繼承於Thread:
private final SpillThread spillThread = new SpillThread();
其作用就是當kvbuffer或kvoffsets超過80%以上,將觸發該線程將kvbuffer中的數據寫入到磁盤。讀寫分別是兩個線程。觸發Spill的代碼為:
private synchronized void startSpill() { LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + "; bufvoid = " + bufvoid); LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex + "; length = " + kvoffsets.length); kvend = kvindex; bufend = bufmark; spillReady.signal(); }
從這里可以看出,觸發Spill時,正是我們前面分析過的,需要執行這個動作:kvend = kvindex、bufend=bufmark。注意,寫入磁盤的數據實際上只是kvbuffer里面的記錄,kvoffsets索引只是同步更新,后面我們會看到,跟kvbuffer同時寫入的實際上是有一個索引數據的,但不是上面提到的這幾個。spillReady是一個java.util.concurrent.locks.Condition對象,環形緩沖區的讀寫線程之間的同步使用JAVA中提供的方法實現,涉及到以下變量:
private final ReentrantLock spillLock = new ReentrantLock(); private final Condition spillDone = spillLock.newCondition(); private final Condition spillReady = spillLock.newCondition();
我們前面分析過,環形緩沖區在一個生產者和一個消費者條件下,雙方對讀寫數據是不需要加鎖的,因為讀寫數據不會位於同一個位置,大家處理的是整個環上不同的部分,那么這里引入鎖的目的是什么呢?一種情況自然是當緩沖區滿了的時候,此時可以使用鎖,理論上也可以判斷變量,看看是否寫滿等等,但無論如何此時寫線程需要阻塞,如果寫線程每過一段時間來掃描一下是否可以寫,這種方式造成的延時跟另一個線程直接發信號通知比起來更慢;另外,讀寫雙方涉及到三個變量kvstart、kvend、kvindex的修改。也就是說,當寫完畢,或者讀完畢時需要修改變量的時候,加鎖保證了變量的一致性。這里不使用synchronized這種傳統的同步方法,主要原因是synchronized不夠靈活,擴展性不好,JAVA中提供了另外一種機制,即ReentrantLock等典型鎖類。這種方式靈活性更好,因為鎖只是一個對象(synchronized是一個關鍵字,用語法來支持同步)。ReentrantLock的含義是可重入鎖,因為Re-entrant就是可以重新進入的意思,什么叫可重入呢?比如一個函數被遞歸調用,在執行這個函數代碼的過程中,還沒執行完畢又被再次調用,就是不斷重入的意思。ReentrantLock也如此,一個線程A獲得鎖以后,這個線程A還可以繼續再次獲取這把鎖,其他線程B要想獲得這把鎖,要么等着A把鎖釋放了,如果A不顯式釋放,但是通過發信號等待的方式,也可以間接地使得鎖釋放(這一點很關鍵)。此時,線程B就可以獲得這把鎖。讓一個線程多次獲取一把鎖有什么意義呢?比如有兩段代碼分布在不同的地方,都加了同樣的一個鎖對象,某個線程則可以連續執行這兩段代碼,因為是一把鎖。否則,執行完了第一段后,第二段就無法執行了,這樣就很容易出現死鎖。另外,通過發信號(而且可以有很多不同的信號)的方式釋放鎖,為線程在不同特定條件下釋放鎖提供了極大靈活性。
在線程A拿到鎖之后,可以通過發送信號控制其它線程B的執行。比如A拿到了鎖,但是需要等待一個條件C才能往下執行,但我又不想釋放這把鎖,於是可以調用一個稱為C.await的方法,讓線程B可以獲得這把鎖,去執行它的代碼,當在線程B里滿足了條件C,它調用C.signal這個方法,則可以讓線程A的條件滿足不再等待,接着往下執行(但需要線程B釋放鎖,或者也調用另外一個條件D的await方法)。這種同步模式比較靈活,所以一般來說,典型應用場景是兩個線程共有一把ReentrantLock鎖,並且有兩個以上共有的條件變量Condition1、Condition2。一個線程負責執行Condition1.signal和Condition2.await;另一個線程負責執行Condition2.signal和Condition1.await。
比如上面的例子就創建了兩種信號:spillDone和spillReady,它們的邏輯如下:
1)對於寫線程來說,如果寫滿了,就調用spillDone.await等待spillDone信號;否則不斷往緩沖區里面寫,到了一定程度,就發送spillReady.signal這個信號給讀線程,發完這個信號后如果緩沖區沒滿,就釋放鎖繼續寫(這段代碼無需鎖),如果滿了,就等待spillDone信號;
2)對於讀線程來說,在平時調用spillReady.await等待spillReady這個信號,當讀取之后(此時寫線程要么釋放鎖了,要么調用spillDone.await在等待了,讀線程肯定可以獲得鎖),則把鎖釋放掉,開始Spill(這段代碼無需鎖),完了讀線程再次獲取鎖,修改相應參數,發送信號spillDone給寫線程,表明Spill完畢。
上面的線程同步模式+環形緩沖區的使用是經典案例,值得仔細學習。
作為SpillThread這個消費者、讀線程而言,主要代碼是在其run方法內:
public void run() { spillLock.lock(); spillThreadRunning = true; try { while (true) { spillDone.signal(); while (kvstart == kvend) { spillReady.await(); } try { spillLock.unlock(); sortAndSpill(); } catch (Exception e) { 。。。 } finally { spillLock.lock(); if (bufend < bufindex && bufindex < bufstart) { bufvoid = kvbuffer.length; } kvstart = kvend; bufstart = bufend; } } } catch (InterruptedException e) { ........ } finally { spillLock.unlock(); spillThreadRunning = false; } }
MapOutputBuffer的collect方法是生產者、寫線程,主要代碼即在該方法內,其中startSpill前面已經看過,主要是改變kvend值以及發送spillReady信號:kvnext是kvindex加1,用於判斷是否寫滿,如果kvnext==kvstart,表示寫滿,布爾變量kvfull則為true。
kvsoftlimit是是否超過Spill門限的標志。
public synchronized void collect(K key, V value, int partition ) throws IOException { final int kvnext = (kvindex + 1) % kvoffsets.length; spillLock.lock(); try { boolean kvfull; do { // sufficient acct space kvfull = kvnext == kvstart; final boolean kvsoftlimit = ((kvnext > kvend) ? kvnext - kvend > softRecordLimit : kvend - kvnext <= kvoffsets.length - softRecordLimit); if (kvstart == kvend && kvsoftlimit) { startSpill(); } if (kvfull) { try { while (kvstart != kvend) { reporter.progress(); spillDone.await(); } } catch (InterruptedException e) { 。。。。。。。。 } } } while (kvfull); } finally { spillLock.unlock(); } 。。。。 寫數據 。。。
13,ArrayList<SpillRecord> indexCacheList,這個是SpillRecord的數組,SpillRecord里面緩存的是一個一個的記錄,所以並不是一整塊無結構字節流,而是以IndexRecord為基本單位組織起來 的,IndexRecord非常簡單,描述了一個記錄在緩存中的起始偏移、原始長度、實際長度(可能壓縮)等信息。SpillRecord里面放了一堆 IndexRecord,並有方法可以插入記錄、獲取記錄等。IndexRecord的定義很簡單如下:
class IndexRecord { long startOffset; long rawLength; long partLength; public IndexRecord() { } public IndexRecord(long startOffset, long rawLength, long partLength) { this.startOffset = startOffset; this.rawLength = rawLength; this.partLength = partLength; } }
SpillRecord的意義在什么地方呢?當kvbuffer觸發Spill的時候,會將kvbuffer的記錄寫入到磁盤(實際上還會包括記錄的長度等信息)。Spill結束后,會生成一個spill文件,這個文件內部包含很多分區的數據,但是是排序過的KeyValue數據(關於排序后面會討論),分為兩層,首先是對分區號進行排序,其次是在一個分區號內,按照Key的大小進行排序,因此Spill文件是一個分區的數據接着一個分區的數據,且每個分區里面的Key-Value都已經按照Key的順序進行排列;SpillRecord就記錄了每個分區數據在文件中的起始位置、長度、以及壓縮長度,這些內容表示成IndexRecord。一個IndexRecord記錄的是一個分區的位置信息,因為一個Spill文件包含N個分區,於是就會有N個IndexRecord,這N個IndexRecord記錄在一個SpillRecord對象中。SpillRecord里面有兩個變量:ByteBuffer buf,以及LongBuffer entries。ByteBuffer和LongBuffer都是java.nio里面提供的類,ByteBuffer是IndexRecord存儲的真正區域,LongBuffer就是對ByteBuffer進行了一點接口封裝,把它當做一個存儲Long型的Buffer。這種概念類似於數據庫里面的視圖跟表的關系一樣。因為IndexRecor里面包含三個Long型變量,每個8字節,因此一個 IndexRecord記錄占用24字節,這就是MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH這個變量指定的。分區數量是numPartitions,因此一個文件需要numPartitions*24來記錄,這也就是一個SpillRecord的大小,每個文件都有一個SpillRecord,因為Spill會有很多次,每次都寫成一個文件,所以會有很多個Spill文件,對應於很多個SpillRecord,這很多個SpillRecord即為ArrayList<SpillRecord> indexCacheList。
為什么要把各個分區數據的位置記錄下來呢?因為MapReduce對Map后的結果會按照分區號對Key-Value進行排序,假定最終生成了10個Spill文件,需要按照分區,將每個分區對應的數據全部拿出來進行歸並排序(Merger),這種排序在Map這一端就有兩個階段,首先是一個Spill文件內部要按照分區對KV排好序(kvoffsets排好序按照順序寫進Spill文件),之后還得把10個Spill文件內部的KV拿過來歸並排序。另外,實際上在Reduce端還會進行歸並排序,因為我們目前討論的都只是在單個Map任務內的排序,Reduce之前還會把各個Map任務排好序的結果進行再次歸並排序,可見,有三種歸並排序,MapReduce中的排序就是不斷地進行歸並排序的過程。
另外,除了將kvbuffer的數據寫進文件,SpillRecord的信息也會寫到文件里,作為后面多個Spill文件歸並的索引。如果不寫入這個信息,怎么知道Spill文件里面的KeyValue是屬於哪個分區呢?如果沒有這個信息,也就無法實現后面的歸並。
14,IndexedSorter sorter,理解了上面的過程,這個變量就容易了,如何對map后的KeyValue進行排序就取決於該對象。IndexedSorter是一個接口,用戶可以實現自定義的方法,其創建代碼如下:
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
可以看出,如果用戶沒有配置,默認就使用Hadoop自帶的QuickSort類,即快速排序。另外,排序的規則是對Key進行比較,這里采用的比較對象就是RawComparator<K> comparator。
排序的對象是一個IndexedSortable接口對象,MapOutputBuffer實現了這個接口中的compare和swap方法,compare方法即比較兩個Key的大小,返回整數:
public int compare(int i, int j) { final int ii = kvoffsets[i % kvoffsets.length]; final int ij = kvoffsets[j % kvoffsets.length]; // sort by partition if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) { return kvindices[ii + PARTITION] - kvindices[ij + PARTITION]; } // sort by key return comparator.compare(kvbuffer, kvindices[ii + KEYSTART], kvindices[ii + VALSTART] - kvindices[ii + KEYSTART], kvbuffer, kvindices[ij + KEYSTART], kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]); }
可見,這個比較分為兩個層次,首先是分區號的排序,在分區號相同條件下,再進行Key的比較。怎么進行Key的比較呢?每個kvoffsets里面就一個字節,記錄了對應的kvindex,kvindex又有3字節,分別是分區號、key在kvbuffer的位置、value在kvbuffer的位置,所以其比較就是首先獲得i、j對應的兩個kvindex,最終調用RawComparator<K> comparator的compare方法,比較兩個Key值的大小,key在kvbuffer中的位置是在kvindices[ii + 1]開始到kvindices[ii + 2]之間,另一個key的位置是在kvbuffer的kvindices[ij + 1]到kvindices[ij + 2]之間。前面已經對kvbuffer、kvindices、kvoffsets進行了詳細分析,這里也就比較簡單了。
在排序的過程中會進行交換,kvbuffer和kvindices都保持不變,只有kvoffsets進行了交換:
public void swap(int i, int j) { i %= kvoffsets.length; j %= kvoffsets.length; int tmp = kvoffsets[i]; kvoffsets[i] = kvoffsets[j]; kvoffsets[j] = tmp; }
因為按照排序原則,如果不是同一個分區的KV,那就不用排序;如果是同一個分區的KV,那就進行排序,所以最終的排序只在kvoffsets中進行交換,當交換完畢后,排序也就結束。要寫入文件時,只要按照kvoffsets的順序將對應的kvbuffer中的數據寫入文件即可。
15,上面對MapOutputBuffer涉及的變量進行了分析,其原理也基本涵蓋在上面的各個分析之中,下面我們來看一看collect方法的過程。
該方法的聲明為:
public synchronized void collect(K key, V value, int partition ) throws IOException
其作用就是對map之后的KeyValue進行處理。
首先獲得kvoffsets中的kvindex的下一個位置,用於判斷kvoffsets是否寫滿:
final int kvnext = (kvindex + 1) % kvoffsets.length;
因為kvindex代表了下一個可寫的位置,如果再下一個已經等於kvstart,那么說明已經寫滿了,需要等待SpillThread處理。
於是設置了一個變量kvfull = kvnext == kvstart;即二者相等時即為true。
要判斷是否Spill,加鎖:
spillLock.lock();
之后判斷是否應該Spill:
final boolean kvsoftlimit = ((kvnext > kvend) ? kvnext - kvend > softRecordLimit : kvend - kvnext <= kvoffsets.length - softRecordLimit);
之所以會有兩種情況,是因為這是一個環形緩沖區,可能kvnext大於kvend(沒有Spill時等於kvstart)很多,也可能kvnext已經繞回到了0那個位置,不管怎樣,如果兩者的差距(絕對值)大於softRecordLimit(80%的kvoffsets),則kvsoftlimit=true。
如果kvstart==kvend,表示此時沒有處於Spill(前面分析過,Spill時會將kvend設置為kvindex),並且如果滿足了kvsoftlimit,則進行Spill,向SpillThread發信號:
if (kvstart == kvend && kvsoftlimit) { LOG.info("Spilling map output: record full = " + kvsoftlimit); startSpill(); }
發完信號后不一定可以寫了,因為此時緩沖區說不定滿了,所以如果滿了,就等待SpillDone信號,這個信號是SpillThread發過來的:
if (kvfull) { try { while (kvstart != kvend) { reporter.progress(); spillDone.await(); } } catch (InterruptedException e) { throw (IOException)new IOException( "Collector interrupted while waiting for the writer" ).initCause(e); } }
好了,如果跳出來了,說明此時緩沖區可寫了,於是把鎖釋放,准備往緩沖區里面寫數據(再重復一遍,讀寫數據不用加鎖):
finally { spillLock.unlock(); }
要寫入key,首先要將其序列化:
int keystart = bufindex; keySerializer.serialize(key);
之后,因為有可能key序列化后超出了kvbuffer的邊界,進行一些邊界條件處理,這一邊界問題在前面已經分析過:
if (bufindex < keystart) { // wrapped the key; reset required bb.reset(); keystart = 0; }
緊接着是對value進行序列化:
// serialize value bytes into buffer final int valstart = bufindex; valSerializer.serialize(value); int valend = bb.markRecord();
之后,更新kvindices,kvoffsets中的索引信息:
// update accounting info int ind = kvindex * ACCTSIZE; kvoffsets[kvindex] = ind; kvindices[ind + PARTITION] = partition; kvindices[ind + KEYSTART] = keystart; kvindices[ind + VALSTART] = valstart; kvindex = kvnext;
此處的ind就是新的kvindex的位置,乘以3字節就等於其在kvindices中的位置。同時更新kvindices,kvindex向前移動一個字節。
於是,collect方法就結束了,KV已經被序列化進入kvbuffer了,下面看一看SpillThread涉及到的方法。
16,SpillThread在構造方法中被啟動:
spillThread.setDaemon(true); spillThread.setName("SpillThread"); spillLock.lock(); try { spillThread.start(); while (!spillThreadRunning) { spillDone.await(); } } catch (InterruptedException e) { throw (IOException)new IOException("Spill thread failed to initialize" ).initCause(sortSpillException); } finally { spillLock.unlock(); }
進入SpillThread的run方法,該方法的處理邏輯在前面已經分析過,主要涉及的方法是sortAndSpill。
首先獲得要寫入的Spill文件的大小:
//approximate the length of the output file to be the length of the //buffer + header lengths for the partitions long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH;
每個分區都會有一些頭開銷,此處為150個字節,這個與Spill文件的文件格式有關,在每個分區之前都會加入一些記錄信息,這里可以看出,Spill文件里面實際上是所有分區的數據混合在一起(但是是一個分區的數據跟着另一個分區的數據)。
然后獲取要寫入的本地文件的文件名,注意不是HDFS文件,而是本地Linux文件:
// create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); public Path getSpillFileForWrite(int spillNumber, long size) throws IOException { return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill" + spillNumber + ".out", size, conf); }
在這時會創建一個與Spill文件對應的SpillRecord對象(輸入參數為分區總數),其文件名為:
TaskTracker.OUTPUT + "/spill" + spillNumber + ".out"
TaskTracker.OUTPUT其實就是一個字符串String OUTPUT = "output",所以Spill的文件名為output/spill2.out等,表示這個文件是第2個Spill文件(最終會有多個Spill文件,前面分析過)。
然后調用上面分析過的排序對象進行排序,實際上就是通過交換kvoffsets里面的字節達到目的:
final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
之后是一個大循環,對每個分區依次進行以下操作。
創建一個寫文件的對象:
writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter);
此時有兩種情況,排序后的Key-Value不一定直接寫入文件,如果需要在Map端進行合並(Combiner)的話,則先進行合並再寫入:
我們先來看不需要合並的代碼。就是一個循環:
DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { final int kvoff = kvoffsets[spindex % kvoffsets.length]; getVBytesForOffset(kvoff, value); key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART])); writer.append(key, value); ++spindex; }
注意while條件中只挑選那些分區號滿足大循環中當前分區號的數據,獲得KeyValue在kvbuffer中的位置(kvoff),然后key的值就從kvindices[kvoff + KEYSTART]到kvindices[kvoff + VALSTART]之間。KEYSTART和VALSTART是固定值1、2,我們再回顧一下,kvindices[kvoff]記錄的是分區號、kvindices[kvoff + 1]記錄的Key在kvbuffer中的起始位置,kvindices[kvoff + 2]記錄的是Value在kvbuffer中的起始位置,於是就得到了key。
Value的獲取是利用getVBytesForOffset實現的。原理也一樣:
private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) { final int nextindex = (kvoff / ACCTSIZE == (kvend - 1 + kvoffsets.length) % kvoffsets.length) ? bufend : kvindices[(kvoff + ACCTSIZE + KEYSTART) % kvindices.length]; int vallen = (nextindex >= kvindices[kvoff + VALSTART]) ? nextindex - kvindices[kvoff + VALSTART] : (bufvoid - kvindices[kvoff + VALSTART]) + nextindex; vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen); }
即nextindex要么是bufend,要么是繞一圈之后的對應值。
之后調用writer.append(key, value)寫入KV即可。
如果是需要對KeyValue進行合並的,則執行combine方法:
if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); }
combine方法我們前面分析過,其實就是調用了用戶寫的reduce方法:
protected void combine(RawKeyValueIterator kvIter, OutputCollector<K,V> combineCollector ) throws IOException { Reducer<K,V,K,V> combiner = ReflectionUtils.newInstance(combinerClass, job); try { CombineValuesIterator<K,V> values = new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, valueClass, job, Reporter.NULL, inputCounter); while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL); values.nextKey(); } } finally { combiner.close(); } } }
當寫入Spill文件后,還需要對SpillRecord進行記錄:
// record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, i);
即當前這個分區中數據的起始位置、原始長度、壓縮后長度。在Writer類中,其append方法會將Key長度和Value長度都寫進去:
WritableUtils.writeVInt(out, keyLength);
WritableUtils.writeVInt(out, valueLength);
out.write(key.getData(), key.getPosition(), keyLength);
out.write(value.getData(), value.getPosition(), valueLength);
使用的VInt即變長整數編碼,這種編碼方式類似於ProtoBuf(但是否完全一樣還沒分析),見我寫的另外一篇介紹ProtocolBuffer的博客。可以看出,KeyValue的記錄加上了Key的長度、Value的長度兩個信息,如果不加無法區分Key、Value的邊界。
注意到,如果設置了壓縮,則在Writer構造方法里將寫入流對象換成另外一個:
if (codec != null) { this.compressor = CodecPool.getCompressor(codec); this.compressor.reset(); this.compressedOut = codec.createOutputStream(checksumOut, compressor); this.out = new FSDataOutputStream(this.compressedOut, null); this.compressOutput = true; } else { this.out = new FSDataOutputStream(checksumOut,null); }
按照上面的過程,對每個分區進行循環即可不斷地寫入到Spill文件,可見,一個Spill文件,比如output/spill2.out這個文件,其內容是一個分區跟着一個分區,每個分區里面的數據都經過了排序。每次觸發Spill的時候就會生成一個文件。如:
output/spill1.out、output/spill2.out、output/spill3.out、....
寫完了Spill文件后,還會把SpillRecord的內容寫入成一個Spill索引文件,不過這個寫不是一個Spill文件就對應於一個索引文件,而是超過了一個界限(1MB)再寫入:
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; }
從getSpillIndexFileForWrite方法來看,其命名是output/spill2.out.index等等:
public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException { return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill" + spillNumber + ".out.index", size, conf); }
所以,實際上並不是一個Spill文件就對應於一個spill index文件。但一個Spill文件確實是對應於一個SpillRecord的,一個SpillRecord的大小等於分區數量*24字節。
17,到此為止,MapOutputBuffer的基本處理過程就明白了,那么,什么時候結束呢,自然是當Map輸入數據處理完畢之后,由下面的代碼進行結束的:
try { runner.run(in, new OldOutputCollector(collector, conf), reporter); collector.flush(); in.close(); in = null; collector.close(); collector = null; } finally { closeQuietly(in); closeQuietly(collector); }
此時就調用了collector的flush方法。在map內只是調用其collect方法。因此我們再來看看其flush方法。
flush方法的邏輯還是比較清楚的,首先對kvbuffer內剩余還沒有Spill的數據進行Spill:
spillLock.lock(); try { while (kvstart != kvend) { reporter.progress(); spillDone.await(); } if (sortSpillException != null) { throw (IOException)new IOException("Spill failed" ).initCause(sortSpillException); } if (kvend != kvindex) { kvend = kvindex; bufend = bufmark; sortAndSpill(); } } catch (InterruptedException e) { throw (IOException)new IOException( "Buffer interrupted while waiting for the writer" ).initCause(e); } finally { spillLock.unlock(); }
可以看出,此時是這個線程調用了sortAndSpill方法(之前是SpillThread那個線程調用)。
全部刷寫到磁盤后,給SpillThread線程發送暫停信號,等待SpillThread關閉(join方法):
try { spillThread.interrupt(); spillThread.join(); } catch (InterruptedException e) { throw (IOException)new IOException("Spill failed" ).initCause(e); }
之后,我們得到了N個Spill文件以及多個索引文件,於是需要按照分區歸並成分區數量個文件,調用mergeParts方法。mergeParts方法的目的是將多個Spill文件合並為一個,注意,雖然最后要把結果送到多個Reduce任務去,但仍然只是寫到一個文件里,不同Reduce任務需要的數據在文件的不同區域。按照SpillRecord索引信息可以取出來。
18,在mergeParts里,首先獲得這些Spill文件的文件名:
for(int i = 0; i < numSpills; i++) { filename[i] = mapOutputFile.getSpillFile(i); finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); }
如果numSpills=1,那么Spill文件相當於就是要Map輸出的文件,因為在Spill內部已經進行了排序。而且因為沒有多余的Spill文件需要歸並,所以重命名文件名即可:
if (numSpills == 1) { //the spill is the final output rfs.rename(filename[0], new Path(filename[0].getParent(), "file.out")); if (indexCacheList.size() == 0) { rfs.rename(mapOutputFile.getSpillIndexFile(0), new Path(filename[0].getParent(),"file.out.index")); } else { indexCacheList.get(0).writeToFile(new Path(filename[0].getParent(),"file.out.index"), job); } return; }
此時,Map輸出文件名為output/file.out和output/file.out.index。
如果多於一個Spill文件,則需要進行歸並處理。
首先將全部索引數據從文件中讀出來,加入到indexCacheList數組里,這里似乎有一個問題,如果索引文件太大怎么辦,會不會導致Out of Memory?不過粗略算一下應該不太可能,假定Reduce個數是100個,一個SpillRecord的大小則是2400字節。假定Map任務輸出100個Spill文件,則indexCacheList大小為240000字節,240KB。這個數量級已經是MapReduce中比較大的了,所以可以忽略這個問題。
// read in paged indices for (int i = indexCacheList.size(); i < numSpills; ++i) { Path indexFileName = mapOutputFile.getSpillIndexFile(i); indexCacheList.add(new SpillRecord(indexFileName, job, null)); }
獲得這個indexCacheList的目的是為了方便地找到某個分區在各個Spill文件中的位置,以便進行歸並處理:
之后,獲得最終要輸出的文件名:
//make correction in the length to include the sequence file header //lengths for each partition finalOutFileSize += partitions * APPROX_HEADER_LENGTH; finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
從下面可以看出Map輸出的文件名,分別是file.out和file.out.index,最終輸出也就是這兩個文件:
public Path getOutputFileForWrite(long size) throws IOException { return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR + "file.out", size, conf); } public Path getOutputIndexFileForWrite(long size) throws IOException { return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR + "file.out.index", size, conf); }
創建文件,rfs是本地文件系統:
//The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
如果一個分區文件都沒有,也需要創建記錄文件(CRC等信息,這樣更不會出錯,否則會不會文件被刪了?):
if (numSpills == 0) { //create dummy files IndexRecord rec = new IndexRecord(); SpillRecord sr = new SpillRecord(partitions); try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null); writer.close(); rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); sr.putIndex(rec, i); } sr.writeToFile(finalIndexFile, job); } finally { finalOut.close(); } return; }
否則,對於每個分區進行一個大循環,內部對每個Spill文件進行一個小循環:
for (int parts = 0; parts < partitions; parts++) { List<Segment<K,V>> segmentList = new ArrayList<Segment<K, V>>(numSpills); for(int i = 0; i < numSpills; i++) { IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); Segment<K,V> s = new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset, indexRecord.partLength, codec, true); segmentList.add(i, s); 。。。。。。。
segmentList是關於一個分區的信息,這個分區信息在每一個Spill文件中都存在,根據IndexRecord可以生成出來,除了位置信息,還包括是否采用了壓縮等等信息。
之后,調用Merger中的merge方法進行歸並處理:
//merge @SuppressWarnings("unchecked") RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, job.getInt("io.sort.factor", 100), new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, null, spilledRecordsCounter);
這個方法也比較復雜,主要實現的是歸並排序,在后面各節再進行詳細分析。這里可以看出,在一個Map任務內,對於某個分區的那些記錄,默認用快速排序(QuickSort)實現,之后更大范圍的排序使用歸並排序。
歸並完畢后,將其寫入文件,這里又見到了Combine,我們在前面已經分析過Combine,那里是對每個刷寫Spill文件之前某個分區的KV進行合並,這里是對歸並排序后每個分區的結果進行歸並,是不是冗余了?實際上不是,前面那個Combine還是局部的Combine,后面這個Combine是在前面的那個合並的基礎上進行的再次合並。比如要對64MB的文本計算hello這個單詞出現的次數,前面那個Combine比如是對每1MB內的文本累積次數,一共有64個數,最后這個Combine是對64個數加起來,得到64MB中hello的次數,這就是Map的輸出結果;Reduce那邊則是對整個大文件(比如6400MB)的hello次數根據不同Map任務(即100個數)輸出的結果進行再次累和,Combine基本上可以理解為就是Map端的Reduce。因此,從Combine、Sort等過程來看,MapReduce就是一個將小數據的結果進行處理,得到局部(合並、排序)結果后,然后不斷匯總處理的過程。基本上有三個階段,一個是在單個Spill內,一個是單個Map內,一個是全局處理。個人理解這算是MapReduce的核心思想。
Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); }
同樣,對每個分區都記錄索引信息:
// record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, parts);
等到每個分區都完成了上面的步驟后,將索引信息寫入到一個文件:
spillRec.writeToFile(finalIndexFile, job);
然后刪除以前寫入的各個Spill文件:
for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); }
於是整個Map輸出過程即結束。
后記:本節將Map處理后的結果(Key-Value記錄序列)如何處理的過程分析了一遍,其核心思想是要按照分區來處理,以便送到不同的Reduce任務,先緩存、到達一定程度后刷寫磁盤,刷寫之前進行Spill這個層面的Combine和Sort,得到N個Spill文件,最后,對N個Spill文件的結果進行歸並排序和二次Combine。最終得到一個結果文件寫入到本地,等待Reduce來取,至於Reduce怎么來取,以及Map端又怎么配合,在后續博文中再進行分析。
另外,從本節可以看出,一個好的框架不僅僅是思想,更重要的是為了實現這些想法,采用哪些算法和數據結構,比如緩存怎么設計,排序如何實現,使得流程既高效,又通用,這可能就是軟件框架設計的核心吧。慢慢學習。