機器學習實戰---決策樹CART簡介及分類樹實現
一:對比分類樹
CART回歸樹和CART分類樹的建立算法大部分是類似的,所以這里我們只討論CART回歸樹和CART分類樹的建立算法不同的地方。
首先,我們要明白,什么是回歸樹,什么是分類樹。
兩者的區別在於樣本輸出:
如果樣本輸出是離散值,那么這是一顆分類樹。 如果果樣本輸出是連續值,那么那么這是一顆回歸樹。
除了概念的不同,CART回歸樹和CART分類樹的建立和預測的區別主要有下面兩點:
1)連續值的處理方法不同
2)決策樹建立后做預測的方式不同。
對於連續值的處理,我們知道CART分類樹采用的是用基尼系數的大小來度量特征的各個划分點的優劣情況,這比較適合分類模型。
但是對於回歸模型,我們使用了常見的和方差的度量方式。
CART回歸樹的度量目標是,對於任意划分特征A,對應的任意划分點s兩邊划分成的數據集D1和D2,求出使D1和D2各自集合的均方差最小,同時D1和D2的均方差之和最小所對應的特征和特征值划分點。
表達式為:
其中,c1為D1數據集的樣本輸出均值,c2為D2數據集的樣本輸出均值。
對於決策樹建立后做預測的方式,上面講到了CART分類樹采用葉子節點里概率最大的類別作為當前節點的預測類別。而回歸樹輸出不是類別,它采用的是用最終葉子的均值或者中位數來預測輸出結果。
除了上面提到了以外,CART回歸樹和CART分類樹的建立算法和預測沒有什么區別。
二:回歸樹的實現
(一)實現葉子節點均值計算
def regLeaf(data_Y): #用於計算指定樣本中標簽均值表示回歸y值 return np.mean(data_Y)
(二)實現計算數據集總方差
def regErr(data_Y): #使用均方誤差作為划分依據 return np.var(data_Y)*data_Y.size #np.var是求解平均誤差,我們這里需要總方差進行比較
(三)實現數據集切分
def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行數據集划分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx]
(四)實現選取最優特征及特征值(含預剪枝處理)
def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回調方式調用葉節點計算和誤差計算,函數中含有預剪枝操作 :param data_X: 傳入數據集 :param data_Y: 傳入標簽值 :param leafType: 要調用計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函數,這里是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要信息, tolS tolN用於控制函數的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續划分子樹放入createTree方法中,這里可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標簽值全部相同,則返回特征None表示不需要進行下一步划分,返回葉節點 return None,leafType(data_Y) #遍歷獲取最優特征和特征值 TosErr = errType(data_Y) #獲取全部數據集的誤差,后面計算划分后兩個子集的總誤差,如果誤差下降小於tolS,則不進行划分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這里兩個我們設置為0,而不是-1,因為我們必須保證可以取到一個特征(后面循環可能一直continue),我們需要在后面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特征 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #數據集划分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小數據集,不進行計算 continue concErr = errType(dataLg_Y)+errType(dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最后求解的誤差,小於我們要求的誤差距離,則不進行下一步划分數據集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_Y) #3.如果我們上面的數據集本身較小,則無論如何切分,數據集都<tolN,我們就需要在這里再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 數據集划分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小數據集,不進行計算 return None,leafType(data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果
(五)實現決策樹創建
def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立回歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞歸建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 數據集划分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree
(六)數據集加載及測試
import numpy as np def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y
data_X,data_Y = loadDataSet("ex0.txt") print(createTree(data_X,data_Y))
結果顯示:
{'feaIdx': 1, 'feaVal': 0.39435, 'left': { 'feaIdx': 1, 'feaVal': 0.582002, 'left': { 'feaIdx': 1, 'feaVal': 0.797583, 'left': 3.9871632, 'right': 2.9836209534883724 }, 'right': 1.980035071428571 }, 'right': { 'feaIdx': 1, 'feaVal': 0.197834, 'left': 1.0289583666666666, 'right': -0.023838155555555553 } }
(七)全部代碼

import numpy as np def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y def regLeaf(data_Y): #用於計算指定樣本中標簽均值表示回歸y值 return np.mean(data_Y) def regErr(data_Y): #使用均方誤差作為划分依據 return np.var(data_Y)*data_Y.size def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行數據集划分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回調方式調用葉節點計算和誤差計算,函數中含有預剪枝操作 :param data_X: 傳入數據集 :param data_Y: 傳入標簽值 :param leafType: 要調用計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函數,這里是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要信息, tolS tolN用於控制函數的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續划分子樹放入createTree方法中,這里可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標簽值全部相同,則返回特征None表示不需要進行下一步划分,返回葉節點 return None,leafType(data_Y) #遍歷獲取最優特征和特征值 TosErr = errType(data_Y) #獲取全部數據集的誤差,后面計算划分后兩個子集的總誤差,如果誤差下降小於tolS,則不進行划分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這里兩個我們設置為0,而不是-1,因為我們必須保證可以取到一個特征(后面循環可能一直continue),我們需要在后面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特征 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #數據集划分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小數據集,不進行計算 continue concErr = errType(dataLg_Y)+errType(dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最后求解的誤差,小於我們要求的誤差距離,則不進行下一步划分數據集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_Y) #3.如果我們上面的數據集本身較小,則無論如何切分,數據集都<tolN,我們就需要在這里再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 數據集划分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小數據集,不進行計算 return None,leafType(data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立回歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞歸建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 數據集划分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree data_X,data_Y = loadDataSet("ex0.txt") print(createTree(data_X,data_Y))
三:樹剪枝
一棵樹如果節點過多,表示該模型可能對數據進行了過擬合(使用測試集交叉驗證法即可),這時就需要我們進行剪枝處理,避免過擬合
(一)預剪枝
前面建立決策樹過程中,我們已經進行了預剪枝操作。即設置的ops參數,包含了兩個重要信息, tolS tolN用於控制函數的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數。用於在建立決策樹過程中進行預剪枝操作。
下面實例中,查看ops參數設置對剪枝的影響:

import numpy as np def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y def regLeaf(data_Y): #用於計算指定樣本中標簽均值表示回歸y值 return np.mean(data_Y) def regErr(data_Y): #使用均方誤差作為划分依據 return np.var(data_Y)*data_Y.size def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行數據集划分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回調方式調用葉節點計算和誤差計算,函數中含有預剪枝操作 :param data_X: 傳入數據集 :param data_Y: 傳入標簽值 :param leafType: 要調用計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函數,這里是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要信息, tolS tolN用於控制函數的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續划分子樹放入createTree方法中,這里可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標簽值全部相同,則返回特征None表示不需要進行下一步划分,返回葉節點 return None,leafType(data_Y) #遍歷獲取最優特征和特征值 TosErr = errType(data_Y) #獲取全部數據集的誤差,后面計算划分后兩個子集的總誤差,如果誤差下降小於tolS,則不進行划分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這里兩個我們設置為0,而不是-1,因為我們必須保證可以取到一個特征(后面循環可能一直continue),我們需要在后面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特征 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #數據集划分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小數據集,不進行計算 continue concErr = errType(dataLg_Y)+errType(dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最后求解的誤差,小於我們要求的誤差距離,則不進行下一步划分數據集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_Y) #3.如果我們上面的數據集本身較小,則無論如何切分,數據集都<tolN,我們就需要在這里再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 數據集划分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小數據集,不進行計算 return None,leafType(data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立回歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞歸建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 數據集划分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree
1.默認參數ops(1,4)---表示誤差大於1,樣本數大於4的划分結果
data_X,data_Y = loadDataSet("ex2.txt") print(createTree(data_X,data_Y,ops=(1,4)))
出現大量樹分叉,過擬合
3.設置參數ops(1000,4)---表示誤差大於1000,樣本數大於4的划分結果
data_X,data_Y = loadDataSet("ex2.txt") print(createTree(data_X,data_Y,ops=(1000,4)))
擬合狀態還不錯。
3.設置參數ops(10000,4)---表示誤差大於10000,樣本數大於4的划分結果
data_X,data_Y = loadDataSet("ex2.txt") print(createTree(data_X,data_Y,ops=(10000,4)))
有點欠擬合。
(二)后剪枝
后剪枝通常比預剪枝保留更多的分支,欠擬合風險小。但是后剪枝是在決策樹構造完成后進行的,其訓練時間的開銷會大於預剪枝。
后剪枝是基於已經建立好的樹,進行的葉子節點合並操作。
使用后剪枝方法需要將數據集分為測試集和訓練集。通過訓練集和參數ops使用預剪枝方法構建決策樹。然后使用構建的決策樹和測試集數據進行后剪枝處理
后剪枝算法實現:
#開啟后剪枝處理 def isTree(tree): return type(tree) == dict #是樹的話返回字典,否則是數據 def getMean(tree): #獲取當前樹的合並均值REP---塌陷處理: 我們對一棵樹進行塌陷處理,就是遞歸將這棵樹進行合並返回這棵樹的平均值。 if isTree(tree['right']): tree['right'] = getMean(tree['right']) if isTree(tree['left']): tree['left'] = getMean(tree['left']) return (tree['left'] + tree['right'])/2 #返回均值 def prune(tree,testData_X,testData_Y): #根據決策樹和測試集數據進行后剪枝處理,不能按照訓練集進行后剪枝,因為創建決策樹時預剪枝操作中已經要求子樹誤差值小於根節點 #1.若是當測試集數據為空,則不需要后面的子樹了,直接進行塌陷處理 if testData_X.shape[0] == 0: return getMean(tree) #2.如果當前測試集不為空,而且決策樹含有左/右子樹,則需要進入子樹中進行剪枝操作---這里我們先將測試集數據划分 if isTree(tree['left']) or isTree(tree['right']): TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #3.根據子樹進行下一步剪枝 if isTree(tree['left']): tree['left'] = prune(tree['left'],TestDataGT_X,TestDataGT_Y) #注意:這里是賦值操作,對樹進行剪枝 if isTree(tree['right']): tree['right'] = prune(tree['right'],TestDataLG_X,TestDataLG_Y) #注意:這里是賦值操作,對樹進行剪枝 #4.如果兩個是葉子節點,我們開始計算誤差,進行合並 if not isTree(tree['left']) and not isTree(tree['right']): #先划分測試集數據 TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #進行誤差比較 #4-1.先獲取沒有合並的誤差 errorNoMerge = np.sum(np.power(TestDataGT_Y-tree['left'],2)) + np.sum(np.power(TestDataLG_Y-tree['right'],2)) #4-2.再獲取合並后的誤差 treemean = (tree['left'] + tree['right'])/2 #因為是葉子節點,可以直接計算 errorMerge = np.sum(np.power(testData_Y- treemean,2)) #4-3.進行判斷 if errorMerge < errorNoMerge: #可以剪枝 print("merging") #打印提示信息 return treemean #返回合並后的塌陷值 else: return tree #不進行合並,返回原樹 return tree #返回樹(但是該樹的子樹中可能存在剪枝合並情況由3可以知道
后剪枝算法測試:
data_X,data_Y = loadDataSet("ex2.txt") myTree = createTree(data_X,data_Y,ops=(0,1)) #設置0,1表示不進行預剪枝,我們只對比后剪枝 print(myTree) Testdata_X,Testdata_Y = loadDataSet("ex2test.txt") #獲取測試集,開始進行后剪枝 myTree2 = prune(myTree,Testdata_X,Testdata_Y) print(myTree2)
可以看到進行了大量的剪枝操作!

import numpy as np def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y def regLeaf(data_Y): #用於計算指定樣本中標簽均值表示回歸y值 return np.mean(data_Y) def regErr(data_Y): #使用均方誤差作為划分依據 return np.var(data_Y)*data_Y.size def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行數據集划分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回調方式調用葉節點計算和誤差計算,函數中含有預剪枝操作 :param data_X: 傳入數據集 :param data_Y: 傳入標簽值 :param leafType: 要調用計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函數,這里是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要信息, tolS tolN用於控制函數的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續划分子樹放入createTree方法中,這里可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標簽值全部相同,則返回特征None表示不需要進行下一步划分,返回葉節點 return None,leafType(data_Y) #遍歷獲取最優特征和特征值 TosErr = errType(data_Y) #獲取全部數據集的誤差,后面計算划分后兩個子集的總誤差,如果誤差下降小於tolS,則不進行划分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這里兩個我們設置為0,而不是-1,因為我們必須保證可以取到一個特征(后面循環可能一直continue),我們需要在后面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特征 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #數據集划分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小數據集,不進行計算 continue concErr = errType(dataLg_Y)+errType(dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最后求解的誤差,小於我們要求的誤差距離,則不進行下一步划分數據集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_Y) #3.如果我們上面的數據集本身較小,則無論如何切分,數據集都<tolN,我們就需要在這里再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 數據集划分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小數據集,不進行計算 return None,leafType(data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立回歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞歸建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 數據集划分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree #開啟后剪枝處理 def isTree(tree): return type(tree) == dict #是樹的話返回字典,否則是數據 def getMean(tree): #獲取當前樹的合並均值REP---塌陷處理: 我們對一棵樹進行塌陷處理,就是遞歸將這棵樹進行合並返回這棵樹的平均值。 if isTree(tree['right']): tree['right'] = getMean(tree['right']) if isTree(tree['left']): tree['left'] = getMean(tree['left']) return (tree['left'] + tree['right'])/2 #返回均值 def prune(tree,testData_X,testData_Y): #根據決策樹和測試集數據進行后剪枝處理,不能按照訓練集進行后剪枝,因為創建決策樹時預剪枝操作中已經要求子樹誤差值小於根節點 #1.若是當測試集數據為空,則不需要后面的子樹了,直接進行塌陷處理 if testData_X.shape[0] == 0: return getMean(tree) #2.如果當前測試集不為空,而且決策樹含有左/右子樹,則需要進入子樹中進行剪枝操作---這里我們先將測試集數據划分 if isTree(tree['left']) or isTree(tree['right']): TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #3.根據子樹進行下一步剪枝 if isTree(tree['left']): tree['left'] = prune(tree['left'],TestDataGT_X,TestDataGT_Y) #注意:這里是賦值操作,對樹進行剪枝 if isTree(tree['right']): tree['right'] = prune(tree['right'],TestDataLG_X,TestDataLG_Y) #注意:這里是賦值操作,對樹進行剪枝 #4.如果兩個是葉子節點,我們開始計算誤差,進行合並 if not isTree(tree['left']) and not isTree(tree['right']): #先划分測試集數據 TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #進行誤差比較 #4-1.先獲取沒有合並的誤差 errorNoMerge = np.sum(np.power(TestDataGT_Y-tree['left'],2)) + np.sum(np.power(TestDataLG_Y-tree['right'],2)) #4-2.再獲取合並后的誤差 treemean = (tree['left'] + tree['right'])/2 #因為是葉子節點,可以直接計算 errorMerge = np.sum(np.power(testData_Y- treemean,2)) #4-3.進行判斷 if errorMerge < errorNoMerge: #可以剪枝 print("merging") #打印提示信息 return treemean #返回合並后的塌陷值 else: return tree #不進行合並,返回原樹 return tree #返回樹(但是該樹的子樹中可能存在剪枝合並情況由3可以知道 data_X,data_Y = loadDataSet("ex2.txt") myTree = createTree(data_X,data_Y,ops=(0,1)) #設置0,1表示不進行預剪枝,我們只對比后剪枝 print(myTree) Testdata_X,Testdata_Y = loadDataSet("ex2test.txt") #獲取測試集,開始進行后剪枝 myTree2 = prune(myTree,Testdata_X,Testdata_Y) print(myTree2)
四:模型樹實現
(一)實現模型樹葉節點生成函數和誤差計算函數
import numpy as np import matplotlib.pyplot as plt def linearSolve(data_X,data_Y): X = np.c_[np.ones(data_X.shape[0]), data_X] XTX = X.T @ X if np.linalg.det(XTX) == 0: raise NameError("this matrix can`t inverse") W = np.linalg.inv(XTX) @ (X.T @ data_Y) return W,X,data_Y def modelLeaf(data_X,data_Y): W,X,Y = linearSolve(data_X,data_Y) return W def modelErr(data_X,data_Y): W,X,Y = linearSolve(data_X,data_Y) yPred = X@W return sum(np.power(yPred-data_Y,2))
(二)修改原有函數
def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行數據集划分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回調方式調用葉節點計算和誤差計算,函數中含有預剪枝操作 :param data_X: 傳入數據集 :param data_Y: 傳入標簽值 :param leafType: 要調用計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函數,這里是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要信息, tolS tolN用於控制函數的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續划分子樹放入createTree方法中,這里可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標簽值全部相同,則返回特征None表示不需要進行下一步划分,返回葉節點 return None,leafType(data_X,data_Y) #遍歷獲取最優特征和特征值 TosErr = errType(data_X,data_Y) #獲取全部數據集的誤差,后面計算划分后兩個子集的總誤差,如果誤差下降小於tolS,則不進行划分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這里兩個我們設置為0,而不是-1,因為我們必須保證可以取到一個特征(后面循環可能一直continue),我們需要在后面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特征 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #數據集划分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小數據集,不進行計算 continue concErr = errType(dataLg_X,dataLg_Y)+errType(dataGt_X,dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最后求解的誤差,小於我們要求的誤差距離,則不進行下一步划分數據集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_X,data_Y) #3.如果我們上面的數據集本身較小,則無論如何切分,數據集都<tolN,我們就需要在這里再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 數據集划分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小數據集,不進行計算 return None,leafType(data_X,data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立回歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞歸建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 數據集划分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree #開啟后剪枝處理 def isTree(tree): return type(tree) == dict #是樹的話返回字典,否則是數據 def getMean(tree): #獲取當前樹的合並均值REP---塌陷處理: 我們對一棵樹進行塌陷處理,就是遞歸將這棵樹進行合並返回這棵樹的平均值。 if isTree(tree['right']): tree['right'] = getMean(tree['right']) if isTree(tree['left']): tree['left'] = getMean(tree['left']) return (tree['left'] + tree['right'])/2 #返回均值 def prune(tree,testData_X,testData_Y): #根據決策樹和測試集數據進行后剪枝處理,不能按照訓練集進行后剪枝,因為創建決策樹時預剪枝操作中已經要求子樹誤差值小於根節點 #1.若是當測試集數據為空,則不需要后面的子樹了,直接進行塌陷處理 if testData_X.shape[0] == 0: return getMean(tree) #2.如果當前測試集不為空,而且決策樹含有左/右子樹,則需要進入子樹中進行剪枝操作---這里我們先將測試集數據划分 if isTree(tree['left']) or isTree(tree['right']): TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #3.根據子樹進行下一步剪枝 if isTree(tree['left']): tree['left'] = prune(tree['left'],TestDataGT_X,TestDataGT_Y) #注意:這里是賦值操作,對樹進行剪枝 if isTree(tree['right']): tree['right'] = prune(tree['right'],TestDataLG_X,TestDataLG_Y) #注意:這里是賦值操作,對樹進行剪枝 #4.如果兩個是葉子節點,我們開始計算誤差,進行合並 if not isTree(tree['left']) and not isTree(tree['right']): #先划分測試集數據 TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #進行誤差比較 #4-1.先獲取沒有合並的誤差 errorNoMerge = np.sum(np.power(TestDataGT_Y-tree['left'],2)) + np.sum(np.power(TestDataLG_Y-tree['right'],2)) #4-2.再獲取合並后的誤差 treemean = (tree['left'] + tree['right'])/2 #因為是葉子節點,可以直接計算 errorMerge = np.sum(np.power(testData_Y- treemean,2)) #4-3.進行判斷 if errorMerge < errorNoMerge: #可以剪枝 print("merging") #打印提示信息 return treemean #返回合並后的塌陷值 else: return tree #不進行合並,返回原樹 return tree #返回樹(但是該樹的子樹中可能存在剪枝合並情況由3可以知道
(三)測試函數
data_X,data_Y = loadDataSet("exp2.txt") myTree = createTree(data_X,data_Y,modelLeaf,modelErr,ops=(1,10)) #設置0,1表示不進行預剪枝,我們只對比后剪枝 print(myTree) plt.figure() plt.scatter(data_X.flatten(),data_Y.flatten()) plt.show()
五:實現回歸樹預測,對比決策樹和線性回歸
由於我上面沒有很好的處理回歸樹和模型樹的參數保持一致性,所以這里我對每一個預測使用不同代碼(就是同上面一樣,各自改變了參數,也可以該一下即可)
(一)實現決策樹--回歸樹和模型樹預測函數
#實現預測回歸樹 def regTreeEval(model,data_X): #對於回歸樹,直接返回model(預測值),對於模型樹,通過model和我們傳遞的測試集數據進行預測 return model #實現預測模型樹 def modelTreeEval(model,data_X): #為了使得回歸樹和模型樹保持一致,所以我們上面為regTreeEval加了data_X X = np.c_[np.ones(data_X.shape[0]),data_X] return X@model #開始遞歸預測 def treeForeCast(tree,TestData,modelEval=regTreeEval): if not isTree(tree): return modelEval(tree,TestData) #如果是葉子節點,直接返回預測值 if TestData[tree['feaIdx']] > tree['feaVal']: #如果測試集指定特征上的值大於決策樹特征值,則進入左子樹 if isTree(tree['left']): return treeForeCast(tree['left'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['left'],TestData) else: #進入右子樹 if isTree(tree['right']): return treeForeCast(tree['right'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['right'],TestData) def createForecast(tree,testData_X,modelEval = regTreeEval): #進行測試集數據預測 m,n = testData_X.shape yPred = np.zeros((m,1)) for i in range(m): #開始預測 yPred[i] = treeForeCast(tree,testData_X[i],modelEval) return yPred
(三)測試回歸樹預測結果和測試集標簽相關性(R2越接近1越好)

import numpy as np import matplotlib.pyplot as plt def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y def regLeaf(data_Y): #用於計算指定樣本中標簽均值表示回歸y值 return np.mean(data_Y) def regErr(data_Y): #使用均方誤差作為划分依據 return np.var(data_Y)*data_Y.size def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行數據集划分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回調方式調用葉節點計算和誤差計算,函數中含有預剪枝操作 :param data_X: 傳入數據集 :param data_Y: 傳入標簽值 :param leafType: 要調用計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函數,這里是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要信息, tolS tolN用於控制函數的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續划分子樹放入createTree方法中,這里可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標簽值全部相同,則返回特征None表示不需要進行下一步划分,返回葉節點 return None,leafType(data_Y) #遍歷獲取最優特征和特征值 TosErr = errType(data_Y) #獲取全部數據集的誤差,后面計算划分后兩個子集的總誤差,如果誤差下降小於tolS,則不進行划分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這里兩個我們設置為0,而不是-1,因為我們必須保證可以取到一個特征(后面循環可能一直continue),我們需要在后面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特征 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #數據集划分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小數據集,不進行計算 continue concErr = errType(dataLg_Y)+errType(dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最后求解的誤差,小於我們要求的誤差距離,則不進行下一步划分數據集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_Y) #3.如果我們上面的數據集本身較小,則無論如何切分,數據集都<tolN,我們就需要在這里再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 數據集划分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小數據集,不進行計算 return None,leafType(data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立回歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞歸建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 數據集划分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree #開啟后剪枝處理 def isTree(tree): return type(tree) == dict #是樹的話返回字典,否則是數據 def getMean(tree): #獲取當前樹的合並均值REP---塌陷處理: 我們對一棵樹進行塌陷處理,就是遞歸將這棵樹進行合並返回這棵樹的平均值。 if isTree(tree['right']): tree['right'] = getMean(tree['right']) if isTree(tree['left']): tree['left'] = getMean(tree['left']) return (tree['left'] + tree['right'])/2 #返回均值 def prune(tree,testData_X,testData_Y): #根據決策樹和測試集數據進行后剪枝處理,不能按照訓練集進行后剪枝,因為創建決策樹時預剪枝操作中已經要求子樹誤差值小於根節點 #1.若是當測試集數據為空,則不需要后面的子樹了,直接進行塌陷處理 if testData_X.shape[0] == 0: return getMean(tree) #2.如果當前測試集不為空,而且決策樹含有左/右子樹,則需要進入子樹中進行剪枝操作---這里我們先將測試集數據划分 if isTree(tree['left']) or isTree(tree['right']): TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #3.根據子樹進行下一步剪枝 if isTree(tree['left']): tree['left'] = prune(tree['left'],TestDataGT_X,TestDataGT_Y) #注意:這里是賦值操作,對樹進行剪枝 if isTree(tree['right']): tree['right'] = prune(tree['right'],TestDataLG_X,TestDataLG_Y) #注意:這里是賦值操作,對樹進行剪枝 #4.如果兩個是葉子節點,我們開始計算誤差,進行合並 if not isTree(tree['left']) and not isTree(tree['right']): #先划分測試集數據 TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #進行誤差比較 #4-1.先獲取沒有合並的誤差 errorNoMerge = np.sum(np.power(TestDataGT_Y-tree['left'],2)) + np.sum(np.power(TestDataLG_Y-tree['right'],2)) #4-2.再獲取合並后的誤差 treemean = (tree['left'] + tree['right'])/2 #因為是葉子節點,可以直接計算 errorMerge = np.sum(np.power(testData_Y- treemean,2)) #4-3.進行判斷 if errorMerge < errorNoMerge: #可以剪枝 print("merging") #打印提示信息 return treemean #返回合並后的塌陷值 else: return tree #不進行合並,返回原樹 return tree #返回樹(但是該樹的子樹中可能存在剪枝合並情況由3可以知道 #實現預測回歸樹 def regTreeEval(model,data_X): #對於回歸樹,直接返回model(預測值),對於模型樹,通過model和我們傳遞的測試集數據進行預測 return model #實現預測模型樹 def modelTreeEval(model,data_X): #為了使得回歸樹和模型樹保持一致,所以我們上面為regTreeEval加了data_X X = np.c_[np.ones(data_X.shape[0]),data_X] return X@model #開始遞歸預測 def treeForeCast(tree,TestData,modelEval=regTreeEval): if not isTree(tree): return modelEval(tree,TestData) #如果是葉子節點,直接返回預測值 if TestData[tree['feaIdx']] > tree['feaVal']: #如果測試集指定特征上的值大於決策樹特征值,則進入左子樹 if isTree(tree['left']): return treeForeCast(tree['left'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['left'],TestData) else: #進入右子樹 if isTree(tree['right']): return treeForeCast(tree['right'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['right'],TestData) def createForecast(tree,testData_X,modelEval = regTreeEval): #進行測試集數據預測 m,n = testData_X.shape yPred = np.zeros((m,1)) for i in range(m): #開始預測 yPred[i] = treeForeCast(tree,testData_X[i],modelEval) return yPred data_X,data_Y = loadDataSet("bikeSpeedVsIq_train.txt") #訓練集數據 myTree = createTree(data_X,data_Y,ops=(1,20)) #訓練集數據建決策模型樹 print(myTree) testData_X,testData_Y = loadDataSet('bikeSpeedVsIq_test.txt') #測試集數據 yPred = createForecast(myTree,testData_X) #使用模型樹預測 print(np.corrcoef(yPred,testData_Y,rowvar=0)[0,1]) plt.figure() plt.scatter(data_X.flatten(),data_Y.flatten()) plt.show()
data_X,data_Y = loadDataSet("bikeSpeedVsIq_train.txt") #訓練集數據 myTree = createTree(data_X,data_Y,ops=(1,20)) #訓練集數據建決策模型樹 print(myTree) testData_X,testData_Y = loadDataSet('bikeSpeedVsIq_test.txt') #測試集數據 yPred = createForecast(myTree,testData_X) #使用模型樹預測 print(np.corrcoef(yPred,testData_Y,rowvar=0)[0,1]) plt.figure() plt.scatter(data_X.flatten(),data_Y.flatten()) plt.show()
(四)測試模型樹預測結果和測試集標簽相關性(R2越接近1越好)

import numpy as np import matplotlib.pyplot as plt def loadDataSet(filename): dataSet = np.loadtxt(filename) m,n = dataSet.shape data_X = dataSet[:,0:n-1] data_Y = dataSet[:,n-1] return data_X,data_Y def regLeaf(data_Y): #用於計算指定樣本中標簽均值表示回歸y值 return np.mean(data_Y) def regErr(data_Y): #使用均方誤差作為划分依據 return np.var(data_Y)*data_Y.size def linearSolve(data_X,data_Y): X = np.c_[np.ones(data_X.shape[0]), data_X] XTX = X.T @ X if np.linalg.det(XTX) == 0: raise NameError("this matrix can`t inverse") W = np.linalg.inv(XTX) @ (X.T @ data_Y) return W,X,data_Y def modelLeaf(data_X,data_Y): W,X,Y = linearSolve(data_X,data_Y) return W def modelErr(data_X,data_Y): W,X,Y = linearSolve(data_X,data_Y) yPred = X@W return sum(np.power(yPred-data_Y,2)) def binSplitDataSet(data_X,data_Y,fea_axis,fea_val): #進行數據集划分 dataGtIdx = np.where(data_X[:,fea_axis]>fea_val) dataLgIdx = np.where(data_X[:,fea_axis]<=fea_val) return data_X[dataGtIdx],data_Y[dataGtIdx],data_X[dataLgIdx],data_Y[dataLgIdx] def chooseBestSplit(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): """ 選取的最好切分方式,使用回調方式調用葉節點計算和誤差計算,函數中含有預剪枝操作 :param data_X: 傳入數據集 :param data_Y: 傳入標簽值 :param leafType: 要調用計算的葉節點值 --- 雖然靈活,但是沒必要 :param errType: 要計算誤差的函數,這里是均方誤差 --- 雖然靈活,但是沒必要 :param ops: 包含了兩個重要信息, tolS tolN用於控制函數的停止時機,tolS是容許的誤差下降值,誤差小於則不再切分,tosN是切分的最少樣本數 :return: """ m,n = data_X.shape tolS = ops[0] tolN = ops[1] #之前都是將判斷是否繼續划分子樹放入createTree方法中,這里可以提到chooseBestSplit中進行判別。 #當然可以放入createTree方法中處理 if np.unique(data_Y).size == 1: #1.如果標簽值全部相同,則返回特征None表示不需要進行下一步划分,返回葉節點 return None,leafType(data_X,data_Y) #遍歷獲取最優特征和特征值 TosErr = errType(data_X,data_Y) #獲取全部數據集的誤差,后面計算划分后兩個子集的總誤差,如果誤差下降小於tolS,則不進行划分,返回該葉子節點即可(預剪枝操作) bestErr = np.inf bestFeaIdx = 0 #注意:這里兩個我們設置為0,而不是-1,因為我們必須保證可以取到一個特征(后面循環可能一直continue),我們需要在后面進行額外處理 bestFeaVal = 0 for i in range(n): #遍歷所有特征 for feaval in np.unique(data_X[:,i]): dataGt_X,dataGt_Y,dataLg_X,dataLg_Y = binSplitDataSet(data_X,data_Y,i,feaval) #數據集划分 # print(dataGt_X.shape,dataLg_X.shape) if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: #不符合最小數據集,不進行計算 continue concErr = errType(dataLg_X,dataLg_Y)+errType(dataGt_X,dataGt_Y) # print(concErr) if concErr < bestErr: bestFeaIdx = i bestFeaVal = feaval bestErr = concErr #2.如果最后求解的誤差,小於我們要求的誤差距離,則不進行下一步划分數據集(預剪枝) if (TosErr - bestErr) < tolS: return None,leafType(data_X,data_Y) #3.如果我們上面的數據集本身較小,則無論如何切分,數據集都<tolN,我們就需要在這里再處理一遍,進行一下判斷 dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, bestFeaIdx, bestFeaVal) # 數據集划分 if dataGt_X.shape[0] < tolN or dataLg_X.shape[0] < tolN: # 不符合最小數據集,不進行計算 return None,leafType(data_X,data_Y) return bestFeaIdx,bestFeaVal #正常情況下的返回結果 def createTree(data_X,data_Y,leafType=regLeaf,errType=regErr,ops=(1,4)): #建立回歸樹 feaIdx,feaVal = chooseBestSplit(data_X,data_Y,leafType,errType,ops) if feaIdx == None: #是葉子節點 return feaVal #遞歸建樹 myTree = {} myTree['feaIdx'] = feaIdx myTree['feaVal'] = feaVal dataGt_X, dataGt_Y, dataLg_X, dataLg_Y = binSplitDataSet(data_X, data_Y, feaIdx, feaVal) # 數據集划分 myTree['left'] = createTree(dataGt_X,dataGt_Y,leafType,errType,ops) myTree['right'] = createTree(dataLg_X,dataLg_Y,leafType,errType,ops) return myTree #開啟后剪枝處理 def isTree(tree): return type(tree) == dict #是樹的話返回字典,否則是數據 def getMean(tree): #獲取當前樹的合並均值REP---塌陷處理: 我們對一棵樹進行塌陷處理,就是遞歸將這棵樹進行合並返回這棵樹的平均值。 if isTree(tree['right']): tree['right'] = getMean(tree['right']) if isTree(tree['left']): tree['left'] = getMean(tree['left']) return (tree['left'] + tree['right'])/2 #返回均值 def prune(tree,testData_X,testData_Y): #根據決策樹和測試集數據進行后剪枝處理,不能按照訓練集進行后剪枝,因為創建決策樹時預剪枝操作中已經要求子樹誤差值小於根節點 #1.若是當測試集數據為空,則不需要后面的子樹了,直接進行塌陷處理 if testData_X.shape[0] == 0: return getMean(tree) #2.如果當前測試集不為空,而且決策樹含有左/右子樹,則需要進入子樹中進行剪枝操作---這里我們先將測試集數據划分 if isTree(tree['left']) or isTree(tree['right']): TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #3.根據子樹進行下一步剪枝 if isTree(tree['left']): tree['left'] = prune(tree['left'],TestDataGT_X,TestDataGT_Y) #注意:這里是賦值操作,對樹進行剪枝 if isTree(tree['right']): tree['right'] = prune(tree['right'],TestDataLG_X,TestDataLG_Y) #注意:這里是賦值操作,對樹進行剪枝 #4.如果兩個是葉子節點,我們開始計算誤差,進行合並 if not isTree(tree['left']) and not isTree(tree['right']): #先划分測試集數據 TestDataGT_X,TestDataGT_Y,TestDataLG_X,TestDataLG_Y = binSplitDataSet(testData_X,testData_Y,tree['feaIdx'],tree['feaVal']) #進行誤差比較 #4-1.先獲取沒有合並的誤差 errorNoMerge = np.sum(np.power(TestDataGT_Y-tree['left'],2)) + np.sum(np.power(TestDataLG_Y-tree['right'],2)) #4-2.再獲取合並后的誤差 treemean = (tree['left'] + tree['right'])/2 #因為是葉子節點,可以直接計算 errorMerge = np.sum(np.power(testData_Y- treemean,2)) #4-3.進行判斷 if errorMerge < errorNoMerge: #可以剪枝 print("merging") #打印提示信息 return treemean #返回合並后的塌陷值 else: return tree #不進行合並,返回原樹 return tree #返回樹(但是該樹的子樹中可能存在剪枝合並情況由3可以知道 #實現預測回歸樹 def regTreeEval(model,data_X): #對於回歸樹,直接返回model(預測值),對於模型樹,通過model和我們傳遞的測試集數據進行預測 return model #實現預測模型樹 def modelTreeEval(model,data_X): #為了使得回歸樹和模型樹保持一致,所以我們上面為regTreeEval加了data_X X = np.c_[np.ones(data_X.shape[0]),data_X] return X@model #開始遞歸預測 def treeForeCast(tree,TestData,modelEval=regTreeEval): if not isTree(tree): return modelEval(tree,TestData) #如果是葉子節點,直接返回預測值 if TestData[tree['feaIdx']] > tree['feaVal']: #如果測試集指定特征上的值大於決策樹特征值,則進入左子樹 if isTree(tree['left']): return treeForeCast(tree['left'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['left'],TestData) else: #進入右子樹 if isTree(tree['right']): return treeForeCast(tree['right'],TestData,modelEval) else: #如果左子樹是葉子節點,直接返回預測值 return modelEval(tree['right'],TestData) def createForecast(tree,testData_X,modelEval = regTreeEval): #進行測試集數據預測 m,n = testData_X.shape yPred = np.zeros((m,1)) for i in range(m): #開始預測 yPred[i] = treeForeCast(tree,testData_X[i],modelEval) return yPred data_X,data_Y = loadDataSet("bikeSpeedVsIq_train.txt") #訓練集數據 myTree = createTree(data_X,data_Y,modelLeaf,modelErr,ops=(1,20)) #訓練集數據建決策模型樹 print(myTree) testData_X,testData_Y = loadDataSet('bikeSpeedVsIq_test.txt') #測試集數據 yPred = createForecast(myTree,testData_X,modelTreeEval) #使用模型樹預測 print(np.corrcoef(yPred,testData_Y,rowvar=0)[0,1]) plt.figure() plt.scatter(data_X.flatten(),data_Y.flatten()) plt.show()
data_X,data_Y = loadDataSet("bikeSpeedVsIq_train.txt") #訓練集數據 myTree = createTree(data_X,data_Y,modelLeaf,modelErr,ops=(1,20)) #訓練集數據建決策模型樹 print(myTree) testData_X,testData_Y = loadDataSet('bikeSpeedVsIq_test.txt') #測試集數據 yPred = createForecast(myTree,testData_X,modelTreeEval) #使用模型樹預測 print(np.corrcoef(yPred,testData_Y,rowvar=0)[0,1]) plt.figure() plt.scatter(data_X.flatten(),data_Y.flatten()) plt.show()
可以看到模型樹優於回歸樹
(五)一般線性回歸
利用我們上面實現的linearSolve方法,獲取訓練集的參數向量權重即可!!
data_X,data_Y = loadDataSet("bikeSpeedVsIq_train.txt") #訓練集數據 testData_X,testData_Y = loadDataSet('bikeSpeedVsIq_test.txt') #測試集數據 W,X,Y = linearSolve(data_X,data_Y) yPred2 = np.zeros((testData_X.shape[0],1)) testDX = np.c_[np.ones(testData_X.shape[0]),testData_X] for i in range(testData_X.shape[0]): yPred2[i] = testDX[i]@W print(np.corrcoef(yPred2,testData_Y,rowvar=0)[0,1])
所以,樹回歸方法在預測復雜數據時,會比簡單的線性模型更加有效