Spark機器學習4·分類模型(spark-shell)


  • 線性模型
    • 邏輯回歸--邏輯損失(logistic loss)
    • 線性支持向量機(Support Vector Machine, SVM)--合頁損失(hinge loss)
  • 朴素貝葉斯(Naive Bayes)
  • 決策樹

0 准備數據

kaggle2.blob.core.windows.net/competitions-data/kaggle/3526/train.tsv

sed 1d train.tsv > train_noheader.tsv 

0 運行環境

cd /Users/erichan/Garden/spark-1.5.1-bin-cdh4 bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2 import org.apache.spark.mllib.feature._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.rdd.RDD import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.evaluation._ import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.tree.configuration.Algo import org.apache.spark.mllib.tree.impurity._ 

1 提取特征

val PATH = "/Users/erichan/sourcecode/book/Spark機器學習" val rawData = sc.textFile(PATH+"/train_noheader.tsv") val records = rawData.map(line => line.split("\t")) records.first 

Array[String] = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042", "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees ...

val data = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble) LabeledPoint(label, Vectors.dense(features)) } data.cache val numData = data.count 

numData: Long = 7395

// note that some of our data contains negative feature vaues. For naive Bayes we convert these to zeros val nbData = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble).map(d => if (d < 0) 0.0 else d) LabeledPoint(label, Vectors.dense(features)) } 

2 訓練分類模型

2.1 邏輯回歸模型

// train a Logistic Regression model val numIterations = 10 val maxTreeDepth = 5 val lrModel = LogisticRegressionWithSGD.train(data, numIterations) 

2.2 SVM模型

val svmModel = SVMWithSGD.train(data, numIterations) 

2.3 朴素貝葉斯

val nbModel = NaiveBayes.train(nbData) 

2.4 決策樹

val dtModel = DecisionTree.train(data, Algo.Classification, Entropy, maxTreeDepth) 

3 使用分類模型

3.1 預測

以邏輯回歸模型為例

val dataPoint = data.first val prediction = lrModel.predict(dataPoint.features) 

prediction: Double = 1.0

val trueLabel = dataPoint.label 

trueLabel: Double = 0.0

val predictions = lrModel.predict(data.map(lp => lp.features)) predictions.take(5) 

Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0)

4 評估性能

4.1 邏輯回歸模型的正確率

val lrTotalCorrect = data.map { point => if (lrModel.predict(point.features) == point.label) 1 else 0 }.sum val lrAccuracy = lrTotalCorrect / numData 

lrAccuracy: Double = 0.5146720757268425

4.2 SVM模型的正確率

val svmTotalCorrect = data.map { point => if (svmModel.predict(point.features) == point.label) 1 else 0 }.sum val svmAccuracy = svmTotalCorrect / numData 

svmAccuracy: Double = 0.5146720757268425

4.3 朴素貝葉斯的正確率

val nbTotalCorrect = nbData.map { point => if (nbModel.predict(point.features) == point.label) 1 else 0 }.sum val nbAccuracy = nbTotalCorrect / numData 

nbAccuracy: Double = 0.5803921568627451

4.4 決策樹的正確率

// decision tree threshold needs to be specified val dtTotalCorrect = data.map { point => val score = dtModel.predict(point.features) val predicted = if (score > 0.5) 1 else 0 if (predicted == point.label) 1 else 0 }.sum val dtAccuracy = dtTotalCorrect / numData 

dtAccuracy: Double = 0.6482758620689655

4.5 ROC曲線和AUC

val metrics = Seq(lrModel, svmModel).map { model => val scoreAndLabels = data.map { point => (model.predict(point.features), point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC) } val nbMetrics = Seq(nbModel).map{ model => val scoreAndLabels = nbData.map { point => val score = model.predict(point.features) (if (score > 0.5) 1.0 else 0.0, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC) } val dtMetrics = Seq(dtModel).map{ model => val scoreAndLabels = data.map { point => val score = model.predict(point.features) (if (score > 0.5) 1.0 else 0.0, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC) } val allMetrics = metrics ++ nbMetrics ++ dtMetrics allMetrics.foreach{ case (m, pr, roc) => println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%") } 

LogisticRegressionModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
SVMModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
NaiveBayesModel, Area under PR: 68.0851%, Area under ROC: 58.3559%
DecisionTreeModel, Area under PR: 74.3081%, Area under ROC: 64.8837%

5 改進和調優

5.1 特征標准化

val vectors = data.map(lp => lp.features)
val matrix = new RowMatrix(vectors)
val matrixSummary = matrix.computeColumnSummaryStatistics()

println(matrixSummary.mean) println(matrixSummary.min) println(matrixSummary.max) println(matrixSummary.variance) println(matrixSummary.numNonzeros) 

[0.4122580529952672,2.761823191986608,0.4682304732861389,0.21407992638350232,0.09206236071899916,0.04926216043908053,2.255103452212041,-0.10375042752143335,0.0,0.05642274498417851,0.02123056118999324,0.23377817665490194,0.2757090373659236,0.615551048005409,0.6603110209601082,30.07707910750513,0.03975659229208925,5716.598242055447,178.75456389452353,4.960649087221096,0.17286405047031742,0.10122079189276552]

[0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.0,0.0,0.0,0.0,0.045564223,-1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0]

[0.999426,363.0,1.0,1.0,0.980392157,0.980392157,21.0,0.25,0.0,0.444444444,1.0,0.716883117,113.3333333,1.0,1.0,100.0,1.0,207952.0,4997.0,22.0,1.0,1.0]

[0.10974244167559001,74.30082476809639,0.04126316989120241,0.02153343633200108,0.009211817450882448,0.005274933469767946,32.53918714591821,0.09396988697611545,0.0,0.0017177410346628928,0.020782634824610638,0.0027548394224293036,3.683788919674426,0.2366799607085986,0.22433071201674218,415.8785589543846,0.03818116876739597,7.877330081138463E7,32208.116247426184,10.45300904576431,0.03359363403832393,0.006277532884214705]

[5053.0,7354.0,7172.0,6821.0,6160.0,5128.0,7350.0,1257.0,0.0,7362.0,157.0,7395.0,7355.0,4552.0,4883.0,7347.0,294.0,7378.0,7395.0,6782.0,6868.0,7235.0]

val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors)
val scaledData = data.map(lp => LabeledPoint(lp.label, scaler.transform(lp.features))) println(data.first.features) println(scaledData.first.features) println((0.789131 - 0.41225805299526636)/math.sqrt(0.1097424416755897)) 

[0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]

[1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]

1.137647336497682

使用標准化重新訓練

val lrModelScaled = LogisticRegressionWithSGD.train(scaledData, numIterations) val lrTotalCorrectScaled = scaledData.map { point => if (lrModelScaled.predict(point.features) == point.label) 1 else 0 }.sum val lrAccuracyScaled = lrTotalCorrectScaled / numData // lrAccuracyScaled: Double = 0.6204192021636241 val lrPredictionsVsTrue = scaledData.map { point => (lrModelScaled.predict(point.features), point.label) } val lrMetricsScaled = new BinaryClassificationMetrics(lrPredictionsVsTrue) val lrPr = lrMetricsScaled.areaUnderPR val lrRoc = lrMetricsScaled.areaUnderROC println(f"${lrModelScaled.getClass.getSimpleName}\nAccuracy: ${lrAccuracyScaled * 100}%2.4f%%\nArea under PR: ${lrPr * 100.0}%2.4f%%\nArea under ROC: ${lrRoc * 100.0}%2.4f%%") 

LogisticRegressionModel
Accuracy: 62.0419%
Area under PR: 72.7254%
Area under ROC: 61.9663%

5.2 其他特征

val categories = records.map(r => r(3)).distinct.collect.zipWithIndex.toMap val numCategories = categories.size val dataCategories = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val categoryIdx = categories(r(3)) val categoryFeatures = Array.ofDim[Double](numCategories) categoryFeatures(categoryIdx) = 1.0 val otherFeatures = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble) val features = categoryFeatures ++ otherFeatures LabeledPoint(label, Vectors.dense(features)) } println(dataCategories.first) 

(0.0,[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575])

標准化轉換

// standardize the feature vectors val scalerCats = new StandardScaler(withMean = true, withStd = true).fit(dataCategories.map(lp => lp.features)) val scaledDataCats = dataCategories.map(lp => LabeledPoint(lp.label, scalerCats.transform(lp.features))) println(dataCategories.first.features) println(scaledDataCats.first.features) 

[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]

[-0.02326210589837061,2.7207366564548514,-0.4464212047941535,-0.22052688457880879,-0.028494000387023734,-0.2709990696925828,-0.23272797709480803,-0.2016540523193296,-0.09914991930875496,-0.38181322324318134,-0.06487757239262681,-0.6807527904251456,-0.20418221057887365,-0.10189469097220732,1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]

使用擴展后的特征訓練邏輯回歸模型

val lrModelScaledCats = LogisticRegressionWithSGD.train(scaledDataCats, numIterations) val lrTotalCorrectScaledCats = scaledDataCats.map { point => if (lrModelScaledCats.predict(point.features) == point.label) 1 else 0 }.sum val lrAccuracyScaledCats = lrTotalCorrectScaledCats / numData val lrPredictionsVsTrueCats = scaledDataCats.map { point => (lrModelScaledCats.predict(point.features), point.label) } val lrMetricsScaledCats = new BinaryClassificationMetrics(lrPredictionsVsTrueCats) val lrPrCats = lrMetricsScaledCats.areaUnderPR val lrRocCats = lrMetricsScaledCats.areaUnderROC println(f"${lrModelScaledCats.getClass.getSimpleName}\nAccuracy: ${lrAccuracyScaledCats * 100}%2.4f%%\nArea under PR: ${lrPrCats * 100.0}%2.4f%%\nArea under ROC: ${lrRocCats * 100.0}%2.4f%%") 

LogisticRegressionModel
Accuracy: 66.5720%
Area under PR: 75.7964%
Area under ROC: 66.5483%

5.3 使用正確的數據格式

使用1-of-k便民店類型特征構建數據集

val dataNB = records.map { r => val trimmed = r.map(_.replaceAll("\"", "")) val label = trimmed(r.size - 1).toInt val categoryIdx = categories(r(3)) val categoryFeatures = Array.ofDim[Double](numCategories) categoryFeatures(categoryIdx) = 1.0 LabeledPoint(label, Vectors.dense(categoryFeatures)) } 

重新訓練貝葉斯模型,並評估性能

val nbModelCats = NaiveBayes.train(dataNB) val nbTotalCorrectCats = dataNB.map { point => if (nbModelCats.predict(point.features) == point.label) 1 else 0 }.sum val nbAccuracyCats = nbTotalCorrectCats / numData val nbPredictionsVsTrueCats = dataNB.map { point => (nbModelCats.predict(point.features), point.label) } val nbMetricsCats = new BinaryClassificationMetrics(nbPredictionsVsTrueCats) val nbPrCats = nbMetricsCats.areaUnderPR val nbRocCats = nbMetricsCats.areaUnderROC println(f"${nbModelCats.getClass.getSimpleName}\nAccuracy: ${nbAccuracyCats * 100}%2.4f%%\nArea under PR: ${nbPrCats * 100.0}%2.4f%%\nArea under ROC: ${nbRocCats * 100.0}%2.4f%%") 

NaiveBayesModel
Accuracy: 60.9601%
Area under PR: 74.0522%
Area under ROC: 60.5138%

5.4 模型參數調優

5.4.1 線性模型

基礎優化技術:隨機梯度下降(SGD)

輔助函數:根據輸入,訓練模型

// helper function to train a logistic regresson model def trainWithParams(input: RDD[LabeledPoint], regParam: Double, numIterations: Int, updater: Updater, stepSize: Double) = { val lr = new LogisticRegressionWithSGD lr.optimizer.setNumIterations(numIterations).setUpdater(updater).setRegParam(regParam).setStepSize(stepSize) lr.run(input) } 

輔助函數:根據輸入數據和分類模型,計算AUC

// helper function to create AUC metric def createMetrics(label: String, data: RDD[LabeledPoint], model: ClassificationModel) = { val scoreAndLabels = data.map { point => (model.predict(point.features), point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (label, metrics.areaUnderROC) } 

迭代次數調優

// cache the data to increase speed of multiple runs agains the dataset scaledDataCats.cache // num iterations val iterResults = Seq(1, 5, 10, 50).map { param => val model = trainWithParams(scaledDataCats, 0.0, param, new SimpleUpdater, 1.0) createMetrics(s"$param iterations", scaledDataCats, model) } iterResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") } 

1 iterations, AUC = 64.97%
5 iterations, AUC = 66.62%
10 iterations, AUC = 66.55%
50 iterations, AUC = 66.81%

步長調優

// step size
val numIterations = 10 val stepResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param => val model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater, param) createMetrics(s"$param step size", scaledDataCats, model) } stepResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") } 

0.001 step size, AUC = 64.95%
0.01 step size, AUC = 65.00%
0.1 step size, AUC = 65.52%
1.0 step size, AUC = 66.55%
10.0 step size, AUC = 61.92%

使用SquaredL2Updater研究正則化參數

// regularization
val regResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param => val model = trainWithParams(scaledDataCats, param, numIterations, new SquaredL2Updater, 1.0) createMetrics(s"$param L2 regularization parameter", scaledDataCats, model) } regResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") } 

0.001 L2 regularization parameter, AUC = 66.55%
0.01 L2 regularization parameter, AUC = 66.55%
0.1 L2 regularization parameter, AUC = 66.63%
1.0 L2 regularization parameter, AUC = 66.04%
10.0 L2 regularization parameter, AUC = 35.33%

5.4.2 決策樹

輔助函數:接收樹的深度和不純度

// investigate decision tree def trainDTWithParams(input: RDD[LabeledPoint], maxDepth: Int, impurity: Impurity) = { DecisionTree.train(input, Algo.Classification, impurity, maxDepth) } 

使用Entropy不純度

// investigate tree depth impact for Entropy impurity val dtResultsEntropy = Seq(1, 2, 3, 4, 5, 10, 20).map { param => val model = trainDTWithParams(data, param, Entropy) val scoreAndLabels = data.map { point => val score = model.predict(point.features) (if (score > 0.5) 1.0 else 0.0, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (s"$param tree depth", metrics.areaUnderROC) } dtResultsEntropy.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") } 

1 tree depth, AUC = 59.33%
2 tree depth, AUC = 61.68%
3 tree depth, AUC = 62.61%
4 tree depth, AUC = 63.63%
5 tree depth, AUC = 64.88%
10 tree depth, AUC = 76.26%
20 tree depth, AUC = 98.45%

使用Gini不純度

// investigate tree depth impact for Gini impurity val dtResultsGini = Seq(1, 2, 3, 4, 5, 10, 20).map { param => val model = trainDTWithParams(data, param, Gini) val scoreAndLabels = data.map { point => val score = model.predict(point.features) (if (score > 0.5) 1.0 else 0.0, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (s"$param tree depth", metrics.areaUnderROC) } dtResultsGini.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") } 

1 tree depth, AUC = 59.33%
2 tree depth, AUC = 61.68%
3 tree depth, AUC = 62.61%
4 tree depth, AUC = 63.63%
5 tree depth, AUC = 64.89%
10 tree depth, AUC = 78.37%
20 tree depth, AUC = 98.87%

5.4.3 朴素貝葉斯

輔助函數:接收lamda參數

// investigate Naive Bayes parameters def trainNBWithParams(input: RDD[LabeledPoint], lambda: Double) = { val nb = new NaiveBayes nb.setLambda(lambda) nb.run(input) } 
val nbResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param => val model = trainNBWithParams(dataNB, param) val scoreAndLabels = dataNB.map { point => (model.predict(point.features), point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) (s"$param lambda", metrics.areaUnderROC) } nbResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") } 

0.001 lambda, AUC = 60.51%
0.01 lambda, AUC = 60.51%
0.1 lambda, AUC = 60.51%
1.0 lambda, AUC = 60.51%
10.0 lambda, AUC = 60.51%

5.4.4 交叉驗證

划分訓練集和測試集

// illustrate cross-validation // create a 60% / 40% train/test data split val trainTestSplit = scaledDataCats.randomSplit(Array(0.6, 0.4), 123) val train = trainTestSplit(0) val test = trainTestSplit(1) 

測試集的模型性能

val regResultsTest = Seq(0.0, 0.001, 0.0025, 0.005, 0.01).map { param => val model = trainWithParams(train, param, numIterations, new SquaredL2Updater, 1.0) createMetrics(s"$param L2 regularization parameter", test, model) } regResultsTest.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.6f%%") } 

0.0 L2 regularization parameter, AUC = 66.480874%
0.001 L2 regularization parameter, AUC = 66.480874%
0.0025 L2 regularization parameter, AUC = 66.515027%
0.005 L2 regularization parameter, AUC = 66.515027%
0.01 L2 regularization parameter, AUC = 66.549180%

訓練集的模型性能

// training set results
val regResultsTrain = Seq(0.0, 0.001, 0.0025, 0.005, 0.01).map { param => val model = trainWithParams(train, param, numIterations, new SquaredL2Updater, 1.0) createMetrics(s"$param L2 regularization parameter", train, model) } regResultsTrain.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.6f%%") } 

0.0 L2 regularization parameter, AUC = 66.260311%
0.001 L2 regularization parameter, AUC = 66.260311%
0.0025 L2 regularization parameter, AUC = 66.260311%
0.005 L2 regularization parameter, AUC = 66.238294%
0.01 L2 regularization parameter, AUC = 66.238294%


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM