4.2 排序(SORT)
在MapReduce中,排序的目的有兩個:
- MapReduce可以通過排序將Map輸出的鍵分組。然后每組鍵調用一次reduce。
- 在某些需要排序的特定場景中,用戶可以將作業(job)的全部輸出進行總體排序。
例如:需要了解前N個最受歡迎的用戶或網頁的數據分析工作。
在這一節中,有兩個場景需要對MapReduce的排序行為進行優化。
- 次排序(Secondary sort)
- 總排序(Total order sorting)
次排序可以根據reduce的鍵對它的值進行排序。如果要求一些數據先於另外一些數據到達reduce,次排序就很有用。(這一章在講解優化過的重分區連接中也提到了這樣的場景。)另一個場景中,需要將作業的輸出根據兩個鍵進行排序,一個鍵的優先級高於另外一個鍵(secondary key)。這個場景也可以用到次排序。例如:將股票數據先根據股票標志進行主排序(primary sort),然后根據股票配額進行次排序。本書很多技術中將會運用次排序,如重分區連接的優化,朋友圖算法等。
這一節第二部分中,將探討對reduce的輸出的全部數據進行總體排序。這在分析數據集中的前N個元素或后N個元素時會比較有用。
4.2.1 次排序(Secondary sort)
在前一節(MapReduce連接)中,次排序用於使一部分數據先於另外一部分到達reduce。作為基礎知識,學習次排序前需要了解MapReduce中的數據整理和數據流。圖4.12說明了三個影響數據整理和數據流(分區,排序,分組)的元素,並且說明了這些元素如何整合到MapReduce中。
在map輸出收集(output collection)階段,由分區器(Partitioner)選擇哪個reduce應該接收map的輸出。map輸出的各個分區的數據,由RawComparator進行排序。Reduce端也用RawComparator進行排序。然后,由RawComparator對排序好的數據進行分組。
技術21 實現次排序
對於某個map的鍵的所有值,如果需要其中一部分值先於另外一部分值到達reduce,就可以用到次排序。次排序還用在了本書的第7章中的朋友圖算法,和經過優化的重分區排序中。
問題
在發送給某個reduce的數據中,需要對某個自然鍵(natural key)的值進行排序。
方案
這個技術中將應用到自定義分區類,排序比較類(sort comparator),分組比較類(grouping comparator)。這些是實現次排序的基礎。
討論
在這個技術中,使用次排序來對人的名字進行排序。具體步驟是:先用主排序對人的姓排序,再用次排序對人的名字排序。
次排序需要在map函數中生成組合鍵(composite key)作為輸出鍵。
組合輸出鍵包括兩個部分:
- 自然鍵,用於連接。
- 次鍵(secondary key),用於對隸屬於自然鍵的值進行排序。排序后的結果將被發送給reduce。
圖4.13說明了組合鍵的構成。它還包括了一個用於reduce端的組合值(composite value)。組合值讓reduce可以訪問次鍵。
在介紹了組合鍵類之后,接下來具體說明分區,排序和分組階段以及他們的實現。
組合鍵(COMPOSITE KEY)
組合鍵包括姓氏和名字。它擴展了WritableComparable。WritableComparable被推薦用於map函數輸出鍵的Writable類。
1 public class Person implements WritableComparable<Person> { 2 3 private String firstName; 4 private String lastName; 5 6 @Override 7 public void readFields(DataInput in) throws IOException { 8 this.firstName = in.readUTF(); 9 this.lastName = in.readUTF(); 10 } 11 12 @Override 13 public void write(DataOutput out) throws IOException { 14 out.writeUTF(firstName); 15 out.writeUTF(lastName); 16 } 17 ...
圖4.14說明了分區,排序和分組的類的名字和方法的設置。同時還有各個類如何使用組合鍵。
接下來是對其它類的實現代碼的介紹。
分區器(PARTITIONER)
分區器用來決定map的輸出值應該分配到哪個reduce。MapReduce的默認分區器(HashPartitioner)調用輸出鍵的hashCode方法,然后用hashCode方法的結果對reduce的數量進行一個模數(modulo)運算,最后得到那個目標reduce。默認的分區器使用整個鍵。這就不適於組合鍵了。因為它可能把有同樣自然鍵的組合鍵發送給不同的reduce。因此,就需要自定義分區器,基於自然鍵進行分區。
以下代碼實現了分區器的接口。getPartition方法的輸入參數有key,value和分區的數量:
1 public interface Partitioner<K2, V2> extends JobConfigurable { 2 int getPartition(K2 key, V2 value, int numPartitions); 3 }
自定義的分區器將基於Person類中的姓計算哈希值,然后將這個哈希值對分區的數量進行模運算。在這里,分區的數量就是reduce的數量:
1 public class PersonNamePartitioner extends Partitioner<Person, Text> { 2 3 @Override 4 public int getPartition(Person key, Text value, int numPartitions) { 5 return Math.abs(key.getLastName().hashCode() * 127) % numPartitions; 6 } 7 8 }
排序(SORTING)
Map端和reduce端都要進行排序。Map端排序的目的是讓reduce端的排序更加高效。這里將讓MapReduce使用組合鍵的所有值進行排序,也就是基於姓氏和名字。
在下列例子中實現了WritableComparator。WritableComparator比較用戶的姓氏和名字。
1 public class PersonComparator extends WritableComparator { 2 3 protected PersonComparator() { 4 super(Person.class, true); 5 } 6 7 @Override 8 public int compare(WritableComparable w1, WritableComparable w2) { 9 10 Person p1 = (Person) w1; 11 Person p2 = (Person) w2; 12 13 int cmp = p1.getLastName().compareTo(p2.getLastName()); 14 15 if (cmp != 0) { 16 return cmp; 17 } 18 19 return p1.getFirstName().compareTo(p2.getFirstName()); 20 } 21 }
分組(GROUPING)
當reduce階段將在本地磁盤上的map輸出的記錄進行流化處理(streaming)的時候,需要要進行分組。在分組中,記錄將被按一定方式排成一個有邏輯順序的流,並被傳輸給reduce。
在分組階段,所有的記錄已經經過了次排序。分組比較器需要將有相同姓氏的記錄分在同一個組。下面是分組比較器的實現:
1 public class PersonNameComparator extends WritableComparator { 2 3 protected PersonNameComparator() { 4 super(Person.class, true); 5 } 6 7 @Override 8 public int compare(WritableComparable o1, WritableComparable o2) { 9 Person p1 = (Person) o1; 10 Person p2 = (Person) o2; 11 return p1.getLastName().compareTo(p2.getLastName()); 12 } 13 }
MAPREDUCE
最后一步是告訴MapReduce使用自定義的分區器類,排序比較器類和分組比較器類:
1 job.setPartitionerClass(PersonNamePartitioner.class); 2 job.setSortComparatorClass(PersonComparator.class); 3 job.setGroupingComparatorClass(PersonNameComparator.class);
然后需要實現map和reduce代碼。Map類創建具有姓和名的組合鍵,然后將它作為輸出鍵。將名字作為輸出值。
Reduce類的輸出和輸入一樣:
1 public static class Map extends Mapper<Text, Text, Person, Text> { 2 3 private Person outputKey = new Person(); 4 5 @Override 6 protected void map(Text lastName, Text firstName, Context context) 7 throws IOException, InterruptedException { 8 9 outputKey.set(lastName.toString(), firstName.toString()); 10 context.write(outputKey, firstName); 11 12 } 13 } 14 15 public static class Reduce extends Reducer<Person, Text, Text, Text> { 16 17 Text lastName = new Text(); 18 19 @Override 20 public void reduce(Person key, Iterable<Text> values, Context context) 21 throws IOException, InterruptedException { 22 23 lastName.set(key.getLastName()); 24 25 for (Text firstName : values) { 26 context.write(lastName, firstName); 27 } 28 } 29 }
上傳一個包含了亂序的名字的小文件,並測試次排序是否能夠生成已經根據名字排序好的結果:
$ hadoop fs -put test-data/ch4/usernames.txt . $ hadoop fs -cat usernames.txt Smith John Smith Anne Smith Ken $ bin/run.sh com.manning.hip.ch4.sort.secondary.SortMapReduce usernames.txt output $ hadoop fs -cat output/part* Smith Anne Smith John Smith Ken
上面的結果和期望一致。
小結
這一節展示了MapReduce中如何使用次排序。下一部分介紹如何將多個reduce的結果做總體排序。