一、Mahout命令使用
合成控制的數據集 synthetic_control.data 可以從 此處下載,總共由600行X60列double型的數據組成, 意思是有600個元組,每個元組是一個時間序列。
1. 把數據拷到集群上,放到kmeans/目錄下
hadoop fs -mv synthetic_control.data kmeans/synthetic_control.data
2. 輸入如下mahout命令進行KMeans聚類分析
當命令中有這個--numClusters( 代表聚類結果中簇的個數)參數的話,它會采用Kmeans聚類。如果沒有配置這個參數的話,它會先采用Canopy聚類,-t1和-t2是用於Canopy聚類的配置參數。
二、源碼學習
從Mahout源碼可以分析出:進行KMeans聚類時,會產生四個步驟。
- 數據預處理,整理規范化數據
- 從上述數據中隨機選擇若干個數據當作Cluster的中心
- 迭代計算,調整形心
- 把數據分給各個Cluster
其中 前倆步就是 KMeans聚類算法的准備工作。
主要流程可以從org.apache.mahout.clustering.syntheticcontrol.kmeans.Job#run()方法里看出一些端倪。
public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, int k,
double convergenceDelta, int maxIterations) throws Exception {
//1. synthetic_control.data存儲的文本格式,轉換成Key/Value格式,存入到output/data目錄。Key為保存一個Integer的Text類型, Value為VectorWritable類型。
Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
log.info("Preparing Input");
InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
//2. 隨機產生幾個cluster,存入到output/clusters-0/part-randomSeed文件里。Key為Text, Value為ClusterWritable類型。
log.info("Running random seed to get initial clusters");
Path clusters = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure);
//3. 進行聚類迭代運算,為每一個簇重新選出cluster centroid中心
log.info("Running KMeans");
KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, measure, convergenceDelta,
maxIterations, true, 0.0, false);
//4. 根據上面選出的中心,把output/data里面的記錄,都分配給各個cluster。輸出運算結果,把sequencefile格式轉化成textfile格式展示出來
// run ClusterDumper
ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
"clusteredPoints"));
clusterDumper.printClusters(null);
}
- RandomAccessSparseVector是一個Vector實現,里面有一個 OpenIntDoubleMap屬性,該OpenIntDoubleMap不是繼承自HashMap,而是自己實現了一套類似的hashMap,數據是通過一個Int數組和Long數組維護着,因此無法通過Iterator為遍歷。
- RandomSeedGenerator#buildRandom()是在上面的Vector里面隨機抽樣k個序列簇Kluster,采用的是一種蓄水池抽樣(Reservoir Sampling)的方法:即先把前k個數放入蓄水池,對第k+1,我們以k/(k+1)概率決定是否要把它換入蓄水池,最終每個數都是以相同的概率k/n進入蓄水池。它通過強大的MersenneTwister偽隨機生成器來隨機產生,它產生的隨機數長度可達2^19937 - 1,維度可高達623維,同時數值還可以精確到32位的均勻分布。
1. 迭代計算准備工作
public static Path buildClusters(Configuration conf, Path input, Path clustersIn, Path output,
DistanceMeasure measure, int maxIterations, String delta, boolean runSequential) throws IOException,
InterruptedException, ClassNotFoundException {
double convergenceDelta = Double.parseDouble(delta);
//從output/clusters-0/part-randomSeed文件里讀出Cluster數據,放入到clusters變量中。
List<Cluster> clusters = Lists.newArrayList();
KMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters);
if (clusters.isEmpty()) {
throw new IllegalStateException("No input clusters found in " + clustersIn + ". Check your -c argument.");
}
//把聚類策略(控制收斂程度)寫進output/clusters-0/_policy文件中
//同時,每個簇cluster在output/clusters-0/下對應生成part-000xx文件
Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
ClusteringPolicy policy = new KMeansClusteringPolicy(convergenceDelta);
ClusterClassifier prior = new ClusterClassifier(clusters, policy);
prior.writeToSeqFiles(priorClustersPath);
//開始迭代maxIterations次執行Map/Reduce
if (runSequential) {
ClusterIterator.iterateSeq(conf, input, priorClustersPath, output, maxIterations);
} else {
ClusterIterator.iterateMR(conf, input, priorClustersPath, output, maxIterations);
}
return output;
}
2. 迭代計算
調整cluster中心的Job的代碼如下:
public static void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
throws IOException, InterruptedException, ClassNotFoundException {
ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
Path clustersOut = null;
int iteration = 1;
while (iteration <= numIterations) {
conf.set(PRIOR_PATH_KEY, priorPath.toString());
String jobName = "Cluster Iterator running iteration " + iteration + " over priorPath: " + priorPath;
Job job = new Job(conf, jobName);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(ClusterWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(ClusterWritable.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
//核心算法就在這個CIMapper和CIReducer里面
job.setMapperClass(CIMapper.class);
job.setReducerClass(CIReducer.class);
FileInputFormat.addInputPath(job, inPath);
clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
priorPath = clustersOut;
FileOutputFormat.setOutputPath(job, clustersOut);
job.setJarByClass(ClusterIterator.class);
if (!job.waitForCompletion(true)) {
throw new InterruptedException("Cluster Iteration " + iteration + " failed processing " + priorPath);
}
ClusterClassifier.writePolicy(policy, clustersOut);
FileSystem fs = FileSystem.get(outPath.toUri(), conf);
iteration++;
if (isConverged(clustersOut, conf, fs)) {
break;
}
}
//把最后一次迭代的結果目錄重命名,加一個final
Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration - 1) + Cluster.FINAL_ITERATION_SUFFIX);
FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, finalClustersIn);
}
2.1. Map階段
CIMapper代碼如下:
@Override
protected void map(WritableComparable<?> key, VectorWritable value, Context context) throws IOException,
InterruptedException {
Vector probabilities = classifier.classify(value.get());
Vector selections = policy.select(probabilities);
for (Iterator<Element> it = selections.iterateNonZero(); it.hasNext();) {
Element el = it.next();
classifier.train(el.index(), value.get(), el.get());
}
}
在這里面需要厘清
org.apache.mahout.clustering.iterator.KMeansClusteringPolicy
和
org.apache.mahout.clustering.classify.ClusterClassifier
這兩個類。
前者是聚類的策略,可以說它提供聚類的核心算法。
后者是聚類的分類器,它的功能是基於聚類策略把數據進行分類。
2.1.1. ClusterClassifier 求點到Cluster形心的距離
ClusterClassifier.classify()求得某點到所有cluster中心的距離,得到的是一個數組。
@Override
public Vector classify(Vector data, ClusterClassifier prior) {
List<Cluster> models = prior.getModels();
int i = 0;
Vector pdfs = new DenseVector(models.size());
for (Cluster model : models) {
pdfs.set(i++, model.pdf(new VectorWritable(data)));
}
return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum());
}
上述代碼中的org.apache.mahout.clustering.iterator.DistanceMeasureCluster.pdf(VectorWritable)求該點到Cluster形心的距離,其算法代碼如下:
@Override
public double pdf(VectorWritable vw) {
return 1 / (1 + measure.distance(vw.get(), getCenter()));
}
pdfs.zSum()是pdfs double數組的和。然后再對pdfs進行歸一化處理。
因此最后select()用於選出相似度最大的cluster的下標,並且對其賦予權重1.0。如下所示:
@Override
public Vector select(Vector probabilities) {
int maxValueIndex = probabilities.maxValueIndex();
Vector weights = new SequentialAccessSparseVector(probabilities.size());
weights.set(maxValueIndex, 1.0);
return weights;
}
2.1.2. ClusterClassifier 為求Cluster新形心做准備
接下來,為了重新得到新的中心,通過org.apache.mahout.clustering.classify.ClusterClassifier.train(int, Vector, double)為訓練數據,即最后在AbstractCluster里面准備數據。
public void observe(Vector x, double weight) {
if (weight == 1.0) {
observe(x);
} else {
setS0(getS0() + weight);
Vector weightedX = x.times(weight);
if (getS1() == null) {
setS1(weightedX);
} else {
getS1().assign(weightedX, Functions.PLUS);
}
Vector x2 = x.times(x).times(weight);
if (getS2() == null) {
setS2(x2);
} else {
getS2().assign(x2, Functions.PLUS);
}
}
}
2.2. Reduce階段
在CIReducer里面,對屬於同一個Cluster里面的數據進行合並,並且求出centroid形心。
@Override
protected void reduce(IntWritable key, Iterable<ClusterWritable> values, Context context) throws IOException,
InterruptedException {
Iterator<ClusterWritable> iter = values.iterator();
Cluster first = iter.next().getValue(); // there must always be at least one
while (iter.hasNext()) {
Cluster cluster = iter.next().getValue();
first.observe(cluster);
}
List<Cluster> models = Lists.newArrayList();
models.add(first);
classifier = new ClusterClassifier(models, policy);
classifier.close();
context.write(key, new ClusterWritable(first));
}
2.2.1. Reduce中求centroid形心的算法
求centroid算法代碼如下:
@Override
public void computeParameters() {
if (getS0() == 0) {
return;
}
setNumObservations((long) getS0());
setTotalObservations(getTotalObservations() + getNumObservations());
setCenter(getS1().divide(getS0()));
// compute the component stds
if (getS0() > 1) {
setRadius(getS2().times(getS0()).minus(getS1().times(getS1())).assign(new SquareRootFunction()).divide(getS0()));
}
setS0(0);
setS1(center.like());
setS2(center.like());
}
3. 聚類數據
真正對output/data記錄分配給各個簇的代碼是:
private static void classifyClusterMR(Configuration conf, Path input, Path clustersIn, Path output,
Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException, InterruptedException,
ClassNotFoundException {
conf.setFloat(ClusterClassificationConfigKeys.OUTLIER_REMOVAL_THRESHOLD,
clusterClassificationThreshold.floatValue());
conf.setBoolean(ClusterClassificationConfigKeys.EMIT_MOST_LIKELY, emitMostLikely);
conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString());
Job job = new Job(conf, "Cluster Classification Driver running over input: " + input);
job.setJarByClass(ClusterClassificationDriver.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
//進行記錄分配
job.setMapperClass(ClusterClassificationMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(WeightedVectorWritable.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
if (!job.waitForCompletion(true)) {
throw new InterruptedException("Cluster Classification Driver Job failed processing " + input);
}
}



