1.為什么要使用采樣器
在這個網頁上有一段描述比較靠譜 http://www.philippeadjiman.com/blog/2009/12/20/hadoop-tutorial-series-issue-2-getting-started-with-customized-partitioning/
簡單的來說就是解決"How to automatically find “good” partitioning function",因為很多時候無法直接制訂固定的partitioner策略,所以需要知道實際的數據分布.糟糕的策略導致的結果就是每個reduce節點得到的數據部均勻,對效率影響挺大
2.如何使用采樣器
InputSampler.RandomSampler<IntWritable, NullWritable> sampler =
new InputSampler.RandomSampler<IntWritable, NullWritable>(0.1,10000,10);
Path partitionFile = new Path(input,”_partitions”);
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);////
InputSampler.writePartitionFile(conf, sampler);
URI partitionURI = new URI(partitionFile.toString() + “#_partitions”);
DistributedCache.addCacheFile(partitionURI, conf);
DistributedCache.createSymlink(conf);
3.常用的采樣器介紹
http://blog.csdn.net/andyelvis/article/details/7294811
Hadoop中采樣是由org.apache.hadoop.mapred.lib.InputSampler類來實現的。
InputSampler類實現了三種采樣方法:RandomSampler,SplitSampler和IntervalSampler。//RandomSampler最耗時
RandomSamplerSplitSampler、RandomSampler和IntervalSampler都是InputSampler的靜態內部類,它們都實現了InputSampler的內部接口Sampler接口
K[] getSample(InputFormat<K,V> inf,JobConf job) throws IOException;
}
getSample方法根據job的配置信息以及輸入格式獲得抽樣結果,三個采樣類各自有不同的實現。
RandomSampler隨機地從輸入數據中抽取Key,是一個通用的采樣器。RandomSampler類有三個屬性:freq(一個Key被選中的概率),numSamples(從所有被選中的分區中獲得的總共的樣本數目),maxSplitsSampled(需要檢查掃描的最大分區數目)。
RandomSampler中getSample方法的實現如下:
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for ( int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for ( int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(key);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
}
freq *= (numSamples - 1) / ( double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
首先通過InputFormat的getSplits方法得到所有的輸入分區;然后確定需要抽樣掃描的分區數目,取輸入分區總數與用戶輸入的maxSplitsSampled兩者的較小的值得到splitsToSample;然后對輸入分區數組shuffle排序,打亂其原始順序;然后循環逐個掃描每個分區中的記錄進行采樣,循環的條件是當前已經掃描的分區數小於splitsToSample或者當前已經掃描的分區數超過了splitsToSample但是小於輸入分區總數並且當前的采樣數小於最大采樣數numSamples。
每個分區中記錄采樣的具體過程如下:
從指定分區中取出一條記錄,判斷得到的隨機浮點數是否小於等於采樣頻率freq,如果大於則放棄這條記錄,然后判斷當前的采樣數是否小於最大采樣數,如果小於則這條記錄被選中,被放進采樣集合中,否則從【0,numSamples】中選擇一個隨機數,如果這個隨機數不等於最大采樣數numSamples,則用這條記錄替換掉采樣集合隨機數對應位置的記錄,同時采樣頻率freq減小變為freq*(numSamples-1)/numSamples。然后依次遍歷分區中的其它記錄。
SplitSampler從s個分區中采樣前n個記錄,是采樣隨機數據的一種簡便方式。SplitSampler類有兩個屬性:numSamples(最大采樣數),maxSplitsSampled(最大分區數)。其getSample方法實現如下:
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for ( int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
首先根據InputFormat得到輸入分區數組;然后確定需要采樣的分區數splitsToSample為最大分區數和輸入分區總數之間的較小值;然后確定對分區采樣時的間隔splitStep為輸入分區總數除splitsToSample的商;然后確定每個分區的采樣數samplesPerSplit為最大采樣數除splitsToSample的商。被采樣的分區下標為i*splitStep,已經采樣的分區數目達到splitsToSample即停止采樣。
對於每一個分區,讀取一條記錄,將這條記錄添加到樣本集合中,如果當前樣本數大於當前的采樣分區所需要的樣本數,則停止對這個分區的采樣。如此循環遍歷完這個分區的所有記錄。
IntervalSampler根據一定的間隔從s個分區中采樣數據,非常適合對排好序的數據采樣。IntervalSampler類有兩個屬性:freq(哪一條記錄被選中的概率),maxSplitsSampled(采樣的最大分區數)。其getSample方法實現如下:
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for ( int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if (( double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
首先根據InputFormat得到輸入分區數組;然后確定需要采樣的分區數splitsToSample為最大分區數和輸入分區總數之間的較小值;然后確定對分區采樣時的間隔splitStep為輸入分區總數除splitsToSample的商。被采樣的分區下標為i*splitStep,已經采樣的分區數目達到splitsToSample即停止采樣。
對於每一個分區,讀取一條記錄,如果當前樣本數與已經讀取的記錄數的比值小於freq,則將這條記錄添加到樣本集合,否則讀取下一條記錄。這樣依次循環遍歷完這個分區的所有記錄。
4.采樣器在實際中的使用
常見的例子是terasort
http://blog.csdn.net/scutshuxue/article/details/5915697
排序的基本思想是利用了mapreduce的自動排序功能,在hadoop中,從map到reduce階段,map出來的結構會按照各個key按照hash值分配到各個reduce中,其中,在reduce中所有的key都是有序的了。如果使用一個reduce,那么我們直接將他output出來就行了,但是這不能夠體現分布式的好處,所以,我們還是要用多個reduce來跑。
比方說我們有1000個1-10000的數據,跑10個ruduce任務, 如果我們運行進行partition的時候,能夠將在1-1000中數據的分配到第一個reduce中,1001-2000的數據分配到第二個reduce中,以此類推。即第n個reduce所分配到的數據全部大於第n-1個reduce中的數據。這樣,每個reduce出來之后都是有序的了,我們只要cat所有的輸出文件,變成一個大的文件,就都是有序的了。
基本思路就是這樣,但是現在有一個問題,就是數據的區間如何划分,在數據量大,還有我們並不清楚數據分布的情況下。一個比較簡單的方法就是采樣,假如有一億的數據,我們可以對數據進行采樣,如取10000個數據采樣,然后對采樣數據分區間。在Hadoop中,patition我們可以用TotalOrderPartitioner替換默認的分區。然后將采樣的結果傳給他,就可以實現我們想要的分區。在采樣時,我們可以使用hadoop的幾種采樣工具,RandomSampler,InputSampler,IntervalSampler。
