[大牛翻譯系列]Hadoop(7)MapReduce:抽樣(Sampling)


4.3 抽樣(Sampling)

用基於MapReduce的程序來處理TB級的數據集,要花費的時間可能是數以小時計。僅僅是優化代碼是很難達到良好的效果。

在開發和調試代碼的時候,沒有必要處理整個數據集。但如果在這種情況下要保證數據集能夠被正確地處理,就需要用到抽樣了。抽樣是統計學中的一個方法。它通過一定的過程從整個數據中抽取出一個子數據集。這個子數據集能夠代表整體數據集的數據分布狀況。在MapReduce中,開發人員可以只針對這個子數據集進行開發調試,極大減小了系統負擔,提高了開發效率。

 

技術23 水塘抽樣(Reservoir sampling)

假設如下場景:在開發一個MapReduce作業的時候,需要反復不斷地去測試一個超大數據集。當然,處理這個數據集很費時間,想要快速開發幾乎不可能。

 

問題

在開發MapReduce作業的時候,如何能夠只用處理超大數據集的一個小小的子集?

 

方案

在讀取數據的那部分,自定義一個InputFormat來封裝默認的InputFormat。在自定義的InputFormat中,將從默認的InputFormat中得到的數據按一定比例進行抽樣。

 

討論
由於水塘抽樣可以從數據流中隨機采樣,它就特別適合於MapReduce。在MapReduce中,數據源的形式就是數據流。圖4.16說明了水塘抽樣的算法。

 

 

這里需要實現ReservoirSamplerRecordReader類來封裝默認的InputFormat類和RecordReader類。InputFormat類的作用是對輸入進行分塊。RecordReader類的作用是讀取記錄。抽樣功能則在ReservoirSamplerRecordReader類中實現。圖4.17說明了ReservoirSamplerRecordReader類的工作機制。

 

 

以下是ReservoirSamplerRecordReader類的實現代碼:

 

 1 public static class ReservoirSamplerRecordReader<K extends Writable, V extends Writable> extends RecordReader {
 2 
 3     private final RecordReader<K, V> rr;
 4     private final int numSamples;
 5     private final int maxRecords;
 6     private final ArrayList<K> keys;
 7     private final ArrayList<V> values;
 8     
 9     @Override
10     public void initialize(InputSplit split,TaskAttemptContext context)
11         throws IOException, InterruptedException {
12         
13         rr.initialize(split, context);
14         Random rand = new Random();
15         
16         for (int i = 0; i < maxRecords; i++) {
17             if (!rr.nextKeyValue()) {
18                 break;
19             }
20             
21             K key = rr.getCurrentKey();
22             V val = rr.getCurrentValue();
23             
24             if (keys.size() < numSamples) {
25                 keys.add(WritableUtils.clone(key, conf));
26                 values.add(WritableUtils.clone(val, conf));
27             } else {
28                 int r = rand.nextInt(i);
29                 if (r < numSamples) {
30                     keys.set(r, WritableUtils.clone(key, conf));
31                     values.set(r, WritableUtils.clone(val, conf));
32                 }
33             }
34         }
35     }
36 ...


在使用ReservoirSamplerInputFormat類的時候,需要設置的參數包括InputFormat等。以下是設置代碼:

 

1 ReservoirSamplerInputFormat.setInputFormat(job,TextInputFormat.class);
2 ReservoirSamplerInputFormat.setNumSamples(job, 10);
3 ReservoirSamplerInputFormat.setMaxRecordsToRead(job, 10000);
4 ReservoirSamplerInputFormat.setUseSamplesNumberPerInputSplit(job, true);

 

然后在batch中執行作業,輸入文件是name.txt,有88799行。經過抽樣后的文件只有10行了。以下是作業執行的過程:

 

$ wc -l test-data/names.txt
88799 test-data/names.txt

$ hadoop fs -put test-data/names.txt names.txt

$ bin/run.sh com.manning.hip.ch4.sampler.SamplerJob \
names.txt output

$ hadoop fs -cat output/part* | wc -l
10

 

前面設置的ReservoirSamplerInputFormat類的參數是抽樣10行,最后的結果就是10行。

 

小結
抽樣可以把數據集的尺寸變小,這對開發是很有幫助的。如果有時需要抽樣,有時不需要抽樣,怎么才能把抽樣功能很好地整合到代碼庫中呢?這里有個方法,在作業的configure中加入一個開關,如下面的代碼所示:

 

1 if(appConfig.isSampling()) {
2     ReservoirSamplerInputFormat.setInputFormat(job,
3     TextInputFormat.class);
4 ...
5 } else {
6     job.setInputFormatClass(TextInputFormat.class);
7 }

 

這樣就可以把抽樣和其他各種代碼整合了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM