邏輯斯蒂回歸:
邏輯斯蒂回歸是統計學習中的經典分類方法,屬於對數線性模型。logistic回歸的因變量可以是二分類的,
也可以是多分類的
基本原理
logistic 分布
折X是連續的隨機變量,X服從logistic分布是指X具有下列分布函數和密度函數:

其中
為位置參數,
為形狀參數。
與
圖像如下,其中分布函數是以
為中心對陣,
越小曲線變化越快

二項logistic回歸模型;
二項logistic回歸模型如下:

其中
是輸入,
輸出,W稱為權值向量,b稱為偏置,
是w和x的內積
參數估計
假設:
![]()
則似然函數為:
![Rendered by QuickLaTeX.com \[\prod_{i=1}^N [\pi (x_i)]^{y_i} [1 - \pi(x_i)]^{1-y_i}\]](/image/aHR0cDovL2RibGFiLnhtdS5lZHUuY24vYmxvZy93cC1jb250ZW50L3FsLWNhY2hlL3F1aWNrbGF0ZXguY29tLWY1ZDYzZWE4MGE5ZmQ3OGE5ZjZhMzg0MWI0MWI5NTVjX2wzLnN2Zw==.png)
求對數似然函數:
![Rendered by QuickLaTeX.com \[L(w) = \sum_{i=1}^N [y_i \log{\pi(x_i)} + (1-y_i) \log{(1 - \pi(x_i)})]\]](/image/aHR0cDovL2RibGFiLnhtdS5lZHUuY24vYmxvZy93cC1jb250ZW50L3FsLWNhY2hlL3F1aWNrbGF0ZXguY29tLTdiNWI0NjEwMDU0NGRmYWRlZmUxMDBkMmY1Y2U5NzM2X2wzLnN2Zw==.png)
![Rendered by QuickLaTeX.com \[\sum_{i=1}^N [y_i \log{\frac {\pi (x_i)} {1 - \pi(x_i)}} + \log{(1 - \pi(x_i)})]=\sum_{i=1}^N [y_i \log{\frac {\pi (x_i)} {1 - \pi(x_i)}} + \log{(1 - \pi(x_i)})]\]](/image/aHR0cDovL2RibGFiLnhtdS5lZHUuY24vYmxvZy93cC1jb250ZW50L3FsLWNhY2hlL3F1aWNrbGF0ZXguY29tLWQzNTViZTE5Y2MxNWE2ODQyYjlmMTk0ODdjZDUyODdhX2wzLnN2Zw==.png)
從而對
![]()
求極大值,得到w的估計值。求極值的方法可以是梯度下降法,梯度上升法等。
示例代碼:
#導入需要的包:
from pyspark import SparkContext
from pyspark.sql import SparkSession,Row,functions
from pyspark.ml.linalg import Vector,Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer,HashingTF,Tokenizer
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel,BinaryLogisticRegressionSummary,LogisticRegression
#用二項邏輯斯蒂回歸解決 二分類 問題
sc = SparkContext('local','用二項邏輯斯蒂回歸解決二分類問題')
spark = SparkSession.builder.master('local').appName('用二項邏輯斯蒂回歸解決二分類問題').getOrCreate()
#讀取數據,簡要分析
#我們定制一個函數,來返回一個指定的數據,然后讀取文本文件,第一個map把每行的數據用","
#隔開,比如在我們的數據集中,每行被分成了5部分,目前4部分是鳶尾花的四個特征,最后一部分鳶尾花的分類;
#我們這里把特征存儲在Vector中,創建一個Iris模式的RDd,然后轉化成DataFrame;最后調用show()方法查看數據
def f(x):
rel ={}
rel['features'] = Vectors.dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
rel['label'] = str(x[4])
return rel
data= sc.textFile("file:///usr/local/spark/mycode/exercise/iris.txt").map(lambda line : line.split(',')).map(lambda p : Row(**f(p))).toDF()
# 因為我們現在處理的是2分類問題,所以我們不需要全部的3類數據,我們要從中選出兩類的
#數據。這里首先把剛剛得到的數據注冊成一個表iris,注冊成這個表之后,我們就可以
#通過sql語句進行數據查詢,比如我們這里選出了所有不屬於“Iris-setosa”類別的數
#據;選出我們需要的數據后,我們可以把結果打印出來看一下,這時就已經沒有“Iris-setosa”類別的數據
data.createOrReplaceTempView("iris")
df = spark.sql("select * from iris where label != 'Iris-setosa'")
rel = df.rdd.map(lambda t : str(t[1])+":"+str(t[0])).collect()
for item in rel:
print(item)
如圖:
#構建ML的pipeline
#分別獲取標簽列和特征列,進行索引,並進行了重命名
labelIndexer = StringIndexer().setInputCol('label').setOutputCol('indexedLabel').fit(df)
featureIndexer = VectorIndexer().setInputCol('features').setOutputCol('indexedFeatures').fit(df)
#把數據集隨機分成訓練集和測試集,其中訓練集占70%
trainingData, testData =df.randomSplit([0.7,0.3])
#設置logistic的參數,這里我們統一用setter的方法來設置,也可以用ParamMap來設置
#(具體的可以查看spark mllib的官網)。這里我們設置了循環次數為10次,正則化項為
#0.3等,具體的可以設置的參數可以通過explainParams()來獲取,還能看到我們已經設置
#的參數的結果。
lr= LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol('indexedFeatures').setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
print("LogisticRegression parameters:\n"+ lr.explainParams())
如圖:
#設置一個labelConverter,目的是把預測的類別重新轉化成字符型的
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
#構建pipeline,設置stage,然后調用fit()來訓練模型
LrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, lr, labelConverter])
LrPipelineModel = LrPipeline.fit(trainingData)
#用訓練得到的模型進行預測,即對測試數據集進行驗證
lrPredictions = LrPipelineModel.transform(testData)
preRel = lrPredictions.select("predictedLabel",'label','features','probability').collect()
for item in preRel:
print(str(item['label'])+','+str(item['features'])+'-->prob='+str(item['probability'])+',predictedLabel'+str(item['predictedLabel']))
如圖:
#模型評估1
#創建一個MulticlassClassificationEvaluator實例,用setter方法把預測分類的列名和真實分類的列名進行設置;然后計算預測准確率和錯誤率
evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
lrAccuracy = evaluator.evaluate(lrPredictions)
print("Test Error=" + str(1.0- lrAccuracy))
如圖:
#從上面可以看到預測的准確性達到94%,接下來我們可以通過model來獲取我們訓練得到
#的邏輯斯蒂模型。前面已經說過model是一個PipelineModel,因此我們可以通過調用它的
#stages來獲取模型
lrModel = LrPipelineModel.stages[2]
print("Coefficients: " + str(lrModel.coefficients)+"Intercept: "+str(lrModel.intercept)+"numClasses: "+str(lrModel.numClasses)+"numFeatures: "+str(lrModel.numFeatures))
如圖:
#模型評估2
#spark的ml庫還提供了一個對模型的摘要總結(summary),不過目前只支持二項邏輯斯
#蒂回歸,而且要顯示轉化成
BinaryLogisticRegressionSummary 。在下面的代碼中,首
#先獲得二項邏輯斯模型的摘要;然后獲得10次循環中損失函數的變化,並將結果打印出來
#,可以看到損失函數隨着循環是逐漸變小的,損失函數越小,模型就越好;接下來,我們
#把摘要強制轉化為
BinaryLogisticRegressionSummary,來獲取用來評估模型性能的矩陣;
#通過獲取ROC,我們可以判斷模型的好壞,areaUnderROC達到了 0.969551282051282,說明
#我們的分類器還是不錯的;最后,我們通過最大化fMeasure來選取最合適的閾值,其中fMeasure
#是一個綜合了召回率和准確率的指標,通過最大化fMeasure,我們可以選取到用來分類的最合適的閾值
trainingSummary = lrModel.summary
objectiveHistory = trainingSummary.objectiveHistory
for item in objectiveHistory:
print (item)
print("areaUnderRoC:"+str(trainingSummary.areaUnderROC))
如圖:
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
print(maxFMeasure)
如圖:
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']).select('threshold').head()['threshold']
print(bestThreshold)
lr.setThreshold(bestThreshold)
#用多項邏輯斯蒂回歸解決 二分類 問題
mlr = LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setFamily("multinomial")
mlrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, mlr, labelConverter])
mlrPipelineModel = mlrPipeline.fit(trainingData)
mlrPrediction = mlrPipelineModel.transform(testData)
mlrPreRel =mlrPrediction.select("predictedLabel", "label", "features", "probability").collect()
for item in mlrPreRel:
print('('+str(item['label'])+','+str(item['features'])+')-->prob='+str(item['probability'])+',predictLabel='+str(item['predictedLabel']))
如圖:
mlrAccuracy = evaluator.evaluate(mlrPrediction)
print("mlr Test Error ="+ str(1.0-mlrAccuracy))
如圖:
mlrModel = mlrPipelineModel.stages[2]
print("Multinomial coefficients: " +str(mlrModel.coefficientMatrix)+"Multinomial intercepts: "+str(mlrModel.interceptVector)+"numClasses: "+str(mlrModel.numClasses)+"numFeatures: "+str(mlrModel.numFeatures))
如圖;
#用多項邏輯斯蒂回歸解決多分類問題
mlrPreRel2 = mlrPrediction.select("predictedLabel", "label", "features", "probability").collect()
for item in mlrPreRel2:
print('('+str(item['label'])+','+str(item['features'])+')-->prob='+str(item['probability'])+',predictLabel='+str(item['predictedLabel']))
如圖:

mlr2Accuracy = evaluator.evaluate(mlrPrediction)
print("Test Error = " + str(1.0 - mlr2Accuracy))
mlr2Model = mlrPipelineModel.stages[2]
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix)+"Multinomial intercepts: "+str(mlrModel.interceptVector)+"numClasses: "+str(mlrModel.numClasses)+"numFeatures: "+str(mlrModel.numFeatures))
