隨機森林
概論
前提
Random Forest:可以理解為Bagging with CARTS.
Bagging是bootstrap aggregating(引導聚集算法)的縮寫。
CART(classification and regression Tree)分類和回歸樹,二分類樹。
這里涉及到集成式學習的概念,集成學習可以分為Bagging和Boosting.
Bagging:自放回式采樣,一種弱分類器,采用少數服從多數的機制,並行式運算。
Boosting:自適應的集成學習,順序迭代,串行式運算。代表算法AdaBoost(Adaptive Boosting)
CART采用分而治之的策略。
回歸樹:采用分治策略,對於無法用唯一的全局線性回歸來優化的目標進行分而治之,進而取得比較准確的結果。但分段后取均值並不是一個明智的選擇,可以考慮將葉節點設置成一個線性函數,即分段線性模型樹。
算法介紹鏈接:https://www.tuicool.com/articles/iiUfeim
數據集出處:https://archive.ics.uci.edu/ml/datasets/Connectionist+Bench+(Sonar,+Mines+vs.+Rocks)
python三維向量轉二維向量
運行:print(sum([[[1,2,3],[4,5,5]],[[1,2,3],[4,5,5]]],[]))
輸出:[[1, 2, 3], [4, 5, 5], [1, 2, 3], [4, 5, 5]]
python中多個實參,放到一個元組里面,以*開頭,可以傳多個參數
*args:(表示的就是將實參中按照位置傳值,多出來的值都給args,且以元祖的方式呈現)
分類回歸效果的判斷指標:
Information Entropy(信息熵)、Gini Index(基尼指數)、Gini Split(基尼分割)、Misclassification Error(錯誤分類)
以上判斷數值越小,模型的效果越好
Information Gain(信息增益),數值越大,效果越好
實戰
數據集說明
【sonar-all-data.csv】
60 個輸入變量表示聲納從不同角度返回的強度。這是一個二元分類問題(binary classification problem),要求模型能夠區分出岩石和金屬柱體的不同材質和形狀,總共有 208 個觀測樣本。
附代碼
#coding = utf-8 from random import seed from random import randrange from csv import reader from math import sqrt from math import log class randomForest: def __init__(self): print('randomforest==start==') print(seed(1)) #導入數據 def load_csv(self,filename): dataset = list() with open(filename, 'r') as file: csv_reader = reader(file) for row in csv_reader: if not row: continue dataset.append(row) return dataset #Convert string column to integer def str_column_to_float(self,dataset,column): for row in dataset: row[column] = float(row[column].strip()) #Convert String column to integer def str_column_to_int(self,dataset,column): class_values = [row[column] for row in dataset] unique = set(class_values) lookup = dict() #enumerate()用於將一個可遍歷的數據對象組合成一個索引序列 for i, value in enumerate(unique): lookup[value] = i for row in dataset: row[column] = lookup[row[column]] return lookup #Create a random subsample from the dataset with replacement #創建隨機子樣本 def subsample(self,dataset,ratio): sample = list() #round()方法返回浮點數x的四舍五入值 n_sample = round(len(dataset)* ratio) # print(n_sample) while len(sample)< n_sample: #有放回的隨機采樣,有一些樣本被重復采樣,從而在訓練集中多次出現,有的則從未在訓練集中出現。 #此方法即為自助采樣法,從而保證每顆決策樹訓練集的差異性 index = randrange(len(dataset)) sample.append(dataset[index]) return sample #Split a dataset based on an attribute and an attribute value #根據特征和特征值分割數據集 def test_split(self,index,value,dataset): left, right = list(), list() for row in dataset: if row[index] < value: left.append(row) else: right.append(row) return left,right #計算基尼指數 def gini_index(self, groups,class_values): gini = 0.0 # print(groups) # print(len(class_values))輸出:166 for class_value in class_values: for group in groups: size = len(group) if size == 0: continue # print(class_value)輸出:{'M'} # print(group) # exit() # print(size)輸出:109 #list count()統計某個元素出現在列表中的次數 proportion = [row[-1] for row in group].count(class_value)/float(size) # print(proportion) gini += (proportion * (1.0 - proportion)) # print(gini) return gini #Select the best split point for a dataset #找出分割數據集的最優特征,得到最優的特征index,特征值row[index],以及分割完的數據groups(left,right) def get_split(self,dataset,n_features): #class_values =[0,1] class_values = list(set(row[-1]) for row in dataset) b_index, b_value, b_score,b_group = 999,999,999,None features = list() #n_features特征值 while len(features) < n_features: #往features添加n_features個特征(n_features等於特征數的根號),特征索引從dataset中隨機取 index = randrange(len(dataset[0]) - 1) if index not in features: features.append(index) #在n_features個特征中選出最優的特征索引,並沒有遍歷所有特征,從而保證每個決策樹的差異 for index in features: for row in dataset: #groups = (left, right);row[index]遍歷每一行index索引下的特征值作為分類值values, #找出最優的分類特征和特征值 groups = self.test_split(index,row[index],dataset) # print(groups)輸出格式:[[]],[[]] gini = self.gini_index(groups, class_values) # print(gini) if gini < b_score: #最后得到最優的分類特征b_index,分類特征值b_value,分類結果b_groups。 b_value為分錯的代價成本 b_index,b_value,b_score,b_groups = index, row[index], gini, groups return {'index':b_index, 'value':b_value, 'groups':b_groups} #創建一個終端節點 #輸出group中出現次數最多的標簽 def to_terminal(self,group): outcomes = [row[-1] for row in group] #max()函數中,當key函數不為空時,就以key的函數對象為判斷的標准 # print(outcomes) return max(set(outcomes), key = outcomes.count) #創建子分割器,遞歸分類,直到分類結束 def split(self, node, max_depth, min_size, n_features, depth): #max_depth = 10 ,min_size = 1, n_features = int(sqrt(len(dataset[0])) - 1) left,right = node['groups'] # print('node[groups]====') del(node['groups']) #檢查左右分支 if not left or not right: node['left'] = node['right'] = self.to_terminal(left+right) return #檢查迭代次數,表示遞歸十次后,若分類還沒結束,則選取數據中分類標簽較多的作為結果,使分類提前結束,防止過擬合。 if depth >= max_depth: node['left'], node['right'] = self.to_terminal(left), self.to_terminal(right) #加工左子樹 if len(left)<= min_size: node['left'] = self.to_terminal(left) else: # print('左子樹遞歸') # node['left']是一個字典,形式為{'index':b_index,'value':b_value,'groups':b_groups},所以node是一個多層字典 node['left'] = self.get_split(left, n_features) #遞歸,depth+1計算遞歸層數 self.split(node['left'],max_depth,min_size,n_features,depth+1) #加工右子樹 if len(right) <= min_size: node['right'] = self.to_terminal(right) else: # print('右子樹遞歸') node['right'] = self.get_split(right,n_features) self.split(node['right'],max_depth,min_size,n_features,depth+1) #build a decision tree,建立一個決策樹 def build_tree(self,train,max_depth,min_size,n_features): #找出最優的分割點 root = self.get_split(train,n_features) # print(root) #創建子分類器,遞歸分類,直到分類結束。 self.split(root, max_depth,min_size,n_features, 1) return root # exit() #用決策樹進行預測,預測模型的分類結果 def predict(self,node,row): if row[node['index']] < node['value']: if isinstance(node['left'], dict): return self.predict(node['left'], row) else: return node['left'] else: if isinstance(node['right'],dict): return self.predict(node['right'],row) else: return node['right'] #用一系列的套袋樹進行預測 def bagging_predict(self, trees,row): #使用多個決策樹trees對測試集test的第row行進行預測,再使用簡單投票法判斷出該行所屬的分類 predictions = [self.predict(tree, row) for tree in trees] return max(set(predictions), key = predictions.count) #Random Forest Algorithm,隨機森林算法 def random_forest(self,train, test, max_depth, min_size,sample_size,n_trees,n_features): trees = list() #n_trees表示決策樹的數量 for i in range(n_trees): #隨機采樣,保證每顆決策樹訓練集的差異性 #sample_size采樣速率 print('訓練集長度=',len(train)) #創建隨機子樣本 sample = self.subsample(train, sample_size) #建立一個決策樹 tree = self.build_tree(sample,max_depth,min_size,n_features) # print(tree) trees.append(tree) ##用一系列的套袋樹進行預測 predictions = [self.bagging_predict(trees, row) for row in test] # print(predictions) return(predictions) # exit() #Split a dataset into k folds ''' 將數量集dataset分成n_flods份,每份包含len(dataset)/ n_folds個值,每個值由dataset數據集的內容隨機產生,每個值被調用一次 ''' def cross_validation_split(self,dataset,n_folds): dataset_split = list() #復制一份dataset,防止dataset的內容改變 dataset_copy = list(dataset) #每份的數據量 fold_size = len(dataset)/n_folds # print(dataset_copy) print('每份的長度',fold_size ) print('dataset_copy長度=',len(dataset_copy)) for i in range(n_folds): #每次循環fold清零,防止重復導入dataset_split fold = list() #隨機抽取數據,不斷地往fold中添加數據 while len(fold) < fold_size: #指定遞增基數集合中的一個隨機數,基數缺省值為1 # print('dataset長度=',len(dataset_copy)) if(len(dataset_copy)==0): break index = randrange(len(dataset_copy)) #將對應索引index的內容從dataset_copy中導出,並將該內容從dataset_copy中刪除。 #pop()函數用於移除列表中的一個元素,並返回該元素的值。 fold.append(dataset_copy.pop(index)) # print(len(fold)) dataset_split.append(fold) print('i===',i) #dataset分割出的n_flods個數據構成的列表,為了用於交叉驗證 return dataset_split #計算精度百分比,導入實際值和預測值,計算精確度 def accuracy_metric(self, actual,predicted): correct = 0 for i in range(len(actual)): if actual[i] == predicted[i]: correct +=1 return correct/float(len(actual)) * 100.0 def evaluate_algorithm(self, dataset, algorithm, n_folds, *args): folds = self.cross_validation_split(dataset, n_folds) scores = list() #每次循環從folds取出一個fold作為測試集,其余作為訓練集,遍歷整個folds,實現交叉驗證 for fold in folds: train_set = list(folds) train_set.remove(fold) #sum三維向量轉二維數組,將多個fold組合成一個train_set列表 train_set = sum(train_set,[]) test_set = list() #fold表示從原始數據集dataset提取出來的測試集 for row in fold: row_copy = list(row) test_set.append(row_copy) row_copy[-1] = None predicted = algorithm(train_set,test_set,*args) print('交叉驗證RF的預測值=',predicted) actual = [row[-1] for row in fold] accuracy = self.accuracy_metric(actual, predicted) scores.append(accuracy) return scores if __name__ == '__main__': rf = randomForest() #load data filename = 'sonar-all-data.csv' dataset = rf.load_csv(filename) # print(dataset) #整個矩陣,按列從左到右轉化 for i in range(0,len(dataset[0]) - 1): #將str類型轉變為float rf.str_column_to_float(dataset, i) # print(dataset) #將最后一列表示表示標簽的值轉化為Int類型0,1 # str_column_to_int(dataset, len(dataset[0]) - 1 ) #evaluate algorithm算法評估 #分成5份,進行交叉驗證 n_folds = 5 #迭代次數 max_depth = 10 min_size = 1 sample_size = 1.0 #調參,TODO,准確性與多樣性之間的權衡 n_features = 15 # n_features = int (sqrt(len(dataset[0]) - 1)) #隨機森林的樹的選擇,理論上越多越好 for n_trees in [1,10,20]: #python中將函數作為另一個函數的參數傳入 scores = rf.evaluate_algorithm(dataset, rf.random_forest, n_folds,max_depth, min_size,sample_size,n_trees,n_features) print('Trees:%d' % n_trees) print('Scores:%s' % scores) print('Mean Accuracy: %.3f%%' % (sum(scores)/float(len(scores)))) exit()