使用 Spark MLlib 做 K-means 聚類分析[轉]


原文地址:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice4/

引言

提起機器學習 (Machine Learning),相信很多計算機從業者都會對這個技術方向感到興奮。然而學習並使用機器學習算法來處理數據卻是一項復雜的工作,需要充足的知識儲備,如概率論,數理統計,數值逼近,最優化理論等。機器學習旨在使計算機具有人類一樣的學習能力和模仿能力,這也是實現人工智能的核心思想和方法。傳統的機器學習算法,由於技術和單機存儲的限制,只能在少量數據上使用,隨着 HDFS(Hadoop Distributed File System) 等分布式文件系統出現,存儲海量數據已經成為可能。然而由於 MapReduce 自身的限制,使得使用 MapReduce 來實現分布式機器學習算法非常耗時和消耗磁盤容量。因為通常情況下機器學習算法參數學習的過程都是迭代計算的,即本次計算的結果要作為下一次迭代的輸入,這個過程中,如果使用 MapReduce,我們只能把中間結果存儲磁盤,然后在下一次計算的時候從新讀取,這對於迭代 頻發的算法顯然是致命的性能瓶頸。Spark 立足於內存計算,天然的適應於迭代式計算,相信對於這點,讀者通過前面幾篇文章已經有了較為深入的了解。然而即便這樣,對於普通開發者來說,實現一個分布式機器學習算法仍然是一件極具挑戰的事情。MLlib 正是為了讓基於海量數據的機器學習變得更加簡單,它提供了常用機器學習算法的分布式實現,開發者只需要有 Spark 基礎並且了解機器學習算法的原理,以及方法相關參數的含義,就可以輕松的通過調用相應的 API 來實現基於海量數據的機器學習過程。當然,原始數據 ETL,特征指標提取,調節參數並優化學習過程,這依然需要有足夠的行業知識和數據敏感度,這往往也是經驗的體現。本文的重點在於向讀者介紹如何使用 MLlib 機器學習庫提供的 K-means 算法做聚類分析,這是一個有意義的過程,相信會對讀者特別是初學者有啟發意義。

 

Spark 機器學習庫簡介

Spark 機器學習庫提供了常用機器學習算法的實現,包括聚類,分類,回歸,協同過濾,維度縮減等。使用 Spark 機器學習庫來做機器學習工作,可以說是非常的簡單,通常只需要在對原始數據進行處理后,然后直接調用相應的 API 就可以實現。但是要想選擇合適的算法,高效准確地對數據進行分析,您可能還需要深入了解下算法原理,以及相應 Spark MLlib API 實現的參數的意義。

需要提及的是,Spark 機器學習庫從 1.2 版本以后被分為兩個包,分別是:

  • spark.mllib

Spark MLlib 歷史比較長了,1.0 以前的版本中已經包含了,提供的算法實現都是基於原始的 RDD,從學習角度上來講,其實比較容易上手。如果您已經有機器學習方面的經驗,那么您只需要熟悉下 MLlib 的 API 就可以開始數據分析工作了。想要基於這個包提供的工具構建完整並且復雜的機器學習流水線是比較困難的。

  • spark.ml

Spark ML Pipeline 從 Spark1.2 版本開始,目前已經從 Alpha 階段畢業,成為可用並且較為穩定的新的機器學習庫。ML Pipeline 彌補了原始 MLlib 庫的不足,向用戶提供了一個基於 DataFrame 的機器學習工作流式 API 套件,使用 ML Pipeline API,我們可以很方便的把數據處理,特征轉換,正則化,以及多個機器學習算法聯合起來,構建一個單一完整的機器學習流水線。顯然,這種新的方式給我們提供了更靈活的方法,而且這也更符合機器學習過程的特點。

從官方文檔來看,Spark ML Pipeline 雖然是被推薦的機器學習方式,但是並不會在短期內替代原始的 MLlib 庫,因為 MLlib 已經包含了豐富穩定的算法實現,並且部分 ML Pipeline 實現基於 MLlib。而且就筆者看來,並不是所有的機器學習過程都需要被構建成一個流水線,有時候原始數據格式整齊且完整,而且使用單一的算法就能實現目標,我們就沒有必要把事情復雜化,采用最簡單且容易理解的方式才是正確的選擇。

本文基於 Spark 1.5,向讀者展示使用 MLlib API 進行聚類分析的過程。讀者將會發現,使用 MLlib API 開發機器學習應用方式是比較簡單的,相信本文可以使讀者建立起信心並掌握基本方法,以便在后續的學習和工作中事半功倍。

 

K-means 聚類算法原理

聚類分析是一個無監督學習 (Unsupervised Learning) 過程, 一般是用來對數據對象按照其特征屬性進行分組,經常被應用在客戶分群,欺詐檢測,圖像分析等領域。K-means 應該是最有名並且最經常使用的聚類算法了,其原理比較容易理解,並且聚類效果良好,有着廣泛的使用。

和諸多機器學習算法一樣,K-means 算法也是一個迭代式的算法,其主要步驟如下:

  • 第一步,選擇 K 個點作為初始聚類中心。
  • 第二步,計算其余所有點到聚類中心的距離,並把每個點划分到離它最近的聚類中心所在的聚類中去。在這里,衡量距離一般有多個函數可以選擇,最常用的是歐幾里得距離 (Euclidean Distance), 也叫歐式距離。公式如下:

Figure xxx. Requires a heading

其中 C 代表中心點,X 代表任意一個非中心點。

  • 第三步,重新計算每個聚類中所有點的平均值,並將其作為新的聚類中心點。
  • 最后,重復 (二),(三) 步的過程,直至聚類中心不再發生改變,或者算法達到預定的迭代次數,又或聚類中心的改變小於預先設定的閥值。

在實際應用中,K-means 算法有兩個不得不面對並且克服的問題。

  1. 聚類個數 K 的選擇。K 的選擇是一個比較有學問和講究的步驟,我們會在后文專門描述如何使用 Spark 提供的工具選擇 K。
  2. 初始聚類中心點的選擇。選擇不同的聚類中心可能導致聚類結果的差異。

Spark MLlib K-means 算法的實現在初始聚類點的選擇上,借鑒了一個叫 K-means||的類 K-means++ 實現。K-means++ 算法在初始點選擇上遵循一個基本原則: 初始聚類中心點相互之間的距離應該盡可能的遠。基本步驟如下:

  • 第一步,從數據集 X 中隨機選擇一個點作為第一個初始點。
  • 第二步,計算數據集中所有點與最新選擇的中心點的距離 D(x)。
  • 第三步,選擇下一個中心點,使得最大。
  • 第四部,重復 (二),(三) 步過程,直到 K 個初始點選擇完成。
 

MLlib 的 K-means 實現

Spark MLlib 中 K-means 算法的實現類 (KMeans.scala) 具有以下參數,具體如下。

圖 1. MLlib K-means 算法實現類預覽

圖 1. MLlib K-means 算法實現類預覽

通過下面默認構造函數,我們可以看到這些可調參數具有以下初始值。

圖 2. MLlib K-means 算法參數初始值

圖 2. MLlib K-means 算法參數初始值

參數的含義解釋如下:

  • k 表示期望的聚類的個數。
  • maxInterations 表示方法單次運行最大的迭代次數。
  • runs 表示算法被運行的次數。K-means 算法不保證能返回全局最優的聚類結果,所以在目標數據集上多次跑 K-means 算法,有助於返回最佳聚類結果。
  • initializationMode 表示初始聚類中心點的選擇方式, 目前支持隨機選擇或者 K-means||方式。默認是 K-means||。
  • initializationSteps表示 K-means||方法中的部數。
  • epsilon 表示 K-means 算法迭代收斂的閥值。
  • seed 表示集群初始化時的隨機種子。

通常應用時,我們都會先調用 KMeans.train 方法對數據集進行聚類訓練,這個方法會返回 KMeansModel 類實例,然后我們也可以使用 KMeansModel.predict 方法對新的數據點進行所屬聚類的預測,這是非常實用的功能。

KMeans.train 方法有很多重載方法,這里我們選擇參數最全的一個展示。

圖 3. KMeans.train 方法預覽

圖 3. KMeans.train 方法預覽

KMeansModel.predict 方法接受不同的參數,可以是向量,或者 RDD,返回是入參所屬的聚類的索引號。

圖 4. KMeansModel.predict 方法預覽

圖 4. KMeansModel.predict 方法預覽

 

聚類測試數據集簡介

在本文中,我們所用到目標數據集是來自 UCI Machine Learning Repository 的 Wholesale customer Data Set。UCI 是一個關於機器學習測試數據的下載中心站點,里面包含了適用於做聚類,分群,回歸等各種機器學習問題的數據集。

Wholesale customer Data Set 是引用某批發經銷商的客戶在各種類別產品上的年消費數。為了方便處理,本文把原始的 CSV 格式轉化成了兩個文本文件,分別是訓練用數據和測試用數據。

圖 5. 客戶消費數據格式預覽

圖 5. 客戶消費數據格式預覽

讀者可以從標題清楚的看到每一列代表的含義,當然讀者也可以到 UCI 網站上去找到關於該數據集的更多信息。雖然 UCI 的數據可以自由獲取並使用,但是我們還是在此聲明,該數據集的版權屬 UCI 以及其原始提供組織或公司所有。

 

案例分析和編碼實現

本例中,我們將根據目標客戶的消費數據,將每一列視為一個特征指標,對數據集進行聚類分析。代碼實現步驟如下

清單 1. 聚類分析實現類源碼
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
object KMeansClustering {
def main (args: Array[String]) {
if (args.length < 5) {
println("Usage:KMeansClustering trainingDataFilePath testDataFilePath numClusters numIterations runTimes")
sys.exit(1)
}

val conf = new SparkConf().setAppName("Spark MLlib Exercise:K-Means Clustering")
val sc = new SparkContext(conf)
/**
*Channel Region Fresh Milk Grocery Frozen Detergents_Paper Delicassen
* 2 3 12669 9656 7561 214 2674 1338
* 2 3 7057 9810 9568 1762 3293 1776
* 2 3 6353 8808 7684 2405 3516 7844
*/
val rawTrainingData = sc.textFile(args(0))
val parsedTrainingData = rawTrainingData.filter(!isColumnNameLine(_)).map(line => {
Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))
}).cache()
// Cluster the data into two classes using KMeans
val numClusters = args(2).toInt
val numIterations = args(3).toInt
val runTimes = args(4).toInt
var clusterIndex:Int = 0
val clusters:KMeansModel = KMeans.train(parsedTrainingData, numClusters, numIterations,runTimes)
println("Cluster Number:" + clusters.clusterCenters.length)
println("Cluster Centers Information Overview:")
clusters.clusterCenters.foreach( x => {
println("Center Point of Cluster " + clusterIndex + ":")
println(x)
clusterIndex += 1
})
//begin to check which cluster each test data belongs to based on the clustering result
val rawTestData = sc.textFile(args(1))
val parsedTestData = rawTestData.map(line => {
Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))
})
parsedTestData.collect().foreach(testDataLine => {
val predictedClusterIndex: Int = clusters.predict(testDataLine)
println("The data " + testDataLine.toString + " belongs to cluster " + predictedClusterIndex)
})
println("Spark MLlib K-means clustering test finished.")
}

private def isColumnNameLine(line:String):Boolean = {
if (line != null && line.contains("Channel")) true
else false
}

該示例程序接受五個入參,分別是

  • 訓練數據集文件路徑
  • 測試數據集文件路徑
  • 聚類的個數
  • K-means 算法的迭代次數
  • K-means 算法 run 的次數
 

運行示例程序

和本系列其他文章一樣,我們依然選擇使用 HDFS 存儲數據文件。運行程序之前,我們需要將前文提到的訓練和測試數據集上傳到 HDFS。

圖 6. 測試數據的 HDFS 目錄

圖 6. 測試數據的 HDFS 目錄

清單 2. 示例程序運行命令
./spark-submit --class com.ibm.spark.exercise.mllib.KMeansClustering \
 --master spark://<spark_master_node_ip>:7077 \
 --num-executors 6 \
--driver-memory 3g \
--executor-memory 512m \
--total-executor-cores 6 \
 /home/fams/spark_exercise-1.0.jar \
 hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_training.txt \
 hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_test.txt \
 8 30 3
圖 7. K-means 聚類示例程序運行結果

圖 7. K-means 聚類示例程序運行結果

 

如何選擇 K

前面提到 K 的選擇是 K-means 算法的關鍵,Spark MLlib 在 KMeansModel 類里提供了 computeCost 方法,該方法通過計算所有數據點到其最近的中心點的平方和來評估聚類的效果。一般來說,同樣的迭代次數和算法跑的次數,這個值越小代表聚類的效果越好。但是在實際情況下,我們還要考慮到聚類結果的可解釋性,不能一味的選擇使 computeCost 結果值最小的那個 K。

清單 3. K 選擇示例代碼片段
val ks:Array[Int] = Array(3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)
ks.foreach(cluster => {
 val model:KMeansModel = KMeans.train(parsedTrainingData, cluster,30,1)
 val ssd = model.computeCost(parsedTrainingData)
 println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd)
})
圖 8. K 選擇示例程序運行結果

圖 8. K 選擇示例程序運行結果

從上圖的運行結果可以看到,當 K=9 時,cost 值有波動,但是后面又逐漸減小了,所以我們選擇 8 這個臨界點作為 K 的個數。當然可以多跑幾次,找一個穩定的 K 值。理論上 K 的值越大,聚類的 cost 越小,極限情況下,每個點都是一個聚類,這時候 cost 是 0,但是顯然這不是一個具有實際意義的聚類結果。

 

結束語

通過本文的學習,讀者已經初步了解了 Spark 的機器學習庫,並且掌握了 K-means 算法的基本原理,以及如何基於 Spark MLlib 構建自己的機器學習應用。機器學習應用的構建是一個復雜的過程,我們通常還需要對數據進行預處理,然后特征提取以及數據清洗等,然后才能利用算法來分析數據。Spark MLlib 區別於傳統的機器學習工具,不僅是因為它提供了簡單易用的 API,更重要的是 Spark 在處理大數據上的高效以及在迭代計算時的獨特優勢。雖然本文所采用的測試數據集很小,並不能反映大數據的應用場景,但是對於掌握基本原理已經足夠,並且如果讀者擁有更大的數據集就可以輕松的將本文的測試程序推廣到大數據聚類的場景下,因為 Spark MLlib 的編程模型都是一致的,無非是數據讀取和處理的方式略有不同。希望讀者可以在本文中找到自己感興趣的知識,相信這對讀者今后深入學習是有幫助的。另外,讀者在閱讀本文的過程中,如果遇到問題或者發現不足之處,請不吝賜教,在文末留言,共同交流學習,謝謝。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM