一、logistic回歸概述
主要是進行二分類預測,也即是對於0~1之間的概率值,當概率大於0.5預測為1,小於0.5預測為0.顯然,我們不能不提到一個函數,即sigmoid=1/(1+exp(-inX)),該函數的曲線類似於一個s型,在x=0處,函數值為0.5.
於是,為了實現logistic分類器,我們可以在每個特征上都乘以一個回歸系數,然后所有的相乘結果進行累加,將這個總結作為輸入,輸入到sigmoid函數中,從而得到一個大小為0~1之間的值,當該值大於0.5歸類為1,否則歸類為0,這樣就完成了二分類的任務。所以logistic回歸可以看成是一種概率估計。
二,基於最優化方法的最佳回歸系數確定
sigmoid函數的輸入記為z,即z=w0x0+w1x1+w2x2+...+wnxn,如果用向量表示即為z=wTx,它表示將這兩個數值向量對應元素相乘然后累加起來。其中,向量x是分類器的輸入數據,w即為我們要擬合的最佳參數,從而使分類器預測更加准確。也就是說,logistic回歸最重要的是要找到最佳的擬合參數。
1 梯度上升法
梯度上升法(等同於我們熟知的梯度下降法,前者是尋找最大值,后者尋找最小值),它的基本思想是:要找到某函數的最大值,最好的方法就是沿着該函數的梯度方向搜尋。如果函數為f,梯度記為D,a為步長,那么梯度上升法的迭代公式為:w:w+a*Dwf(w)。該公式停止的條件是迭代次數達到某個指定值或者算法達到某個允許的誤差范圍。我們熟知的梯度下降法迭代公式為:w:w-a*Dwf(w)
2 使用梯度上升法尋找最佳參數
假設有100個樣本點,每個樣本有兩個特征:x1和x2.此外為方便考慮,我們額外添加一個x0=1,將線性函數z=wTx+b轉為z=wTx(此時向量w和x的維度均價1).那么梯度上升法的偽代碼如下:
初始化每個回歸系數為1
重復R次:
計算整個數據集梯度
使用alpha*gradient更新回歸系數的向量
返回回歸系數
代碼如下:
#預處理數據 def loadDataSet(): dataMat=[];labelMat=[] fr=open('testSet.txt') #遍歷文本的每一行 for line in fr.readlines(): #對當前行去除首尾空格,並按空格進行分離 lineArr=line.strip().split() #將每一行的兩個特征x1,x2,加上x0=1,組成列表並添加到數據集列表中 dataMat.append([1.0,float(lineArr[0]),float(lineArr[1])]) #將當前行標簽添加到標簽列表 labelMat.append(int(lineArr[2])) #返回數據列表,標簽列表 return dataMat,labelMat #定義sigmoid函數 def sigmoid(inx): return 1.0/(1+exp(-inx)) #梯度上升法更新最優擬合參數 #@dataMatIn:數據集 #@classLabels:數據標簽 def gradAscent(dataMatIn,classLabels): #將數據集列表轉為Numpy矩陣 dataMatrix=mat(dataMatIn) 將數據集標簽列表轉為Numpy矩陣,並轉置 labelMat=mat(classLabels).transpose() #獲取數據集矩陣的行數和列數 m,n=shape(dataMat) #學習步長 alpha=0.001 #最大迭代次數 maxCycles=500 #初始化權值參數向量每個維度均為1.0 weights=one((n,1)) #循環迭代次數 for k in range(maxCycles): #求當前的sigmoid函數預測概率 h=sigmoid(dataMatrix*weights) #*********************************************** #此處計算真實類別和預測類別的差值 #對logistic回歸函數的對數釋然函數的參數項求偏導 error=(labelMat-h) #更新權值參數(這里的梯度是由最大似然損失函數的梯度求得) weights=weights+alpha*dataMatrix.transpose()*error #*********************************************** return weights
我們知道對回歸系數進行更新的公式為:w:w+alpha*gradient,其中gradient是對參數w求偏導數。則我們可以通過求導驗證logistic回歸函數對參數w的梯度為(yi-sigmoid(wTx))*x,證明過程如下所示:
我們還可以通過matpotlib畫出決策的邊界,集體代碼為:
def plotBestFit(wei):
import matplotlib.pyplot as plt
weights = wei.getA()
dataMat, labelMat = loadDataSet()
dataArr = array(dataMat)
n = shape(dataArr)[0]
xcord1 = []; ycord1 = []
xcord2 = []; ycord2 = []
for i in range(n):
if int(labelMat[i]) == 1:
xcord1.append(dataArr[i,1]); ycord1.append(dataArr[i,2])
else: xcord2.append(dataArr[i,1]); ycord2.append(dataArr[i,2])
fig = plt.figure()
ax = fig.add_subplot(111)
ax.scatter(xcord1, ycord1, s = 30, c = 'red', marker='s')
ax.scatter(xcord2, ycord2, s = 30, c = 'green')
x = arange(-3.0, 3.0, 0.1)
y = (-weights[0]- weights[1]*x)/weights[2]
ax.plot(x, y)
plt.xlabel('X1');
plt.ylabel('X2');
plt.show()
3 隨機梯度上升法
我們知道梯度上升法每次更新回歸系數都需要遍歷整個數據集,當樣本數量較小時,該方法尚可,但是當樣本數據集非常大且特征非常多時,那么隨機梯度下降法的計算復雜度就會特別高。一種改進的方法是一次僅用一個樣本點來更新回歸系數,即歲集梯度上升法。由於可以在新樣本到來時對分類器進行增量式更新,因此隨機梯度上升法是一個在線學習算法。隨機梯度上升法可以寫成如下偽代碼:
所有回歸系數初始化為1
對數據集每個樣本
計算該樣本的梯度
使用alpha*gradient更新回顧系數值
返回回歸系數值
#梯度上升算法 def stocGradAscent(dataMatrix,classLabels): #為便於計算,轉為Numpy數組 dataMat=array(dataMatrix) 獲取數據集的行數和列數 m,n=shape(dataMatrix) #初始化權值向量各個參數為1.0 weights=ones(n) #設置步長為0.01 alpha=0.01 #循環m次,每次選取數據集一個樣本更新參數 for i in range(m): #計算當前樣本的sigmoid函數值 h=sigmoid(dataMatrix[i]+weights) #計算當前樣本的殘差(代替梯度) error=(classLabels[i]-h) #更新權值參數 weights=weights+alpha*error*dataMatrix[i] return weights
評判一個優化算法的優劣的可靠方法是看其是否收斂,也就是說參數的值是否達到穩定值。此外,當參數值接近穩定時,仍然可能會出現一些小的周期性的波動。這種情況發生的原因是樣本集中存在一些不能正確分類的樣本點(數據集並非線性可分),所以這些點在每次迭代時會引發系數的劇烈改變,造成周期性的波動。顯然我們希望算法能夠避免來回波動,從而收斂到某個值,並且收斂速度也要足夠快。
為此,需要對上述隨機梯度上升法代碼進行適當修改,代碼如下:
#@dataMatrix:數據集列表 #@classLabels:標簽列表 #@numIter:迭代次數,默認150 def stocGradAscent1(dataMatrix,classLabels,numIter=150): #將數據集列表轉為Numpy數組 dataMat=array(dataMatrix) #獲取數據集的行數和列數 m,n=shape(dataMat) #初始化權值參數向量每個維度均為1 weights=ones(n) #循環每次迭代次數 for j in range(numIter): #獲取數據集行下標列表 dataIndex=range(m) #遍歷行列表 for i in range(m): #每次更新參數時設置動態的步長,且為保證多次迭代后對新數據仍然具有一定影響 #添加了固定步長0.1 alpha=4/(1.0+j+i)+0.1 #隨機獲取樣本 randomIndex=int(random.nuiform(0,len(dataIndex))) #計算當前sigmoid函數值 h=sigmoid(dataMat[randomIndex]*weights) #計算權值更新 #*********************************************** error=classLabels-h weights=weights+alpha*error*dataMat[randomIndex] #*********************************************** #選取該樣本后,將該樣本下標刪除,確保每次迭代時只使用一次 del(dataIndex[randomIndex]) return weights
上述代碼中有兩處改進的地方:
1 alpha在每次迭代更新是都會調整,這會緩解數據波動或者高頻運動。此外,alpha還有一個常數項,目的是為了保證在多次迭代后仍然對新數據具有一定的影響,如果要處理的問題是動態變化的,可以適當加大該常數項,從而確保新的值獲得更大的回歸系數。
2 第二個改進的地方是選擇隨機的樣本對參數進行更新,由於增加了隨機性,這就防止參數值發生周期性的波動。
3 並且采用上述改進算法后,收斂速度更快
三,logistic回歸實例:從疝氣病症預測病馬死亡率
現有數據集有100個樣本和20個特征,但是數據存在一定的問題,即數據集有30%的缺失,因此,我們在對病馬進行預測死亡率前,首先要解決數據的缺失問題。
我們可能會遇到數據缺失的情況,但有時候數據相當昂貴,扔掉和重新獲取均不可取,這顯然是會帶來更多的成本負擔,所以我們可以選取一些有效的方法來解決該類問題。比如:
1 使用可用特征的均值填補缺失值
2 使用特殊值來填補缺失的特征,如-1
3 忽略有缺失值的樣本
4 使用相似樣本的平均值填補缺失值
5 使用另外的機器學習算法預測缺失值
這里我們根據logstic回歸的函數特征,選擇實數0來替換所有缺失值,而這恰好能適用logistic回歸。因此,它在參數更新時不會影響參數的值。即如果某特征對應值為0 ,那么由公式w:w+alpha*gradient,可知w不會發生改變。
此外,由於sigmoid(0)=0.5,表面該特征對結果的預測不具有任何傾向性,因此不會對誤差造成影響。
當然,如果是發生有樣本的類標簽缺失的情況,此時我們最好的辦法是將該樣本舍棄,這是因為標簽與特征不同,我們很難確定采用某個合適的值替換掉。
下面來看回歸分類器代碼:
#------------------------------實例:從疝氣病預測病馬的死亡率---------------------------- #1 准備數據:處理數據的缺失值 #這里將特征的缺失值補0,從而在更新時不影響系數的值 #2 分類決策函數 def clasifyVector(inX,weights): #計算logistic回歸預測概率 prob=sigmoid(inX*weights) #大於0.5預測為1 if prob>0.5: return 1.0 #否則預測為0 else: return 0.0 #logistic回歸預測算法 def colicTest(): #打開訓練數據集 frTrain=open('horseColicTraining.txt') #打開測試數據集 frTest=open('horseColicTest.txt') #新建兩個孔列表,用於保存訓練數據集和標簽 trainingSet=[];trainingLabels=[] #讀取訓練集文檔的每一行 for line in frTrain.readlines(): #對當前行進行特征分割 currLine=line.strip().split() #新建列表存儲每個樣本的特征向量 lineArr=[] #遍歷每個樣本的特征 for i in range(21): 將該樣本的特征存入lineArr列表 lineArr.append(float(currLine[i])) #將該樣本標簽存入標簽列表 trainingLabels.append(currLine[21]) #將該樣本的特征向量添加到數據集列表 trainingSet.append(lineArr) #調用隨機梯度上升法更新logistic回歸的權值參數 trainWeights=stocGradAscent1(trainingSet,trainingLabels,500) #統計測試數據集預測錯誤樣本數量和樣本總數 errorCount=0;numTestVec=0.0 #遍歷測試數據集的每個樣本 for line in frTest.readlines(): #樣本總數加1 numTestVec+=1.0 #對當前行進行處理,分割出各個特征及樣本標簽 currLine=line.strip().split() #新建特征向量 lineArr=[] #將各個特征構成特征向量 for i in range(21): lineArr.append(float(currLine[i])) #利用分類預測函數對該樣本進行預測,並與樣本標簽進行比較 if(clasifyVector(lineArr,trainWeights)!=currLine[21]): #如果預測錯誤,錯誤數加1 errorCount+=1 #計算測試集總的預測錯誤率 errorRate=(float(errorCount)/numTestVec) #打印錯誤率大小 print('the error rate of this test is: %f', %(errorRate)) #返回錯誤率 return errorRate #多次測試算法求取預測誤差平均值 def multTest(): #設置測試次數為10次,並統計錯誤率總和 numTests=10;errorRateSum=0.0 #每一次測試算法並統計錯誤率 for k in range(numTests): errorRateSum+=colicTest() #打印出測試10次預測錯誤率平均值 print('after %d iterations the average error rate is: %f',\ %(numTests,errorRateSum/float(numTests)))