前面介紹一些怎樣用戶類制定自己的類,來達到減少中間數據:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499498.html
1.迭代式mapreduce
一些復雜的任務難以用一次mapreduce處理完成,需要多次mapreduce才能完成任務,例如Pagrank,Kmeans算法都需要多次的迭代,關於mapreduce迭代在mahout中運用較多。有興趣的可以參考一下mahout的源碼。
在map/reduce迭代過程中,思想還是比較簡單,就像類似for循環一樣,前一個mapreduce的輸出結果,作為下一個mapreduce的輸入,任務完成后中間結果都可以刪除。如代碼所以:
Configuration conf1 = new Configuration(); Job job1 = new Job(conf1,"job1"); ..... FileInputFormat.addInputPath(job1,InputPaht1); FileOutputFromat.setOoutputPath(job1,Outpath1); job1.waitForCompletion(true); //sub Mapreduce Configuration conf2 = new Configuration(); Job job2 = new Job(conf1,"job1"); ..... FileInputFormat.addInputPath(job2,Outpath1); FileOutputFromat.setOoutputPath(job2,Outpath2); job2.waitForCompletion(true); //sub Mapreduce Configuration conf3 = new Configuration(); Job job3 = new Job(conf1,"job1"); ..... FileInputFormat.addInputPath(job3,Outpath2); FileOutputFromat.setOoutputPath(job3,Outpath3); job3.waitForCompletion(true); .....
下面列舉一個mahout怎樣運用mapreduce迭代的,下面的代碼快就是mahout中kmeans的算法的代碼,在main函數中用一個while循環來做mapreduce的迭代,其中:runIteration()是一次mapreduce的過程。
但個人感覺現在的mapreduce迭代設計不太滿意的地方。
1. 每次迭代,如果所有Job(task)重復創建,代價將非常高。
2.每次迭代,數據都寫入本地和讀取本地,I/O和網絡傳輸的代價比較大。
好像Twister和Haloop的模型能過比較好的解決這些問題,但他們抽象度不夠高,支持的計算有限。
期待着下個版本hadoop更好的支持迭代算法。
//main function while (!converged && iteration <= maxIterations) { log.info("K-Means Iteration {}", iteration); // point the output to a new directory per iteration Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration); converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta); // now point the input to the old output directory clustersIn = clustersOut; iteration++; } private static boolean runIteration(Configuration conf, Path input, Path clustersIn, Path clustersOut, String measureClass, String convergenceDelta) throws IOException, InterruptedException, ClassNotFoundException { conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString()); conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass); conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta); Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ClusterObservations.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Cluster.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(KMeansMapper.class); job.setCombinerClass(KMeansCombiner.class); job.setReducerClass(KMeansReducer.class); FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, clustersOut); job.setJarByClass(KMeansDriver.class); HadoopUtil.delete(conf, clustersOut); if (!job.waitForCompletion(true)) { throw new InterruptedException("K-Means Iteration failed processing " + clustersIn); } FileSystem fs = FileSystem.get(clustersOut.toUri(), conf); return isConverged(clustersOut, conf, fs); }
2.依賴關系組合式MapReduce
我們可以設想一下MapReduce有3個子任務job1,job2,job3構成,其中job1和job2相互獨立,job3要在job1和job2完成之后才執行。這種關系就叫復雜數據依賴關系的組合時mapreduce。hadoop為這種組合關系提供了一種執行和控制機制,hadoop通過job和jobControl類提供具體的編程方法。Job除了維護子任務的配置信息,還維護子任務的依賴關系,而jobControl控制整個作業流程,把所有的子任務作業加入到JobControl中,執行JobControl的run()方法即可運行程序。
下面給出偽代碼:
Configuration job1conf = new Configuration(); Job job1 = new Job(job1conf,"Job1"); .........//job1 其他設置 Configuration job2conf = new Configuration(); Job job2 = new Job(job2conf,"Job2"); .........//job2 其他設置 Configuration job3conf = new Configuration(); Job job3 = new Job(job3conf,"Job3"); .........//job3 其他設置 job3.addDepending(job1);//設置job3和job1的依賴關系 job3.addDepending(job2); JobControl JC = new JobControl("123"); JC.addJob(job1);//把三個job加入到jobcontorl中 JC.addJob(job2); JC.addJob(job3); JC.run();
3.鏈式MapReduce
首先看一下例子,來說明為什么要有鏈式MapReduce,假設在統計單詞是,會出現這樣的詞,make,made,making等,他們都屬於一個詞,在單詞累加的時候,都歸於一個詞。解決的方法為用一個單獨的Mapreduce任務可以實現,單增加了多個Mapreduce作業,將增加整個作業處理的周期,還增加了I/O操作,因而處理效率不高。
一個較好的辦法就是在核心的MapReduce之外,增加一個輔助的Map過程,然后將這個輔助的Map過程和核心的Mapreudce過程合並為一個鏈式的Mapreduce,從而完成整個作業。hadoop提供了專門的鏈式ChainMapper和ChainReducer來處理鏈式任務,ChainMapper允許一個Map任務中添加多個Map的子任務,ChainReducer可以在Reducer執行之后,在加入多個Map的子任務。其調用形式如下:
ChainMapper.addMapper(...); ChainReducer.addMapper(...); //addMapper()調用的方法形式如下: public static void addMapper(JOb job, Class<? extends Mapper> mclass, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration conf ){ }
其中,ChainReducer專門提供了一個setRreducer()方法來設置整個作業唯一的Reducer。
note:這些Mapper和Reducer之間傳遞的鍵和值都必須保持一致。
下面舉個例子:用ChainMapper把Map1加如並執行,然后用ChainReducer把Reduce和Map2加入到Reduce過程中。代碼如下:Map1.class 要實現map方法
public void function throws IOException { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJobName("ChianJOb"); // 在ChainMapper里面添加Map1 Configuration map1conf = new Configuration(false); ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); // 在ChainReduce中加入Reducer,Map2; Configuration reduceConf = new Configuration(false); ChainReducer.setReducer(job, Reduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); Configuration map2Conf = new Configuration(); ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); job.waitForCompletion(true); }
作者:BIGBIGBOAT/Liqizhou