1.什么是MLBase
MLBase是Spark生態圈的一部分,專注於機器學習,包含三個組件:MLlib、MLI、ML Optimizer。
- ML Optimizer: This layer aims to automating the task of ML pipeline construction. The optimizer solves a search problem over feature extractors and ML algorithms included inMLI and MLlib. The ML Optimizer is currently under active development.
- MLI: An experimental API for feature extraction and algorithm development that introduces high-level ML programming abstractions. A prototype of MLI has been implemented against Spark, and serves as a testbed for MLlib.
- MLlib: Apache Spark's distributed ML library. MLlib was initially developed as part of the MLbase project, and the library is currently supported by the Spark community. Many features in MLlib have been borrowed from ML Optimizer and MLI, e.g., the model and algorithm APIs, multimodel training, sparse data support, design of local / distributed matrices, etc.
2.MLbase機器學習算法的流程
用戶可以容易地使用MLbase這個工具來處理自己的數據。大部分的機器學習算法都包含訓練以及預測兩個部分,訓練出模型,然后對未知樣本進行預測。Spark中的機器學習包也是如此。
Spark將機器學習算法都分成了兩個模塊:
- 訓練模塊:通過訓練樣本輸出模型參數
- 預測模塊:利用模型參數初始化,預測測試樣本,輸出與測值。
MLbase提供了函數式編程語言Scala,利用MLlib可以很方便的實現機器學習的常用算法。
比如說,我們要做分類,只需要寫如下scala代碼:
1 var X = load("some_data", 2 to 10) 2 var y = load("some_data", 1) 3 var (fn-model, summary) = doClassify(X, y)
代碼解釋:X是需要分類的數據集,y是從這個數據集里取的一個分類標簽,doClassify()分類。
這樣的處理有兩個主要好處:
- 每一步數據處理很清楚,可以很容易地可視化出來;
- 對用戶來說,用ML算法處理是透明的,不用關心和考慮用什么分類方法,是SVM還是AdaBoost,SVM用的kernel是線性的還是RBF的,original和scaled的參數調成多少等等。
MLbase的三大組成部分之一:ML Optimizer,會選擇它認為最適合的已經在內部實現好了的機器學習算法和相關參數,來處理用戶輸入的數據,並返回模型或別的幫助分析的結果。總體上的處理流程如下圖:
- 用戶輸入的Task比如doClassify(X, y)或者做協同過濾doCollabFilter(X, y)、圖計算findTopKDegreeNodes(G, k = 1000)之類的事情,先會傳給Parser處理,然后交給LLP。LLP是logical learning plan,即是邏輯上的一個學習選擇過程,在這個過程里選擇該用什么算法,特征提取應該用什么做,參數應該選什么,數據集怎么拆子數據集的策略等事情。
- LLP決定之后交給Optimizer。Optimizer是MLbase的核心,它會把數據拆分成若干份,對每一份使用不同的算法和參數來運算出結果,看哪一種搭配方式得到的結果最優(注意這次最優結果是初步的),優化器做完這些事之后就交給PLP。
- PLP是physical learning plan,即物理(實際)執行的計划,讓MLbase的master把任務分配給具體slave去最后執行之前選好的算法方案,把結果計算出來返回,同時返回這次計算的學習模型。
- 這個流程是Task -> Parser -> LLP -> Optimizer -> PLP -> Execute -> Result/Model,先從邏輯上,在已有的算法里選幾個適合這個場景的,讓優化器都去做一遍,把認為當時最優的方案給實際執行的部分去執行,返回結果。
MLbase不僅僅把結果返回給用戶。在LLP、Optimizer,MLbase會存儲一些中間結果和特征,然后會繼續搜尋和測試結果更好的算法和相關參數,並且會通知用戶。LLP內部實現的算法是可以擴充的。
總之,MLbase會自動尋找合適的算法,自動選擇和優化,還可以進行擴充。
3.Scala實現KMeans算法
3.1 什么是KMeans算法
K-Means算法是一種cluster analysis的算法,其主要是來計算數據聚集的算法,主要通過不斷地取離種子點最近均值的算法。
具體來說,通過輸入聚類個數k,以及包含 n個數據對象的數據庫,輸出滿足方差最小標准的k個聚類。
3.2 k-means 算法基本步驟
(1) 從 n個數據對象任意選擇 k 個對象作為初始聚類中心;
(2) 根據每個聚類對象的均值(中心對象),計算每個對象與這些中心對象的距離;並根據最小距離重新對相應對象進行划分;
(3) 重新計算每個(有變化)聚類的均值(中心對象);
(4) 計算標准測度函數,當滿足一定條件,如函數收斂時,則算法終止;如果條件不滿足則回到步驟(2)。
算法的時間復雜度上界為O(n*k*t), 其中t是迭代次數,n個數據對象划分為 k個聚類。
3.3 MLlib實現KMeans
以MLlib實現KMeans算法,利用MLlib KMeans訓練出來的模型,可以對新的數據作出分類預測,具體見代碼和輸出結果。
Scala代碼:

1 package com.hq 2 3 import org.apache.spark.mllib.clustering.KMeans 4 import org.apache.spark.mllib.linalg.Vectors 5 import org.apache.spark.{SparkContext, SparkConf} 6 7 object KMeansTest { 8 def main(args: Array[String]) { 9 if (args.length < 1) { 10 System.err.println("Usage: <file>") 11 System.exit(1) 12 } 13 14 val conf = new SparkConf() 15 val sc = new SparkContext(conf) 16 val data = sc.textFile(args(0)) 17 val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))) 18 val numClusters = 2 19 val numIterations = 20 20 val clusters = KMeans.train(parsedData,numClusters,numIterations) 21 22 println("------Predict the existing line in the analyzed data file: "+args(0)) 23 println("Vector 1.0 2.1 3.8 belongs to clustering "+ clusters.predict(Vectors.dense("1.0 2.1 3.8".split(' ').map(_.toDouble)))) 24 println("Vector 5.6 7.6 8.9 belongs to clustering "+ clusters.predict(Vectors.dense("5.6 7.6 8.9".split(' ').map(_.toDouble)))) 25 println("Vector 3.2 3.3 6.6 belongs to clustering "+ clusters.predict(Vectors.dense("3.2 3.3 6.6".split(' ').map(_.toDouble)))) 26 println("Vector 8.1 9.2 9.3 belongs to clustering "+ clusters.predict(Vectors.dense("8.1 9.2 9.3".split(' ').map(_.toDouble)))) 27 println("Vector 6.2 6.5 7.3 belongs to clustering "+ clusters.predict(Vectors.dense("6.2 6.5 7.3".split(' ').map(_.toDouble)))) 28 29 println("-------Predict the non-existent line in the analyzed data file: ----------------") 30 println("Vector 1.1 2.2 3.9 belongs to clustering "+ clusters.predict(Vectors.dense("1.1 2.2 3.9".split(' ').map(_.toDouble)))) 31 println("Vector 5.5 7.5 8.8 belongs to clustering "+ clusters.predict(Vectors.dense("5.5 7.5 8.8".split(' ').map(_.toDouble)))) 32 33 println("-------Evaluate clustering by computing Within Set Sum of Squared Errors:-----") 34 val wssse = clusters.computeCost(parsedData) 35 println("Within Set Sum of Squared Errors = "+ wssse) 36 sc.stop() 37 } 38 }
3.4 以Spark集群standalone方式運行
①在IDEA打成jar包(如果忘記了,參見Spark:用Scala和Java實現WordCount),上傳到用戶目錄下/home/ebupt/test/kmeans.jar
②准備訓練樣本數據:hdfs://eb170:8020/user/ebupt/kmeansData,內容如下
[ebupt@eb170 ~]$ hadoop fs -cat ./kmeansData
1.0 2.1 3.8
5.6 7.6 8.9
3.2 3.3 6.6
8.1 9.2 9.3
6.2 6.5 7.3
③spark-submit提交運行
[ebupt@eb174 test]$ spark-submit --master spark://eb174:7077 --name KmeansWithMLib --class com.hq.KMeansTest --executor-memory 2G --total-executor-cores 4 ~/test/kmeans.jar hdfs://eb170:8020/user/ebupt/kmeansData
輸出結果摘要:
1 ------Predict the existing line in the analyzed data file: hdfs://eb170:8020/user/ebupt/kmeansData 2 Vector 1.0 2.1 3.8 belongs to clustering 0 3 Vector 5.6 7.6 8.9 belongs to clustering 1 4 Vector 3.2 3.3 6.6 belongs to clustering 0 5 Vector 8.1 9.2 9.3 belongs to clustering 1 6 Vector 6.2 6.5 7.3 belongs to clustering 1 7 -------Predict the non-existent line in the analyzed data file: ---------------- 8 Vector 1.1 2.2 3.9 belongs to clustering 0 9 Vector 5.5 7.5 8.8 belongs to clustering 1 10 -------Evaluate clustering by computing Within Set Sum of Squared Errors:----- 11 Within Set Sum of Squared Errors = 16.393333333333388
4.MLbase總結
本文主要介紹了MLbase如何實現機器學習算法,簡單介紹了MLBase的設計思想。總的來說,Mlbase的核心是ML Optimizer,把聲明式的任務轉化成復雜的學習計划,輸出最優的模型和計算結果。
與其它機器學習系統Weka、mahout不同:
- MLbase是分布式的,Weka是單機的。
- Mlbase是自動化的,Weka和mahout都需要使用者具備機器學習技能,來選擇自己想要的算法和參數來做處理。
- MLbase提供了不同抽象程度的接口,可以擴充ML算法。
5.參考文獻
- MLbase
- apache mlbase
- A. Talwalkar, T. Kraska, R. Griffith, J. Duchi, J. Gonzalez, D. Britz, X. Pan, V. Smith, E. Sparks, A. Wibisono, M. J. Franklin, M. I. Jordan. MLbase: A Distributed Machine Learning Wrapper. In Big Learning Workshop at NIPS, 2012.
- Spark MLlib系列——程序框架
- MLBase:Spark生態圈里的分布式機器學習系統
- Apache Spark MLlib KMeans