6.4.3 優化洗牌(shuffle)和排序階段
洗牌和排序階段都很耗費資源。洗牌需要在map和reduce任務之間傳輸數據,會導致過大的網絡消耗。排序和合並操作的消耗也是很顯著的。這一節將介紹一系列的技術來緩解洗牌和排序階段的消耗。
技術46 規避使用reduce
Reduce在用於連接數據集的時候將會產生大量的網絡消耗。
問題
需要考慮在MapReduce規避reduce的使用。
方案
通過將MapReduce參數setNumReduceTasks設置為0來創建一個只有map的作業。
討論
洗牌和排序階段一般都是用來連接數據集。但連接操作並不一定需要洗牌和排序,正如第4章中所介紹的。滿足一定條件的連接可以只在map端運行。那么就只需要只有map的作業了。設置只有map的作業的命令如下。
job.setNumReduceTasks(0);
小結
一個只有map的作業的OutputFormat是和普通作業中reduce的OutputFormat一樣。如圖6.39所示。
如果無法規避reduce,那么就要盡量減小它對你的作業執行時間的影響。
技術47 過濾和投影
Map到Reduce之間傳輸數據要通過網絡,這個成本很高。
問題
需要減少被洗牌的數據。
方案
減少map輸出的每條記錄的大小,並盡可能地減少map輸出的數據量。
討論
過濾和投影是關系運算中的概念,用以減少需要處理的數據。這些概念也可以用到MapReduce中減少map任務需要輸出的數據。以下是過濾和投影的簡明定義:
- 過濾是減少map輸出的數據量。
- 投影是減少map輸出的每條記錄的大小。
以下是上述概念的演示代碼:
1 Text outputKey = new Text(); 2 Text outputValue = new Text(); 3 4 @Override 5 public void map(LongWritable key, Text value, 6 OutputCollector<Text, Text> output, 7 Reporter reporter) throws IOException { 8 9 String v = value.toString(); 10 11 if (!v.startsWith("10.")) { 12 String[] parts = StringUtils.split(v, ".", 3); 13 outputKey.set(parts[0]); 14 outputValue.set(parts[1]); 15 output.collect(outputKey, outputValue); 16 } 17 }
小結
過濾和投影是在需要顯著減少MapReduce作業運行時間時最容易的方法中的兩種。
如果已經應用了這兩種方法,但還需要進一步減少運行時間。那么就可以考慮combine。
技術48 使用combine
Combine可以在map階段進行聚合操作來減少需要發送到reduce的數據。它是一個map端的優化工具,以map的輸出作為輸入。
問題
需要在過濾和投影后進一步減少運行時間。
方案
定義一個combine。在作業代碼中使用setCombinerClass來調用它。
討論
在map輸出數據到磁盤的過程中,有兩個子過程:溢灑(spill)子過程,合並子過程。Combine在這兩個子過程中都會被調用,如圖6.40所示。為了讓combine在分組數據中效率最大,可以在兩個子過程調用combine之前進行初步(precursory)的排序。
與設置map類類似,作業使用setCombinClass來設置combine。
job.setCombinerClass(Combine.class);
Combine的實現必須嚴格遵從reduce的規格說明。這里將假定使用技術39種的map。將map的輸出中的記錄按照下述條件合並:第二個八進制數相同。代碼如下。
1 public static class Combine implements Reducer<Text, Text, Text, Text> { 2 3 @Override 4 public void reduce(Text key, Iterator<Text> values, 5 OutputCollector<Text, 6 Text> output, 7 Reporter reporter) throws IOException { 8 9 Text prev = null; 10 while (values.hasNext()) { 11 Text t = values.next(); 12 if (!t.equals(prev)) { 13 output.collect(key, t); 14 } 15 prev = ReflectionUtils.copy(job, t, prev); 16 } 17 } 18 }
Combine函數必須是可分布的(distributive)。如圖6.40(在前面)所示,combine要被調用多次處理多個具有相同輸入鍵的記錄。這些記錄的順序是不可預測的。可分布函數是指,不論輸入數據的順序如何,最終的結果都一樣。
小結
在MapReduce中combine非常有用,它能夠減少map和reduce之間的網絡傳輸數據和網絡負載。下一個減少執行時間的有用工具就是二進制比較器。
技術49 用Comparator進行超快排序
MapReduce默認使用RawComparator對map的輸出鍵進行比較排序。內置的Writable類(例如Text和IntWritable)是字節級實現。這樣不用將字節形式的類解排列(unmarshal)成類對象。如果要通過WritableComparable實現自定義Writable,就有可能延長洗牌和排序階段的時間,因為它需要進行解排列。
問題
存在自定義的Writable。需要減少作業的排序時間。
方案
實現字節級的Comparator來優化排序中的比較過程。
討論
在MapReduce中很多階段,排序是通過比較輸出鍵來進行的。為了加快鍵排序,所有的map輸出鍵必須實現WritableComparable接口。
1 public interface WritableComparable<T> extends Writable, Comparable<T> { 2 3 }
如果對4.2.1中的Person類進行改造,實現代碼如下。
1 public class Person implements WritableComparable<Person> { 2 private String firstName; 3 private String lastName; 4 5 @Override 6 public int compareTo(Person other) { 7 int cmp = this.lastName.compareTo(other.lastName); 8 if (cmp != 0) { 9 return cmp; 10 } 11 return this.firstName.compareTo(other.firstName); 12 } 13 ...
這個Comparator的問題在於,如果要進行比較,就需要將字節形式的map的中間結果數據解排列成Writable形式。解排列要重新創建對象,因此成本很高。
Hadoop中的自帶的各種Writable類不但擴展了WritableComparable接口,也提供了基於WritableComparator類的自定義Comparator。代碼如下。
1 public class WritableComparator implements RawComparator { 2 3 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 4 5 try { 6 buffer.reset(b1, s1, l1); 7 key1.readFields(buffer); 8 9 buffer.reset(b2, s2, l2); 10 key2.readFields(buffer); 11 } catch (IOException e) { 12 throw new RuntimeException(e); 13 } 14 return compare(key1, key2); 15 } 16 17 /** Compare two WritableComparables. 18 * 19 * <p> The default implementation uses the natural ordering, 20 * calling {@link 21 * Comparable#compareTo(Object)}. */ 22 @SuppressWarnings("unchecked") 23 public int compare(WritableComparable a, WritableComparable b) { 24 return a.compareTo(b); 25 } 26 ... 27 }
要實現字節級的Comparator,需要重載compare方法。這里先學習一下IntWritable類如何實現這個方法。
1 public class IntWritable implements WritableComparable { 2 3 public static class Comparator extends WritableComparator { 4 5 public Comparator() { 6 super(IntWritable.class); 7 } 8 9 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 10 int thisValue = readInt(b1, s1); 11 int thatValue = readInt(b2, s2); 12 return (thisValue<thatValue ? -1 : 13 (thisValue==thatValue ? 0 : 1)); 14 } 15 } 16 17 static { 18 WritableComparator.define(IntWritable.class, new Comparator()); 19 }
如果只使用內置的Writable,那就沒有必要實現WritableComparator。它們都自帶。如果需要使用自定義的Writable作為輸出鍵,那么就需要自定義WritableComparator。這里基於前述Person類來說明如何實現。
在Person類中,有兩個字符串類屬性,firstName和lastName。使用writeUTF方法通過DataOutput輸出它們。以下是實現代碼。
1 private String firstName; 2 private String lastName; 3 4 @Override 5 public void write(DataOutput out) throws IOException { 6 out.writeUTF(lastName); 7 out.writeUTF(firstName); 8 }
首先需要理解Person對象是如何用字節形式表示的。writeUTF方法輸出了字節長度(2個字節),字符內容(字符的長度,L1個字節)。如圖6.41描述了字節是如何排列的。
假設需要對lastName和firstName進行字典式地比較(譯注:就是看字典中的先后順序)。顯然不能直接用整個字節數組,因為其中還有字符長度。那么Comparator就需要足夠聰明到能夠跳過字符長度。以下是實現代碼。
1 @Override 2 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 3 4 int lastNameResult = compare(b1, s1, b2, s2); 5 if (lastNameResult != 0) { 6 return lastNameResult; 7 } 8 int b1l1 = readUnsignedShort(b1, s1); 9 int b2l1 = readUnsignedShort(b2, s2); 10 return compare(b1, s1 + b1l1 + 2, b2, s2 + b2l1 + 2); 11 } 12 13 public static int compare(byte[] b1, int s1, byte[] b2, int s2) { 14 int b1l1 = readUnsignedShort(b1, s1); 15 int b2l1 = readUnsignedShort(b2, s2); 16 return compareBytes(b1, s1 + 2, b1l1, b2, s2 + 2, b2l1); 17 } 18 19 public static int readUnsignedShort(byte[] b, int offset) { 20 int ch1 = b[offset]; 21 int ch2 = b[offset + 1]; 22 return (ch1 << 8) + (ch2); 23 }
小結
writeUTF只支持小於65536字符的字符串類。對於人名來說,是足夠了。大點的,可能就不行。這個時候就需要使用Hadoop的Text類來支持更大的字符串。Text類中的Comparator類的二進制字符串比較器的實現機制和剛才介紹的大致相當。(這個修飾真長。)那么針對Text類的lastName和firstName的Comparator的實現方式也會累死。
下一節將介紹如何減小數據傾斜的影響。