[大牛翻譯系列]Hadoop(13)MapReduce 性能調優:優化洗牌(shuffle)和排序階段


6.4.3 優化洗牌(shuffle)和排序階段

洗牌和排序階段都很耗費資源。洗牌需要在map和reduce任務之間傳輸數據,會導致過大的網絡消耗。排序和合並操作的消耗也是很顯著的。這一節將介紹一系列的技術來緩解洗牌和排序階段的消耗。

 

技術46 規避使用reduce

Reduce在用於連接數據集的時候將會產生大量的網絡消耗。

 

問題

需要考慮在MapReduce規避reduce的使用。


方案

通過將MapReduce參數setNumReduceTasks設置為0來創建一個只有map的作業。


討論

洗牌和排序階段一般都是用來連接數據集。但連接操作並不一定需要洗牌和排序,正如第4章中所介紹的。滿足一定條件的連接可以只在map端運行。那么就只需要只有map的作業了。設置只有map的作業的命令如下。

 

job.setNumReduceTasks(0);


小結

一個只有map的作業的OutputFormat是和普通作業中reduce的OutputFormat一樣。如圖6.39所示。

 

 

如果無法規避reduce,那么就要盡量減小它對你的作業執行時間的影響。

 

技術47 過濾和投影

Map到Reduce之間傳輸數據要通過網絡,這個成本很高。


問題

需要減少被洗牌的數據。


方案

減少map輸出的每條記錄的大小,並盡可能地減少map輸出的數據量。


討論

過濾和投影是關系運算中的概念,用以減少需要處理的數據。這些概念也可以用到MapReduce中減少map任務需要輸出的數據。以下是過濾和投影的簡明定義:

  • 過濾是減少map輸出的數據量。
  • 投影是減少map輸出的每條記錄的大小。

以下是上述概念的演示代碼:

 

 1 Text outputKey = new Text();
 2 Text outputValue = new Text();
 3 
 4 @Override
 5 public void map(LongWritable key, Text value,
 6                 OutputCollector<Text, Text> output,
 7                 Reporter reporter) throws IOException {
 8                 
 9     String v = value.toString();
10     
11     if (!v.startsWith("10.")) {
12         String[] parts = StringUtils.split(v, ".", 3);
13         outputKey.set(parts[0]);
14         outputValue.set(parts[1]);
15         output.collect(outputKey, outputValue);
16     }
17 }


小結

過濾和投影是在需要顯著減少MapReduce作業運行時間時最容易的方法中的兩種。

如果已經應用了這兩種方法,但還需要進一步減少運行時間。那么就可以考慮combine。

 

技術48 使用combine

Combine可以在map階段進行聚合操作來減少需要發送到reduce的數據。它是一個map端的優化工具,以map的輸出作為輸入。


問題

需要在過濾和投影后進一步減少運行時間。


方案

定義一個combine。在作業代碼中使用setCombinerClass來調用它。


討論

在map輸出數據到磁盤的過程中,有兩個子過程:溢灑(spill)子過程,合並子過程。Combine在這兩個子過程中都會被調用,如圖6.40所示。為了讓combine在分組數據中效率最大,可以在兩個子過程調用combine之前進行初步(precursory)的排序。

 

 

與設置map類類似,作業使用setCombinClass來設置combine。

 

job.setCombinerClass(Combine.class);

 

Combine的實現必須嚴格遵從reduce的規格說明。這里將假定使用技術39種的map。將map的輸出中的記錄按照下述條件合並:第二個八進制數相同。代碼如下。

 

 1 public static class Combine implements Reducer<Text, Text, Text, Text> {
 2     
 3     @Override
 4     public void reduce(Text key, Iterator<Text> values,
 5                         OutputCollector<Text,
 6                         Text> output,
 7                         Reporter reporter) throws IOException {
 8                         
 9         Text prev = null;
10         while (values.hasNext()) {
11             Text t = values.next();
12             if (!t.equals(prev)) {
13                 output.collect(key, t);
14             }
15             prev = ReflectionUtils.copy(job, t, prev);
16         }
17     }
18 }

 

Combine函數必須是可分布的(distributive)。如圖6.40(在前面)所示,combine要被調用多次處理多個具有相同輸入鍵的記錄。這些記錄的順序是不可預測的。可分布函數是指,不論輸入數據的順序如何,最終的結果都一樣。


小結

在MapReduce中combine非常有用,它能夠減少map和reduce之間的網絡傳輸數據和網絡負載。下一個減少執行時間的有用工具就是二進制比較器。

 

技術49 用Comparator進行超快排序

MapReduce默認使用RawComparator對map的輸出鍵進行比較排序。內置的Writable類(例如Text和IntWritable)是字節級實現。這樣不用將字節形式的類解排列(unmarshal)成類對象。如果要通過WritableComparable實現自定義Writable,就有可能延長洗牌和排序階段的時間,因為它需要進行解排列。


問題

存在自定義的Writable。需要減少作業的排序時間。


方案

實現字節級的Comparator來優化排序中的比較過程。

 

討論

在MapReduce中很多階段,排序是通過比較輸出鍵來進行的。為了加快鍵排序,所有的map輸出鍵必須實現WritableComparable接口。

 

1 public interface WritableComparable<T> extends Writable, Comparable<T> {
2 
3 }

 

如果對4.2.1中的Person類進行改造,實現代碼如下。

 

 1 public class Person implements WritableComparable<Person> {
 2     private String firstName;
 3     private String lastName;
 4     
 5     @Override
 6     public int compareTo(Person other) {
 7         int cmp = this.lastName.compareTo(other.lastName);
 8         if (cmp != 0) {
 9             return cmp;
10         }
11         return this.firstName.compareTo(other.firstName);
12     }
13 ...

 

這個Comparator的問題在於,如果要進行比較,就需要將字節形式的map的中間結果數據解排列成Writable形式。解排列要重新創建對象,因此成本很高。

Hadoop中的自帶的各種Writable類不但擴展了WritableComparable接口,也提供了基於WritableComparator類的自定義Comparator。代碼如下。

 

 1 public class WritableComparator implements RawComparator {
 2 
 3     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
 4     
 5         try {
 6             buffer.reset(b1, s1, l1);
 7             key1.readFields(buffer);
 8 
 9             buffer.reset(b2, s2, l2);
10             key2.readFields(buffer);
11         } catch (IOException e) {
12             throw new RuntimeException(e);
13         }
14         return compare(key1, key2);
15     }
16     
17     /** Compare two WritableComparables.
18     *
19     * <p> The default implementation uses the natural ordering,
20     * calling {@link
21     * Comparable#compareTo(Object)}. */
22     @SuppressWarnings("unchecked")
23     public int compare(WritableComparable a, WritableComparable b) {
24         return a.compareTo(b);
25     }
26     ...
27 }

 

要實現字節級的Comparator,需要重載compare方法。這里先學習一下IntWritable類如何實現這個方法。

 

 1 public class IntWritable implements WritableComparable {
 2 
 3     public static class Comparator extends WritableComparator {
 4     
 5         public Comparator() {
 6             super(IntWritable.class);
 7         }
 8         
 9         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
10             int thisValue = readInt(b1, s1);
11             int thatValue = readInt(b2, s2);
12             return (thisValue<thatValue ? -1 :
13             (thisValue==thatValue ? 0 : 1));
14         }
15     }
16     
17     static {
18         WritableComparator.define(IntWritable.class, new Comparator());
19     }

 

如果只使用內置的Writable,那就沒有必要實現WritableComparator。它們都自帶。如果需要使用自定義的Writable作為輸出鍵,那么就需要自定義WritableComparator。這里基於前述Person類來說明如何實現。

在Person類中,有兩個字符串類屬性,firstName和lastName。使用writeUTF方法通過DataOutput輸出它們。以下是實現代碼。

 

1 private String firstName;
2 private String lastName;
3 
4 @Override
5 public void write(DataOutput out) throws IOException {
6     out.writeUTF(lastName);
7     out.writeUTF(firstName);
8 }

 

首先需要理解Person對象是如何用字節形式表示的。writeUTF方法輸出了字節長度(2個字節),字符內容(字符的長度,L1個字節)。如圖6.41描述了字節是如何排列的。

 

 

假設需要對lastName和firstName進行字典式地比較(譯注:就是看字典中的先后順序)。顯然不能直接用整個字節數組,因為其中還有字符長度。那么Comparator就需要足夠聰明到能夠跳過字符長度。以下是實現代碼。

 

 1 @Override
 2 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
 3 
 4     int lastNameResult = compare(b1, s1, b2, s2);
 5     if (lastNameResult != 0) {
 6         return lastNameResult;
 7     }
 8     int b1l1 = readUnsignedShort(b1, s1);
 9     int b2l1 = readUnsignedShort(b2, s2);
10     return compare(b1, s1 + b1l1 + 2, b2, s2 + b2l1 + 2);
11 }
12 
13 public static int compare(byte[] b1, int s1, byte[] b2, int s2) {
14     int b1l1 = readUnsignedShort(b1, s1);
15     int b2l1 = readUnsignedShort(b2, s2);
16     return compareBytes(b1, s1 + 2, b1l1, b2, s2 + 2, b2l1);
17 }
18 
19 public static int readUnsignedShort(byte[] b, int offset) {
20     int ch1 = b[offset];
21     int ch2 = b[offset + 1];
22     return (ch1 << 8) + (ch2);
23 }

 

小結

 writeUTF只支持小於65536字符的字符串類。對於人名來說,是足夠了。大點的,可能就不行。這個時候就需要使用Hadoop的Text類來支持更大的字符串。Text類中的Comparator類的二進制字符串比較器的實現機制和剛才介紹的大致相當。(這個修飾真長。)那么針對Text類的lastName和firstName的Comparator的實現方式也會累死。

下一節將介紹如何減小數據傾斜的影響。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM