(借鑒於網絡資料,有修改)
一、概念介紹
K-means算法是硬聚類算法,是典型的局域原型的目標函數聚類方法的代表,它是數據點到原型的某種距離作為優化的目標函數,利用函數求極值的方法得到迭代運算的調整規則。K-means算法以歐式距離作為相似度測度,它是求對應某一初始聚類中心向量V最有分類,使得評價指標J最小。算法采用誤差平方和准則函數作為聚類准則函數。
K-means算法是很典型的基於距離的聚類算法,采用距離作為相似性的評價指標,即認為兩個對象的距離越近,其相似度就越大。該算法認為簇是由距離靠近的對象組成的,因此把得到緊湊且獨立的簇作為最終目標。
k個初始類聚類中心點的選取對聚類結果具有較大的影響,因為在該算法第一步中是隨機的選取任意k個對象作為初始聚類的中心,初始地代表一個簇。該算法在每次迭代中對數據集中剩余的每個對象,根據其與各個簇中心的距離將每個對象重新賦給最近的簇。當考察完所有數據對象后,一次迭代運算完成,新的聚類中心被計算出來。如果在一次迭代前后,評價指標J的值沒有發生變化,說明算法已經收斂。
二、基本思想
1.數學描述
給定d維實數向量(
),后面就將這個實數向量稱作點吧,短!K-Means算法會根據事先制定的參數k,將這些點划分出k個Cluster(k ≤ n),而划分的標准是最小化點與Cluster重心(均值)的距離平方和,假設這些Cluster為:
,則數學描述如下:
,其中
為第i個Cluster的“重心”(Cluster中所有點的平均值)。
聚類的效果類似下圖:

具體可見:http://en.wikipedia.org/wiki/K-means_clustering
2.K-means算法
它是一種迭代的算法:
(1)、根據事先給定的k值建立初始划分,得到k個Cluster,比如,可以隨機選擇k個點作為k個Cluster的重心,又或者用Canopy Clustering得到的Cluster作為初始重心(當然這個時候k的值由Canopy Clustering得結果決定);
(2)、計算每個點到各個Cluster重心的距離,將它加入到最近的那個Cluster;
(3)、重新計算每個Cluster的重心;
(4)、重復過程2~3,直到各個Cluster重心在某個精度范圍內不變化或者達到最大迭代次數。
別看算法簡單,很多復雜算法的實際效果或許都不如它,而且它的局部性較好,容易並行化,對大規模數據集很有意義;算法時間復雜度是:O(nkt),其中:n 是聚類點個數,k 是Cluster個數,t 是迭代次數。
三、並行化K-means
K-Means較好地局部性使它能很好的被並行化。第一階段,生成Cluster的過程可以並行化,各個Slaves讀取存在本地的數據集,用上述算法生成Cluster集合,最后用若干Cluster集合生成第一次迭代的全局Cluster集合,然后重復這個過程直到滿足結束條件,第二階段,用之前得到的Cluster進行聚類操作。
用map-reduce描述是:datanode在map階段讀出位於本地的數據集,輸出每個點及其對應的Cluster;combiner操作對位於本地包含在相同Cluster中的點進行reduce操作並輸出,reduce操作得到全局Cluster集合並寫入HDFS。
四、Mahout的K-means
mahout實現了標准K-Means Clustering,思想與前面相同,一共使用了2個map操作、1個combine操作和1個reduce操作,每次迭代都用1個map、1個combine和一個reduce操作得到並保存全局Cluster集合,迭代結束后,用一個map進行聚類操作。
1.數據結構模型
Mahout聚類算法將對象以Vector的方式表示,它同時支持dense vector和sparse vector,一共有三種表示方式(它們擁有共同的基類AbstractVector,里面實現了有關Vector的很多操作):
(1)、DenseVector
它實現的時候用一個double數組表示Vector(private double[] values), 對於dense data可以使用它;
(2)、RandomAccessSparseVector
它用來表示一個可以隨機訪問的sparse vector,只存儲非零元素,數據的存儲采用hash映射:OpenIntDoubleHashMap;
關於OpenIntDoubleHashMap,其key為int類型,value為double類型,解決沖突的方法是double hashing,
(3)、SequentialAccessSparseVector
它用來表示一個順序訪問的sparse vector,同樣只存儲非零元素,數據的存儲采用順序映射:OrderedIntDoubleMapping;
關於OrderedIntDoubleMapping,其key為int類型,value為double類型,存儲的方式讓我想起了Libsvm數據表示的形式:非零元素索引:非零元素的值,這里用一個int數組存儲indices,用double數組存儲非零元素,要想讀寫某個元素,需要在indices中查找offset,由於indices應該是有序的,所以查找操作用的是二分法。
2.K-means變量含義
可以從Cluster.java及其父類,對於Cluster,mahout實現了一個抽象類AbstractCluster封裝Cluster,具體說明可以參考上一篇文章,這里做個簡單說明:
(1)、private int id; #每個K-Means算法產生的Cluster的id
(2)、private long numPoints; #Cluster中包含點的個數,這里的點都是Vector
(3)、private Vector center; #Cluster的重心,這里就是平均值,由s0和s1計算而來。
(4)、private Vector Radius; #Cluster的半徑,這個半徑是各個點的標准差,反映組內個體間的離散程度,由s0、s1和s2計算而來。
(5)、private double s0; #表示Cluster包含點的權重之和,
(6)、private Vector s1; #表示Cluster包含點的加權和,
(7)、private Vector s2; #表示Cluster包含點平方的加權和,
(8)、public void computeParameters(); #根據s0、s1、s2計算numPoints、center和Radius:




這幾個操作很重要,最后三步很必要,在后面會做說明。
(9)、public void observe(VectorWritable x, double weight); #每當有一個新的點加入當前Cluster時都需要更新s0、s1、s2的值
(10)、public ClusterObservation getObservations(); #這個操作在combine操作時會經常被用到,它會返回由s0、s1、s2初始化的ClusterObservation對象,表示當前Cluster中包含的所有被觀察過的點
3.K-means的Map-Reduce實現
K-Means Clustering的實現同樣包含單機版和MR兩個版本,單機版就不說了,MR版用了兩個map操作、一個combine操作和一個reduce操作,是通過兩個不同的job觸發,用Dirver來組織的,map和reduce階段執行順序是:

(1)對於K初始化的形成
K-Means算法需要一個對數據點的初始划分,mahout里用了兩種方法(以Iris dataset前3個feature為例):
A、使用RandomSeedGenerator類
在指定clusters目錄生成k個初始划分並以Sequence File形式存儲,其選擇方法希望能盡可能不讓孤立點作為Cluster重心,大概流程如下:

圖2
B、使用Canopy Clustering
Canopy Clustering常常用來對初始數據做一個粗略的划分,它的結果可以為之后代價較高聚類提供幫助,Canopy Clustering可能用在數據預處理上要比單純拿來聚類更有用,比如對K-Means來說提供k值,另外還能很好的處理孤立點,當然,需要人工指定的參數由k變成了T1、T2,T1和T2所起的作用是缺一不可的,T1決定了每個Cluster包含點的數目,這直接影響了Cluster的“重心”和“半徑”,而T2則決定了Cluster的數目,T2太大會導致只有一個Cluster,而太小則會出現過多的Cluster。通過實驗,T1和T2取值會嚴重影響到算法的效果,如何確定T1和T2,似乎可以用AIC、BIC或者交叉驗證去做。。。
(2).配置Cluster信息
K-Means算法的MR實現,第一次迭代需要將隨機方法或者Canopy Clustering方法結果目錄作為kmeans第一次迭代的輸入目錄,接下來的每次迭代都需要將上次迭代的輸出目錄作為本次迭代的輸入目錄,這就需要能在每次kmeans map和kmeans reduce操作前從該目錄得到Cluster的信息,這個功能由KMeansUtil.configureWithClusterInfo實現,它依據指定的HDFS目錄將Canopy Cluster或者上次迭代Cluster的信息存儲到一個Collection中,這個方法在之后的每個map和reduce操作中都需要。
(3).KMeansMapper
public class KMeansMapper extends Mapper<WritableComparable<?>, VectorWritable, Text, ClusterObservations> { private KMeansClusterer clusterer; private final Collection<Cluster> clusters = new ArrayList<Cluster>(); @Override protected void map(WritableComparable<?> key, VectorWritable point, Context context) throws IOException, InterruptedException { this.clusterer.emitPointToNearestCluster(point.get(), this.clusters, context); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); try { ClassLoader ccl = Thread.currentThread().getContextClassLoader(); DistanceMeasure measure = ccl.loadClass(conf.get(KMeansConfigKeys.DISTANCE_MEASURE_KEY)) .asSubclass(DistanceMeasure.class).newInstance(); measure.configure(conf); this.clusterer = new KMeansClusterer(measure); String clusterPath = conf.get(KMeansConfigKeys.CLUSTER_PATH_KEY); if (clusterPath != null && clusterPath.length() > 0) { KMeansUtil.configureWithClusterInfo(conf, new Path(clusterPath), clusters); if (clusters.isEmpty()) { throw new IllegalStateException("No clusters found. Check your -c path."); } } } catch (ClassNotFoundException e) { throw new IllegalStateException(e); } catch (IllegalAccessException e) { throw new IllegalStateException(e); } catch (InstantiationException e) { throw new IllegalStateException(e); } } void setup(Collection<Cluster> clusters, DistanceMeasure measure) { this.clusters.clear(); this.clusters.addAll(clusters); this.clusterer = new KMeansClusterer(measure); } }
A、KMeansMapper接收的是(WritableComparable<?>, VectorWritable) Pair,setup方法利用KMeansUtil.configureWithClusterInfo得到上一次迭代的Clustering結果,map操作需要依據這個結果聚類。
B、每個slave機器會分布式的處理存在硬盤上的數據,依據之前得到得Cluster信息,用emitPointToNearestCluster方法將每個點加入到與其距離最近的Cluster,輸出結果為(與當前點距離最近Cluster的ID, 由當前點包裝而成的ClusterObservations) Pair,值得注意的是Mapper只是將點加入最近的Cluster,並以(key,value)形式注明此點所離最近的cluster,等待combiner,reducer搜集,沒有更新Cluster重心等參數。
(4).KMeansCombiner
public class KMeansCombiner extends Reducer<Text, ClusterObservations, Text, ClusterObservations> { @Override protected void reduce(Text key, Iterable<ClusterObservations> values, Context context) throws IOException, InterruptedException { Cluster cluster = new Cluster(); for (ClusterObservations value : values) { cluster.observe(value); } context.write(key, cluster.getObservations()); } }
combiner操作是一個本地的reduce操作,發生在map之后,reduce之前:
(5).KMeansReducer
public class KMeansReducer extends Reducer<Text, ClusterObservations, Text, Cluster> { private Map<String, Cluster> clusterMap; private double convergenceDelta; private KMeansClusterer clusterer; @Override protected void reduce(Text key, Iterable<ClusterObservations> values, Context context) throws IOException, InterruptedException { Cluster cluster = clusterMap.get(key.toString()); for (ClusterObservations delta : values) { cluster.observe(delta); } // force convergence calculation boolean converged = clusterer.computeConvergence(cluster, convergenceDelta); if (converged) { context.getCounter("Clustering", "Converged Clusters").increment(1); } cluster.computeParameters(); context.write(new Text(cluster.getIdentifier()), cluster); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); try { ClassLoader ccl = Thread.currentThread().getContextClassLoader(); DistanceMeasure measure = ccl.loadClass(conf.get(KMeansConfigKeys.DISTANCE_MEASURE_KEY)) .asSubclass(DistanceMeasure.class).newInstance(); measure.configure(conf); this.convergenceDelta = Double.parseDouble(conf.get(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY)); this.clusterer = new KMeansClusterer(measure); this.clusterMap = new HashMap<String, Cluster>(); String path = conf.get(KMeansConfigKeys.CLUSTER_PATH_KEY); if (path.length() > 0) { Collection<Cluster> clusters = new ArrayList<Cluster>(); KMeansUtil.configureWithClusterInfo(conf, new Path(path), clusters); setClusterMap(clusters); if (clusterMap.isEmpty()) { throw new IllegalStateException("Cluster is empty!"); } } } catch (ClassNotFoundException e) { throw new IllegalStateException(e); } catch (IllegalAccessException e) { throw new IllegalStateException(e); } catch (InstantiationException e) { throw new IllegalStateException(e); } } private void setClusterMap(Collection<Cluster> clusters) { clusterMap = new HashMap<String, Cluster>(); for (Cluster cluster : clusters) { clusterMap.put(cluster.getIdentifier(), cluster); } clusters.clear(); } public void setup(Collection<Cluster> clusters, DistanceMeasure measure) { setClusterMap(clusters); this.clusterer = new KMeansClusterer(measure); } }
很直白的的操作,只是在setup的時候稍復雜。
A、setup操作的目的是讀取初始划分或者上次迭代的結果,構建Cluster信息,同時做了Map<Cluster的ID,Cluster>映射,方便從ID找Cluster。
B、reduce操作非常直白,將從combiner傳來的<Cluster ID,ClusterObservations>進行匯總;
computeConvergence用來判斷當前Cluster是否收斂,即新的“重心”與老的“重心”距離是否滿足之前傳入的精度要求;
注意到有個cluster.computeParameters()操作,這個操作非常重要,它保證了本次迭代的結果不會影響到下次迭代,也就是保證了能夠“重新計算每個Cluster的重心”這一步驟。



前三個操作得到新的Cluster信息;

后三個步驟清空S0、S1、S2信息,保證下次迭代所需的Cluster信息是“干凈”的。
之后,reduce將(Cluster ID, Cluster) Pair寫入到HDFS中以”clusters-迭代次數“命名的文件夾中,供后面迭代時候使用。
Reduce操作搜集前面Combiner輸出的信息,並再一次對Canopy重心等信息進行了更新
(6).KMeansClusterMapper
之前的MR操作用於構建Cluster信息,KMeansClusterMapper則用構造好的Cluster信息來聚類。
public class KMeansClusterMapper extends Mapper<WritableComparable<?>,VectorWritable,IntWritable,WeightedVectorWritable> { private final Collection<Cluster> clusters = new ArrayList<Cluster>(); private KMeansClusterer clusterer; @Override protected void map(WritableComparable<?> key, VectorWritable point, Context context) throws IOException, InterruptedException { clusterer.outputPointWithClusterInfo(point.get(), clusters, context); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); try { ClassLoader ccl = Thread.currentThread().getContextClassLoader(); DistanceMeasure measure = ccl.loadClass(conf.get(KMeansConfigKeys.DISTANCE_MEASURE_KEY)) .asSubclass(DistanceMeasure.class).newInstance(); measure.configure(conf); String clusterPath = conf.get(KMeansConfigKeys.CLUSTER_PATH_KEY); if (clusterPath != null && clusterPath.length() > 0) { KMeansUtil.configureWithClusterInfo(conf, new Path(clusterPath), clusters); if (clusters.isEmpty()) { throw new IllegalStateException("No clusters found. Check your -c path."); } } this.clusterer = new KMeansClusterer(measure); } catch (ClassNotFoundException e) { throw new IllegalStateException(e); } catch (IllegalAccessException e) { throw new IllegalStateException(e); } catch (InstantiationException e) { throw new IllegalStateException(e); } } }
A、setup依然是從指定目錄讀取並構建Cluster信息;
B、map操作通過計算每個點到各Cluster“重心”的距離完成聚類操作,可以看到map操作結束,所有點就都被放在唯一一個與之距離最近的Cluster中了,因此之后並不需要reduce操作。
(7).KMeansDriver
這里值得注意的是buildCluster中的迭代過程,runIteration中設置前面KMeanMapper,KMeansCombiner,KMeanReducer所在job的參數。
其中buildCluster代碼:
private static Path buildClustersMR(Configuration conf, Path input, Path clustersIn, Path output, DistanceMeasure measure, int maxIterations, String delta) throws IOException, InterruptedException, ClassNotFoundException { boolean converged = false; int iteration = 1; while (!converged && iteration <= maxIterations) { log.info("K-Means Iteration {}", iteration); // point the output to a new directory per iteration Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration); converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta); // now point the input to the old output directory clustersIn = clustersOut; iteration++; } return clustersIn; }
如果把前面的KMeansMapper、KMeansCombiner、KMeansReducer、KMeansClusterMapper看做是磚的話,KMeansDriver就是蓋房子的人,它用來組織整個kmeans算法流程(包括單機版和MR版)。示意圖如下:
圖4
