MapReduce,組合式,迭代式,鏈式


前面介紹一些怎樣用戶類制定自己的類,來達到減少中間數據:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499498.html

1.迭代式mapreduce

    一些復雜的任務難以用一次mapreduce處理完成,需要多次mapreduce才能完成任務,例如Pagrank,Kmeans算法都需要多次的迭代,關於mapreduce迭代在mahout中運用較多。有興趣的可以參考一下mahout的源碼。

     在map/reduce迭代過程中,思想還是比較簡單,就像類似for循環一樣,前一個mapreduce的輸出結果,作為下一個mapreduce的輸入,任務完成后中間結果都可以刪除。如代碼所以:

Configuration conf1 = new Configuration();
Job job1 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job1,InputPaht1);
FileOutputFromat.setOoutputPath(job1,Outpath1);
job1.waitForCompletion(true);
//sub Mapreduce
Configuration conf2 = new Configuration();
Job job2 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job2,Outpath1);
FileOutputFromat.setOoutputPath(job2,Outpath2);
job2.waitForCompletion(true);
//sub Mapreduce
Configuration conf3 = new Configuration();
Job job3 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job3,Outpath2);
FileOutputFromat.setOoutputPath(job3,Outpath3);
job3.waitForCompletion(true);
.....

下面列舉一個mahout怎樣運用mapreduce迭代的,下面的代碼快就是mahout中kmeans的算法的代碼,在main函數中用一個while循環來做mapreduce的迭代,其中:runIteration()是一次mapreduce的過程。

但個人感覺現在的mapreduce迭代設計不太滿意的地方。

1. 每次迭代,如果所有Job(task)重復創建,代價將非常高。

2.每次迭代,數據都寫入本地和讀取本地,I/O和網絡傳輸的代價比較大。

好像Twister和Haloop的模型能過比較好的解決這些問題,但他們抽象度不夠高,支持的計算有限。

期待着下個版本hadoop更好的支持迭代算法。

//main function
while (!converged && iteration <= maxIterations) {
      log.info("K-Means Iteration {}", iteration);
      // point the output to a new directory per iteration
      Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration);
      converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta);
      // now point the input to the old output directory
      clustersIn = clustersOut;
      iteration++;
}

  private static boolean runIteration(Configuration conf,
                                      Path input,
                                      Path clustersIn,
                                      Path clustersOut,
                                      String measureClass,
                                      String convergenceDelta)
    throws IOException, InterruptedException, ClassNotFoundException {

    conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
    conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
    conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);

    Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(ClusterObservations.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Cluster.class);

    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setMapperClass(KMeansMapper.class);
    job.setCombinerClass(KMeansCombiner.class);
    job.setReducerClass(KMeansReducer.class);

    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, clustersOut);

    job.setJarByClass(KMeansDriver.class);
    HadoopUtil.delete(conf, clustersOut);
    if (!job.waitForCompletion(true)) {
      throw new InterruptedException("K-Means Iteration failed processing " + clustersIn);
    }
    FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);

    return isConverged(clustersOut, conf, fs);
  }

2.依賴關系組合式MapReduce

我們可以設想一下MapReduce有3個子任務job1,job2,job3構成,其中job1和job2相互獨立,job3要在job1和job2完成之后才執行。這種關系就叫復雜數據依賴關系的組合時mapreduce。hadoop為這種組合關系提供了一種執行和控制機制,hadoop通過job和jobControl類提供具體的編程方法。Job除了維護子任務的配置信息,還維護子任務的依賴關系,而jobControl控制整個作業流程,把所有的子任務作業加入到JobControl中,執行JobControl的run()方法即可運行程序。

下面給出偽代碼:

Configuration job1conf = new Configuration();
Job job1 = new Job(job1conf,"Job1");
.........//job1 其他設置
Configuration job2conf = new Configuration();
Job job2 = new Job(job2conf,"Job2");
.........//job2 其他設置
Configuration job3conf = new Configuration();
Job job3 = new Job(job3conf,"Job3");
.........//job3 其他設置
job3.addDepending(job1);//設置job3和job1的依賴關系
job3.addDepending(job2);
JobControl JC = new JobControl("123");
JC.addJob(job1);//把三個job加入到jobcontorl中
JC.addJob(job2);
JC.addJob(job3);
JC.run();

3.鏈式MapReduce

首先看一下例子,來說明為什么要有鏈式MapReduce,假設在統計單詞是,會出現這樣的詞,make,made,making等,他們都屬於一個詞,在單詞累加的時候,都歸於一個詞。解決的方法為用一個單獨的Mapreduce任務可以實現,單增加了多個Mapreduce作業,將增加整個作業處理的周期,還增加了I/O操作,因而處理效率不高。

一個較好的辦法就是在核心的MapReduce之外,增加一個輔助的Map過程,然后將這個輔助的Map過程和核心的Mapreudce過程合並為一個鏈式的Mapreduce,從而完成整個作業。hadoop提供了專門的鏈式ChainMapper和ChainReducer來處理鏈式任務,ChainMapper允許一個Map任務中添加多個Map的子任務,ChainReducer可以在Reducer執行之后,在加入多個Map的子任務。其調用形式如下:

ChainMapper.addMapper(...);
    ChainReducer.addMapper(...);
    //addMapper()調用的方法形式如下:
    public static void addMapper(JOb job,
            Class<? extends Mapper> mclass,
            Class<?> inputKeyClass,
            Class<?> inputValueClass,
            Class<?> outputKeyClass,
            Class<?> outputValueClass,
            Configuration conf
    ){
    }

其中,ChainReducer專門提供了一個setRreducer()方法來設置整個作業唯一的Reducer。

note:這些Mapper和Reducer之間傳遞的鍵和值都必須保持一致。

下面舉個例子:用ChainMapper把Map1加如並執行,然后用ChainReducer把Reduce和Map2加入到Reduce過程中。代碼如下:Map1.class 要實現map方法

public void function throws IOException {
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJobName("ChianJOb");
        // 在ChainMapper里面添加Map1
        Configuration map1conf = new Configuration(false);
        ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,
                Text.class, Text.class, true, map1conf);
        // 在ChainReduce中加入Reducer,Map2;
        Configuration reduceConf = new Configuration(false);
        ChainReducer.setReducer(job, Reduce.class, LongWritable.class,
                Text.class, Text.class, Text.class, true, map1conf);
        Configuration map2Conf = new Configuration();
        ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class,
                Text.class, Text.class, true, map1conf);
        job.waitForCompletion(true);
    }

作者:BIGBIGBOAT/Liqizhou


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM