Spark MLlib架構解析
- MLlib的底層基礎解析
- MLlib的算法庫分析
- 分類算法
- 回歸算法
- 聚類算法
- 協同過濾
- MLlib的實用程序分析
從架構圖可以看出MLlib主要包含三個部分:
- 底層基礎:包括Spark的運行庫、矩陣庫和向量庫;
- 算法庫:包含廣義線性模型、推薦系統、聚類、決策樹和評估的算法;
- 實用程序:包括測試數據的生成、外部數據的讀入等功能。

MLlib的底層基礎解析
底層基礎部分主要包括向量接口和矩陣接口,這兩種接口都會使用Scala語言基於Netlib和BLAS/LAPACK開發的線性代數庫Breeze。
MLlib支持本地的密集向量和稀疏向量,並且支持標量向量。
MLlib同時支持本地矩陣和分布式矩陣,支持的分布式矩陣分為RowMatrix、IndexedRowMatrix、CoordinateMatrix等。
關於密集型和稀疏型的向量Vector的示例如下所示。


疏矩陣在含有大量非零元素的向量Vector計算中會節省大量的空間並大幅度提高計算速度,如下圖所示。

標量LabledPoint在實際中也被大量使用,例如判斷郵件是否為垃圾郵件時就可以使用類似於以下的代碼:

可以把表示為1.0的判斷為正常郵件,而表示為0.0則作為垃圾郵件來看待。
對於矩陣Matrix而言,本地模式的矩陣如下所示。

分布式矩陣如下所示。


RowMatrix直接通過RDD[Vector]來定義並可以用來統計平均數、方差、協同方差等:


而IndexedRowMatrix是帶有索引的Matrix,但其可以通過toRowMatrix方法來轉換為RowMatrix,從而利用其統計功能,代碼示例如下所示。

CoordinateMatrix常用於稀疏性比較高的計算中,是由RDD[MatrixEntry]來構建的,MatrixEntry是一個Tuple類型的元素,其中包含行、列和元素值,代碼示例如下所示:

MLlib的算法庫分析
下圖是MLlib算法庫的核心內容。

在這里我們分析一些Spark中常用的算法:
1) 分類算法
分類算法屬於監督式學習,使用類標簽已知的樣本建立一個分類函數或分類模型,應用分類模型,能把數據庫中的類標簽未知的數據進行歸類。分類在數據挖掘中是一項重要的任務,目前在商業上應用最多,常見的典型應用場景有流失預測、精確營銷、客戶獲取、個性偏好等。MLlib 目前支持分類算法有:邏輯回歸、支持向量機、朴素貝葉斯和決策樹。
案例:導入訓練數據集,然后在訓練集上執行訓練算法,最后在所得模型上進行預測並計算訓練誤差。
import org.apache.spark.SparkContext import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint // 加載和解析數據文件 val data = sc.textFile("mllib/data/sample_svm_data.txt") val parsedData = data.map { line => val parts = line.split(' ') LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray) } // 設置迭代次數並進行進行訓練 val numIterations = 20 val model = SVMWithSGD.train(parsedData, numIterations) // 統計分類錯誤的樣本比例 val labelAndPreds = parsedData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count println("Training Error = " + trainErr)
2) 回歸算法
回歸算法屬於監督式學習,每個個體都有一個與之相關聯的實數標簽,並且我們希望在給出用於表示這些實體的數值特征后,所預測出的標簽值可以盡可能接近實際值。MLlib 目前支持回歸算法有:線性回歸、嶺回歸、Lasso和決策樹。
案例:導入訓練數據集,將其解析為帶標簽點的RDD,使用 LinearRegressionWithSGD 算法建立一個簡單的線性模型來預測標簽的值,最后計算均方差來評估預測值與實際值的吻合度。
import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.regression.LabeledPoint // 加載和解析數據文件 val data = sc.textFile("mllib/data/ridge-data/lpsa.data") val parsedData = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray) } //設置迭代次數並進行訓練 val numIterations = 20 val model = LinearRegressionWithSGD.train(parsedData, numIterations) // 統計回歸錯誤的樣本比例 val valuesAndPreds = parsedData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count println("training Mean Squared Error = " + MSE)
3) 聚類算法
聚類算法屬於非監督式學習,通常被用於探索性的分析,是根據“物以類聚”的原理,將本身沒有類別的樣本聚集成不同的組,這樣的一組數據對象的集合叫做簇,並且對每一個這樣的簇進行描述的過程。它的目的是使得屬於同一簇的樣本之間應該彼此相似,而不同簇的樣本應該足夠不相似,常見的典型應用場景有客戶細分、客戶研究、市場細分、價值評估。MLlib 目前支持廣泛使用的KMmeans聚類算法。
案例:導入訓練數據集,使用 KMeans 對象來將數據聚類到兩個類簇當中,所需的類簇個數會被傳遞到算法中,然后計算集內均方差總和(WSSSE),可以通過增加類簇的個數 k 來減小誤差。 實際上,最優的類簇數通常是 1,因為這一點通常是WSSSE圖中的 “低谷點”。
import org.apache.spark.mllib.clustering.KMeans // 加載和解析數據文件 val data = sc.textFile("kmeans_data.txt") val parsedData = data.map( _.split(' ').map(_.toDouble)) // 設置迭代次數、類簇的個數 val numIterations = 20 val numClusters = 2 // 進行訓練 val clusters = KMeans.train(parsedData, numClusters, numIterations) // 統計聚類錯誤的樣本比例 val WSSSE = clusters.computeCost(parsedData) println("Within Set Sum of Squared Errors = " + WSSSE)
4) 協同過濾
協同過濾常被應用於推薦系統,這些技術旨在補充用戶-商品關聯矩陣中所缺失的部分。MLlib當前支持基於模型的協同過濾,其中用戶和商品通過一小組隱語義因子進行表達,並且這些因子也用於預測缺失的元素。
案例:導入訓練數據集,數據每一行由一個用戶、一個商品和相應的評分組成。假設評分是顯性的,使用默認的ALS.train()方法,通過計算預測出的評分的均方差來評估這個推薦模型。
import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating // 加載和解析數據文件 val data = sc.textFile("mllib/data/als/test.data") val ratings = data.map(_.split(',') match { case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble) }) // 設置迭代次數 val numIterations = 20 val model = ALS.train(ratings, 1, 20, 0.01) // 對推薦模型進行評分 val usersProducts = ratings.map{ case Rating(user, product, rate) => (user, product)} val predictions = model.predict(usersProducts).map{ case Rating(user, product, rate) => ((user, product), rate) } val ratesAndPreds = ratings.map{ case Rating(user, product, rate) => ((user, product), rate) }.join(predictions) val MSE = ratesAndPreds.map{ case ((user, product), (r1, r2)) => math.pow((r1- r2), 2) }.reduce(_ + _)/ratesAndPreds.count println("Mean Squared Error = " + MSE)
MLlib的實用程序分析
實用程序部分包括數據的驗證器、Label的二元和多元的分析器、多種數據生成器、數據加載器。

