用Hadoop1.0.3實現KMeans算法


從理論上來講用MapReduce技術實現KMeans算法是很Natural的想法:在Mapper中逐個計算樣本點離哪個中心最近,然后Emit(樣本點所屬的簇編號,樣本點);在Reducer中屬於同一個質心的樣本點在一個鏈表中,方便我們計算新的中心,然后Emit(質心編號,質心)。但是技術上的事並沒有理論層面那么簡單。

Mapper和Reducer都要用到K個中心(我習慣稱之為質心),Mapper要讀這些質心,Reducer要寫這些質心。另外Mapper還要讀存儲樣本點的數據文件。我先后嘗試以下3種方法,只有第3種是可行的,如果你不想被我誤導,請直接跳過前兩種。

一、用一個共享變量在存儲K個質心

由於K很小,所以我們認為用一個Vector<Sample>來存儲K個質心是沒有問題的。以下代碼是錯誤的:

class MyJob extends Tool{
  static Vector<Sample> centers=new Vector<Sample>(K);
  static class MyMapper extends Mapper{
    //read centers
  } 
  static class MyMapper extends Reducer{
    //update centers
  }
  void run(){
    until ( convergence ){
      map();
      reduce();
    }
}

發生這種錯誤是因為對hadoop執行流程不清楚,對數據流不清楚。簡單地說Mapper和Reducer作為MyJob的內部靜態類,它們應該是獨立的--它們不應該與MyJob有任何交互,因為Mapper和Reducer分別在Task Tracker的不同JVM中運行,而MyJob以及MyJob的內部其他類都在客戶端上運行,自然不能在不同的JVM中共享一個變量。

詳細的流程是這樣的:

首先在客戶端上,JVM加載MyJob時先初始化靜態變量,執行static塊。然后提交作業到Job Tracker。

在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer線程獲得了MyJob類靜態變量的初始拷貝(這份拷貝是指MyJob執行完靜態塊之后靜態變量的模樣)。

在Task Tracker上,Mapper和Reducer分別地讀寫MyJob的靜態變量的本地拷貝,但是並不影響原始的MyJob中的靜態變量的值。

二、用分布式緩存文件存儲K個質心

既然不能通過共享外部類變量的方式,那我們通過文件在map和reduce之間傳遞數據總可以吧,Mapper從文件中讀取質心,Reducer把更新后的質心再寫入這個文件。這里的問題是:如果確定要把質心放在文件中,那Mapper就需要從2個文件中讀取數據--質心文件和樣本數據文件。雖然有MutipleInputs可以指定map()的輸入文件有多個,並可以為每個輸入文件分別指定解析方式,但是MutipleInputs不能保證每條記錄從不同文件中傳給map()的順序。在我們的KMeans中,我們希望質心文件全部被讀入后再逐條讀入樣本數據。

於是乎就想到了DistributedCache,它主要用於Mapper和Reducer之間共享數據。DistributedCacheFile是緩存在本地文件,在Mapper和Reducer中都可使用本地Java I/O的方式讀取它。於是我又有了一個錯誤的思路:

class MyMaper{
	Vector<Sample> centers=new Vector<Sample>(K);
	void setup(){
		//讀取cacheFile,給centers賦值
	}
	void map(){
		//計算樣本離哪個質心最近
	}
}
class MyReducer{
	Vector<Sample> centers=new Vector<Sample>(K);
	void reduce(){
		//更新centers
	}
	void cleanup(){
		//把centers寫回cacheFile
	}
}

錯因:DistributedCacheFile是只讀的在任務運行前,TaskTracker從JobTracker文件系統復制文件到本地磁盤作為緩存,這是單向的復制,是不能寫回的。試想在分布式環境下,如果不同的mapper和reducer可以把緩存文件寫回的話,那豈不又需要一套復雜的文件共享機制,嚴重地影響hadoop執行效率。

三、用分布式緩存文件存儲樣本數據

其實DistributedCache還有一個特點,它更適合於“大文件”(各節點內存容不下)緩存在本地。僅存儲了K個質心的文件顯然是小文件,與之相比樣本數據文件才是大文件。

此時我們需要2個質心文件:一個存放上一次的質心prevCenterFile,一個存放reducer更新后的質心currCenterFile。Mapper從prevCenterFile中讀取質心,Reducer把更新后有質心寫入currCenterFile。在Driver中讀入prevCenterFile和currCenterFile,比較前后兩次的質心是否相同(或足夠地接近),如果相同則停止迭代,否則就用currCenterFile覆蓋prevCenterFile(使用fs.rename),進入下一次的迭代。

這時候Mapper就是這樣的:

class MyMaper{
	Vector<Sample> centers=new Vector<Sample>(K);
	void map(){
		//逐條讀取質心,給centers賦值
	}
	void cleanup(){
		//逐行讀取cacheFile,計算每個樣本點離哪個質心最近
		//然后Emit(樣本點所屬的簇編號,樣本點)
	}
}

源代碼

試驗數據是在Mahout項目中作為example提供的,600個樣本點,每個樣本是一個60維的浮點向量。點擊下載

為樣本數據建立一個類Sample.java。

package kmeans;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;

public class Sample implements Writable{
	private static final Log log=LogFactory.getLog(Sample.class);
	public static final int DIMENTION=60;
	public double arr[];
	
	public Sample(){
		arr=new double[DIMENTION];
	}
	
	public static double getEulerDist(Sample vec1,Sample vec2){
		if(!(vec1.arr.length==DIMENTION && vec2.arr.length==DIMENTION)){
			log.error("vector's dimention is not "+DIMENTION);
			System.exit(1);
		}
		double dist=0.0;
		for(int i=0;i<DIMENTION;++i){
			dist+=(vec1.arr[i]-vec2.arr[i])*(vec1.arr[i]-vec2.arr[i]);
		}
		return Math.sqrt(dist);
	}
	
	public void clear(){
		for(int i=0;i<arr.length;i++)
			arr[i]=0.0;
	}
	
	@Override
	public String toString(){
		String rect=String.valueOf(arr[0]);
		for(int i=1;i<DIMENTION;i++)
			rect+="\t"+String.valueOf(arr[i]);
		return rect;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		String str[]=in.readUTF().split("\\s+");
		for(int i=0;i<DIMENTION;++i)
			arr[i]=Double.parseDouble(str[i]);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.toString());
	}
}

KMeans.java

package kmeans;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Vector;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class KMeans extends Configured implements Tool{
	private static final Log log = LogFactory.getLog(KMeans2.class);

	private static final int K = 10;
	private static final int MAXITERATIONS = 300;
	private static final double THRESHOLD = 0.01;
	
	public static boolean stopIteration(Configuration conf) throws IOException{
		FileSystem fs=FileSystem.get(conf);
		Path pervCenterFile=new Path("/user/orisun/input/centers");
		Path currentCenterFile=new Path("/user/orisun/output/part-r-00000");
		if(!(fs.exists(pervCenterFile) && fs.exists(currentCenterFile))){
			log.info("兩個質心文件需要同時存在");
			System.exit(1);
		}
		//比較前后兩次質心的變化是否小於閾值,決定迭代是否繼續
		boolean stop=true;
		String line1,line2;
		FSDataInputStream in1=fs.open(pervCenterFile);
		FSDataInputStream in2=fs.open(currentCenterFile);
		InputStreamReader isr1=new InputStreamReader(in1);
		InputStreamReader isr2=new InputStreamReader(in2);
		BufferedReader br1=new BufferedReader(isr1);
		BufferedReader br2=new BufferedReader(isr2);
		Sample prevCenter,currCenter;
		while((line1=br1.readLine())!=null && (line2=br2.readLine())!=null){
			prevCenter=new Sample();
			currCenter=new Sample();
			String []str1=line1.split("\\s+");
			String []str2=line2.split("\\s+");
			assert(str1[0].equals(str2[0]));
			for(int i=1;i<=Sample.DIMENTION;i++){
				prevCenter.arr[i-1]=Double.parseDouble(str1[i]);
				currCenter.arr[i-1]=Double.parseDouble(str2[i]);
			}
			if(Sample.getEulerDist(prevCenter, currCenter)>THRESHOLD){
				stop=false;
				break;
			}
		}
		//如果還要進行下一次迭代,就用當前質心替代上一次的質心
		if(stop==false){
			fs.delete(pervCenterFile,true);
			if(fs.rename(currentCenterFile, pervCenterFile)==false){
				log.error("質心文件替換失敗");
				System.exit(1);
			}
		}
		return stop;
	}
	
	public static class ClusterMapper extends Mapper<LongWritable, Text, IntWritable, Sample> {
		Vector<Sample> centers = new Vector<Sample>();
		@Override
		//清空centers
		public void setup(Context context){
			for (int i = 0; i < K; i++) {
				centers.add(new Sample());
			}
		}
		@Override
		//從輸入文件讀入centers
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String []str=value.toString().split("\\s+");
			if(str.length!=Sample.DIMENTION+1){
				log.error("讀入centers時維度不對");
				System.exit(1);
			}
			int index=Integer.parseInt(str[0]);
			for(int i=1;i<str.length;i++)
				centers.get(index).arr[i-1]=Double.parseDouble(str[i]);
		}
		@Override
		//找到每個數據點離哪個質心最近
		public void cleanup(Context context) throws IOException,InterruptedException {
			Path []caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
			if(caches==null || caches.length<=0){
				log.error("data文件不存在");
				System.exit(1);
			}
			BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
			Sample sample;
			String line;
			while((line=br.readLine())!=null){
				sample=new Sample();
				String []str=line.split("\\s+");
				for(int i=0;i<Sample.DIMENTION;i++)
					sample.arr[i]=Double.parseDouble(str[i]);
				
				int index=-1;
				double minDist=Double.MAX_VALUE;
				for(int i=0;i<K;i++){
					double dist=Sample.getEulerDist(sample, centers.get(i));
					if(dist<minDist){
						minDist=dist;
						index=i;
					}
				}
				context.write(new IntWritable(index), sample);
			}
		}
	}
	
	public static class UpdateCenterReducer extends Reducer<IntWritable, Sample, IntWritable, Sample> {
		int prev=-1;
		Sample center=new Sample();;
		int count=0;
		@Override
		//更新每個質心(除最后一個)
		public void reduce(IntWritable key,Iterable<Sample> values,Context context) throws IOException,InterruptedException{
			while(values.iterator().hasNext()){
				Sample value=values.iterator().next();
				if(key.get()!=prev){
					if(prev!=-1){
						for(int i=0;i<center.arr.length;i++)
							center.arr[i]/=count;		
						context.write(new IntWritable(prev), center);
					}
					center.clear();
					prev=key.get();
					count=0;
				}
				for(int i=0;i<Sample.DIMENTION;i++)
					center.arr[i]+=value.arr[i];
				count++;
			}
		}
		@Override
		//更新最后一個質心
		public void cleanup(Context context) throws IOException,InterruptedException{
			for(int i=0;i<center.arr.length;i++)
				center.arr[i]/=count;
			context.write(new IntWritable(prev), center);
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf=getConf();
		FileSystem fs=FileSystem.get(conf);
		Job job=new Job(conf);
		job.setJarByClass(KMeans.class);
		
		//質心文件每行的第一個數字是索引
		FileInputFormat.setInputPaths(job, "/user/orisun/input/centers");
		Path outDir=new Path("/user/orisun/output");
		fs.delete(outDir,true);
		FileOutputFormat.setOutputPath(job, outDir);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setMapperClass(ClusterMapper.class);
		job.setReducerClass(UpdateCenterReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Sample.class);
		
		return job.waitForCompletion(true)?0:1;
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		FileSystem fs=FileSystem.get(conf);
		
		//樣本數據文件中每個樣本不需要標記索引
		Path dataFile=new Path("/user/orisun/input/data");
		DistributedCache.addCacheFile(dataFile.toUri(), conf);

		int iteration = 0;
		int success = 1;
		do {
			success ^= ToolRunner.run(conf, new KMeans(), args);
			log.info("iteration "+iteration+" end");
		} while (success == 1 && iteration++ < MAXITERATIONS
				&& (!stopIteration(conf)));
		log.info("Success.Iteration=" + iteration);
		
		//迭代完成后再執行一次mapper,輸出每個樣本點所屬的分類--在/user/orisun/output2/part-m-00000中
		//質心文件保存在/user/orisun/input/centers中
		Job job=new Job(conf);
		job.setJarByClass(KMeans.class);
		
		FileInputFormat.setInputPaths(job, "/user/orisun/input/centers");
		Path outDir=new Path("/user/orisun/output2");
		fs.delete(outDir,true);
		FileOutputFormat.setOutputPath(job, outDir);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setMapperClass(ClusterMapper.class);
		job.setNumReduceTasks(0);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Sample.class);
		
		job.waitForCompletion(true);
	}
}

注意在Driver中創建Job實例時一定要把Configuration類型的參數傳遞進去,否則在Mapper或Reducer中調用DistributedCache.getLocalCacheFiles(context.getConfiguration());返回值就為null。因為空構造函數的Job采用的Configuration是從hadoop的配置文件中讀出來的(使用new Configuration()創建的Configuration就是從hadoop的配置文件中讀出來的),請注意在main()函數中有一句:DistributedCache.addCacheFile(dataFile.toUri(), conf);即此時的Configuration中多了一個DistributedCacheFile,所以你需要把這個Configuration傳遞給Job構造函數,如果傳遞默認的Configuration,那在Job中當然不知道DistributedCacheFile的存在了。

Further

方案三還是不如人意,質心文件是很小的(因為質心總共就沒幾個),用map()函數僅僅是來讀一個質心文件根本就沒有發揮並行的作用,而且在map()中也沒有調用context.write(),所以Mapper中做的事情可以放在Reducer的setup()中來完成,這樣就不需要Mapper了,或者說上面設計的就不是MapReduce程序,跟平常的單線程串行程序是一樣的。sigh


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM