問題
實際處理和解決機器學習問題過程中,我們會遇到一些“大數據”問題,比如有上百萬條數據,上千上萬維特征,此時數據存儲已經達到10G這種級別。這種情況下,如果還是直接使用傳統的方式肯定行不通,比如當你想把數據load到內存中轉成numpy數組,你會發現要么創建不了那么大的numpy矩陣,要么直接加載時報MemeryError。
在這種情況下我了解了幾種選擇辦法,1. 對數據進行降維,2. 使用流式或類似流式處理,3. 上大機器,高內存的,或者用spark集群。
文檔
Sklearn里面提供一些流式處理方法。具體可以參考官方文檔:
講解了怎么處理 big data 文件:http://scikit-learn.org/stable/modules/scaling_strategies.html
通過一個例子講解了怎么用:http://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html
簡單介紹
我看了上面兩個文檔,並使用介紹的SGDClassifier進行分類,效果挺好的,這里記錄下用法。
要實現big data的處理,需要滿足三個條件:
1. 有流式數據
2. 能從數據中可以提取出特征
3. 增量學習算法
1. 流式數據
第一個條件,要給算法流式數據或小batch的數據,比如一次提供1000條這樣。這一塊是需要自己寫代碼提供的,可以實現一個生成器,每調用一次提供一份小batch數據。
2. 提取特征
第二個條件,可以使用任何一種sklearn中支持的特征提取方法。對於一些特殊情況,比如特征需要標准化或者是事先不知道特征值的情況下需要特殊處理。
3. 增量學習算法
對於第三個條件,sklearn中提供了很多增量學習算法。雖然不是所有的算法都可以增量學習,但是學習器提供了 partial_fit
的函數的都可以進行增量學習。事實上,使用小batch的數據中進行增量學習(有時候也稱為online learning)是這種學習方式的核心,因為它能讓任何一段時間內內存中只有少量的數據。
sklearn提供很多增量學習算法:
- Classification
- sklearn.naive_bayes.MultinomialNB
- sklearn.naive_bayes.BernoulliNB
- sklearn.linear_model.Perceptron
- sklearn.linear_model.SGDClassifier
- sklearn.linear_model.PassiveAggressiveClassifier
- Regression
- sklearn.linear_model.SGDRegressor
- sklearn.linear_model.PassiveAggressiveRegressor
- Clustering
- sklearn.cluster.MiniBatchKMeans
- Decomposition / feature Extraction
- sklearn.decomposition.MiniBatchDictionaryLearning
- sklearn.decomposition.IncrementalPCA
- sklearn.decomposition.LatentDirichletAllocation
- sklearn.cluster.MiniBatchKMeans
其中對於分類問題,在第一次調用partial_fit
時需要通過classes
參數指定分類的類別。
另外有一點需要考慮,所有的學習器在學習過程中不會對每個樣例賦予同樣的權重。對於感知機,它對於bad樣本會敏感,即使學習器已經學習了很多樣本了,而對於SGD和PassiveAggressive,對於這種情況會更魯棒一點,后者在學習的時候,后來學習樣本的權重會隨着學習器學習率的下降而降低。
實例
這里舉一個實際的例子。我這邊有上G的訓練文件和測試文件,都是csv格式。因為沒法直接都讀進內存處理,所以選擇增量學習的方式處理。
1. 生成一個文件流迭代器
1 def iter_minibatches(data_stream, minibatch_size=1000): 2 ''' 3 迭代器 4 給定文件流(比如一個大文件),每次輸出minibatch_size行,默認選擇1k行 5 將輸出轉化成numpy輸出,返回X, y 6 ''' 7 X = [] 8 y = [] 9 cur_line_num = 0 10 11 csvfile = file(data_stream, 'rb') 12 reader = csv.reader(csvfile) 13 for line in reader: 14 y.append(float(line[0])) 15 X.append(line[1:]) # 這里要將數據轉化成float類型 16 17 cur_line_num += 1 18 if cur_line_num >= minibatch_size: 19 X, y = np.array(X), np.array(y) # 將數據轉成numpy的array類型並返回 20 yield X, y 21 X, y = [], [] 22 cur_line_num = 0 23 csvfile.close() 24 25 # 生成測試文件 26 minibatch_test_iterators = iter_minibatches(test_file, minibatch_size=5000) 27 X_test, y_test = minibatch_test_iterators.next() # 得到一份測試文件
2. 增量訓練
1 from sklearn.linear_model import SGDClassifier 2 sgd_clf = SGDClassifier() # SGDClassifier的參數設置可以參考sklearn官網 3 minibatch_train_iterators = iter_minibatches(data_part_file, minibatch_size=2000) 4 5 for i, (X_train, y_train) in enumerate(minibatch_train_iterators): 6 # 使用 partial_fit ,並在第一次調用 partial_fit 的時候指定 classes 7 sgd_clf.partial_fit(X_train, y_train, classes=np.array([0, 1])) 8 print("{} time".format(i)) # 當前次數 9 print("{} score".format(sgd_clf.score(X_test, y_test))) # 在測試集上看效果
3. 結果
0 time 0.679 score 1 time 0.6954 score 2 time 0.712 score 3 time 0.7248 score ... 57 time 0.745 score 58 time 0.7394 score 59 time 0.7398 score
4. 一點補充
- 當SGD的損失函數為log時,SGD等價於LR。
- 數據只迭代一次分類器可能還沒完全收斂,可以多迭代幾次
- mini-batch的量不要設置太小,太小的話,需要多迭代幾次才能收斂