從理論上來講用MapReduce技術實現KMeans算法是很Natural的想法:在Mapper中逐個計算樣本點離哪個中心最近,然后Emit(樣本點所屬的簇編號,樣本點);在Reducer中屬於同一個質心的樣本點在一個鏈表中,方便我們計算新的中心,然后Emit(質心編號,質心)。但是技術上的事並沒有理論層面那么簡單。
Mapper和Reducer都要用到K個中心(我習慣稱之為質心),Mapper要讀這些質心,Reducer要寫這些質心。另外Mapper還要讀存儲樣本點的數據文件。我先后嘗試以下3種方法,只有第3種是可行的,如果你不想被我誤導,請直接跳過前兩種。
一、用一個共享變量在存儲K個質心
由於K很小,所以我們認為用一個Vector<Sample>來存儲K個質心是沒有問題的。以下代碼是錯誤的:
class MyJob extends Tool{ static Vector<Sample> centers=new Vector<Sample>(K); static class MyMapper extends Mapper{ //read centers } static class MyMapper extends Reducer{ //update centers } void run(){ until ( convergence ){ map(); reduce(); } }
發生這種錯誤是因為對hadoop執行流程不清楚,對數據流不清楚。簡單地說Mapper和Reducer作為MyJob的內部靜態類,它們應該是獨立的--它們不應該與MyJob有任何交互,因為Mapper和Reducer分別在Task Tracker的不同JVM中運行,而MyJob以及MyJob的內部其他類都在客戶端上運行,自然不能在不同的JVM中共享一個變量。
詳細的流程是這樣的:
首先在客戶端上,JVM加載MyJob時先初始化靜態變量,執行static塊。然后提交作業到Job Tracker。
在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer線程獲得了MyJob類靜態變量的初始拷貝(這份拷貝是指MyJob執行完靜態塊之后靜態變量的模樣)。
在Task Tracker上,Mapper和Reducer分別地讀寫MyJob的靜態變量的本地拷貝,但是並不影響原始的MyJob中的靜態變量的值。
二、用分布式緩存文件存儲K個質心
既然不能通過共享外部類變量的方式,那我們通過文件在map和reduce之間傳遞數據總可以吧,Mapper從文件中讀取質心,Reducer把更新后的質心再寫入這個文件。這里的問題是:如果確定要把質心放在文件中,那Mapper就需要從2個文件中讀取數據--質心文件和樣本數據文件。雖然有MutipleInputs可以指定map()的輸入文件有多個,並可以為每個輸入文件分別指定解析方式,但是MutipleInputs不能保證每條記錄從不同文件中傳給map()的順序。在我們的KMeans中,我們希望質心文件全部被讀入后再逐條讀入樣本數據。
於是乎就想到了DistributedCache,它主要用於Mapper和Reducer之間共享數據。DistributedCacheFile是緩存在本地文件,在Mapper和Reducer中都可使用本地Java I/O的方式讀取它。於是我又有了一個錯誤的思路:
class MyMaper{ Vector<Sample> centers=new Vector<Sample>(K); void setup(){ //讀取cacheFile,給centers賦值 } void map(){ //計算樣本離哪個質心最近 } } class MyReducer{ Vector<Sample> centers=new Vector<Sample>(K); void reduce(){ //更新centers } void cleanup(){ //把centers寫回cacheFile } }
錯因:DistributedCacheFile是只讀的,在任務運行前,TaskTracker從JobTracker文件系統復制文件到本地磁盤作為緩存,這是單向的復制,是不能寫回的。試想在分布式環境下,如果不同的mapper和reducer可以把緩存文件寫回的話,那豈不又需要一套復雜的文件共享機制,嚴重地影響hadoop執行效率。
三、用分布式緩存文件存儲樣本數據
其實DistributedCache還有一個特點,它更適合於“大文件”(各節點內存容不下)緩存在本地。僅存儲了K個質心的文件顯然是小文件,與之相比樣本數據文件才是大文件。
此時我們需要2個質心文件:一個存放上一次的質心prevCenterFile,一個存放reducer更新后的質心currCenterFile。Mapper從prevCenterFile中讀取質心,Reducer把更新后有質心寫入currCenterFile。在Driver中讀入prevCenterFile和currCenterFile,比較前后兩次的質心是否相同(或足夠地接近),如果相同則停止迭代,否則就用currCenterFile覆蓋prevCenterFile(使用fs.rename),進入下一次的迭代。
這時候Mapper就是這樣的:
class MyMaper{ Vector<Sample> centers=new Vector<Sample>(K); void map(){ //逐條讀取質心,給centers賦值 } void cleanup(){ //逐行讀取cacheFile,計算每個樣本點離哪個質心最近 //然后Emit(樣本點所屬的簇編號,樣本點) } }
源代碼
試驗數據是在Mahout項目中作為example提供的,600個樣本點,每個樣本是一個60維的浮點向量。點擊下載
為樣本數據建立一個類Sample.java。
package kmeans; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; public class Sample implements Writable{ private static final Log log=LogFactory.getLog(Sample.class); public static final int DIMENTION=60; public double arr[]; public Sample(){ arr=new double[DIMENTION]; } public static double getEulerDist(Sample vec1,Sample vec2){ if(!(vec1.arr.length==DIMENTION && vec2.arr.length==DIMENTION)){ log.error("vector's dimention is not "+DIMENTION); System.exit(1); } double dist=0.0; for(int i=0;i<DIMENTION;++i){ dist+=(vec1.arr[i]-vec2.arr[i])*(vec1.arr[i]-vec2.arr[i]); } return Math.sqrt(dist); } public void clear(){ for(int i=0;i<arr.length;i++) arr[i]=0.0; } @Override public String toString(){ String rect=String.valueOf(arr[0]); for(int i=1;i<DIMENTION;i++) rect+="\t"+String.valueOf(arr[i]); return rect; } @Override public void readFields(DataInput in) throws IOException { String str[]=in.readUTF().split("\\s+"); for(int i=0;i<DIMENTION;++i) arr[i]=Double.parseDouble(str[i]); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.toString()); } }
KMeans.java
package kmeans; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class KMeans extends Configured implements Tool{ private static final Log log = LogFactory.getLog(KMeans2.class); private static final int K = 10; private static final int MAXITERATIONS = 300; private static final double THRESHOLD = 0.01; public static boolean stopIteration(Configuration conf) throws IOException{ FileSystem fs=FileSystem.get(conf); Path pervCenterFile=new Path("/user/orisun/input/centers"); Path currentCenterFile=new Path("/user/orisun/output/part-r-00000"); if(!(fs.exists(pervCenterFile) && fs.exists(currentCenterFile))){ log.info("兩個質心文件需要同時存在"); System.exit(1); } //比較前后兩次質心的變化是否小於閾值,決定迭代是否繼續 boolean stop=true; String line1,line2; FSDataInputStream in1=fs.open(pervCenterFile); FSDataInputStream in2=fs.open(currentCenterFile); InputStreamReader isr1=new InputStreamReader(in1); InputStreamReader isr2=new InputStreamReader(in2); BufferedReader br1=new BufferedReader(isr1); BufferedReader br2=new BufferedReader(isr2); Sample prevCenter,currCenter; while((line1=br1.readLine())!=null && (line2=br2.readLine())!=null){ prevCenter=new Sample(); currCenter=new Sample(); String []str1=line1.split("\\s+"); String []str2=line2.split("\\s+"); assert(str1[0].equals(str2[0])); for(int i=1;i<=Sample.DIMENTION;i++){ prevCenter.arr[i-1]=Double.parseDouble(str1[i]); currCenter.arr[i-1]=Double.parseDouble(str2[i]); } if(Sample.getEulerDist(prevCenter, currCenter)>THRESHOLD){ stop=false; break; } } //如果還要進行下一次迭代,就用當前質心替代上一次的質心 if(stop==false){ fs.delete(pervCenterFile,true); if(fs.rename(currentCenterFile, pervCenterFile)==false){ log.error("質心文件替換失敗"); System.exit(1); } } return stop; } public static class ClusterMapper extends Mapper<LongWritable, Text, IntWritable, Sample> { Vector<Sample> centers = new Vector<Sample>(); @Override //清空centers public void setup(Context context){ for (int i = 0; i < K; i++) { centers.add(new Sample()); } } @Override //從輸入文件讀入centers public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String []str=value.toString().split("\\s+"); if(str.length!=Sample.DIMENTION+1){ log.error("讀入centers時維度不對"); System.exit(1); } int index=Integer.parseInt(str[0]); for(int i=1;i<str.length;i++) centers.get(index).arr[i-1]=Double.parseDouble(str[i]); } @Override //找到每個數據點離哪個質心最近 public void cleanup(Context context) throws IOException,InterruptedException { Path []caches=DistributedCache.getLocalCacheFiles(context.getConfiguration()); if(caches==null || caches.length<=0){ log.error("data文件不存在"); System.exit(1); } BufferedReader br=new BufferedReader(new FileReader(caches[0].toString())); Sample sample; String line; while((line=br.readLine())!=null){ sample=new Sample(); String []str=line.split("\\s+"); for(int i=0;i<Sample.DIMENTION;i++) sample.arr[i]=Double.parseDouble(str[i]); int index=-1; double minDist=Double.MAX_VALUE; for(int i=0;i<K;i++){ double dist=Sample.getEulerDist(sample, centers.get(i)); if(dist<minDist){ minDist=dist; index=i; } } context.write(new IntWritable(index), sample); } } } public static class UpdateCenterReducer extends Reducer<IntWritable, Sample, IntWritable, Sample> { int prev=-1; Sample center=new Sample();; int count=0; @Override //更新每個質心(除最后一個) public void reduce(IntWritable key,Iterable<Sample> values,Context context) throws IOException,InterruptedException{ while(values.iterator().hasNext()){ Sample value=values.iterator().next(); if(key.get()!=prev){ if(prev!=-1){ for(int i=0;i<center.arr.length;i++) center.arr[i]/=count; context.write(new IntWritable(prev), center); } center.clear(); prev=key.get(); count=0; } for(int i=0;i<Sample.DIMENTION;i++) center.arr[i]+=value.arr[i]; count++; } } @Override //更新最后一個質心 public void cleanup(Context context) throws IOException,InterruptedException{ for(int i=0;i<center.arr.length;i++) center.arr[i]/=count; context.write(new IntWritable(prev), center); } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf(); FileSystem fs=FileSystem.get(conf); Job job=new Job(conf); job.setJarByClass(KMeans.class); //質心文件每行的第一個數字是索引 FileInputFormat.setInputPaths(job, "/user/orisun/input/centers"); Path outDir=new Path("/user/orisun/output"); fs.delete(outDir,true); FileOutputFormat.setOutputPath(job, outDir); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(ClusterMapper.class); job.setReducerClass(UpdateCenterReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Sample.class); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fs=FileSystem.get(conf); //樣本數據文件中每個樣本不需要標記索引 Path dataFile=new Path("/user/orisun/input/data"); DistributedCache.addCacheFile(dataFile.toUri(), conf); int iteration = 0; int success = 1; do { success ^= ToolRunner.run(conf, new KMeans(), args); log.info("iteration "+iteration+" end"); } while (success == 1 && iteration++ < MAXITERATIONS && (!stopIteration(conf))); log.info("Success.Iteration=" + iteration); //迭代完成后再執行一次mapper,輸出每個樣本點所屬的分類--在/user/orisun/output2/part-m-00000中 //質心文件保存在/user/orisun/input/centers中 Job job=new Job(conf); job.setJarByClass(KMeans.class); FileInputFormat.setInputPaths(job, "/user/orisun/input/centers"); Path outDir=new Path("/user/orisun/output2"); fs.delete(outDir,true); FileOutputFormat.setOutputPath(job, outDir); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(ClusterMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Sample.class); job.waitForCompletion(true); } }
注意在Driver中創建Job實例時一定要把Configuration類型的參數傳遞進去,否則在Mapper或Reducer中調用DistributedCache.getLocalCacheFiles(context.getConfiguration());返回值就為null。因為空構造函數的Job采用的Configuration是從hadoop的配置文件中讀出來的(使用new Configuration()創建的Configuration就是從hadoop的配置文件中讀出來的),請注意在main()函數中有一句:DistributedCache.addCacheFile(dataFile.toUri(), conf);即此時的Configuration中多了一個DistributedCacheFile,所以你需要把這個Configuration傳遞給Job構造函數,如果傳遞默認的Configuration,那在Job中當然不知道DistributedCacheFile的存在了。
Further
方案三還是不如人意,質心文件是很小的(因為質心總共就沒幾個),用map()函數僅僅是來讀一個質心文件根本就沒有發揮並行的作用,而且在map()中也沒有調用context.write(),所以Mapper中做的事情可以放在Reducer的setup()中來完成,這樣就不需要Mapper了,或者說上面設計的就不是MapReduce程序,跟平常的單線程串行程序是一樣的。sigh