問題起因
最近要將一個文本分割成好幾個topic,每個topic設計一個regressor,各regressor是相互獨立的,最后匯總所有topic的regressor得到總得預測結果。沒錯!類似bagging ensemble!只是我沒有抽樣。文本不大,大概3000行,topic個數為8,於是我寫了一個串行的程序,一個topic算完之后再算另一個topic。可是我在每個topic中用了GridSearchCV
來調參,又要選特征又要調整regressor的參數,導致參數組合一共有1782種。我真是低估了調參的時間,程序跑了一天一夜最后因為忘記import一個庫導致最終的預測精度沒有算出來。后來想到,既然每個topic的預測都是獨立的,那是不是可以並行呢?
Python中的多線程與多進程
但是聽聞Python的多線程實際上並不能真正利用多核,所以如果使用多線程實際上還是在一個核上做並發處理。不過,如果使用多進程就可以真正利用多核,因為各進程之間是相互獨立的,不共享資源,可以在不同的核上執行不同的進程,達到並行的效果。同時在我的問題中,各topic相互獨立,不涉及進程間的通信,只需最后匯總結果,因此使用多進程是個不錯的選擇。
multiprocessing
一個子進程
multiprocessing模塊提供process類實現新建進程。下述代碼是新建一個子進程。
1 from multiprocessing import Process 2 3 def f(name): 4 print 'hello', name 5 6 if __name__ == '__main__': 7 p = Process(target=f, args=('bob',)) # 新建一個子進程p,目標函數是f,args是函數f的參數列表 8 p.start() # 開始執行進程 9 p.join() # 等待子進程結束
上述代碼中p.join()
的意思是等待子進程結束后才執行后續的操作,一般用於進程間通信。例如有一個讀進程pw和一個寫進程pr,在調用pw之前需要先寫pr.join()
,表示等待寫進程結束之后才開始執行讀進程。
多個子進程
如果要同時創建多個子進程可以使用multiprocessing.Pool
類。該類可以創建一個進程池,然后在多個核上執行這些進程。
import multiprocessing import time def func(msg): print multiprocessing.current_process().name + '-' + msg if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 創建4個進程 for i in xrange(10): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) pool.close() # 關閉進程池,表示不能在往進程池中添加進程 pool.join() # 等待進程池中的所有進程執行完畢,必須在close()之后調用 print "Sub-process(es) done."
輸出結果如下:
1 Sub-process(es) done. 2 PoolWorker-34-hello 1 3 PoolWorker-33-hello 0 4 PoolWorker-35-hello 2 5 PoolWorker-36-hello 3 6 PoolWorker-34-hello 7 7 PoolWorker-33-hello 4 8 PoolWorker-35-hello 5 9 PoolWorker-36-hello 6 10 PoolWorker-33-hello 8 11 PoolWorker-36-hello 9
上述代碼中的pool.apply_async()
是apply()
函數的變體,apply_async()
是apply()
的並行版本,apply()
是apply_async()
的阻塞版本,使用apply()
主進程會被阻塞直到函數執行結束,所以說是阻塞版本。apply()
既是Pool
的方法,也是Python內置的函數,兩者等價。可以看到輸出結果並不是按照代碼for循環中的順序輸出的。
多個子進程並返回值
apply_async()
本身就可以返回被進程調用的函數的返回值。上一個創建多個子進程的代碼中,如果在函數func
中返回一個值,那么pool.apply_async(func, (msg, ))
的結果就是返回pool中所有進程的值的對象(注意是對象,不是值本身)。
1 import multiprocessing 2 import time 3 4 def func(msg): 5 return multiprocessing.current_process().name + '-' + msg 6 7 if __name__ == "__main__": 8 pool = multiprocessing.Pool(processes=4) # 創建4個進程 9 results = [] 10 for i in xrange(10): 11 msg = "hello %d" %(i) 12 results.append(pool.apply_async(func, (msg, ))) 13 pool.close() # 關閉進程池,表示不能再往進程池中添加進程,需要在join之前調用 14 pool.join() # 等待進程池中的所有進程執行完畢 15 print ("Sub-process(es) done.") 16 17 for res in results: 18 print (res.get())
上述代碼輸出結果如下:
1 Sub-process(es) done. 2 PoolWorker-37-hello 0 3 PoolWorker-38-hello 1 4 PoolWorker-39-hello 2 5 PoolWorker-40-hello 3 6 PoolWorker-37-hello 4 7 PoolWorker-38-hello 5 8 PoolWorker-39-hello 6 9 PoolWorker-37-hello 7 10 PoolWorker-40-hello 8 11 PoolWorker-38-hello 9
與之前的輸出不同,這次的輸出是有序的。
如果電腦是八核,建立8個進程,在Ubuntu下輸入top命令再按下大鍵盤的1,可以看到每個CPU的使用率是比較平均的,如下圖:
在system monitor中也可以清楚看到執行多進程前后CPU使用率曲線的差異。