將Mahout on Spark 中的機器學習算法和MLlib中支持的算法統計如下:
主要針對MLlib進行總結
分類與回歸
分類和回歸是監督式學習;
監督式學習是指使用有標簽的數據(LabeledPoint)進行訓練,得到模型后,使用測試數據預測結果。其中標簽數據是指已知結果的特征數據。
分類和回歸的區別:預測結果的變量類型
分類預測出來的變量是離散的(比如對郵件的分類,垃圾郵件和非垃圾郵件),對於二元分類的標簽是0和1,對於多元分類標簽范圍是0~C-1,C表示類別數目;
回歸預測出來的變量是連續的(比如根據年齡和體重預測身高)
線性回歸
線性回歸是回歸中最常用的方法之一,是指用特征的線性組合來預測輸出值。
線性回歸算法可以使用的類有:
LinearRegressionWithSGD
RidgeRegressionWithSGD
LassoWithSGD
ridge regression 使用 L2 正規化;
Lasso 使用 L1 正規化;
參數:
stepSize:梯度下降的步數
numIterations:迭代次數
設置intercept:是否給數據加上一個干擾特征或者偏差特征,一個始終值為1的特征,默認不增加false
{stepSize: 1.0, numIterations: 100, miniBatchFraction: 1.0}
模型的使用:
1、對數據進行預測,使用model.predict()
2、獲取數據特征的權重model.weights()
模型的評估:
均方誤差
例子:
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LinearRegressionModel import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.linalg.Vectors /** * Created by Edward on 2016/9/21. */ object LinearRegression { def main(args: Array[String]) { val conf: SparkConf = new SparkConf().setAppName("LinearRegression").setMaster("local") val sc = new SparkContext(conf) // Load and parse the data val data = sc.textFile("data/mllib/ridge-data/lpsa.data") val parsedData = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) }.cache() // Building the model val numIterations = 100 val model = LinearRegressionWithSGD.train(parsedData, numIterations) // var lr = new LinearRegressionWithSGD().setIntercept(true) // val model = lr.run(parsedData) //獲取特征權重,及干擾特征 println("weights:%s, intercept:%s".format(model.weights,model.intercept)) // Evaluate model on training examples and compute training error val valuesAndPreds = parsedData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } //計算 均方誤差 val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean() println("training Mean Squared Error = " + MSE) // Save and load model model.save(sc, "myModelPath") val sameModel = LinearRegressionModel.load(sc, "myModelPath") } }
數據:
-0.4307829,-1.63735562648104 -2.00621178480549 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 -0.1625189,-1.98898046126935 -0.722008756122123 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 -0.1625189,-1.57881887548545 -2.1887840293994 1.36116336875686 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.155348103855541 -0.1625189,-2.16691708463163 -0.807993896938655 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 0.3715636,-0.507874475300631 -0.458834049396776 -0.250631301876899 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306 0.7654678,-2.03612849966376 -0.933954647105133 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
...
數據第一列表示標簽數據,也就是結果數據,其他列表示特征數據;
預測就是再給一組特征數據,預測結果;
結果:
weights:[0.5808575763272221,0.18930001482946976,0.2803086929991066,0.1110834181777876,0.4010473965597895,-0.5603061626684255,-0.5804740464000981,0.8742741176970946], intercept:0.0
training Mean Squared Error = 6.207597210613579
邏輯回歸
是一種二元分類方法,也是多類分類方法;
邏輯回歸可以使用的方法:
LogisticRegressionWithLBFGS (建議使用這個)
LogisticRegressionWithSGD
參數:
與線性回歸類似
模型的使用:
1、對數據進行預測,使用model.predict()
2、獲取數據特征的權重model.weights()
模型的評估:
二元分類:AUC(Area Under roc Curve)
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel} import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, MulticlassMetrics} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils /** * Created by Edward on 2016/9/21. */ object LogisticRegression { def main(args: Array[String]) { val conf: SparkConf = new SparkConf().setAppName("LogisticRegression").setMaster("local") val sc: SparkContext = new SparkContext(conf) // Load training data in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val model = new LogisticRegressionWithLBFGS() .setNumClasses(10) .run(training) model.setThreshold(0.8) // Compute raw scores on the test set. val predictionAndLabels = test.map { case LabeledPoint(label, features) => val prediction = model.predict(features) (prediction, label) } //多元矩陣 // Get evaluation metrics. //val metrics = new MulticlassMetrics(predictionAndLabels) //val precision = metrics.precision //println("Precision = " + precision) //二元矩陣 val metrics = new BinaryClassificationMetrics(predictionAndLabels) //通過ROC對模型進行評估,值趨近於1 receiver operating characteristic (ROC), 接受者操作特征 曲線下面積 val auROC: Double = metrics.areaUnderROC() println("Area under ROC = " + auROC) //通過PR對模型進行評估,值趨近於1 precision-recall (PR), 精確率 val underPR: Double = metrics.areaUnderPR() println("Area under PR = " + underPR) // Save and load model model.save(sc, "myModelPath") val sameModel = LogisticRegressionModel.load(sc, "myModelPath") } }
支持向量機 Support Vector Machines (SVMs)
分類算法,二元分類算法
和邏輯回歸二元分類相似
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD} import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.util.MLUtils /** * Created by Edward on 2016/9/21. */ object SVMs { def main(args: Array[String]) { val conf: SparkConf = new SparkConf().setAppName("SVM").setMaster("local") val sc: SparkContext = new SparkContext(conf) // Load training data in LIBSVM format. val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") // Split data into training (60%) and test (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // Clear the default threshold. model.clearThreshold() // Compute raw scores on the test set. val scoreAndLabels = test.map { point => println("feature="+point.features) val score = model.predict(point.features) (score, point.label) } scoreAndLabels.foreach(println(_)) // Get evaluation metrics. val metrics = new BinaryClassificationMetrics(scoreAndLabels) println("metrics="+metrics) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC) // Save and load model model.save(sc, "myModelPath") val sameModel = SVMModel.load(sc, "myModelPath") sc.stop() } }
數據:
0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 188:252 189:57 190:6 208:10 209:60 210:224 1 159:124 160:253 161:255 162:63 186:96 187:244 188:251 189:253 190:62 214:127 215:251 216:251 217:253 218:62
...
協同過濾 Collaborative Filtering
Spark中協同過濾算法主要由交替最小二乘法來實現 alternating least squares (ALS)
參數:
numBlocks block塊的數量,用來控制並行度
rank 特征向量的大小
iterations 迭代數量
lambda 正規化參數
alpha 用來在隱式ALS中計算置信度的常量
方法:
ALS.train
模型的評估:
均方誤差
例子:
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.MatrixFactorizationModel import org.apache.spark.mllib.recommendation.Rating /** * Created by Edward on 2016/9/22. */ object CollaborativeALS { def main(args: Array[String]) { val conf: SparkConf = new SparkConf().setAppName("CollaborativeALS").setMaster("local") val sc: SparkContext = new SparkContext(conf) // Load and parse the data val data = sc.textFile("data/mllib/als/test.data") val ratings = data.map(_.split(',') match { case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble) }) // Build the recommendation model using ALS val rank = 10 val numIterations = 10 val model = ALS.train(ratings, rank, numIterations, 0.01) // Evaluate the model on rating data val usersProducts = ratings.map { case Rating(user, product, rate) => (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) => ((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) => ((user, product), rate) }.join(predictions) val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => val err = (r1 - r2) err * err }.mean() //均方誤差 println("Mean Squared Error = " + MSE) // Save and load model model.save(sc, "target/tmp/myCollaborativeFilter") val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter") } }
持續更新中...