目錄
1 使用sklearn進行數據挖掘
1.1 數據挖掘的步驟
1.2 數據初貌
1.3 關鍵技術
2 並行處理
2.1 整體並行處理
2.2 部分並行處理
3 流水線處理
4 自動化調參
5 持久化
6 回顧
7 總結
8 參考資料
1 使用sklearn進行數據挖掘
1.1 數據挖掘的步驟
數據挖掘通常包括數據采集,數據分析,特征工程,訓練模型,模型評估等步驟。使用sklearn工具可以方便地進行特征工程和模型訓練工作,在《使用sklearn做單機特征工程》中,我們最后留下了一些疑問:特征處理類都有三個方法fit、transform和fit_transform,fit方法居然和模型訓練方法fit同名(不光同名,參數列表都一樣),這難道都是巧合?
顯然,這不是巧合,這正是sklearn的設計風格。我們能夠更加優雅地使用sklearn進行特征工程和模型訓練工作。此時,不妨從一個基本的數據挖掘場景入手:
我們使用sklearn進行虛線框內的工作(sklearn也可以進行文本特征提取)。通過分析sklearn源碼,我們可以看到除訓練,預測和評估以外,處理其他工作的類都實現了3個方法:fit、transform和fit_transform。從命名中可以看到,fit_transform方法是先調用fit然后調用transform,我們只需要關注fit方法和transform方法即可。
transform方法主要用來對特征進行轉換。從可利用信息的角度來說,轉換分為無信息轉換和有信息轉換。無信息轉換是指不利用任何其他信息進行轉換,比如指數、對數函數轉換等。有信息轉換從是否利用目標值向量又可分為無監督轉換和有監督轉換。無監督轉換指只利用特征的統計信息的轉換,統計信息包括均值、標准差、邊界等等,比如標准化、PCA法降維等。有監督轉換指既利用了特征信息又利用了目標值信息的轉換,比如通過模型選擇特征、LDA法降維等。通過總結常用的轉換類,我們得到下表:
包 | 類 | 參數列表 | 類別 | fit方法有用 | 說明 |
sklearn.preprocessing | StandardScaler | 特征 | 無監督 | Y | 標准化 |
sklearn.preprocessing | MinMaxScaler | 特征 | 無監督 | Y | 區間縮放 |
sklearn.preprocessing | Normalizer | 特征 | 無信息 | N | 歸一化 |
sklearn.preprocessing | Binarizer | 特征 | 無信息 | N | 定量特征二值化 |
sklearn.preprocessing | OneHotEncoder | 特征 | 無監督 | Y | 定性特征編碼 |
sklearn.preprocessing | Imputer | 特征 | 無監督 | Y | 缺失值計算 |
sklearn.preprocessing | PolynomialFeatures | 特征 | 無信息 | N | 多項式變換(fit方法僅僅生成了多項式的表達式) |
sklearn.preprocessing | FunctionTransformer | 特征 | 無信息 | N | 自定義函數變換(自定義函數在transform方法中調用) |
sklearn.feature_selection | VarianceThreshold | 特征 | 無監督 | Y | 方差選擇法 |
sklearn.feature_selection | SelectKBest | 特征/特征+目標值 | 無監督/有監督 | Y | 自定義特征評分選擇法 |
sklearn.feature_selection | SelectKBest+chi2 | 特征+目標值 | 有監督 | Y | 卡方檢驗選擇法 |
sklearn.feature_selection | RFE | 特征+目標值 | 有監督 | Y | 遞歸特征消除法 |
sklearn.feature_selection | SelectFromModel | 特征+目標值 | 有監督 | Y | 自定義模型訓練選擇法 |
sklearn.decomposition | PCA | 特征 | 無監督 | Y | PCA降維 |
sklearn.lda | LDA | 特征+目標值 | 有監督 | Y | LDA降維 |
不難看到,只有有信息的轉換類的fit方法才實際有用,顯然fit方法的主要工作是獲取特征信息和目標值信息,在這點上,fit方法和模型訓練時的fit方法就能夠聯系在一起了:都是通過分析特征和目標值,提取有價值的信息,對於轉換類來說是某些統計量,對於模型來說可能是特征的權值系數等。另外,只有有監督的轉換類的fit和transform方法才需要特征和目標值兩個參數。fit方法無用不代表其沒實現,而是除合法性校驗以外,其並沒有對特征和目標值進行任何處理,Normalizer的fit方法實現如下:
1 def fit(self, X, y=None): 2 """Do nothing and return the estimator unchanged 3 This method is just there to implement the usual API and hence 4 work in pipelines. 5 """ 6 X = check_array(X, accept_sparse='csr') 7 return self
基於這些特征處理工作都有共同的方法,那么試想可不可以將他們組合在一起?在本文假設的場景中,我們可以看到這些工作的組合形式有兩種:流水線式和並行式。基於流水線組合的工作需要依次進行,前一個工作的輸出是后一個工作的輸入;基於並行式的工作可以同時進行,其使用同樣的輸入,所有工作完成后將各自的輸出合並之后輸出。sklearn提供了包pipeline來完成流水線式和並行式的工作。
1.2 數據初貌
在此,我們仍然使用IRIS數據集來進行說明。為了適應提出的場景,對原數據集需要稍微加工:
1 from numpy import hstack, vstack, array, median, nan 2 from numpy.random import choice 3 from sklearn.datasets import load_iris 4 5 #特征矩陣加工 6 #使用vstack增加一行含缺失值的樣本(nan, nan, nan, nan) 7 #使用hstack增加一列表示花的顏色(0-白、1-黃、2-紅),花的顏色是隨機的,意味着顏色並不影響花的分類 8 iris.data = hstack((choice([0, 1, 2], size=iris.data.shape[0]+1).reshape(-1,1), vstack((iris.data, array([nan, nan, nan, nan]).reshape(1,-1))))) 9 #目標值向量加工 10 #增加一個目標值,對應含缺失值的樣本,值為眾數 11 iris.target = hstack((iris.target, array([median(iris.target)])))
1.3 關鍵技術
並行處理,流水線處理,自動化調參,持久化是使用sklearn優雅地進行數據挖掘的核心。並行處理和流水線處理將多個特征處理工作,甚至包括模型訓練工作組合成一個工作(從代碼的角度來說,即將多個對象組合成了一個對象)。在組合的前提下,自動化調參技術幫我們省去了人工調參的反鎖。訓練好的模型是貯存在內存中的數據,持久化能夠將這些數據保存在文件系統中,之后使用時無需再進行訓練,直接從文件系統中加載即可。
2 並行處理
並行處理使得多個特征處理工作能夠並行地進行。根據對特征矩陣的讀取方式不同,可分為整體並行處理和部分並行處理。整體並行處理,即並行處理的每個工作的輸入都是特征矩陣的整體;部分並行處理,即可定義每個工作需要輸入的特征矩陣的列。
2.1 整體並行處理
pipeline包提供了FeatureUnion類來進行整體並行處理:
1 from numpy import log1p 2 from sklearn.preprocessing import FunctionTransformer 3 from sklearn.preprocessing import Binarizer 4 from sklearn.pipeline import FeatureUnion 5 6 #新建將整體特征矩陣進行對數函數轉換的對象 7 step2_1 = ('ToLog', FunctionTransformer(log1p)) 8 #新建將整體特征矩陣進行二值化類的對象 9 step2_2 = ('ToBinary', Binarizer()) 10 #新建整體並行處理對象 11 #該對象也有fit和transform方法,fit和transform方法均是並行地調用需要並行處理的對象的fit和transform方法 12 #參數transformer_list為需要並行處理的對象列表,該列表為二元組列表,第一元為對象的名稱,第二元為對象 13 step2 = ('FeatureUnion', FeatureUnion(transformer_list=[step2_1, step2_2, step2_3]))
2.2 部分並行處理
整體並行處理有其缺陷,在一些場景下,我們只需要對特征矩陣的某些列進行轉換,而不是所有列。pipeline並沒有提供相應的類(僅OneHotEncoder類實現了該功能),需要我們在FeatureUnion的基礎上進行優化:

1 from sklearn.pipeline import FeatureUnion, _fit_one_transformer, _fit_transform_one, _transform_one 2 from sklearn.externals.joblib import Parallel, delayed 3 from scipy import sparse 4 import numpy as np 5 6 #部分並行處理,繼承FeatureUnion 7 class FeatureUnionExt(FeatureUnion): 8 #相比FeatureUnion,多了idx_list參數,其表示每個並行工作需要讀取的特征矩陣的列 9 def __init__(self, transformer_list, idx_list, n_jobs=1, transformer_weights=None): 10 self.idx_list = idx_list 11 FeatureUnion.__init__(self, transformer_list=map(lambda trans:(trans[0], trans[1]), transformer_list), n_jobs=n_jobs, transformer_weights=transformer_weights) 12 13 #由於只部分讀取特征矩陣,方法fit需要重構 14 def fit(self, X, y=None): 15 transformer_idx_list = map(lambda trans, idx:(trans[0], trans[1], idx), self.transformer_list, self.idx_list) 16 transformers = Parallel(n_jobs=self.n_jobs)( 17 #從特征矩陣中提取部分輸入fit方法 18 delayed(_fit_one_transformer)(trans, X[:,idx], y) 19 for name, trans, idx in transformer_idx_list) 20 self._update_transformer_list(transformers) 21 return self 22 23 #由於只部分讀取特征矩陣,方法fit_transform需要重構 24 def fit_transform(self, X, y=None, **fit_params): 25 transformer_idx_list = map(lambda trans, idx:(trans[0], trans[1], idx), self.transformer_list, self.idx_list) 26 result = Parallel(n_jobs=self.n_jobs)( 27 #從特征矩陣中提取部分輸入fit_transform方法 28 delayed(_fit_transform_one)(trans, name, X[:,idx], y, 29 self.transformer_weights, **fit_params) 30 for name, trans, idx in transformer_idx_list) 31 32 Xs, transformers = zip(*result) 33 self._update_transformer_list(transformers) 34 if any(sparse.issparse(f) for f in Xs): 35 Xs = sparse.hstack(Xs).tocsr() 36 else: 37 Xs = np.hstack(Xs) 38 return Xs 39 40 #由於只部分讀取特征矩陣,方法transform需要重構 41 def transform(self, X): 42 transformer_idx_list = map(lambda trans, idx:(trans[0], trans[1], idx), self.transformer_list, self.idx_list) 43 Xs = Parallel(n_jobs=self.n_jobs)( 44 #從特征矩陣中提取部分輸入transform方法 45 delayed(_transform_one)(trans, name, X[:,idx], self.transformer_weights) 46 for name, trans, idx in transformer_idx_list) 47 if any(sparse.issparse(f) for f in Xs): 48 Xs = sparse.hstack(Xs).tocsr() 49 else: 50 Xs = np.hstack(Xs) 51 return Xs
在本文提出的場景中,我們對特征矩陣的第1列(花的顏色)進行定性特征編碼,對第2、3、4列進行對數函數轉換,對第5列進行定量特征二值化處理。使用FeatureUnionExt類進行部分並行處理的代碼如下:
1 from numpy import log1p 2 from sklearn.preprocessing import OneHotEncoder 3 from sklearn.preprocessing import FunctionTransformer 4 from sklearn.preprocessing import Binarizer 5 6 #新建將部分特征矩陣進行定性特征編碼的對象 7 step2_1 = ('OneHotEncoder', OneHotEncoder(sparse=False)) 8 #新建將部分特征矩陣進行對數函數轉換的對象 9 step2_2 = ('ToLog', FunctionTransformer(log1p)) 10 #新建將部分特征矩陣進行二值化類的對象 11 step2_3 = ('ToBinary', Binarizer()) 12 #新建部分並行處理對象 13 #參數transformer_list為需要並行處理的對象列表,該列表為二元組列表,第一元為對象的名稱,第二元為對象 14 #參數idx_list為相應的需要讀取的特征矩陣的列 15 step2 = ('FeatureUnionExt', FeatureUnionExt(transformer_list=[step2_1, step2_2, step2_3], idx_list=[[0], [1, 2, 3], [4]]))
3 流水線處理
pipeline包提供了Pipeline類來進行流水線處理。流水線上除最后一個工作以外,其他都要執行fit_transform方法,且上一個工作輸出作為下一個工作的輸入。最后一個工作必須實現fit方法,輸入為上一個工作的輸出;但是不限定一定有transform方法,因為流水線的最后一個工作可能是訓練!
根據本文提出的場景,結合並行處理,構建完整的流水線的代碼如下:
1 from numpy import log1p 2 from sklearn.preprocessing import Imputer 3 from sklearn.preprocessing import OneHotEncoder 4 from sklearn.preprocessing import FunctionTransformer 5 from sklearn.preprocessing import Binarizer 6 from sklearn.preprocessing import MinMaxScaler 7 from sklearn.feature_selection import SelectKBest 8 from sklearn.feature_selection import chi2 9 from sklearn.decomposition import PCA 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.pipeline import Pipeline 12 13 #新建計算缺失值的對象 14 step1 = ('Imputer', Imputer()) 15 #新建將部分特征矩陣進行定性特征編碼的對象 16 step2_1 = ('OneHotEncoder', OneHotEncoder(sparse=False)) 17 #新建將部分特征矩陣進行對數函數轉換的對象 18 step2_2 = ('ToLog', FunctionTransformer(log1p)) 19 #新建將部分特征矩陣進行二值化類的對象 20 step2_3 = ('ToBinary', Binarizer()) 21 #新建部分並行處理對象,返回值為每個並行工作的輸出的合並 22 step2 = ('FeatureUnionExt', FeatureUnionExt(transformer_list=[step2_1, step2_2, step2_3], idx_list=[[0], [1, 2, 3], [4]])) 23 #新建無量綱化對象 24 step3 = ('MinMaxScaler', MinMaxScaler()) 25 #新建卡方校驗選擇特征的對象 26 step4 = ('SelectKBest', SelectKBest(chi2, k=3)) 27 #新建PCA降維的對象 28 step5 = ('PCA', PCA(n_components=2)) 29 #新建邏輯回歸的對象,其為待訓練的模型作為流水線的最后一步 30 step6 = ('LogisticRegression', LogisticRegression(penalty='l2')) 31 #新建流水線處理對象 32 #參數steps為需要流水線處理的對象列表,該列表為二元組列表,第一元為對象的名稱,第二元為對象 33 pipeline = Pipeline(steps=[step1, step2, step3, step4, step5, step6])
4 自動化調參
網格搜索為自動化調參的常見技術之一,grid_search包提供了自動化調參的工具,包括GridSearchCV類。對組合好的對象進行訓練以及調參的代碼如下:
1 from sklearn.grid_search import GridSearchCV 2 3 #新建網格搜索對象 4 #第一參數為待訓練的模型 5 #param_grid為待調參數組成的網格,字典格式,鍵為參數名稱(格式“對象名稱__子對象名稱__參數名稱”),值為可取的參數值列表 6 grid_search = GridSearchCV(pipeline, param_grid={'FeatureUnionExt__ToBinary__threshold':[1.0, 2.0, 3.0, 4.0], 'LogisticRegression__C':[0.1, 0.2, 0.4, 0.8]}) 7 #訓練以及調參 8 grid_search.fit(iris.data, iris.target)
5 持久化
externals.joblib包提供了dump和load方法來持久化和加載內存數據:
1 #持久化數據 2 #第一個參數為內存中的對象 3 #第二個參數為保存在文件系統中的名稱 4 #第三個參數為壓縮級別,0為不壓縮,3為合適的壓縮級別 5 dump(grid_search, 'grid_search.dmp', compress=3) 6 #從文件系統中加載數據到內存中 7 grid_search = load('grid_search.dmp')
6 回顧
包 | 類或方法 | 說明 |
sklearn.pipeline | Pipeline | 流水線處理 |
sklearn.pipeline | FeatureUnion | 並行處理 |
sklearn.grid_search | GridSearchCV | 網格搜索調參 |
externals.joblib | dump | 數據持久化 |
externals.joblib | load | 從文件系統中加載數據至內存 |
注意:組合和持久化都會涉及pickle技術,在sklearn的技術文檔中有說明,將lambda定義的函數作為FunctionTransformer的自定義轉換函數將不能pickle化。
7 總結
2015年我設計了一個基於sklearn的自動化特征工程的工具,其以Mysql數據庫作為原始數據源,提供了“靈活的”特征提取、特征處理的配置方法,同時重新封裝了數據、特征和模型,以方便調度系統識別。說靈活,其實也只是通過配置文件的方式定義每個特征的提取和處理的sql語句。但是純粹使用sql語句來進行特征處理是很勉強的,除去特征提取以外,我又造了一回輪子,原來sklearn提供了這么優秀的特征處理、工作組合等功能。所以,我在這個博客中先不提任何算法和模型,先從數據挖掘工作的第一步開始,使用基於Python的各個工具把大部分步驟都走了一遍(抱歉,我暫時忽略了特征提取),希望這樣的梳理能夠少讓初學者走彎路吧。