一、概要描述 shuffle是MapReduce的一個核心過程,因此沒有在前面的MapReduce作業提交的過程中描述,而是單獨拿出來比較詳細的描述。 根據官方的流程圖示如下:
本篇文章中只是想嘗試從代碼分析來說明在map端是如何將map的輸出保存下來等待reduce來取。 在執行每個map task時,無論map方法中執行什么邏輯,最終都是要把輸出寫到磁盤上。如果沒有reduce階段,則直接輸出到hdfs上,如果有有reduce作業,則每個map方法的輸出在寫磁盤前線在內存中緩存。每個map task都有一個環狀的內存緩沖區,存儲着map的輸出結果,默認100m,在每次當緩沖區快滿的時候由一個獨立的線程將緩沖區的數據以一個溢出文件的方式存放到磁盤,當整個map task結束后再對磁盤中這個map task產生的所有溢出文件做合並,被合並成已分區且已排序的輸出文件。然后等待reduce task來拉數據。
二、 流程描述
- 在child進程調用到runNewMapper時,會設置output為NewOutputCollector,來負責map的輸出。
- 在map方法的最后,不管經過什么邏輯的map處理,最終一般都要調用到TaskInputOutputContext的write方法,進而調用到設置的output即NewOutputCollector的write方法
- NewOutputCollector其實只是對MapOutputBuffer的一個封裝,其write方法調用的是MapOutputBuffer的collect方法。
- MapOutputBuffer的collect方法中把key和value序列化后存儲在一個環形緩存中,如果緩存滿了則會調用startspill方法設置信號量,使得一個獨立的線程SpillThread可以對緩存中的數據進行處理。
- SpillThread線程的run方法中調用sortAndSpill方法對緩存中的數據進行排序后寫溢出文件。
- 當map輸出完成后,會調用output的close方法。
- 在close方法中調用flush方法,對剩余的緩存進行處理,最后調用mergeParts方法,將前面過程的多個溢出文件合並為一個。
Mapreduce shuffle過程之Map輸出過程代碼流程
三、代碼詳細
1 MapTask的runNewMapper方法 注意到有這樣一段代碼。即當job中只有map沒有reduce的時候,這個rg.apache.hadoop.mapreduce.RecordWriter類型的對象 output是一Outputformat中定義的writer,即直接寫到輸出中。如果是有Reduce,則output是一個NewOutputCollector類型輸出。
1 if (job.getNumReduceTasks() == 0) { 2 output = outputFormat.getRecordWriter(taskContext); 3 } else { 4 output = new NewOutputCollector(taskContext, job, umbilical, reporter); 5 } 6 mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),input, output, committer, reporter, split); 7 input.initialize(split, mapperContext); 8 mapper.run(mapperContext);
和其他的RecordWriter一樣,NewOutputCollector也繼承自RecordWriter抽象類。除了一個close方法釋放資源外,該抽象類定義的最主要的方法就一個void write(K key, V value)。即寫入key,value。
2. Mapper的run方法,對每個輸出執行map方法。
1 public void run(Context context) throws IOException, InterruptedException { 2 setup(context); 3 while (context.nextKeyValue()) { 4 map(context.getCurrentKey(), context.getCurrentValue(), context); 5 } 6 cleanup(context); 7 }
3. Mapper的map方法,默認是直接把key和value寫入
1 protected void map(KEYIN key, VALUEIN value, 2 Context context) throws IOException, InterruptedException { 3 context.write((KEYOUT) key, (VALUEOUT) value); 4 }
一般使用中會做很多我們需要的操作,如著名的wordcount中,把一行單詞切分后,數一(value都設為one = new IntWritable(1)),但最終都是要把結果寫入。即調用context.write(key,value)
1 public void map(Object key, Text value, Context context 2 ) throws IOException, InterruptedException { 3 StringTokenizer itr = new StringTokenizer(value.toString()); 4 while (itr.hasMoreTokens()) { 5 word.set(itr.nextToken()); 6 context.write(word, one); 7 } 8 }
4.TaskInputOutputContext的write方法。調用的是contex中的RecordWriter的write方法。即調用的是NewOutputCollector的write方法。
1 public void write(KEYOUT key, VALUEOUT value 2 ) throws IOException, InterruptedException { 3 output.write(key, value); 4 }
5.NewOutputCollector的write方法。
1 public void write(K key, V value) throws IOException, InterruptedException { 2 collector.collect(key, value, 3 partitioner.getPartition(key, value, partitions)); 4 }
從方法名上不難看出提供寫數據的是MapOutputCollector<K,V>類型的 collector對象從NewOutputCollector的構造函數中看到collector的初始化。
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
6.MapOutputBuffer的構造函數,在了解MapOutputBuffer的collect方法前,先了解下期構造函數,看做了哪些初始化。
1 public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, 2 TaskReporter reporter 3 ) throws IOException, ClassNotFoundException { 4 this.job = job; 5 this.reporter = reporter; 6 localFs = FileSystem.getLocal(job); 7 //1)設定map的分區數,即作業 配置中的的reduce數 8 partitions = job.getNumReduceTasks(); 9 10 rfs = ((LocalFileSystem)localFs).getRaw(); 11 12 indexCacheList = new ArrayList<SpillRecord>(); 13 14 //2)重要的參數 15 final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8); 16 final float recper = job.getFloat("io.sort.record.percent",(float)0.05); 17 final int sortmb = job.getInt("io.sort.mb", 100); 18 if (spillper > (float)1.0 || spillper < (float)0.0) { 19 throw new IOException("Invalid \"io.sort.spill.percent\": " + spillper); 20 } 21 if (recper > (float)1.0 || recper < (float)0.01) { 22 throw new IOException("Invalid \"io.sort.record.percent\": " + recper); 23 } 24 if ((sortmb & 0x7FF) != sortmb) { 25 throw new IOException("Invalid \"io.sort.mb\": " + sortmb); 26 } 27 //3)sorter,使用其對map的輸出在partition內進行內排序。 28 sorter = ReflectionUtils.newInstance( 29 job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); 30 LOG.info("io.sort.mb = " + sortmb); 31 // buffers and accounting 32 //把單位是M的sortmb設定左移20,還原單位為個 33 int maxMemUsage = sortmb << 20; 34 int recordCapacity = (int)(maxMemUsage * recper); 35 recordCapacity -= recordCapacity % RECSIZE; 36 //輸出緩存 37 kvbuffer = new byte[maxMemUsage - recordCapacity]; 38 bufvoid = kvbuffer.length; 39 recordCapacity /= RECSIZE; 40 kvoffsets = new int[recordCapacity]; 41 kvindices = new int[recordCapacity * ACCTSIZE]; 42 softBufferLimit = (int)(kvbuffer.length * spillper); 43 softRecordLimit = (int)(kvoffsets.length * spillper); 44 LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length); 45 LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length); 46 // k/v serialization 47 comparator = job.getOutputKeyComparator(); 48 keyClass = (Class<K>)job.getMapOutputKeyClass(); 49 valClass = (Class<V>)job.getMapOutputValueClass(); 50 serializationFactory = new SerializationFactory(job); 51 keySerializer = serializationFactory.getSerializer(keyClass); 52 keySerializer.open(bb); 53 valSerializer = serializationFactory.getSerializer(valClass); 54 valSerializer.open(bb); 55 // counters 56 mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES); 57 mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS); 58 Counters.Counter combineInputCounter = 59 reporter.getCounter(COMBINE_INPUT_RECORDS); 60 combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS); 61 // 4)compression 62 if (job.getCompressMapOutput()) { 63 Class<? extends CompressionCodec> codecClass = 64 job.getMapOutputCompressorClass(DefaultCodec.class); 65 codec = ReflectionUtils.newInstance(codecClass, job); 66 } 67 // 5)combiner是一個NewCombinerRunner類型,調用Job的reducer來對map的輸出在map端進行combine。 68 combinerRunner = CombinerRunner.create(job, getTaskID(), 69 combineInputCounter, 70 reporter, null); 71 if (combinerRunner != null) { 72 combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter); 73 } else { 74 combineCollector = null; 75 } 76 //6)啟動一個SpillThread線程來 77 minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3); 78 spillThread.setDaemon(true); 79 spillThread.setName("SpillThread"); 80 spillLock.lock(); 81 try { 82 spillThread.start(); 83 while (!spillThreadRunning) { 84 spillDone.await(); 85 } 86 } catch (InterruptedException e) { 87 throw (IOException)new IOException("Spill thread failed to initialize" 88 ).initCause(sortSpillException); 89 } finally { 90 spillLock.unlock(); 91 } 92 if (sortSpillException != null) { 93 throw (IOException)new IOException("Spill thread failed to initialize" 94 ).initCause(sortSpillException); 95 } 96 }
7.MapOutputBuffer的collect方法。
參數partition是partitioner根據key計算得到的當前key value屬於的partition索引。寫key和value寫入緩存,當緩存滿足spill條件時,通過調用startSpill方法設置變量並通過spillReady.signal(),通知spillThread;並等待spill結束(通過spillDone.await()等待)緩沖區的作用是批量收集map結果,減少磁盤IO的影響。key/value對以及Partition的結果都會被寫入緩沖區。寫入之前,key與value值都會被序列化成字節數組。kvindices保持了記錄所屬的分區,key在緩沖區開始的位置和value在緩沖區開始的位置,通過kvindices,可以在緩沖區中找到對應的記錄。
輸出緩沖區中,和kvstart,kvend和kvindex對應的是bufstart,bufend和bufmark。這部分還涉及到變量bufvoid,用於表明實際使用的緩沖區結尾和變量bufmark,用於標記記錄的結尾。需要bufmark,是因為key或value的輸出是變長的。
[caption id="attachment_539" align="alignnone" width="739"] Key Value序列化后緩存[/caption]
public synchronized void collect(K key, V value, int partition ) throws IOException { reporter.progress(); if (key.getClass() != keyClass) { throw new IOException("Type mismatch in key from map: expected " + keyClass.getName() + ", recieved " + key.getClass().getName()); } if (value.getClass() != valClass) { throw new IOException("Type mismatch in value from map: expected " + valClass.getName() + ", recieved " + value.getClass().getName()); } //對kvoffsets的長度取模,暗示我們這是一個環形緩存。 final int kvnext = (kvindex + 1) % kvoffsets.length; //進入臨界區 spillLock.lock(); try { boolean kvfull; do { if (sortSpillException != null) { throw (IOException)new IOException("Spill failed" ).initCause(sortSpillException); } // sufficient acct space kvfull = kvnext == kvstart; final boolean kvsoftlimit = ((kvnext > kvend) ? kvnext - kvend > softRecordLimit : kvend - kvnext <= kvoffsets.length - softRecordLimit); if (kvstart == kvend && kvsoftlimit) { LOG.info("Spilling map output: record full = " + kvsoftlimit); //其實是設置變量並通過spillReady.signal(),通知spillThread;並等待spill結束 startSpill(); } if (kvfull) { try { while (kvstart != kvend) { //kvstart不等於kvend,表示系統正在spill,等待spillDone信號 reporter.progress(); spillDone.await(); } } catch (InterruptedException e) { throw (IOException)new IOException( "Collector interrupted while waiting for the writer" ).initCause(e); } } } while (kvfull); } finally { spillLock.unlock(); } try { //先對key串行化,然后對value做串行化,臨時變量keystart,valstart和valend分別記錄了key結果的開始位置,value結果的開始位置和value結果的結束位置。串行化過程中,往緩沖區寫是最終調用了Buffer.write方法 // serialize key bytes into buffer int keystart = bufindex; keySerializer.serialize(key); if (bufindex < keystart) { //如果key串行化后出現bufindex < keystart,那么會調用BlockingBuffer的reset方法。原因是在spill的過程中需要對<key,value>排序,這種情況下,傳遞給RawComparator的必須是連續的二進制緩沖區,通過BlockingBuffer.reset方法 會把bufvoid設置為bufmark,緩沖區開始部分往后挪,然后將原來位於bufmark到bufvoid出的結果,拷到緩沖區開始處,這樣的話,key串行化的結果就連續存放在緩沖區的最開始處。 bb.reset(); keystart = 0; } // serialize value bytes into buffer final int valstart = bufindex; valSerializer.serialize(value); int valend = bb.markRecord(); if (partition < 0 || partition >= partitions) { throw new IOException("Illegal partition for " + key + " (" + partition + ")"); } mapOutputRecordCounter.increment(1); mapOutputByteCounter.increment(valend >= keystart ? valend - keystart : (bufvoid - keystart) + valend); // update accounting info int ind = kvindex * ACCTSIZE; kvoffsets[kvindex] = ind; kvindices[ind + PARTITION] = partition; kvindices[ind + KEYSTART] = keystart; kvindices[ind + VALSTART] = valstart; kvindex = kvnext; } catch (MapBufferTooSmallException e) { LOG.info("Record too large for in-memory buffer: " + e.getMessage()); //如果value的串行化結果太大,不能一次放入緩沖區 spillSingleRecord(key, value, partition); mapOutputRecordCounter.increment(1); return; } }
8.MapOutputBuffer.BlockingBuffer的reset()方法.
如果key串行化后出現bufindex < keystart,那么會調用BlockingBuffer的reset方法。原因是在spill的過程中需要對<key,value>排序,這種情況下,傳遞給RawComparator的必須是連續的二進制緩沖區,通過BlockingBuffer.reset方法當發現key的串行化結果出現不連續的情況時,會把bufvoid設置為bufmark,緩沖區開始部分往后挪,然后將原來位於bufmark到bufvoid出的結果,拷到緩沖區開始處,這樣的話,key串行化的結果就連續存放在緩沖區的最開始處。
BlockingBuffer.reset方法
bufstart前面的緩沖區如果不能放下整個key串行化的結果,,處理的方式是將bufindex置0,然后調用BlockingBuffer內部的out的write方法直接輸出,這實際調用了Buffer.write方法,會啟動spill過程,最終會成功寫入key串行化的結果。
1 protected synchronized void reset() throws IOException { 2 3 int headbytelen = bufvoid - bufmark; 4 bufvoid = bufmark; 5 //當發現key的串行化結果出現不連續的情況時,會把bufvoid設置為bufmark,緩沖區開始部分往后挪,然后將原來位於bufmark到bufvoid出的結果,拷到緩沖區開始處,這樣的話,key串行化的結果就連續存放在緩沖區的最開始處。 6 7 if (bufindex + headbytelen < bufstart) { 8 System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex); 9 System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen); 10 bufindex += headbytelen; 11 } else { 12 //bufstart前面的緩沖區如果不能夠放下整個key串行化的結果,處理的方式是將bufindex置0,然后調用BlockingBuffer內部的out的write方法直接輸出 13 byte[] keytmp = new byte[bufindex]; 14 System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex); 15 bufindex = 0; 16 out.write(kvbuffer, bufmark, headbytelen); 17 out.write(keytmp); 18 } 19 } 20 }
9. MapOutputBuffer.Buffer的write方法。在key和value序列列化的時候,被調用寫到緩存中。如果spill線程正在把緩存的數據寫溢出文件,則阻塞。
1 public synchronized void write(byte b[], int off, int len) 2 throws IOException { 3 boolean buffull = false; 4 boolean wrap = false; 5 spillLock.lock(); 6 try { 7 do {//循環,直到有足夠的空間可以寫數據 8 if (sortSpillException != null) { 9 throw (IOException)new IOException("Spill failed" 10 ).initCause(sortSpillException); 11 } 12 13 // sufficient buffer space? 14 if (bufstart <= bufend && bufend <= bufindex) { 15 buffull = bufindex + len > bufvoid; 16 wrap = (bufvoid - bufindex) + bufstart > len; 17 } else { 18 // bufindex <= bufstart <= bufend 19 // bufend <= bufindex <= bufstart 20 wrap = false; 21 buffull = bufindex + len > bufstart; 22 } 23 24 if (kvstart == kvend) { 25 // spill thread not running 26 if (kvend != kvindex) { 27 //如果數組中有記錄(kvend != kvindex),那么,根據需要(目前輸出空間不足或記錄數達到spill條件)啟動spill過程 28 final boolean bufsoftlimit = (bufindex > bufend) 29 ? bufindex - bufend > softBufferLimit 30 : bufend - bufindex < bufvoid - softBufferLimit; 31 if (bufsoftlimit || (buffull && !wrap)) { 32 LOG.info("Spilling map output: buffer full= " + bufsoftlimit); 33 startSpill(); 34 } 35 } else if (buffull && !wrap) { 36 // 如果空間不夠(buffull && !wrap),但是緩存中沒有記錄,表明這個記錄非常大,內存緩沖區不能容下這么大的數據量,拋MapBufferTooSmallException異常,直接寫文件不用寫緩存 37 final int size = ((bufend <= bufindex) 38 ? bufindex - bufend 39 : (bufvoid - bufend) + bufindex) + len; 40 bufstart = bufend = bufindex = bufmark = 0; 41 kvstart = kvend = kvindex = 0; 42 bufvoid = kvbuffer.length; 43 throw new MapBufferTooSmallException(size + " bytes"); 44 } 45 } 46 47 if (buffull && !wrap) { 48 try { 49 //如果空間不足但是spill在運行,等待spillDone 50 while (kvstart != kvend) { 51 reporter.progress(); 52 spillDone.await(); 53 } 54 } catch (InterruptedException e) { 55 throw (IOException)new IOException( 56 "Buffer interrupted while waiting for the writer" 57 ).initCause(e); 58 } 59 } 60 } while (buffull && !wrap); 61 } finally { 62 spillLock.unlock(); 63 } 64 //真正把數據寫緩存的地方!如果buffull,則寫數據會不連續,則寫滿剩余緩沖區,然后設置bufindex=0,並從bufindex處接着寫。否則,就是從bufindex處開始寫。 65 if (buffull) { 66 //緩存剩余長度 67 final int gaplen = bufvoid - bufindex; 68 //把剩余的寫滿 69 System.arraycopy(b, off, kvbuffer, bufindex, gaplen); 70 //剩下長度 71 len -= gaplen; 72 //剩下偏移 73 off += gaplen; 74 //寫指針移到開頭 75 bufindex = 0; 76 } 77 從指定的開頭寫 78 System.arraycopy(b, off, kvbuffer, bufindex, len); 79 bufindex += len; 80 } 81 }
buffull和wrap條件說明
如圖,對bufful和wrap條件進行說明: 在上面兩種情況下,即情況1和情況2,
buffull = bufindex + len > bufvoid;
wrap = (bufvoid - bufindex) + bufstart > len;
buffull條件判斷為從下次寫指針的位置bufindex到緩存結束bufvoid的空間是否有足夠的空間容納寫的內容,wrap是圖中白顏色部分的空間(前后空白合在一起)是否比輸入大,如果是,wrap為true; 情況3和情況4中,
wrap = false; buffull = bufindex + len > bufstart;
buffull判斷bufindex到bufstart的空間是否滿足條件,而wrap肯定是false。 條件(buffull && !wrap)滿足時,目前的空間不夠一次寫。
10.MapOutputBuffer 的spillSingleRecord方法。如果在collect方法中處理緩存失敗,則直接把這條記錄些到spill文件中。對應單條記錄即使設置了combiner也不用。如果記錄非常大,內存緩沖區不能容下這么大的數據量,拋MapBufferTooSmallException異常,直接寫文件不用寫緩存。
private void spillSingleRecord(final K key, final V value, int partition) throws IOException { long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // 創建spill文件 final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, size); out = rfs.create(filename); IndexRecord rec = new IndexRecord(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (i == partition) { final long recordStart = out.getPos(); writer.append(key, value); mapOutputByteCounter.increment(out.getPos() - recordStart); } writer.close(); // 把偏移記錄在index中 rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } catch (IOException e) { if (null != writer) writer.close(); throw e; } } //如果index滿了,則把index也寫到index文件中。沒滿則把該條index記錄加入到indexCacheList中,並更新totalIndexCacheMemory。 if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( getTaskID(), numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } ++numSpills; } finally { if (out != null) out.close(); } }
11.MapOutputBuffer的startSpill。喚醒等待spillReady的線程。
1 private synchronized void startSpill() { 2 LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + 3 "; bufvoid = " + bufvoid); 4 LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex + 5 "; length = " + kvoffsets.length); 6 kvend = kvindex; 7 bufend = bufmark; 8 spillReady.signal(); 9 }
12.SpillThread的run方法。
該Thread會檢查內存中的輸出緩存區,在滿足一定條件的時候將緩沖區中的內容spill到硬盤上。這是一個標准的生產者-消費者模型,MapTask的collect方法是生產者,spillThread是消費者,它們之間同步是通過spillLock(ReentrantLock)和spillLock上的兩個條件變量(spillDone和spillReady)完成的。當kvstart == kvend條件成立時,表示沒有要spill的記錄。
1 public void run() { 2 //臨界區 3 spillLock.lock(); 4 spillThreadRunning = true; 5 try { 6 while (true) { 7 spillDone.signal(); 8 當kvstart == kvend條件成立時,表示沒有要spill的記錄 9 while (kvstart == kvend) { 10 spillReady.await(); 11 } 12 try { 13 spillLock.unlock(); 14 //執行操作 15 sortAndSpill(); 16 } catch (Exception e) { 17 sortSpillException = e; 18 } catch (Throwable t) { 19 sortSpillException = t; 20 String logMsg = "Task " + getTaskID() + " failed : " 21 + StringUtils.stringifyException(t); 22 reportFatalError(getTaskID(), t, logMsg); 23 } finally { 24 spillLock.lock(); 25 if (bufend < bufindex && bufindex < bufstart) { 26 bufvoid = kvbuffer.length; 27 } 28 kvstart = kvend; 29 bufstart = bufend; 30 } 31 } 32 } catch (InterruptedException e) { 33 Thread.currentThread().interrupt(); 34 } finally { 35 spillLock.unlock(); 36 spillThreadRunning = false; 37 } 38 } 39 }
13..MapOutputBuffer的sortAndSpill() 方法 SpillThread線程的run方法中調用sortAndSpill把緩存中的輸出寫到格式為+ "/spill" + spillNumber + ".out"的spill文件中。索引(kvindices)保持在spill{spill號}.out.index中,數據保存在spill{spill號}.out中 創建SpillRecord記錄,輸出文件和IndexRecord記錄,然后,需要在kvoffsets上做排序,排完序后順序訪問kvoffsets,也就是按partition順序訪問記錄。按partition循環處理排完序的數組,如果沒有combiner,則直接輸出記錄,否則,調用combineAndSpill,先做combin然后輸出。循環的最后記錄IndexRecord到SpillRecord。
1 private void sortAndSpill() throws IOException, ClassNotFoundException, 2 InterruptedException { 3 //approximate the length of the output file to be the length of the 4 //buffer + header lengths for the partitions 5 long size = (bufend >= bufstart 6 ? bufend - bufstart 7 : (bufvoid - bufend) + bufstart) + 8 partitions * APPROX_HEADER_LENGTH; 9 FSDataOutputStream out = null; 10 try { 11 // 創建溢出文件 12 final SpillRecord spillRec = new SpillRecord(partitions); 13 final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), 14 numSpills, size); 15 out = rfs.create(filename); 16 17 final int endPosition = (kvend > kvstart) 18 ? kvend 19 : kvoffsets.length + kvend; 20 //使用sorter進行排序, 在內存中進行,參照MapOutputBuffer的compare方法實現的這里的排序也是對序列化的字節做的排序。排序是在kvoffsets上面進行,參照MapOutputBuffer的swap方法實現。 21 sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); 22 int spindex = kvstart; 23 IndexRecord rec = new IndexRecord(); 24 InMemValBytes value = new InMemValBytes(); 25 for (int i = 0; i < partitions; ++i) { 26 IFile.Writer<K, V> writer = null; 27 try { 28 long segmentStart = out.getPos(); 29 writer = new Writer<K, V>(job, out, keyClass, valClass, codec, 30 spilledRecordsCounter); 31 if (combinerRunner == null) { 32 // 如果沒有combinner則直接寫鍵值 33 DataInputBuffer key = new DataInputBuffer(); 34 while (spindex < endPosition && 35 kvindices[kvoffsets[spindex % kvoffsets.length] 36 + PARTITION] == i) { 37 final int kvoff = kvoffsets[spindex % kvoffsets.length]; 38 getVBytesForOffset(kvoff, value); 39 key.reset(kvbuffer, kvindices[kvoff + KEYSTART], 40 (kvindices[kvoff + VALSTART] - 41 kvindices[kvoff + KEYSTART])); 42 //鍵值寫到溢出文件 43 writer.append(key, value); 44 ++spindex; 45 } 46 } else { 47 int spstart = spindex; 48 while (spindex < endPosition && 49 kvindices[kvoffsets[spindex % kvoffsets.length] 50 + PARTITION] == i) { 51 ++spindex; 52 } 53 //如果設置了combiner,則調用了combine方法后的結果寫到IFile中,writer還是先前的writer。減少溢寫到磁盤的數據量。 54 if (spstart != spindex) { 55 combineCollector.setWriter(writer); 56 RawKeyValueIterator kvIter = 57 new MRResultIterator(spstart, spindex); 58 combinerRunner.combine(kvIter, combineCollector); 59 } 60 } 61 62 // close the writer 63 writer.close(); 64 65 // record offsets 66 rec.startOffset = segmentStart; 67 rec.rawLength = writer.getRawLength(); 68 rec.partLength = writer.getCompressedLength(); 69 spillRec.putIndex(rec, i); 70 71 writer = null; 72 } finally { 73 if (null != writer) writer.close(); 74 } 75 } 76 77 if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { 78 // 寫溢出索引文件,格式如+ "/spill" + spillNumber + ".out.index" 79 Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( 80 getTaskID(), numSpills, 81 partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); 82 spillRec.writeToFile(indexFilename, job); 83 } else { 84 indexCacheList.add(spillRec); 85 totalIndexCacheMemory += 86 spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; 87 } 88 LOG.info("Finished spill " + numSpills); 89 ++numSpills; 90 } finally { 91 if (out != null) out.close(); 92 } 93 }
14 MapOutputBuffer的compare方法和swap方法 MapOutputBuffer實現了IndexedSortable接口,從接口命名上就可以猜想到,這個排序不是移動數據,而是移動數據的索引。在這里要排序的其實是kvindices對象,通過移動其記錄在kvoffets上的索引來實現。 如圖,表示了寫磁盤前Sort的效果。kvindices保持了記錄所屬的(Reduce)分區,key在緩沖區開始的位置和value在緩沖區開始的位置,通過kvindices,我們可以在緩沖區中找到對應的記錄。kvoffets用於在緩沖區滿的時候對kvindices的partition進行排序,排完序的結果將輸出到輸出到本地磁盤上,其中索引(kvindices)保持在spill{spill號}.out.index中,數據保存在spill{spill號}.out中。通過觀察MapOutputBuffer的compare知道,先是在partition上排序,然后是在key上排序。 kvindices在kvoffets上排序
-
1 public int compare(int i, int j) { 2 final int ii = kvoffsets[i % kvoffsets.length]; 3 final int ij = kvoffsets[j % kvoffsets.length]; 4 // 先在partition上排序 5 if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) { 6 return kvindices[ii + PARTITION] - kvindices[ij + PARTITION]; 7 } 8 // 然后在可以上排序 9 return comparator.compare(kvbuffer, 10 kvindices[ii + KEYSTART], 11 kvindices[ii + VALSTART] - kvindices[ii + KEYSTART], 12 kvbuffer, 13 kvindices[ij + KEYSTART], 14 kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]); 15 } 16 17 public void swap(int i, int j) { 18 i %= kvoffsets.length; 19 j %= kvoffsets.length; 20 //通過交互在kvoffsets上的索引達到排序效果 21 int tmp = kvoffsets[i]; 22 kvoffsets[i] = kvoffsets[j]; 23 kvoffsets[j] = tmp; 24 }
- 15. MapOutputBuffer的flush() 方法
Mapper的結果都已經collect了,需要對緩沖區做一些最后的清理,調用flush方法,合並spill{n}文件產生最后的輸出。先等待可能的spill過程完成,然后判斷緩沖區是否為空,如果不是,則調用sortAndSpill,做最后的spill,然后結束spill線程.
1 public synchronized void flush() throws IOException, ClassNotFoundException, 2 InterruptedException { 3 LOG.info("Starting flush of map output"); 4 spillLock.lock(); 5 try { 6 while (kvstart != kvend) { 7 reporter.progress(); 8 spillDone.await(); 9 } 10 if (sortSpillException != null) { 11 throw (IOException)new IOException("Spill failed" 12 ).initCause(sortSpillException); 13 } 14 if (kvend != kvindex) { 15 kvend = kvindex; 16 bufend = bufmark; 17 sortAndSpill(); 18 } 19 } catch (InterruptedException e) { 20 throw (IOException)new IOException( 21 "Buffer interrupted while waiting for the writer" 22 ).initCause(e); 23 } finally { 24 spillLock.unlock(); 25 } 26 assert !spillLock.isHeldByCurrentThread(); 27 28 try { 29 spillThread.interrupt(); 30 spillThread.join(); 31 } catch (InterruptedException e) { 32 throw (IOException)new IOException("Spill failed" 33 ).initCause(e); 34 } 35 // release sort buffer before the merge 36 kvbuffer = null; 37 mergeParts(); 38 }
16.MapTask.MapOutputBuffer的mergeParts()方法.
從不同溢寫文件中讀取出來的,然后再把這些值加起來。因為merge是將多個溢寫文件合並到一個文件,所以可能也有相同的key存在,在這個過程中如果配置設置過Combiner,也會使用Combiner來合並相同的key。?mapreduce讓每個map只輸出一個文件,並且為這個文件提供一個索引文件,以記錄每個reduce對應數據的偏移量。
1 private void mergeParts() throws IOException, InterruptedException, 2 ClassNotFoundException { 3 // get the approximate size of the final output/index files 4 long finalOutFileSize = 0; 5 long finalIndexFileSize = 0; 6 final Path[] filename = new Path[numSpills]; 7 final TaskAttemptID mapId = getTaskID(); 8 9 for(int i = 0; i < numSpills; i++) { 10 filename[i] = mapOutputFile.getSpillFile(mapId, i); 11 finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); 12 } 13 if (numSpills == 1) { //如果只有一個spill文件,則重命名為輸出的最終文件 14 rfs.rename(filename[0], 15 new Path(filename[0].getParent(), "file.out")); 16 if (indexCacheList.size() == 0) { 17 rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0), 18 new Path(filename[0].getParent(),"file.out.index")); 19 } else { 20 indexCacheList.get(0).writeToFile( 21 new Path(filename[0].getParent(),"file.out.index"), job); 22 } 23 return; 24 } 25 26 // read in paged indices 27 for (int i = indexCacheList.size(); i < numSpills; ++i) { 28 Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i); 29 indexCacheList.add(new SpillRecord(indexFileName, job)); 30 } 31 32 //make correction in the length to include the sequence file header 33 //lengths for each partition 34 finalOutFileSize += partitions * APPROX_HEADER_LENGTH; 35 finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; 36 Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId, 37 finalOutFileSize); 38 Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite( 39 mapId, finalIndexFileSize); 40 41 //The output stream for the final single output file 42 FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); 43 44 if (numSpills == 0) { 45 //如果沒有spill文件,則創建一個 dummy files 46 IndexRecord rec = new IndexRecord(); 47 SpillRecord sr = new SpillRecord(partitions); 48 try { 49 for (int i = 0; i < partitions; i++) { 50 long segmentStart = finalOut.getPos(); 51 Writer<K, V> writer = 52 new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null); 53 writer.close(); 54 rec.startOffset = segmentStart; 55 rec.rawLength = writer.getRawLength(); 56 rec.partLength = writer.getCompressedLength(); 57 sr.putIndex(rec, i); 58 } 59 sr.writeToFile(finalIndexFile, job); 60 } finally { 61 finalOut.close(); 62 } 63 return; 64 } 65 { 66 IndexRecord rec = new IndexRecord(); 67 final SpillRecord spillRec = new SpillRecord(partitions); 68 for (int parts = 0; parts < partitions; parts++) { 69 //在循環內對每個分區分別創建segment然后做merge 70 List<Segment<K,V>> segmentList = 71 new ArrayList<Segment<K, V>>(numSpills); 72 for(int i = 0; i < numSpills; i++) { 73 IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); 74 75 Segment<K,V> s = 76 new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset, 77 indexRecord.partLength, codec, true); 78 segmentList.add(i, s); 79 80 if (LOG.isDebugEnabled()) { 81 LOG.debug("MapId=" + mapId + " Reducer=" + parts + 82 "Spill =" + i + "(" + indexRecord.startOffset + "," + 83 indexRecord.rawLength + ", " + indexRecord.partLength + ")"); 84 } 85 } 86 87 //merge 88 @SuppressWarnings("unchecked") 89 RawKeyValueIterator kvIter = Merger.merge(job, rfs, 90 keyClass, valClass, codec, 91 segmentList, job.getInt("io.sort.factor", 100), 92 new Path(mapId.toString()), 93 job.getOutputKeyComparator(), reporter, 94 null, spilledRecordsCounter); 95 96 //write merged output to disk 97 //執行merge,並且把merge結果寫到"/file.out"的最終輸出中去。 98 long segmentStart = finalOut.getPos(); 99 Writer<K, V> writer = 100 new Writer<K, V>(job, finalOut, keyClass, valClass, codec, 101 spilledRecordsCounter); 102 if (combinerRunner == null || numSpills < minSpillsForCombine) { 103 Merger.writeFile(kvIter, writer, reporter, job); 104 } else { 105 combineCollector.setWriter(writer); 106 combinerRunner.combine(kvIter, combineCollector); 107 } 108 109 //close 110 writer.close(); 111 112 // record offsets 113 //把index寫到最終的"/file.out.index"文件中。 114 rec.startOffset = segmentStart; 115 rec.rawLength = writer.getRawLength(); 116 rec.partLength = writer.getCompressedLength(); 117 spillRec.putIndex(rec, parts); 118 } 119 spillRec.writeToFile(finalIndexFile, job); 120 finalOut.close(); 121 for(int i = 0; i < numSpills; i++) { 122 rfs.delete(filename[i],true); 123 } 124 } 125 }
合並前后index文件和spill文件的結構圖
merge最終生成一個spill.out和spill.out.index文件
從前面的分析指導,多個partition的都在一個輸出文件中,但是按照partition排序的。即把maper輸出按照partition分段了。一個partition對應一個reducer,因此一個reducer只要獲取一段即可。
完。
參考:
參考並補充了http://caibinbupt.iteye.com/blog/401374文章中關於內存中索引結構的分析。謝謝。
為了維護文章的版本一致、最新、可追溯,轉載請注明: 轉載自idouba本文鏈接地址: 【hadoop代碼筆記】Mapreduce shuffle過程之Map輸出過程