關於Hadoop中的采樣器


1.為什么要使用采樣器

在這個網頁上有一段描述比較靠譜 http://www.philippeadjiman.com/blog/2009/12/20/hadoop-tutorial-series-issue-2-getting-started-with-customized-partitioning/

 簡單的來說就是解決"How to automatically find “good” partitioning function",因為很多時候無法直接制訂固定的partitioner策略,所以需要知道實際的數據分布.糟糕的策略導致的結果就是每個reduce節點得到的數據部均勻,對效率影響挺大

 

2.如何使用采樣器

 

  conf.setPartitionerClass(TotalOrderPartitioner. class);//關於partitioner可以參考這個實現 使用采樣器產生的文件

  InputSampler.RandomSampler<IntWritable, NullWritable> sampler =
    new InputSampler.RandomSampler<IntWritable, NullWritable>(0.1,10000,10);
  
  Path partitionFile =  new Path(input,”_partitions”);
  TotalOrderPartitioner.setPartitionFile(conf, partitionFile);////
  InputSampler.writePartitionFile(conf, sampler);
  
//一般都將該文件做distribute cache處理
  URI partitionURI =  new URI(partitionFile.toString() + “#_partitions”);
  DistributedCache.addCacheFile(partitionURI, conf);
  DistributedCache.createSymlink(conf);
 
//從上面可以看出 采樣器是在map階段之前進行的 在提交job的client端完成的
 

3.常用的采樣器介紹

http://blog.csdn.net/andyelvis/article/details/7294811

Hadoop中采樣是由org.apache.hadoop.mapred.lib.InputSampler類來實現的。


InputSampler類實現了三種采樣方法:RandomSampler,SplitSampler和IntervalSampler。//RandomSampler最耗時

RandomSamplerSplitSampler、RandomSampler和IntervalSampler都是InputSampler的靜態內部類,它們都實現了InputSampler的內部接口Sampler接口

public  interface Sampler<K,V>{
      K[] getSample(InputFormat<K,V> inf,JobConf job)  throws IOException;
}

getSample方法根據job的配置信息以及輸入格式獲得抽樣結果,三個采樣類各自有不同的實現。


RandomSampler隨機地從輸入數據中抽取Key,是一個通用的采樣器。RandomSampler類有三個屬性:freq(一個Key被選中的概率),numSamples(從所有被選中的分區中獲得的總共的樣本數目),maxSplitsSampled(需要檢查掃描的最大分區數目)。
RandomSampler中getSample方法的實現如下: 

     public K[] getSample(InputFormat<K,V> inf, JobConf job)  throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList<K> samples =  new ArrayList<K>(numSamples);
       int splitsToSample = Math.min(maxSplitsSampled, splits.length);

      Random r =  new Random();
       long seed = r.nextLong();
      r.setSeed(seed);
      LOG.debug("seed: " + seed);
       //  shuffle splits
       for ( int i = 0; i < splits.length; ++i) {
        InputSplit tmp = splits[i];
         int j = r.nextInt(splits.length);
        splits[i] = splits[j];
        splits[j] = tmp;
      }
       //  our target rate is in terms of the maximum number of sample splits,
      
//  but we accept the possibility of sampling additional splits to hit
      
//  the target sample keyset
       for ( int i = 0; i < splitsToSample ||
                     (i < splits.length && samples.size() < numSamples); ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
            Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
         while (reader.next(key, value)) {
           if (r.nextDouble() <= freq) {
             if (samples.size() < numSamples) {
              samples.add(key);
            }  else {
               //  When exceeding the maximum number of samples, replace a
              
//  random element with this one, then adjust the frequency
              
//  to reflect the possibility of existing elements being
              
//  pushed out
               int ind = r.nextInt(numSamples);
               if (ind != numSamples) {
                samples.set(ind, key);
              }
              freq *= (numSamples - 1) / ( double) numSamples;
            }
            key = reader.createKey();
          }
        }
        reader.close();
      }
       return (K[])samples.toArray();
    }

 

首先通過InputFormat的getSplits方法得到所有的輸入分區;然后確定需要抽樣掃描的分區數目,取輸入分區總數與用戶輸入的maxSplitsSampled兩者的較小的值得到splitsToSample;然后對輸入分區數組shuffle排序,打亂其原始順序;然后循環逐個掃描每個分區中的記錄進行采樣,循環的條件是當前已經掃描的分區數小於splitsToSample或者當前已經掃描的分區數超過了splitsToSample但是小於輸入分區總數並且當前的采樣數小於最大采樣數numSamples。

每個分區中記錄采樣的具體過程如下:

從指定分區中取出一條記錄,判斷得到的隨機浮點數是否小於等於采樣頻率freq,如果大於則放棄這條記錄,然后判斷當前的采樣數是否小於最大采樣數,如果小於則這條記錄被選中,被放進采樣集合中,否則從【0,numSamples】中選擇一個隨機數,如果這個隨機數不等於最大采樣數numSamples,則用這條記錄替換掉采樣集合隨機數對應位置的記錄,同時采樣頻率freq減小變為freq*(numSamples-1)/numSamples。然后依次遍歷分區中的其它記錄。

 

SplitSampler從s個分區中采樣前n個記錄,是采樣隨機數據的一種簡便方式。SplitSampler類有兩個屬性:numSamples(最大采樣數),maxSplitsSampled(最大分區數)。其getSample方法實現如下:

 

     public K[] getSample(InputFormat<K,V> inf, JobConf job)  throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList<K> samples =  new ArrayList<K>(numSamples);
       int splitsToSample = Math.min(maxSplitsSampled, splits.length);
       int splitStep = splits.length / splitsToSample;
       int samplesPerSplit = numSamples / splitsToSample;
       long records = 0;
       for ( int i = 0; i < splitsToSample; ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
            job, Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
         while (reader.next(key, value)) {
          samples.add(key);
          key = reader.createKey();
          ++records;
           if ((i+1) * samplesPerSplit <= records) {
             break;
          }
        }
        reader.close();
      }
       return (K[])samples.toArray();
    }

 

首先根據InputFormat得到輸入分區數組;然后確定需要采樣的分區數splitsToSample為最大分區數和輸入分區總數之間的較小值;然后確定對分區采樣時的間隔splitStep為輸入分區總數除splitsToSample的商;然后確定每個分區的采樣數samplesPerSplit為最大采樣數除splitsToSample的商。被采樣的分區下標為i*splitStep,已經采樣的分區數目達到splitsToSample即停止采樣。

對於每一個分區,讀取一條記錄,將這條記錄添加到樣本集合中,如果當前樣本數大於當前的采樣分區所需要的樣本數,則停止對這個分區的采樣。如此循環遍歷完這個分區的所有記錄。

 

IntervalSampler根據一定的間隔從s個分區中采樣數據,非常適合對排好序的數據采樣。IntervalSampler類有兩個屬性:freq(哪一條記錄被選中的概率),maxSplitsSampled(采樣的最大分區數)。其getSample方法實現如下:

 

public K[] getSample(InputFormat<K,V> inf, JobConf job)  throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList<K> samples =  new ArrayList<K>();
       int splitsToSample = Math.min(maxSplitsSampled, splits.length);
       int splitStep = splits.length / splitsToSample;
       long records = 0;
       long kept = 0;
       for ( int i = 0; i < splitsToSample; ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
            job, Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
         while (reader.next(key, value)) {
          ++records;
           if (( double) kept / records < freq) {
            ++kept;
            samples.add(key);
            key = reader.createKey();
          }
        }
        reader.close();
      }
       return (K[])samples.toArray();
    }

 

首先根據InputFormat得到輸入分區數組;然后確定需要采樣的分區數splitsToSample為最大分區數和輸入分區總數之間的較小值;然后確定對分區采樣時的間隔splitStep為輸入分區總數除splitsToSample的商。被采樣的分區下標為i*splitStep,已經采樣的分區數目達到splitsToSample即停止采樣。

對於每一個分區,讀取一條記錄,如果當前樣本數與已經讀取的記錄數的比值小於freq,則將這條記錄添加到樣本集合,否則讀取下一條記錄。這樣依次循環遍歷完這個分區的所有記錄。

 

4.采樣器在實際中的使用

  常見的例子是terasort

 http://blog.csdn.net/scutshuxue/article/details/5915697

排序的基本思想是利用了mapreduce的自動排序功能,在hadoop中,從map到reduce階段,map出來的結構會按照各個key按照hash值分配到各個reduce中,其中,在reduce中所有的key都是有序的了。如果使用一個reduce,那么我們直接將他output出來就行了,但是這不能夠體現分布式的好處,所以,我們還是要用多個reduce來跑。

      比方說我們有1000個1-10000的數據,跑10個ruduce任務, 如果我們運行進行partition的時候,能夠將在1-1000中數據的分配到第一個reduce中,1001-2000的數據分配到第二個reduce中,以此類推。即第n個reduce所分配到的數據全部大於第n-1個reduce中的數據。這樣,每個reduce出來之后都是有序的了,我們只要cat所有的輸出文件,變成一個大的文件,就都是有序的了。

       基本思路就是這樣,但是現在有一個問題,就是數據的區間如何划分,在數據量大,還有我們並不清楚數據分布的情況下。一個比較簡單的方法就是采樣,假如有一億的數據,我們可以對數據進行采樣,如取10000個數據采樣,然后對采樣數據分區間。在Hadoop中,patition我們可以用TotalOrderPartitioner替換默認的分區。然后將采樣的結果傳給他,就可以實現我們想要的分區。在采樣時,我們可以使用hadoop的幾種采樣工具,RandomSampler,InputSampler,IntervalSampler。

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM