8.4 分類與回歸


一、邏輯斯蒂回歸分類器

邏輯斯蒂回歸(logistic regression)是統計學習中的經典分類方法,屬於對數線性模型。logistic回歸的因變量可以是二分類的,也可以是多分類的。

任務描述:以iris數據集(iris)為例進行分析(iris下載地址:http://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/03/iris.txt)

iris以鳶尾花的特征作為數據來源,數據集包含150個數據集,分為3類,每類50個數據,每個數據包含4個屬性,是在數據挖掘、數據分類中非常常用的測試集、訓練集。為了便於理解,這里主要用后兩個屬性(花瓣的長度和寬度)來進行分類。

1.用二項邏輯斯蒂回歸來解決二分類問題

首先我們先取其中的后兩類數據,用二項邏輯斯蒂回歸進行二分類分析

(1)導入需要的包

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.{Vector,Vectors}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.{Pipeline,PipelineModel}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}
import org.apache.spark.sql.functions;

(2)讀取數據,簡要分析

scala> import spark.implicits._
import spark.implicits._
 
scala> case class Iris(features: org.apache.spark.ml.linalg.Vector, label: String)
defined class Iris
 
scala> val data = spark.sparkContext.textFile("file:///usr/local/spark/iris.txt").map(_.split(",")).map(p => I
ris(Vectors.dense(p(0).toDouble,p(1).toDouble,p(2).toDouble, p(3).toDouble), p(4
).toString())).toDF()
data: org.apache.spark.sql.DataFrame = [features: vector, label: string]

scala> data.show()
+-----------------+-----------+
| features| label|
+-----------------+-----------+
|[5.1,3.5,1.4,0.2]|Iris-setosa|
|[4.9,3.0,1.4,0.2]|Iris-setosa|
|[4.7,3.2,1.3,0.2]|Iris-setosa|
|[4.6,3.1,1.5,0.2]|Iris-setosa|
|[5.0,3.6,1.4,0.2]|Iris-setosa|
|[5.4,3.9,1.7,0.4]|Iris-setosa|
|[4.6,3.4,1.4,0.3]|Iris-setosa|
|[5.0,3.4,1.5,0.2]|Iris-setosa|
|[4.4,2.9,1.4,0.2]|Iris-setosa|
|[4.9,3.1,1.5,0.1]|Iris-setosa|
|[5.4,3.7,1.5,0.2]|Iris-setosa|
|[4.8,3.4,1.6,0.2]|Iris-setosa|
|[4.8,3.0,1.4,0.1]|Iris-setosa|
|[4.3,3.0,1.1,0.1]|Iris-setosa|
|[5.8,4.0,1.2,0.2]|Iris-setosa|
|[5.7,4.4,1.5,0.4]|Iris-setosa|
|[5.4,3.9,1.3,0.4]|Iris-setosa|
|[5.1,3.5,1.4,0.3]|Iris-setosa|
|[5.7,3.8,1.7,0.3]|Iris-setosa|
|[5.1,3.8,1.5,0.3]|Iris-setosa|
+-----------------+-----------+
only showing top 20 rows

因為我們現在處理的是2分類問題,所以我們不需要全部的3類數據,我們要從中選出兩類的數據。

首先把剛剛得到的數據注冊成一個表iris,注冊成這個表之后,我們就可以通過sql語句進行數據查詢。  

scala> data.createOrReplaceTempView("iris")
 
scala> val df = spark.sql("select * from iris where label != 'Iris-setosa'")
df: org.apache.spark.sql.DataFrame = [features: vector, label: string]
 
scala> df.map(t => t(1)+":"+t(0)).collect().foreach(println)
Iris-versicolor:[7.0,3.2,4.7,1.4]
Iris-versicolor:[6.4,3.2,4.5,1.5]
Iris-versicolor:[6.9,3.1,4.9,1.5]
……
……

(3)構建ML的pipeline

a.分別獲取標簽列和特征列,進行索引,並進行了重命名。

scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_e53e67411169
scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(df)
featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_53b988077b38

b.接下來,我們把數據集隨機分成訓練集和測試集,其中訓練集占70%  

scala> val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))
trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string]

c.然后,我們設置logistic的參數,這里我們統一用setter的方法來設置,也可以用ParamMap來設置(具體的可以查看spark mllib的官網)。這里我們設置了循環次數為10次,正則化項為0.3等  

scala> val lr = new LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_692899496c23

d.這里我們設置一個labelConverter,目的是把預測的類別重新轉化成字符型的。  

scala> val labelConverter = new IndexToString().setInputCol("prediction").setOut
putCol("predictedLabel").setLabels(labelIndexer.labels)
labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_c204eafabf57

e.構建pipeline,設置stage,然后調用fit()來訓練模型  

scala> val lrPipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, lr, labelConverter))
lrPipeline: org.apache.spark.ml.Pipeline = pipeline_eb1b201af1e0
 
scala> val lrPipelineModel = lrPipeline.fit(trainingData)
lrPipelineModel: org.apache.spark.ml.PipelineModel = pipeline_eb1b201af1e0

f.pipeline本質上是一個Estimator,當pipeline調用fit()的時候就產生了一個PipelineModel,本質上是一個Transformer。然后這個PipelineModel就可以調用transform()來進行預測,生成一個新的DataFrame,即利用訓練得到的模型對測試集進行驗證。  

scala> val lrPredictions = lrPipelineModel.transform(testData)
lrPredictions: org.apache.spark.sql.DataFrame = [features: vector, label: string ... 6 more fields]

g.最后我們可以輸出預測的結果,其中select選擇要輸出的列,collect獲取所有行的數據,用foreach把每行打印出來。其中打印出來的值依次分別代表該行數據的真實分類和特征值、預測屬於不同分類的概率、預測的分類  

scala> lrPredictions.select("predictedLabel", "label", "features", "probability").collect().foreach { case Row(predictedLabel: String, label: String, features: Vector, prob: Vector) => println(s"($label, $features) --> prob=$prob, predicted Label=$predictedLabel")}
(Iris-virginica, [4.9,2.5,4.5,1.7]) --> prob=[0.4796551461409372,0.5203448538590628], predictedLabel=Iris-virginica
(Iris-versicolor, [5.1,2.5,3.0,1.1]) --> prob=[0.5892626391059901,0.41073736089401], predictedLabel=Iris-versicolor
(Iris-versicolor, [5.5,2.3,4.0,1.3]) --> prob=[0.5577310241453046,0.4422689758546954], predictedLabel=Iris-versicolor

(4)模型評估  

創建一個MulticlassClassificationEvaluator實例,用setter方法把預測分類的列名和真實分類的列名進行設置;然后計算預測准確率和錯誤率  

scala> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_a80353e4211d
 
scala> val lrAccuracy = evaluator.evaluate(lrPredictions)
lrAccuracy: Double = 1.0
 
scala> println("Test Error = " + (1.0 - lrAccuracy))
Test Error = 0.0

接下來我們可以通過model來獲取我們訓練得到的邏輯斯蒂模型。前面已經說過model是一個PipelineModel,因此我們可以通過調用它的stages來獲取模型,具體如下:  

scala> val lrModel = lrPipelineModel.stages(2).asInstanceOf[LogisticRegressionModel]
lrModel: org.apache.spark.ml.classification.LogisticRegressionModel = logreg_692899496c23
 
scala> println("Coefficients: " + lrModel.coefficients+"Intercept: "+lrModel.intercept+"numClasses: "+lrModel.numClasses+"numFeatures: "+lrModel.numFeatures)
Coefficients: [-0.0396171957643483,0.0,0.0,0.07240315639651046]Intercept: -0.23127346342015379numClasses: 2numFeatures: 4

2.用多項邏輯斯蒂回歸來解決二分類問題

3.用多項邏輯斯蒂回歸來解決多分類問題

二、決策樹分類器

  決策樹(decision tree)是一種基本的分類與回歸方法,這里主要介紹用於分類的決策樹。決策樹模式呈樹形結構,其中每個內部節點表示一個屬性上的測試,每個分支代表一個測試輸出,每個葉節點代表一種類別。學習時利用訓練數據,根據損失函數最小化的原則建立決策樹模型;預測時,對新的數據,利用決策樹模型進行分類

  決策樹學習通常包括3個步驟:特征選擇、決策樹的生成和決策樹的剪枝。

1.特征選擇

  特征選擇在於選取對訓練數據具有分類能力的特征,這樣可以提高決策樹學習的效率。通常特征選擇的准則是信息增益(或信息增益比、基尼指數等),每次計算每個特征的信息增益,並比較它們的大小,選擇信息增益最大(信息增益比最大、基尼指數最小)的特征

2.決策樹的生成 

  1. 從根結點開始,對結點計算所有可能的特征的信息增益,選擇信息增益最大的特征作為結點的特征,由該特征的不同取值建立子結點,再對子結點遞歸地調用以上方法,構建決策樹;直到所有特征的信息增均很小或沒有特征可以選擇為止,最后得到一個決策樹。
  2. ​決策樹需要有停止條件來終止其生長的過程。一般來說最低的條件是:當該節點下面的所有記錄都屬於同一類,或者當所有的記錄屬性都具有相同的值時。這兩種條件是停止決策樹的必要條件,也是最低的條件。在實際運用中一般希望決策樹提前停止生長,限定葉節點包含的最低數據量,以防止由於過度生長造成的過擬合問題。

3.決策樹的剪枝

  決策樹生成算法遞歸地產生決策樹,直到不能繼續下去為止。這樣產生的樹往往對訓練數據的分類很准確,但對未知的測試數據的分類卻沒有那么准確,即出現過擬合現象。解決這個問題的辦法是考慮決策樹的復雜度,對已生成的決策樹進行簡化,這個過程稱為剪枝。

  我們以iris數據集(iris)為例進行分析(iris下載地址:http://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/03/iris.txt)iris以鳶尾花的特征作為數據來源,數據集包含150個數據集,分為3類,每類50個數據,每個數據包含4個屬性,是在數據挖掘、數據分類中非常常用的測試集、訓練集。

(1)導入需要的包

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.{Vector,Vectors}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

(2)讀取數據,簡要分析  

scala> import spark.implicits._
import spark.implicits._
 
scala> case class Iris(features: org.apache.spark.ml.linalg.Vector, label: String)
defined class Iris
 
scala> val data = spark.sparkContext.textFile("file:///usr/local/spark/iris.txt").map(_.split(",")).map(p => Iris(Vectors.dense(p(0).toDouble,p(1).toDouble,p(2).toDouble, p(3).toDouble),p(4).toString())).toDF()
data: org.apache.spark.sql.DataFrame = [features: vector, label: string]

scala> data.createOrReplaceTempView("iris")
 
scala> val df = spark.sql("select * from iris")
df: org.apache.spark.sql.DataFrame = [features: vector, label: string]
 
scala> df.map(t => t(1)+":"+t(0)).collect().foreach(println)
Iris-setosa:[5.1,3.5,1.4,0.2]
Iris-setosa:[4.9,3.0,1.4,0.2]
Iris-setosa:[4.7,3.2,1.3,0.2]
Iris-setosa:[4.6,3.1,1.5,0.2]
Iris-setosa:[5.0,3.6,1.4,0.2]
Iris-setosa:[5.4,3.9,1.7,0.4]
Iris-setosa:[4.6,3.4,1.4,0.3]
 
... ... 

(3)進一步處理特征和標簽,以及數據分組

//分別獲取標簽列和特征列,進行索引,並進行了重命名。
scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol(
"indexedLabel").fit(df)
labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_107f7e530fa7
 
scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutpu
tCol("indexedFeatures").setMaxCategories(4).fit(df)
featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_0649803dfa70

//這里我們設置一個labelConverter,目的是把預測的類別重新轉化成字符型的。
scala> val labelConverter = new IndexToString().setInputCol("prediction").setOut
putCol("predictedLabel").setLabels(labelIndexer.labels)
labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_046182b2e571
//接下來,我們把數據集隨機分成訓練集和測試集,其中訓練集占70%。
scala> val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string]

(4)構建決策樹分類模型

//導入所需要的包
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
//訓練決策樹模型,這里我們可以通過setter的方法來設置決策樹的參數,也可以用ParamMap來設置(具體的可以查看spark mllib的官網)。具體的可以設置的參數可以通過explainParams()來獲取。
scala> val dtClassifier = new DecisionTreeClassifier().setLabelCol("indexedLabel
").setFeaturesCol("indexedFeatures")
dtClassifier: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_029ea28aceb1
//在pipeline中進行設置
scala> val pipelinedClassifier = new Pipeline().setStages(Array(labelIndexer, featureIndexer, dtClassifier, labelConverter))
pipelinedClassifier: org.apache.spark.ml.Pipeline = pipeline_a254dfd6dfb9

//訓練決策樹模型
scala> val modelClassifier = pipelinedClassifier.fit(trainingData)
modelClassifier: org.apache.spark.ml.PipelineModel = pipeline_a254dfd6dfb9
//進行預測
scala> val predictionsClassifier = modelClassifier.transform(testData)
predictionsClassifier: org.apache.spark.sql.DataFrame = [features: vector, label: string ... 6 more fields]
//查看部分預測的結果
scala> predictionsClassifier.select("predictedLabel", "label", "features").show(20)
+---------------+---------------+-----------------+
| predictedLabel| label| features|
+---------------+---------------+-----------------+
| Iris-setosa| Iris-setosa|[4.4,2.9,1.4,0.2]|
| Iris-setosa| Iris-setosa|[4.6,3.6,1.0,0.2]|
| Iris-virginica|Iris-versicolor|[4.9,2.4,3.3,1.0]|
| Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]|
| Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]|

(5)評估決策樹分類模型 

scala> val evaluatorClassifier = new MulticlassClassificationEvaluator().s
etLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
evaluatorClassifier: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_4abc19f3a54d
 
scala> val accuracy = evaluatorClassifier.evaluate(predictionsClassifier)
accuracy: Double = 0.8648648648648649
 
scala> println("Test Error = " + (1.0 - accuracy))
Test Error = 0.1351351351351351
 
scala> val treeModelClassifier = modelClassifier.stages(2).asInstanceOf[De
cisionTreeClassificationModel]
treeModelClassifier: org.apache.spark.ml.classification.DecisionTreeClassificati
onModel = DecisionTreeClassificationModel (uid=dtc_029ea28aceb1) of depth 5 with 13 nodes

scala> println("Learned classification tree model:\n" + treeModelClassifier.toDebugString)
Learned classification tree model:
DecisionTreeClassificationModel (uid=dtc_029ea28aceb1) of depth 5 with 13 nodes
If (feature 2 <= 1.9)
Predict: 2.0
Else (feature 2 > 1.9)
If (feature 2 <= 4.7)
If (feature 0 <= 4.9)
Predict: 1.0
Else (feature 0 > 4.9)
Predict: 0.0
Else (feature 2 > 4.7)
If (feature 3 <= 1.6)
If (feature 2 <= 4.8)
Predict: 0.0
Else (feature 2 > 4.8)
If (feature 0 <= 6.0)
Predict: 0.0
Else (feature 0 > 6.0)
Predict: 1.0
Else (feature 3 > 1.6)
Predict: 1.0

(6)構建決策樹回歸模型  

//導入所需要的包
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
//訓練決策樹模型
scala> val dtRegressor = new DecisionTreeRegressor().setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
dtRegressor: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_358e08c37f0c
//在pipeline中進行設置
scala> val pipelineRegressor = new Pipeline().setStages(Array(labelIndexer, featureIndexer, dtRegressor, labelConverter))
pipelineRegressor: org.apache.spark.ml.Pipeline = pipeline_ae699675d015

//訓練決策樹模型
scala> val modelRegressor = pipelineRegressor.fit(trainingData)
modelRegressor: org.apache.spark.ml.PipelineModel = pipeline_ae699675d015
//進行預測
scala> val predictionsRegressor = modelRegressor.transform(testData)
predictionsRegressor: org.apache.spark.sql.DataFrame = [features: vector, label: string ... 4 more fields]
//查看部分預測結果
scala> predictionsRegressor.select("predictedLabel", "label", "features").show(20)
+---------------+---------------+-----------------+
| predictedLabel| label| features|
+---------------+---------------+-----------------+
| Iris-setosa| Iris-setosa|[4.4,2.9,1.4,0.2]|
| Iris-setosa| Iris-setosa|[4.6,3.6,1.0,0.2]|
| Iris-virginica|Iris-versicolor|[4.9,2.4,3.3,1.0]|
| Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]|
| Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]|

(7)評估決策樹回歸模型

scala> val evaluatorRegressor = new RegressionEvaluator().setLabelCol("ind
exedLabel").setPredictionCol("prediction").setMetricName("rmse")
evaluatorRegressor: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_425d2aeea2dd
 
scala> val rmse = evaluatorRegressor.evaluate(predictionsRegressor)
rmse: Double = 0.3676073110469039
 
scala> println("Root Mean Squared Error (RMSE) on test data = " + rmse)
Root Mean Squared Error (RMSE) on test data = 0.3676073110469039
 
scala> val treeModelRegressor = modelRegressor.stages(2).asInstanceOf[Deci
sionTreeRegressionModel]
treeModelRegressor: org.apache.spark.ml.regression.DecisionTreeRegressionModel =
DecisionTreeRegressionModel (uid=dtr_358e08c37f0c) of depth 5 with 13 nodes

scala> println("Learned regression tree model:\n" + treeModelRegressor.toDebugString)
Learned regression tree model:
DecisionTreeRegressionModel (uid=dtr_358e08c37f0c) of depth 5 with 13 nodes
If (feature 2 <= 1.9)
Predict: 2.0
Else (feature 2 > 1.9)
If (feature 2 <= 4.7)
If (feature 0 <= 4.9)
Predict: 1.0
Else (feature 0 > 4.9)
Predict: 0.0
Else (feature 2 > 4.7)
If (feature 3 <= 1.6)
If (feature 2 <= 4.8)
Predict: 0.0
Else (feature 2 > 4.8)
If (feature 0 <= 6.0)
Predict: 0.5
Else (feature 0 > 6.0)
Predict: 1.0
Else (feature 3 > 1.6)
Predict: 1.0

從上述結果可以看到模型的標准誤差為 0.3676073110469039以及訓練的決策樹模型結構。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM