4.3 抽樣(Sampling)
用基於MapReduce的程序來處理TB級的數據集,要花費的時間可能是數以小時計。僅僅是優化代碼是很難達到良好的效果。
在開發和調試代碼的時候,沒有必要處理整個數據集。但如果在這種情況下要保證數據集能夠被正確地處理,就需要用到抽樣了。抽樣是統計學中的一個方法。它通過一定的過程從整個數據中抽取出一個子數據集。這個子數據集能夠代表整體數據集的數據分布狀況。在MapReduce中,開發人員可以只針對這個子數據集進行開發調試,極大減小了系統負擔,提高了開發效率。
技術23 水塘抽樣(Reservoir sampling)
假設如下場景:在開發一個MapReduce作業的時候,需要反復不斷地去測試一個超大數據集。當然,處理這個數據集很費時間,想要快速開發幾乎不可能。
問題
在開發MapReduce作業的時候,如何能夠只用處理超大數據集的一個小小的子集?
方案
在讀取數據的那部分,自定義一個InputFormat來封裝默認的InputFormat。在自定義的InputFormat中,將從默認的InputFormat中得到的數據按一定比例進行抽樣。
討論
由於水塘抽樣可以從數據流中隨機采樣,它就特別適合於MapReduce。在MapReduce中,數據源的形式就是數據流。圖4.16說明了水塘抽樣的算法。
這里需要實現ReservoirSamplerRecordReader類來封裝默認的InputFormat類和RecordReader類。InputFormat類的作用是對輸入進行分塊。RecordReader類的作用是讀取記錄。抽樣功能則在ReservoirSamplerRecordReader類中實現。圖4.17說明了ReservoirSamplerRecordReader類的工作機制。
以下是ReservoirSamplerRecordReader類的實現代碼:
1 public static class ReservoirSamplerRecordReader<K extends Writable, V extends Writable> extends RecordReader { 2 3 private final RecordReader<K, V> rr; 4 private final int numSamples; 5 private final int maxRecords; 6 private final ArrayList<K> keys; 7 private final ArrayList<V> values; 8 9 @Override 10 public void initialize(InputSplit split,TaskAttemptContext context) 11 throws IOException, InterruptedException { 12 13 rr.initialize(split, context); 14 Random rand = new Random(); 15 16 for (int i = 0; i < maxRecords; i++) { 17 if (!rr.nextKeyValue()) { 18 break; 19 } 20 21 K key = rr.getCurrentKey(); 22 V val = rr.getCurrentValue(); 23 24 if (keys.size() < numSamples) { 25 keys.add(WritableUtils.clone(key, conf)); 26 values.add(WritableUtils.clone(val, conf)); 27 } else { 28 int r = rand.nextInt(i); 29 if (r < numSamples) { 30 keys.set(r, WritableUtils.clone(key, conf)); 31 values.set(r, WritableUtils.clone(val, conf)); 32 } 33 } 34 } 35 } 36 ...
在使用ReservoirSamplerInputFormat類的時候,需要設置的參數包括InputFormat等。以下是設置代碼:
1 ReservoirSamplerInputFormat.setInputFormat(job,TextInputFormat.class); 2 ReservoirSamplerInputFormat.setNumSamples(job, 10); 3 ReservoirSamplerInputFormat.setMaxRecordsToRead(job, 10000); 4 ReservoirSamplerInputFormat.setUseSamplesNumberPerInputSplit(job, true);
然后在batch中執行作業,輸入文件是name.txt,有88799行。經過抽樣后的文件只有10行了。以下是作業執行的過程:
$ wc -l test-data/names.txt 88799 test-data/names.txt $ hadoop fs -put test-data/names.txt names.txt $ bin/run.sh com.manning.hip.ch4.sampler.SamplerJob \ names.txt output $ hadoop fs -cat output/part* | wc -l 10
前面設置的ReservoirSamplerInputFormat類的參數是抽樣10行,最后的結果就是10行。
小結
抽樣可以把數據集的尺寸變小,這對開發是很有幫助的。如果有時需要抽樣,有時不需要抽樣,怎么才能把抽樣功能很好地整合到代碼庫中呢?這里有個方法,在作業的configure中加入一個開關,如下面的代碼所示:
1 if(appConfig.isSampling()) { 2 ReservoirSamplerInputFormat.setInputFormat(job, 3 TextInputFormat.class); 4 ... 5 } else { 6 job.setInputFormatClass(TextInputFormat.class); 7 }
這樣就可以把抽樣和其他各種代碼整合了。