回顧:
梯度下降
梯度下降和梯度上升區別
一:加載數據和實現sigmoid函數(同梯度下降)
import numpy as np def loadDataSet(): data = np.loadtxt("testSet.txt") data_X = data[:,0:2] data_Y = data[:,-1] #注意,后面需要有一個常數項x0,設置為1即可 data_X = np.c_[np.ones(data_X.shape[0]),data_X] return data_X,np.array([data_Y]).T def sigmoid(Z): return 1/(1+np.exp(-Z))
二:實現批量梯度上升(重點)
(一)代碼實現
def gradientAsc(data_X,data_Y,iter_Count,alpha): #使用梯度上升,不需要求解代價函數,利用的是概率---我們想求概率最大值 及誤差最小 這里就體現了我們之前提及的sigmoid不只是表示0/1,還是一個表示概率的函數 m,n = data_X.shape W = np.ones((n,1)) #初始化權重矩陣 這里直接是n行1列 for i in range(iter_Count): #進行迭代 yPred = sigmoid(data_X@W)#開始計算sigmoid值---即預測值 error = data_Y - yPred #獲取實際標簽值和預測值的誤差 W += alpha*data_X.T@error #其中梯度上升---我們這里的W一直在上升,注意:yPred由於來自sigmoid函數,所以會一直<=1,所以error不會為負值,可能最后擬合出error正好為全0的結果,就是我們要的結果 return W
(二)結果預測
data_X,data_Y = loadDataSet() print(gradientAsc(data_X,data_Y,500,0.001))
三:繪制圖像決策邊界
#繪圖圖像,畫出決策邊界 plt.figure() plt.scatter(data_X[np.where(data_Y==1),1],data_X[np.where(data_Y==1),2],c="red",s=30) plt.scatter(data_X[np.where(data_Y==0),1],data_X[np.where(data_Y==0),2],c="green",s=30) #繪制決策邊界直線 x = np.linspace(-3,3,100) y = -(W[0]+W[1]*x)/W[2] plt.plot(x,y) plt.show()
四:隨機梯度下降法
(一)簡陋版隨機梯度下降法
def stocGradientAsc(data_X,data_Y,alpha): #隨機梯度算法(簡陋版) m,n = data_X.shape W = np.ones((n,1)) for i in range(m): #循環的次數和批量隨機梯度下降不一致 h = sigmoid(data_X[i]@W) #這里選擇一個數據,獲取預測(沒體現隨機) error = data_Y[i] - h W += alpha*error*(np.array([data_X[i]]).T) return W data_X,data_Y = loadDataSet() W = stocGradientAsc(data_X,data_Y,150,0.01) #繪圖圖像,畫出決策邊界 plt.figure() plt.scatter(data_X[np.where(data_Y==1),1],data_X[np.where(data_Y==1),2],c="red",s=30) plt.scatter(data_X[np.where(data_Y==0),1],data_X[np.where(data_Y==0),2],c="green",s=30) #繪制決策邊界直線 x = np.linspace(-3,3,100) y = -(W[0]+W[1]*x)/W[2] plt.plot(x,y) plt.show()
(二)改進版隨機梯度下降法
def stocGradientAsc2(data_X,data_Y,iter_Count,alpha): m,n = data_X.shape W = np.ones((n,1)) for j in range(iter_Count): #迭代次數和批量隨機梯度保持一致 dataInt = list(range(m)) #獲取全部索引0 - m-1 列表 for i in range(m): #迭代數據和批量保持一致 new_alpha = 4/(1+j+i)+alpha #適當調整alpha參數,隨着迭代次數上升,適當降低alpha的值。防止后面的波動。並且避免參數嚴格下降 randIdx = int(random.uniform(0,len(dataInt))) #uniform隨機選取范圍內的一個實數,所以要int h = sigmoid(data_X[randIdx]@W) error = data_Y[randIdx] - h W += new_alpha*error*np.array([data_X[randIdx]]).T del(dataInt[randIdx]) return W data_X,data_Y = loadDataSet() W = stocGradientAsc2(data_X,data_Y,150,0.01) #繪圖圖像,畫出決策邊界 plt.figure() plt.scatter(data_X[np.where(data_Y==1),1],data_X[np.where(data_Y==1),2],c="red",s=30) plt.scatter(data_X[np.where(data_Y==0),1],data_X[np.where(data_Y==0),2],c="green",s=30) #繪制決策邊界直線 x = np.linspace(-3,3,100) y = -(W[0]+W[1]*x)/W[2] plt.plot(x,y) plt.show()
五:從疝氣病症預測病馬的死亡率
(一)數據導入
import random import numpy as np import matplotlib.pyplot as plt def loadDataSet(): #獲取訓練集 data = np.loadtxt("horseColicTraining.txt") m,n = data.shape trainData_X = data[:,0:n-1] trainData_X = np.c_[np.ones(m),trainData_X] trainData_Y = np.array([data[:,n-1]]).T #注意,后面需要有一個常數項x0,設置為1即可 #獲取測試集 data = np.loadtxt("horseColicTest.txt") m, n = data.shape TestData_X = data[:, 0:n-1] TestData_X = np.c_[np.ones(m),TestData_X] TestData_Y = np.array([data[:, n-1]]).T return trainData_X,trainData_Y,TestData_X,TestData_Y
(二)改進sigmoid函數
def sigmoid(Z): if Z >= 0: return 1/(1+np.exp(-Z)) #-Z可能會是極大值,所以導致np.exp(-Z)過大,導致溢出。所以由下面分支處理 else: #若是Z<0,那么會出現結果為0。我們只是將上面的式子展開來了。這樣,不會出現np.exp(-Z)過大溢出 return np.exp(Z) / (1+np.exp(Z))
(三)實現預測誤差率
def stocGradientAsc2(data_X,data_Y,iter_Count,alpha): m,n = data_X.shape W = np.ones((n,1)) for j in range(iter_Count): #迭代次數和批量隨機梯度保持一致 dataInt = list(range(m)) #獲取全部索引0 - m-1 列表 for i in range(m): #迭代數據和批量保持一致 new_alpha = 4/(1+j+i)+alpha #適當調整alpha參數,隨着迭代次數上升,適當降低alpha的值。防止后面的波動。並且避免參數嚴格下降 randIdx = int(random.uniform(0,len(dataInt))) #uniform隨機選取范圍內的一個實數,所以要int h = sigmoid(data_X[randIdx]@W) error = data_Y[randIdx] - h W += new_alpha*error*np.array([data_X[randIdx]]).T del(dataInt[randIdx]) return W def classifyVector(PreDataVec,W): h = sigmoid(PreDataVec@W) if h > 0.5: return 1 else: return 0 def OneTestGetErr(trainData_X,trainData_Y,TestData_X,TestData_Y): W = stocGradientAsc2(trainData_X,trainData_Y,500,0.01) err = 0 for i in range(TestData_X.shape[0]): if classifyVector(TestData_X[i],W) != TestData_Y[i]: err += 1 return err / TestData_X.shape[0] def GetAvgErr(trainData_X,trainData_Y,TestData_X,TestData_Y,TestNums=10): ErrAvg = 0.0 for i in range(TestNums): err = OneTestGetErr(trainData_X,trainData_Y,TestData_X,TestData_Y) print(err) ErrAvg += err return ErrAvg / TestNums trainData_X,trainData_Y,TestData_X,TestData_Y = loadDataSet() print(GetAvgErr(trainData_X,trainData_Y,TestData_X,TestData_Y))