關於MapReduce中自定義Combine類(一)


MRJobConfig
     public static fina COMBINE_CLASS_ATTR
     屬性 COMBINE_CLASS_ATTR = "mapreduce.job.combine.class"
     ————子接口(F4) JobContent
           方法getCombinerClass
             ————子實現類 JobContextImpl
                 實現getCombinerClass 方法:
                 public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
                          throws ClassNotFoundException {
                      return (Class<? extends Reducer<?,?,?,?>>)
                        conf.getClass(COMBINE_CLASS_ATTR, null);
                 }
                 因為JobContextImpl是MRJobConfig子類
                 所以得到了父類MRJobConfig的COMBINE_CLASS_ATTR屬性
                 ————子類Job
                     public void setCombinerClass(Class<? extends Reducer> cls
                               ) throws IllegalStateException {
                     ensureState(JobState.DEFINE);
                     conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
                     }
                因為JobContextImpl是MRJobConfig子類,
                而Job是JobContextImpl的子類
                所以也有COMBINE_CLASS_ATTR屬性
                通過setCombinerClass設置了父類MRJobConfig的屬性
 
 
MRJobConfig
    ————子接口JobContent
        方法getCombinerClass
        ————子實現類 JobContextImpl
            ————子類 Job
        ————子實現類 TaskAttemptContext
            繼承了方法getCombinerClass
 
Task   
    $CombinerRunner (Task的內部類)   
            該內部類有方法create:
            public static <K,V> CombinerRunner<K,V> create(JobConf job,
                               TaskAttemptID taskId,
                               Counters.Counter inputCounter,
                               TaskReporter reporter,
                               org.apache.hadoop.mapreduce.OutputCommitter committer
                              ) throws ClassNotFoundException
            {
                  Class<? extends Reducer<K,V,K,V>> cls =
                    (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
                  if (cls != null) {
                    return new OldCombinerRunner(cls, job, inputCounter, reporter);
                  }
                  // make a task context so we can get the classes
                  org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
                    new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
                        reporter);
                  Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
                    (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
                       taskContext.getCombinerClass();
                  if (newcls != null) {
                    return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
                                                      inputCounter, reporter, committer);
                  }
                  return null;
            }
                  其中這一段應該是舊的API
                  Class<? extends Reducer<K,V,K,V>> cls =
                          (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
                  if (cls != null) {
                          return new OldCombinerRunner(cls, job, inputCounter, reporter);
                  }
                  而這個是新的API
                  org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
                    new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
                        reporter);
                  Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
                    (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
                       taskContext.getCombinerClass();
                  if (newcls != null) {
                    return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
                                                      inputCounter, reporter, committer);
                  }
                  return null;
                  (不知道為什么要寫全名,去掉那些包名、向上/下轉型和各種泛型的話,看起來就會清晰很多?)
                  而TaskAttemptContext是JobContent的子實現類,所以繼承了getCombinerClass方法
                  而且,這里用的是多態,其調用的是子實現類TaskAttemptContextImpl的getCombinerClass方法
                  (TaskAttemptContextImpl繼承了JobContextImpl,而JobContextImpl實現了該方法)
                  所以最終get到了屬性COMBINE_CLASS_ATTR,即得到了我們通過job.setCombinerClass的xxxC
                    而這個xxxC是給了newcls,而newcls是給了NewCombinerRunner的構造函數的reducerClassc參數
                      NewCombinerRunner(Class reducerClass,
                          JobConf job,
                          org.apache.hadoop.mapreduce.TaskAttemptID taskId,
                          org.apache.hadoop.mapreduce.TaskAttemptContext context,
                          Counters.Counter inputCounter,
                          TaskReporter reporter,
                          org.apache.hadoop.mapreduce.OutputCommitter committer)
                      {
                          super(inputCounter, job, reporter);
                          this.reducerClass = reducerClass;
                          this.taskId = taskId;
                          keyClass = (Class<K>) context.getMapOutputKeyClass();
                          valueClass = (Class<V>) context.getMapOutputValueClass();
                          comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
                          this.committer = committer;
                      }
Task           
  MapTask
        $MapOutputBuffer
            private CombinerRunner<K,V> combinerRunner;
            $SpillThread類($表示內部類)
                combinerRunner = CombinerRunner.create(job, getTaskID(),
                                             combineInputCounter,
                                             reporter, null);
                //此時,我們得到了設置好的合並類                            
                if (combinerRunner == null) {
                      // spill directly
                      DataInputBuffer key = new DataInputBuffer();
                      while (spindex < mend &&
                          kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                        final int kvoff = offsetFor(spindex % maxRec);
                        int keystart = kvmeta.get(kvoff + KEYSTART);
                        int valstart = kvmeta.get(kvoff + VALSTART);
                        key.reset(kvbuffer, keystart, valstart - keystart);
                        getVBytesForOffset(kvoff, value);
                        writer.append(key, value);
                        ++spindex;
                      }
                } else {
                      int spstart = spindex;
                      while (spindex < mend &&
                          kvmeta.get(offsetFor(spindex % maxRec)
                                    + PARTITION) == i) {
                        ++spindex;
                      }
                      // Note: we would like to avoid the combiner if we've fewer
                      // than some threshold of records for a partition
                      if (spstart != spindex) {
                        combineCollector.setWriter(writer);
                        RawKeyValueIterator kvIter =
                          new MRResultIterator(spstart, spindex);
                        combinerRunner.combine(kvIter, combineCollector);
                      }
                }
            
            再查看combine函數
            在Task的內部類NewCombinerRunner下
            public void combine(RawKeyValueIterator iterator,
                                OutputCollector<K,V> collector)
                throws IOException, InterruptedException,ClassNotFoundException
            {
              // make a reducer
              org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
                (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
                  ReflectionUtils.newInstance(reducerClass, job);
              org.apache.hadoop.mapreduce.Reducer.Context
                   reducerContext = createReduceContext(reducer, job, taskId,
                                                        iterator, null, inputCounter,
                                                        new OutputConverter(collector),
                                                        committer,
                                                        reporter, comparator, keyClass,
                                                        valueClass);
              reducer.run(reducerContext);
            }
            上面的reducerClass就是我們傳入的xxxC
            最終是通過反射創建了一個xxxC對象,並將其強制向上轉型為Reducer實例對象,
            然后調用了向上轉型后對象的run方法(當前的xxxC沒有run方法,調用的是父類Reduce的run)
            在類Reducer中,run方法如下
            /**
           * Advanced application writers can use the
           * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
           * control how the reduce task works.
           */
          public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
              while (context.nextKey()) {
                reduce(context.getCurrentKey(), context.getValues(), context);
                // If a back up store is used, reset it
                Iterator<VALUEIN> iter = context.getValues().iterator();
                if(iter instanceof ReduceContext.ValueIterator) {
                  ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();       
                }
              }
            } finally {
              cleanup(context);
            }
          }
          有由於多態,此時調用的reduce是子類xxxC中的reduce方法
         (多態態性質:子類復寫了該方法,則實際上執行的是子類中的該方法)
          所以說,我們自定義combine用的類的時候,應該繼承Reducer類,並且復寫reduce方法
          且其輸入形式:(以wordcount為例)
       reduce(Text key, Iterable<IntWritable> values,  Context context)
       其中key是單詞個數,而values是個數列表,也就是value1、value2........
       注意,此時已經是列表,即<鍵,list<值1、值2、值3.....>>
       (之所以得到這個結論,是因為我當時使用的combine類是WCReduce,
        即Reduce和combine所用的類是一樣的,通過對代碼的分析,傳入值的結構如果是<lkey,value>的話,是不可能做到combine的啊——即所謂的對相同值合並,求計數的累積和,這根本就是兩個步驟,對key相同的鍵值對在map端就進行了一次合並了,合並成了<key,value list>,然后才輪到combine接受直接換個形式的輸入,並處理——我們的處理是求和,然后再輸出到context,進入reduce端的shuffle過程。
        然后我在reduce中遍歷了用syso輸出
        結果發現是0,而這實際上是因為經過一次遍歷, 我的指針指向的位置就不對了啊,
         )
嗯,自己反復使用以下的代碼,不斷的組合、注釋,去測試吧~就會得出這樣的結論了
  1. /reduce
  2.     publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
  3.         private final IntWritableValueOut=newIntWritable();
  4.         @Override
  5.         protectedvoid reduce(Text key,Iterable<IntWritable> values,
  6.                 Context context)  throws IOException,InterruptedException{
  7.             for(IntWritable value : values){
  8.                 System.out.println(value.get()+"--");
  9.             }
  10.  
  11. //            int total = 0 ;
  12. //            for (IntWritable value : values) {
  13. //                total += value.get();
  14. //            }
  15. //            ValueOut.set(total);
  16. //            context.write(key, ValueOut);
  17.         }
  18.  
  19.     }
  20.           
  21. job.setCombinerClass(WCReduce.class);
 
 





附件列表

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM