由於思路比較簡單,而且代碼我寫的很詳細了,直接貼代碼吧。Mark.
/**
* @autor phh
* 相似度通常以對象到類質心的距離作為相似性的評價指標
* 算法流程如下:
* 1、從n個數據對象中選取k個不同的點作為初始質心,每個質心看成是一個類別的標識點
* 2、然后將數據集中的每一個點划分到距離最近的一個知心所對應的類別
* 3、完成一次聚類后根據此次聚類的結果重新計算各個類別的新質心
* 4、如果新的質心和之前的質心距離大於某個閾值,那么說明現在的聚類結果還沒有達到最佳結果,繼續直到不再變化
*/
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.calcite.rel.core.Collect
import akka.dispatch.Foreach
object Kmeans {
def main(args:Array[String]){
if(args.length < 0){
System.err.println("usag error input and output");
}
val input = args(0) ;
val conf = new SparkConf().setAppName("Kmeans-test").setMaster("local")
val sc = new SparkContext(conf)
//k指代聚類結果生成的類別個數
val k=2
//制定結束條件:要么距離閾值為0.1,要么最大迭代次數為5
val e=0.1
val maxIterations = 5
//起始迭代是0
var iteration =0
//將數據從input中讀取使用map生成K/V格式:
//???
val data = sc.textFile(input).map(x=>x.split(" ").map(_.toDouble)).cache
println(data);
//定義一個數組來保存質心
var centers:Array[Array[Double]]=null
//隨機選取兩個輸入數據作為質心
do{
//???
centers = data.takeSample(true, 2, System.nanoTime().toInt )
}while(centers.map(_.deep).toSet.size != k)
def euclideanDistance(xs:Array[Double],ys:Array[Double])={
Math.sqrt((xs zip ys).map{
case (x,y)=>Math.pow(y-x,2)
}.sum)
}
var changed =true
val dims =centers(0).length
while(changed && iteration<maxIterations){
iteration= iteration + 1
changed =false
}
val pointWithClass =data.map({point=>
val closestCenterIndex =centers.zipWithIndex.map({case (center,
index)=>{
val distance =euclideanDistance(point, center)
(distance,index)
}}).reduce((d1,d2)=>if(d1._1>d2._2) d2 else d1)._2
(closestCenterIndex,(point, 1))
})
val totalContribs =pointWithClass.reduceByKey({case ((xs,c1),(ys,c2))=>
((xs zip ys).map{case (x,y)=>x+y},c1+c2)}).collect
val newCenters =totalContribs.map{
case (centerIndex, (sum, counts))=>
(centerIndex,sum.map(_/counts))}.sortBy(_._1).map(_._2)
for(i<-0 until k){
if(euclideanDistance(centers(i), newCenters(i))>e){
changed=true;
centers(i)=newCenters(i)
}
}
centers.foreach(x=>println(x.mkString(",")))
}
}
