(轉)Mahout Kmeans Clustering 學習


一、Mahout命令使用

合成控制的數據集 synthetic_control.data 可以從 此處下載,總共由600行X60列double型的數據組成, 意思是有600個元組,每個元組是一個時間序列。

1. 把數據拷到集群上,放到kmeans/目錄下

  
hadoop fs -mv synthetic_control.data kmeans/synthetic_control.data
 

 

2. 輸入如下mahout命令進行KMeans聚類分析

 

mahout org.apache.mahout.clustering.syntheticcontrol.kmeans.Job --input kmeans/synthetic_control.data  --numClusters 3 -t1 3 -t2 6 --maxIter 3 --output kmeans/output

 當命令中有這個--numClusters( 代表聚類結果中簇的個數)參數的話,它會采用Kmeans聚類。如果沒有配置這個參數的話,它會先采用Canopy聚類,-t1和-t2是用於Canopy聚類的配置參數。

 

二、源碼學習

從Mahout源碼可以分析出:進行KMeans聚類時,會產生四個步驟。

  1. 數據預處理,整理規范化數據
  2. 從上述數據中隨機選擇若干個數據當作Cluster的中心
  3. 迭代計算,調整形心
  4. 把數據分給各個Cluster

其中 前倆步就是 KMeans聚類算法的准備工作。

主要流程可以從org.apache.mahout.clustering.syntheticcontrol.kmeans.Job#run()方法里看出一些端倪。

  

  public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, int k,
      double convergenceDelta, int maxIterations) throws Exception {
    //1. synthetic_control.data存儲的文本格式,轉換成Key/Value格式,存入到output/data目錄。Key為保存一個Integer的Text類型, Value為VectorWritable類型。
    Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
    log.info("Preparing Input");
    InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
    //2. 隨機產生幾個cluster,存入到output/clusters-0/part-randomSeed文件里。Key為Text, Value為ClusterWritable類型。
    log.info("Running random seed to get initial clusters");
    Path clusters = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
    clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure);
    //3. 進行聚類迭代運算,為每一個簇重新選出cluster centroid中心
    log.info("Running KMeans");
    KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, measure, convergenceDelta,
        maxIterations, true, 0.0, false);
    //4. 根據上面選出的中心,把output/data里面的記錄,都分配給各個cluster。輸出運算結果,把sequencefile格式轉化成textfile格式展示出來
    // run ClusterDumper
    ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
        "clusteredPoints"));
    clusterDumper.printClusters(null);
  }
  1. RandomAccessSparseVector是一個Vector實現,里面有一個 OpenIntDoubleMap屬性,該OpenIntDoubleMap不是繼承自HashMap,而是自己實現了一套類似的hashMap,數據是通過一個Int數組和Long數組維護着,因此無法通過Iterator為遍歷。
  2. RandomSeedGenerator#buildRandom()是在上面的Vector里面隨機抽樣k個序列簇Kluster,采用的是一種蓄水池抽樣(Reservoir Sampling)的方法:即先把前k個數放入蓄水池,對第k+1,我們以k/(k+1)概率決定是否要把它換入蓄水池,最終每個數都是以相同的概率k/n進入蓄水池。它通過強大的MersenneTwister偽隨機生成器來隨機產生,它產生的隨機數長度可達2^19937 - 1,維度可高達623維,同時數值還可以精確到32位的均勻分布。

1. 迭代計算准備工作

真正在做KMeans聚類的代碼是:
 
  public static Path buildClusters(Configuration conf, Path input, Path clustersIn, Path output,
      DistanceMeasure measure, int maxIterations, String delta, boolean runSequential) throws IOException,
      InterruptedException, ClassNotFoundException {
    
    double convergenceDelta = Double.parseDouble(delta);
    //從output/clusters-0/part-randomSeed文件里讀出Cluster數據,放入到clusters變量中。
    List<Cluster> clusters = Lists.newArrayList();
    KMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters);
    
    if (clusters.isEmpty()) {
      throw new IllegalStateException("No input clusters found in " + clustersIn + ". Check your -c argument.");
    }
    //把聚類策略(控制收斂程度)寫進output/clusters-0/_policy文件中
    //同時,每個簇cluster在output/clusters-0/下對應生成part-000xx文件
    Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
    ClusteringPolicy policy = new KMeansClusteringPolicy(convergenceDelta);
    ClusterClassifier prior = new ClusterClassifier(clusters, policy);
    prior.writeToSeqFiles(priorClustersPath);
    //開始迭代maxIterations次執行Map/Reduce
    if (runSequential) {
      ClusterIterator.iterateSeq(conf, input, priorClustersPath, output, maxIterations);
    } else {
      ClusterIterator.iterateMR(conf, input, priorClustersPath, output, maxIterations);
    }
    return output;
  }
  

2. 迭代計算

調整cluster中心的Job的代碼如下:

 
  public static void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
    throws IOException, InterruptedException, ClassNotFoundException {
    ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
    Path clustersOut = null;
    int iteration = 1;
    while (iteration <= numIterations) {
      conf.set(PRIOR_PATH_KEY, priorPath.toString());
      
      String jobName = "Cluster Iterator running iteration " + iteration + " over priorPath: " + priorPath;
      Job job = new Job(conf, jobName);
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(ClusterWritable.class);
      job.setOutputKeyClass(IntWritable.class);
      job.setOutputValueClass(ClusterWritable.class);
      
      job.setInputFormatClass(SequenceFileInputFormat.class);
      job.setOutputFormatClass(SequenceFileOutputFormat.class);
      //核心算法就在這個CIMapper和CIReducer里面
      job.setMapperClass(CIMapper.class);
      job.setReducerClass(CIReducer.class);
      
      FileInputFormat.addInputPath(job, inPath);
      clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
      priorPath = clustersOut;
      FileOutputFormat.setOutputPath(job, clustersOut);
      
      job.setJarByClass(ClusterIterator.class);
      if (!job.waitForCompletion(true)) {
        throw new InterruptedException("Cluster Iteration " + iteration + " failed processing " + priorPath);
      }
      ClusterClassifier.writePolicy(policy, clustersOut);
      FileSystem fs = FileSystem.get(outPath.toUri(), conf);
      iteration++;
      if (isConverged(clustersOut, conf, fs)) {
        break;
      }
    }
    //把最后一次迭代的結果目錄重命名,加一個final
    Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration - 1) + Cluster.FINAL_ITERATION_SUFFIX);
    FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, finalClustersIn);
  }

  

2.1. Map階段

CIMapper代碼如下:

 

 
 @Override
  protected void map(WritableComparable<?> key, VectorWritable value, Context context) throws IOException,
      InterruptedException {
    Vector probabilities = classifier.classify(value.get());
    Vector selections = policy.select(probabilities);
    for (Iterator<Element> it = selections.iterateNonZero(); it.hasNext();) {
      Element el = it.next();
      classifier.train(el.index(), value.get(), el.get());
    }
  }
 

 

 

在這里面需要厘清

org.apache.mahout.clustering.iterator.KMeansClusteringPolicy

org.apache.mahout.clustering.classify.ClusterClassifier

這兩個類。

前者是聚類的策略,可以說它提供聚類的核心算法。

后者是聚類的分類器,它的功能是基於聚類策略把數據進行分類。

 

2.1.1. ClusterClassifier 求點到Cluster形心的距離

 ClusterClassifier.classify()求得某點到所有cluster中心的距離,得到的是一個數組。

 

 
@Override
  public Vector classify(Vector data, ClusterClassifier prior) {
    List<Cluster> models = prior.getModels();
    int i = 0;
    Vector pdfs = new DenseVector(models.size());
    for (Cluster model : models) {
      pdfs.set(i++, model.pdf(new VectorWritable(data)));
    }
    return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum());
  }
 

 

上述代碼中的org.apache.mahout.clustering.iterator.DistanceMeasureCluster.pdf(VectorWritable)求該點到Cluster形心的距離,其算法代碼如下:

 

Java代碼 復制代碼  收藏代碼  
@Override
  public double pdf(VectorWritable vw) {
    return 1 / (1 + measure.distance(vw.get(), getCenter()));
  }
 
每一次迭代后,就會重新計算一次centroid,通過AbstractCluster.computeParameters來計算的。
 

pdfs.zSum()是pdfs double數組的和。然后再對pdfs進行歸一化處理。

因此最后select()用於選出相似度最大的cluster的下標,並且對其賦予權重1.0。如下所示:

 

 
@Override
  public Vector select(Vector probabilities) {
    int maxValueIndex = probabilities.maxValueIndex();
    Vector weights = new SequentialAccessSparseVector(probabilities.size());
    weights.set(maxValueIndex, 1.0);
    return weights;
  }
 

 

2.1.2. ClusterClassifier 為求Cluster新形心做准備

 接下來,為了重新得到新的中心,通過org.apache.mahout.clustering.classify.ClusterClassifier.train(int, Vector, double)為訓練數據,即最后在AbstractCluster里面准備數據。

 
public void observe(Vector x, double weight) {
    if (weight == 1.0) {
      observe(x);
    } else {
      setS0(getS0() + weight);
      Vector weightedX = x.times(weight);
      if (getS1() == null) {
        setS1(weightedX);
      } else {
        getS1().assign(weightedX, Functions.PLUS);
      }
      Vector x2 = x.times(x).times(weight);
      if (getS2() == null) {
        setS2(x2);
      } else {
        getS2().assign(x2, Functions.PLUS);
      }
    }
  }

 

2.2. Reduce階段

在CIReducer里面,對屬於同一個Cluster里面的數據進行合並,並且求出centroid形心。

 
@Override
  protected void reduce(IntWritable key, Iterable<ClusterWritable> values, Context context) throws IOException,
      InterruptedException {
    Iterator<ClusterWritable> iter = values.iterator();
    Cluster first = iter.next().getValue(); // there must always be at least one
    while (iter.hasNext()) {
      Cluster cluster = iter.next().getValue();
      first.observe(cluster);
    }
    List<Cluster> models = Lists.newArrayList();
    models.add(first);
    classifier = new ClusterClassifier(models, policy);
    classifier.close();
    context.write(key, new ClusterWritable(first));
  }

 

2.2.1. Reduce中求centroid形心的算法

 求centroid算法代碼如下:

@Override
  public void computeParameters() {
    if (getS0() == 0) {
      return;
    }
    setNumObservations((long) getS0());
    setTotalObservations(getTotalObservations() + getNumObservations());
    setCenter(getS1().divide(getS0()));
    // compute the component stds
    if (getS0() > 1) {
      setRadius(getS2().times(getS0()).minus(getS1().times(getS1())).assign(new SquareRootFunction()).divide(getS0()));
    }
    setS0(0);
    setS1(center.like());
    setS2(center.like());
  }

 

 

3. 聚類數據

 

真正對output/data記錄分配給各個簇的代碼是:

 
 private static void classifyClusterMR(Configuration conf, Path input, Path clustersIn, Path output,
      Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException, InterruptedException,
      ClassNotFoundException {
    
    conf.setFloat(ClusterClassificationConfigKeys.OUTLIER_REMOVAL_THRESHOLD,
                  clusterClassificationThreshold.floatValue());
    conf.setBoolean(ClusterClassificationConfigKeys.EMIT_MOST_LIKELY, emitMostLikely);
    conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString());
    
    Job job = new Job(conf, "Cluster Classification Driver running over input: " + input);
    job.setJarByClass(ClusterClassificationDriver.class);
    
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    //進行記錄分配
    job.setMapperClass(ClusterClassificationMapper.class);
    job.setNumReduceTasks(0);
    
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(WeightedVectorWritable.class);
    
    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, output);
    if (!job.waitForCompletion(true)) {
      throw new InterruptedException("Cluster Classification Driver Job failed processing " + input);
    }
  }
  

 摘錄地址:http://zcdeng.iteye.com/blog/1859711


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM