從理論上來講用MapReduce技術實現KMeans算法是很Natural的想法:在Mapper中逐個計算樣本點離哪個中心最近,然后Emit(樣本點所屬的簇編號,樣本點);在Reducer中屬於同一個質心的樣本點在一個鏈表中,方便我們計算新的中心,然后Emit(質心編號,質心)。但是技術上的事並沒有理論層面那么簡單。
Mapper和Reducer都要用到K個中心(我習慣稱之為質心),Mapper要讀這些質心,Reducer要寫這些質心。另外Mapper還要讀存儲樣本點的數據文件。我先后嘗試以下3種方法,只有第3種是可行的,如果你不想被我誤導,請直接跳過前兩種。
一、用一個共享變量在存儲K個質心
由於K很小,所以我們認為用一個Vector<Sample>來存儲K個質心是沒有問題的。以下代碼是錯誤的:
class MyJob extends Tool{
static Vector<Sample> centers=new Vector<Sample>(K);
static class MyMapper extends Mapper{
//read centers
}
static class MyMapper extends Reducer{
//update centers
}
void run(){
until ( convergence ){
map();
reduce();
}
}
發生這種錯誤是因為對hadoop執行流程不清楚,對數據流不清楚。簡單地說Mapper和Reducer作為MyJob的內部靜態類,它們應該是獨立的--它們不應該與MyJob有任何交互,因為Mapper和Reducer分別在Task Tracker的不同JVM中運行,而MyJob以及MyJob的內部其他類都在客戶端上運行,自然不能在不同的JVM中共享一個變量。
詳細的流程是這樣的:
首先在客戶端上,JVM加載MyJob時先初始化靜態變量,執行static塊。然后提交作業到Job Tracker。
在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer線程獲得了MyJob類靜態變量的初始拷貝(這份拷貝是指MyJob執行完靜態塊之后靜態變量的模樣)。
在Task Tracker上,Mapper和Reducer分別地讀寫MyJob的靜態變量的本地拷貝,但是並不影響原始的MyJob中的靜態變量的值。
二、用分布式緩存文件存儲K個質心
既然不能通過共享外部類變量的方式,那我們通過文件在map和reduce之間傳遞數據總可以吧,Mapper從文件中讀取質心,Reducer把更新后的質心再寫入這個文件。這里的問題是:如果確定要把質心放在文件中,那Mapper就需要從2個文件中讀取數據--質心文件和樣本數據文件。雖然有MutipleInputs可以指定map()的輸入文件有多個,並可以為每個輸入文件分別指定解析方式,但是MutipleInputs不能保證每條記錄從不同文件中傳給map()的順序。在我們的KMeans中,我們希望質心文件全部被讀入后再逐條讀入樣本數據。
於是乎就想到了DistributedCache,它主要用於Mapper和Reducer之間共享數據。DistributedCacheFile是緩存在本地文件,在Mapper和Reducer中都可使用本地Java I/O的方式讀取它。於是我又有了一個錯誤的思路:
class MyMaper{
Vector<Sample> centers=new Vector<Sample>(K);
void setup(){
//讀取cacheFile,給centers賦值
}
void map(){
//計算樣本離哪個質心最近
}
}
class MyReducer{
Vector<Sample> centers=new Vector<Sample>(K);
void reduce(){
//更新centers
}
void cleanup(){
//把centers寫回cacheFile
}
}
錯因:DistributedCacheFile是只讀的,在任務運行前,TaskTracker從JobTracker文件系統復制文件到本地磁盤作為緩存,這是單向的復制,是不能寫回的。試想在分布式環境下,如果不同的mapper和reducer可以把緩存文件寫回的話,那豈不又需要一套復雜的文件共享機制,嚴重地影響hadoop執行效率。
三、用分布式緩存文件存儲樣本數據
其實DistributedCache還有一個特點,它更適合於“大文件”(各節點內存容不下)緩存在本地。僅存儲了K個質心的文件顯然是小文件,與之相比樣本數據文件才是大文件。
此時我們需要2個質心文件:一個存放上一次的質心prevCenterFile,一個存放reducer更新后的質心currCenterFile。Mapper從prevCenterFile中讀取質心,Reducer把更新后有質心寫入currCenterFile。在Driver中讀入prevCenterFile和currCenterFile,比較前后兩次的質心是否相同(或足夠地接近),如果相同則停止迭代,否則就用currCenterFile覆蓋prevCenterFile(使用fs.rename),進入下一次的迭代。
這時候Mapper就是這樣的:
class MyMaper{
Vector<Sample> centers=new Vector<Sample>(K);
void map(){
//逐條讀取質心,給centers賦值
}
void cleanup(){
//逐行讀取cacheFile,計算每個樣本點離哪個質心最近
//然后Emit(樣本點所屬的簇編號,樣本點)
}
}
源代碼
試驗數據是在Mahout項目中作為example提供的,600個樣本點,每個樣本是一個60維的浮點向量。點擊下載
為樣本數據建立一個類Sample.java。
package kmeans;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
public class Sample implements Writable{
private static final Log log=LogFactory.getLog(Sample.class);
public static final int DIMENTION=60;
public double arr[];
public Sample(){
arr=new double[DIMENTION];
}
public static double getEulerDist(Sample vec1,Sample vec2){
if(!(vec1.arr.length==DIMENTION && vec2.arr.length==DIMENTION)){
log.error("vector's dimention is not "+DIMENTION);
System.exit(1);
}
double dist=0.0;
for(int i=0;i<DIMENTION;++i){
dist+=(vec1.arr[i]-vec2.arr[i])*(vec1.arr[i]-vec2.arr[i]);
}
return Math.sqrt(dist);
}
public void clear(){
for(int i=0;i<arr.length;i++)
arr[i]=0.0;
}
@Override
public String toString(){
String rect=String.valueOf(arr[0]);
for(int i=1;i<DIMENTION;i++)
rect+="\t"+String.valueOf(arr[i]);
return rect;
}
@Override
public void readFields(DataInput in) throws IOException {
String str[]=in.readUTF().split("\\s+");
for(int i=0;i<DIMENTION;++i)
arr[i]=Double.parseDouble(str[i]);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.toString());
}
}
KMeans.java
package kmeans;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class KMeans extends Configured implements Tool{
private static final Log log = LogFactory.getLog(KMeans2.class);
private static final int K = 10;
private static final int MAXITERATIONS = 300;
private static final double THRESHOLD = 0.01;
public static boolean stopIteration(Configuration conf) throws IOException{
FileSystem fs=FileSystem.get(conf);
Path pervCenterFile=new Path("/user/orisun/input/centers");
Path currentCenterFile=new Path("/user/orisun/output/part-r-00000");
if(!(fs.exists(pervCenterFile) && fs.exists(currentCenterFile))){
log.info("兩個質心文件需要同時存在");
System.exit(1);
}
//比較前后兩次質心的變化是否小於閾值,決定迭代是否繼續
boolean stop=true;
String line1,line2;
FSDataInputStream in1=fs.open(pervCenterFile);
FSDataInputStream in2=fs.open(currentCenterFile);
InputStreamReader isr1=new InputStreamReader(in1);
InputStreamReader isr2=new InputStreamReader(in2);
BufferedReader br1=new BufferedReader(isr1);
BufferedReader br2=new BufferedReader(isr2);
Sample prevCenter,currCenter;
while((line1=br1.readLine())!=null && (line2=br2.readLine())!=null){
prevCenter=new Sample();
currCenter=new Sample();
String []str1=line1.split("\\s+");
String []str2=line2.split("\\s+");
assert(str1[0].equals(str2[0]));
for(int i=1;i<=Sample.DIMENTION;i++){
prevCenter.arr[i-1]=Double.parseDouble(str1[i]);
currCenter.arr[i-1]=Double.parseDouble(str2[i]);
}
if(Sample.getEulerDist(prevCenter, currCenter)>THRESHOLD){
stop=false;
break;
}
}
//如果還要進行下一次迭代,就用當前質心替代上一次的質心
if(stop==false){
fs.delete(pervCenterFile,true);
if(fs.rename(currentCenterFile, pervCenterFile)==false){
log.error("質心文件替換失敗");
System.exit(1);
}
}
return stop;
}
public static class ClusterMapper extends Mapper<LongWritable, Text, IntWritable, Sample> {
Vector<Sample> centers = new Vector<Sample>();
@Override
//清空centers
public void setup(Context context){
for (int i = 0; i < K; i++) {
centers.add(new Sample());
}
}
@Override
//從輸入文件讀入centers
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String []str=value.toString().split("\\s+");
if(str.length!=Sample.DIMENTION+1){
log.error("讀入centers時維度不對");
System.exit(1);
}
int index=Integer.parseInt(str[0]);
for(int i=1;i<str.length;i++)
centers.get(index).arr[i-1]=Double.parseDouble(str[i]);
}
@Override
//找到每個數據點離哪個質心最近
public void cleanup(Context context) throws IOException,InterruptedException {
Path []caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
if(caches==null || caches.length<=0){
log.error("data文件不存在");
System.exit(1);
}
BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
Sample sample;
String line;
while((line=br.readLine())!=null){
sample=new Sample();
String []str=line.split("\\s+");
for(int i=0;i<Sample.DIMENTION;i++)
sample.arr[i]=Double.parseDouble(str[i]);
int index=-1;
double minDist=Double.MAX_VALUE;
for(int i=0;i<K;i++){
double dist=Sample.getEulerDist(sample, centers.get(i));
if(dist<minDist){
minDist=dist;
index=i;
}
}
context.write(new IntWritable(index), sample);
}
}
}
public static class UpdateCenterReducer extends Reducer<IntWritable, Sample, IntWritable, Sample> {
int prev=-1;
Sample center=new Sample();;
int count=0;
@Override
//更新每個質心(除最后一個)
public void reduce(IntWritable key,Iterable<Sample> values,Context context) throws IOException,InterruptedException{
while(values.iterator().hasNext()){
Sample value=values.iterator().next();
if(key.get()!=prev){
if(prev!=-1){
for(int i=0;i<center.arr.length;i++)
center.arr[i]/=count;
context.write(new IntWritable(prev), center);
}
center.clear();
prev=key.get();
count=0;
}
for(int i=0;i<Sample.DIMENTION;i++)
center.arr[i]+=value.arr[i];
count++;
}
}
@Override
//更新最后一個質心
public void cleanup(Context context) throws IOException,InterruptedException{
for(int i=0;i<center.arr.length;i++)
center.arr[i]/=count;
context.write(new IntWritable(prev), center);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf();
FileSystem fs=FileSystem.get(conf);
Job job=new Job(conf);
job.setJarByClass(KMeans.class);
//質心文件每行的第一個數字是索引
FileInputFormat.setInputPaths(job, "/user/orisun/input/centers");
Path outDir=new Path("/user/orisun/output");
fs.delete(outDir,true);
FileOutputFormat.setOutputPath(job, outDir);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(ClusterMapper.class);
job.setReducerClass(UpdateCenterReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Sample.class);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs=FileSystem.get(conf);
//樣本數據文件中每個樣本不需要標記索引
Path dataFile=new Path("/user/orisun/input/data");
DistributedCache.addCacheFile(dataFile.toUri(), conf);
int iteration = 0;
int success = 1;
do {
success ^= ToolRunner.run(conf, new KMeans(), args);
log.info("iteration "+iteration+" end");
} while (success == 1 && iteration++ < MAXITERATIONS
&& (!stopIteration(conf)));
log.info("Success.Iteration=" + iteration);
//迭代完成后再執行一次mapper,輸出每個樣本點所屬的分類--在/user/orisun/output2/part-m-00000中
//質心文件保存在/user/orisun/input/centers中
Job job=new Job(conf);
job.setJarByClass(KMeans.class);
FileInputFormat.setInputPaths(job, "/user/orisun/input/centers");
Path outDir=new Path("/user/orisun/output2");
fs.delete(outDir,true);
FileOutputFormat.setOutputPath(job, outDir);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(ClusterMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Sample.class);
job.waitForCompletion(true);
}
}
注意在Driver中創建Job實例時一定要把Configuration類型的參數傳遞進去,否則在Mapper或Reducer中調用DistributedCache.getLocalCacheFiles(context.getConfiguration());返回值就為null。因為空構造函數的Job采用的Configuration是從hadoop的配置文件中讀出來的(使用new Configuration()創建的Configuration就是從hadoop的配置文件中讀出來的),請注意在main()函數中有一句:DistributedCache.addCacheFile(dataFile.toUri(), conf);即此時的Configuration中多了一個DistributedCacheFile,所以你需要把這個Configuration傳遞給Job構造函數,如果傳遞默認的Configuration,那在Job中當然不知道DistributedCacheFile的存在了。
Further
方案三還是不如人意,質心文件是很小的(因為質心總共就沒幾個),用map()函數僅僅是來讀一個質心文件根本就沒有發揮並行的作用,而且在map()中也沒有調用context.write(),所以Mapper中做的事情可以放在Reducer的setup()中來完成,這樣就不需要Mapper了,或者說上面設計的就不是MapReduce程序,跟平常的單線程串行程序是一樣的。sigh
