一、邏輯斯蒂回歸分類器
邏輯斯蒂回歸(logistic regression)是統計學習中的經典分類方法,屬於對數線性模型。logistic回歸的因變量可以是二分類的,也可以是多分類的。
任務描述:以iris數據集(iris)為例進行分析(iris下載地址:http://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/03/iris.txt)
iris以鳶尾花的特征作為數據來源,數據集包含150個數據集,分為3類,每類50個數據,每個數據包含4個屬性,是在數據挖掘、數據分類中非常常用的測試集、訓練集。為了便於理解,這里主要用后兩個屬性(花瓣的長度和寬度)來進行分類。
1.用二項邏輯斯蒂回歸來解決二分類問題
首先我們先取其中的后兩類數據,用二項邏輯斯蒂回歸進行二分類分析
(1)導入需要的包
import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.ml.linalg.{Vector,Vectors} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.{Pipeline,PipelineModel} import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.classification.LogisticRegressionModel import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression} import org.apache.spark.sql.functions;
(2)讀取數據,簡要分析
scala> import spark.implicits._ import spark.implicits._ scala> case class Iris(features: org.apache.spark.ml.linalg.Vector, label: String) defined class Iris scala> val data = spark.sparkContext.textFile("file:///usr/local/spark/iris.txt").map(_.split(",")).map(p => I ris(Vectors.dense(p(0).toDouble,p(1).toDouble,p(2).toDouble, p(3).toDouble), p(4 ).toString())).toDF() data: org.apache.spark.sql.DataFrame = [features: vector, label: string] scala> data.show() +-----------------+-----------+ | features| label| +-----------------+-----------+ |[5.1,3.5,1.4,0.2]|Iris-setosa| |[4.9,3.0,1.4,0.2]|Iris-setosa| |[4.7,3.2,1.3,0.2]|Iris-setosa| |[4.6,3.1,1.5,0.2]|Iris-setosa| |[5.0,3.6,1.4,0.2]|Iris-setosa| |[5.4,3.9,1.7,0.4]|Iris-setosa| |[4.6,3.4,1.4,0.3]|Iris-setosa| |[5.0,3.4,1.5,0.2]|Iris-setosa| |[4.4,2.9,1.4,0.2]|Iris-setosa| |[4.9,3.1,1.5,0.1]|Iris-setosa| |[5.4,3.7,1.5,0.2]|Iris-setosa| |[4.8,3.4,1.6,0.2]|Iris-setosa| |[4.8,3.0,1.4,0.1]|Iris-setosa| |[4.3,3.0,1.1,0.1]|Iris-setosa| |[5.8,4.0,1.2,0.2]|Iris-setosa| |[5.7,4.4,1.5,0.4]|Iris-setosa| |[5.4,3.9,1.3,0.4]|Iris-setosa| |[5.1,3.5,1.4,0.3]|Iris-setosa| |[5.7,3.8,1.7,0.3]|Iris-setosa| |[5.1,3.8,1.5,0.3]|Iris-setosa| +-----------------+-----------+ only showing top 20 rows
因為我們現在處理的是2分類問題,所以我們不需要全部的3類數據,我們要從中選出兩類的數據。
首先把剛剛得到的數據注冊成一個表iris,注冊成這個表之后,我們就可以通過sql語句進行數據查詢。
scala> data.createOrReplaceTempView("iris") scala> val df = spark.sql("select * from iris where label != 'Iris-setosa'") df: org.apache.spark.sql.DataFrame = [features: vector, label: string] scala> df.map(t => t(1)+":"+t(0)).collect().foreach(println) Iris-versicolor:[7.0,3.2,4.7,1.4] Iris-versicolor:[6.4,3.2,4.5,1.5] Iris-versicolor:[6.9,3.1,4.9,1.5] …… ……
(3)構建ML的pipeline
a.分別獲取標簽列和特征列,進行索引,並進行了重命名。
scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df) labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_e53e67411169 scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(df) featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_53b988077b38
b.接下來,我們把數據集隨機分成訓練集和測試集,其中訓練集占70%
scala> val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3)) trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string] testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string]
c.然后,我們設置logistic的參數,這里我們統一用setter的方法來設置,也可以用ParamMap來設置(具體的可以查看spark mllib的官網)。這里我們設置了循環次數為10次,正則化項為0.3等
scala> val lr = new LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) lr: org.apache.spark.ml.classification.LogisticRegression = logreg_692899496c23
d.這里我們設置一個labelConverter,目的是把預測的類別重新轉化成字符型的。
scala> val labelConverter = new IndexToString().setInputCol("prediction").setOut putCol("predictedLabel").setLabels(labelIndexer.labels) labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_c204eafabf57
e.構建pipeline,設置stage,然后調用fit()來訓練模型
scala> val lrPipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, lr, labelConverter)) lrPipeline: org.apache.spark.ml.Pipeline = pipeline_eb1b201af1e0 scala> val lrPipelineModel = lrPipeline.fit(trainingData) lrPipelineModel: org.apache.spark.ml.PipelineModel = pipeline_eb1b201af1e0
f.pipeline本質上是一個Estimator,當pipeline調用fit()的時候就產生了一個PipelineModel,本質上是一個Transformer。然后這個PipelineModel就可以調用transform()來進行預測,生成一個新的DataFrame,即利用訓練得到的模型對測試集進行驗證。
scala> val lrPredictions = lrPipelineModel.transform(testData) lrPredictions: org.apache.spark.sql.DataFrame = [features: vector, label: string ... 6 more fields]
g.最后我們可以輸出預測的結果,其中select選擇要輸出的列,collect獲取所有行的數據,用foreach把每行打印出來。其中打印出來的值依次分別代表該行數據的真實分類和特征值、預測屬於不同分類的概率、預測的分類
scala> lrPredictions.select("predictedLabel", "label", "features", "probability").collect().foreach { case Row(predictedLabel: String, label: String, features: Vector, prob: Vector) => println(s"($label, $features) --> prob=$prob, predicted Label=$predictedLabel")} (Iris-virginica, [4.9,2.5,4.5,1.7]) --> prob=[0.4796551461409372,0.5203448538590628], predictedLabel=Iris-virginica (Iris-versicolor, [5.1,2.5,3.0,1.1]) --> prob=[0.5892626391059901,0.41073736089401], predictedLabel=Iris-versicolor (Iris-versicolor, [5.5,2.3,4.0,1.3]) --> prob=[0.5577310241453046,0.4422689758546954], predictedLabel=Iris-versicolor
(4)模型評估
創建一個MulticlassClassificationEvaluator實例,用setter方法把預測分類的列名和真實分類的列名進行設置;然后計算預測准確率和錯誤率
scala> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction") evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_a80353e4211d scala> val lrAccuracy = evaluator.evaluate(lrPredictions) lrAccuracy: Double = 1.0 scala> println("Test Error = " + (1.0 - lrAccuracy)) Test Error = 0.0
接下來我們可以通過model來獲取我們訓練得到的邏輯斯蒂模型。前面已經說過model是一個PipelineModel,因此我們可以通過調用它的stages來獲取模型,具體如下:
scala> val lrModel = lrPipelineModel.stages(2).asInstanceOf[LogisticRegressionModel] lrModel: org.apache.spark.ml.classification.LogisticRegressionModel = logreg_692899496c23 scala> println("Coefficients: " + lrModel.coefficients+"Intercept: "+lrModel.intercept+"numClasses: "+lrModel.numClasses+"numFeatures: "+lrModel.numFeatures) Coefficients: [-0.0396171957643483,0.0,0.0,0.07240315639651046]Intercept: -0.23127346342015379numClasses: 2numFeatures: 4
2.用多項邏輯斯蒂回歸來解決二分類問題
3.用多項邏輯斯蒂回歸來解決多分類問題
二、決策樹分類器
決策樹(decision tree)是一種基本的分類與回歸方法,這里主要介紹用於分類的決策樹。決策樹模式呈樹形結構,其中每個內部節點表示一個屬性上的測試,每個分支代表一個測試輸出,每個葉節點代表一種類別。學習時利用訓練數據,根據損失函數最小化的原則建立決策樹模型;預測時,對新的數據,利用決策樹模型進行分類
決策樹學習通常包括3個步驟:特征選擇、決策樹的生成和決策樹的剪枝。
1.特征選擇
特征選擇在於選取對訓練數據具有分類能力的特征,這樣可以提高決策樹學習的效率。通常特征選擇的准則是信息增益(或信息增益比、基尼指數等),每次計算每個特征的信息增益,並比較它們的大小,選擇信息增益最大(信息增益比最大、基尼指數最小)的特征
2.決策樹的生成
- 從根結點開始,對結點計算所有可能的特征的信息增益,選擇信息增益最大的特征作為結點的特征,由該特征的不同取值建立子結點,再對子結點遞歸地調用以上方法,構建決策樹;直到所有特征的信息增均很小或沒有特征可以選擇為止,最后得到一個決策樹。
- 決策樹需要有停止條件來終止其生長的過程。一般來說最低的條件是:當該節點下面的所有記錄都屬於同一類,或者當所有的記錄屬性都具有相同的值時。這兩種條件是停止決策樹的必要條件,也是最低的條件。在實際運用中一般希望決策樹提前停止生長,限定葉節點包含的最低數據量,以防止由於過度生長造成的過擬合問題。
3.決策樹的剪枝
決策樹生成算法遞歸地產生決策樹,直到不能繼續下去為止。這樣產生的樹往往對訓練數據的分類很准確,但對未知的測試數據的分類卻沒有那么准確,即出現過擬合現象。解決這個問題的辦法是考慮決策樹的復雜度,對已生成的決策樹進行簡化,這個過程稱為剪枝。
我們以iris數據集(iris)為例進行分析(iris下載地址:http://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/03/iris.txt)iris以鳶尾花的特征作為數據來源,數據集包含150個數據集,分為3類,每類50個數據,每個數據包含4個屬性,是在數據挖掘、數據分類中非常常用的測試集、訓練集。
(1)導入需要的包
import org.apache.spark.sql.SparkSession import org.apache.spark.ml.linalg.{Vector,Vectors} import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
(2)讀取數據,簡要分析
scala> import spark.implicits._ import spark.implicits._ scala> case class Iris(features: org.apache.spark.ml.linalg.Vector, label: String) defined class Iris scala> val data = spark.sparkContext.textFile("file:///usr/local/spark/iris.txt").map(_.split(",")).map(p => Iris(Vectors.dense(p(0).toDouble,p(1).toDouble,p(2).toDouble, p(3).toDouble),p(4).toString())).toDF() data: org.apache.spark.sql.DataFrame = [features: vector, label: string] scala> data.createOrReplaceTempView("iris") scala> val df = spark.sql("select * from iris") df: org.apache.spark.sql.DataFrame = [features: vector, label: string] scala> df.map(t => t(1)+":"+t(0)).collect().foreach(println) Iris-setosa:[5.1,3.5,1.4,0.2] Iris-setosa:[4.9,3.0,1.4,0.2] Iris-setosa:[4.7,3.2,1.3,0.2] Iris-setosa:[4.6,3.1,1.5,0.2] Iris-setosa:[5.0,3.6,1.4,0.2] Iris-setosa:[5.4,3.9,1.7,0.4] Iris-setosa:[4.6,3.4,1.4,0.3] ... ...
(3)進一步處理特征和標簽,以及數據分組
//分別獲取標簽列和特征列,進行索引,並進行了重命名。 scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol( "indexedLabel").fit(df) labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_107f7e530fa7 scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutpu tCol("indexedFeatures").setMaxCategories(4).fit(df) featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_0649803dfa70 //這里我們設置一個labelConverter,目的是把預測的類別重新轉化成字符型的。 scala> val labelConverter = new IndexToString().setInputCol("prediction").setOut putCol("predictedLabel").setLabels(labelIndexer.labels) labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_046182b2e571 //接下來,我們把數據集隨機分成訓練集和測試集,其中訓練集占70%。 scala> val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string] testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string]
(4)構建決策樹分類模型
//導入所需要的包 import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.classification.DecisionTreeClassifier import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator //訓練決策樹模型,這里我們可以通過setter的方法來設置決策樹的參數,也可以用ParamMap來設置(具體的可以查看spark mllib的官網)。具體的可以設置的參數可以通過explainParams()來獲取。 scala> val dtClassifier = new DecisionTreeClassifier().setLabelCol("indexedLabel ").setFeaturesCol("indexedFeatures") dtClassifier: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_029ea28aceb1 //在pipeline中進行設置 scala> val pipelinedClassifier = new Pipeline().setStages(Array(labelIndexer, featureIndexer, dtClassifier, labelConverter)) pipelinedClassifier: org.apache.spark.ml.Pipeline = pipeline_a254dfd6dfb9 //訓練決策樹模型 scala> val modelClassifier = pipelinedClassifier.fit(trainingData) modelClassifier: org.apache.spark.ml.PipelineModel = pipeline_a254dfd6dfb9 //進行預測 scala> val predictionsClassifier = modelClassifier.transform(testData) predictionsClassifier: org.apache.spark.sql.DataFrame = [features: vector, label: string ... 6 more fields] //查看部分預測的結果 scala> predictionsClassifier.select("predictedLabel", "label", "features").show(20) +---------------+---------------+-----------------+ | predictedLabel| label| features| +---------------+---------------+-----------------+ | Iris-setosa| Iris-setosa|[4.4,2.9,1.4,0.2]| | Iris-setosa| Iris-setosa|[4.6,3.6,1.0,0.2]| | Iris-virginica|Iris-versicolor|[4.9,2.4,3.3,1.0]| | Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]| | Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]|
(5)評估決策樹分類模型
scala> val evaluatorClassifier = new MulticlassClassificationEvaluator().s etLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy") evaluatorClassifier: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_4abc19f3a54d scala> val accuracy = evaluatorClassifier.evaluate(predictionsClassifier) accuracy: Double = 0.8648648648648649 scala> println("Test Error = " + (1.0 - accuracy)) Test Error = 0.1351351351351351 scala> val treeModelClassifier = modelClassifier.stages(2).asInstanceOf[De cisionTreeClassificationModel] treeModelClassifier: org.apache.spark.ml.classification.DecisionTreeClassificati onModel = DecisionTreeClassificationModel (uid=dtc_029ea28aceb1) of depth 5 with 13 nodes scala> println("Learned classification tree model:\n" + treeModelClassifier.toDebugString) Learned classification tree model: DecisionTreeClassificationModel (uid=dtc_029ea28aceb1) of depth 5 with 13 nodes If (feature 2 <= 1.9) Predict: 2.0 Else (feature 2 > 1.9) If (feature 2 <= 4.7) If (feature 0 <= 4.9) Predict: 1.0 Else (feature 0 > 4.9) Predict: 0.0 Else (feature 2 > 4.7) If (feature 3 <= 1.6) If (feature 2 <= 4.8) Predict: 0.0 Else (feature 2 > 4.8) If (feature 0 <= 6.0) Predict: 0.0 Else (feature 0 > 6.0) Predict: 1.0 Else (feature 3 > 1.6) Predict: 1.0
(6)構建決策樹回歸模型
//導入所需要的包 import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.regression.DecisionTreeRegressionModel import org.apache.spark.ml.regression.DecisionTreeRegressor //訓練決策樹模型 scala> val dtRegressor = new DecisionTreeRegressor().setLabelCol("indexedLabel") .setFeaturesCol("indexedFeatures") dtRegressor: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_358e08c37f0c //在pipeline中進行設置 scala> val pipelineRegressor = new Pipeline().setStages(Array(labelIndexer, featureIndexer, dtRegressor, labelConverter)) pipelineRegressor: org.apache.spark.ml.Pipeline = pipeline_ae699675d015 //訓練決策樹模型 scala> val modelRegressor = pipelineRegressor.fit(trainingData) modelRegressor: org.apache.spark.ml.PipelineModel = pipeline_ae699675d015 //進行預測 scala> val predictionsRegressor = modelRegressor.transform(testData) predictionsRegressor: org.apache.spark.sql.DataFrame = [features: vector, label: string ... 4 more fields] //查看部分預測結果 scala> predictionsRegressor.select("predictedLabel", "label", "features").show(20) +---------------+---------------+-----------------+ | predictedLabel| label| features| +---------------+---------------+-----------------+ | Iris-setosa| Iris-setosa|[4.4,2.9,1.4,0.2]| | Iris-setosa| Iris-setosa|[4.6,3.6,1.0,0.2]| | Iris-virginica|Iris-versicolor|[4.9,2.4,3.3,1.0]| | Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]| | Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]|
(7)評估決策樹回歸模型
scala> val evaluatorRegressor = new RegressionEvaluator().setLabelCol("ind exedLabel").setPredictionCol("prediction").setMetricName("rmse") evaluatorRegressor: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_425d2aeea2dd scala> val rmse = evaluatorRegressor.evaluate(predictionsRegressor) rmse: Double = 0.3676073110469039 scala> println("Root Mean Squared Error (RMSE) on test data = " + rmse) Root Mean Squared Error (RMSE) on test data = 0.3676073110469039 scala> val treeModelRegressor = modelRegressor.stages(2).asInstanceOf[Deci sionTreeRegressionModel] treeModelRegressor: org.apache.spark.ml.regression.DecisionTreeRegressionModel = DecisionTreeRegressionModel (uid=dtr_358e08c37f0c) of depth 5 with 13 nodes scala> println("Learned regression tree model:\n" + treeModelRegressor.toDebugString) Learned regression tree model: DecisionTreeRegressionModel (uid=dtr_358e08c37f0c) of depth 5 with 13 nodes If (feature 2 <= 1.9) Predict: 2.0 Else (feature 2 > 1.9) If (feature 2 <= 4.7) If (feature 0 <= 4.9) Predict: 1.0 Else (feature 0 > 4.9) Predict: 0.0 Else (feature 2 > 4.7) If (feature 3 <= 1.6) If (feature 2 <= 4.8) Predict: 0.0 Else (feature 2 > 4.8) If (feature 0 <= 6.0) Predict: 0.5 Else (feature 0 > 6.0) Predict: 1.0 Else (feature 3 > 1.6) Predict: 1.0
從上述結果可以看到模型的標准誤差為 0.3676073110469039以及訓練的決策樹模型結構。