Python多進程庫multiprocessing中進程池Pool類的使用


問題起因

最近要將一個文本分割成好幾個topic,每個topic設計一個regressor,各regressor是相互獨立的,最后匯總所有topic的regressor得到總得預測結果。沒錯!類似bagging ensemble!只是我沒有抽樣。文本不大,大概3000行,topic個數為8,於是我寫了一個串行的程序,一個topic算完之后再算另一個topic。可是我在每個topic中用了GridSearchCV來調參,又要選特征又要調整regressor的參數,導致參數組合一共有1782種。我真是低估了調參的時間,程序跑了一天一夜最后因為忘記import一個庫導致最終的預測精度沒有算出來。后來想到,既然每個topic的預測都是獨立的,那是不是可以並行呢?

Python中的多線程與多進程

但是聽聞Python的多線程實際上並不能真正利用多核,所以如果使用多線程實際上還是在一個核上做並發處理。不過,如果使用多進程就可以真正利用多核,因為各進程之間是相互獨立的,不共享資源,可以在不同的核上執行不同的進程,達到並行的效果。同時在我的問題中,各topic相互獨立,不涉及進程間的通信,只需最后匯總結果,因此使用多進程是個不錯的選擇。

multiprocessing

一個子進程

multiprocessing模塊提供process類實現新建進程。下述代碼是新建一個子進程。

1 from multiprocessing import Process
2 
3 def f(name):
4     print 'hello', name
5 
6 if __name__ == '__main__':
7     p = Process(target=f, args=('bob',)) # 新建一個子進程p,目標函數是f,args是函數f的參數列表
8     p.start() # 開始執行進程
9     p.join() # 等待子進程結束

 

上述代碼中p.join()的意思是等待子進程結束后才執行后續的操作,一般用於進程間通信。例如有一個讀進程pw和一個寫進程pr,在調用pw之前需要先寫pr.join(),表示等待寫進程結束之后才開始執行讀進程。

多個子進程

如果要同時創建多個子進程可以使用multiprocessing.Pool類。該類可以創建一個進程池,然后在多個核上執行這些進程。

import multiprocessing
import time

def func(msg):
    print multiprocessing.current_process().name + '-' + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4) # 創建4個進程
    for i in xrange(10):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))
    pool.close() # 關閉進程池,表示不能在往進程池中添加進程
    pool.join() # 等待進程池中的所有進程執行完畢,必須在close()之后調用
    print "Sub-process(es) done."

 

輸出結果如下:

 1 Sub-process(es) done.
 2 PoolWorker-34-hello 1
 3 PoolWorker-33-hello 0
 4 PoolWorker-35-hello 2
 5 PoolWorker-36-hello 3
 6 PoolWorker-34-hello 7
 7 PoolWorker-33-hello 4
 8 PoolWorker-35-hello 5
 9 PoolWorker-36-hello 6
10 PoolWorker-33-hello 8
11 PoolWorker-36-hello 9

 

上述代碼中的pool.apply_async()apply()函數的變體,apply_async()apply()的並行版本,apply()apply_async()的阻塞版本,使用apply()主進程會被阻塞直到函數執行結束,所以說是阻塞版本。apply()既是Pool的方法,也是Python內置的函數,兩者等價。可以看到輸出結果並不是按照代碼for循環中的順序輸出的。

多個子進程並返回值

apply_async()本身就可以返回被進程調用的函數的返回值。上一個創建多個子進程的代碼中,如果在函數func中返回一個值,那么pool.apply_async(func, (msg, ))的結果就是返回pool中所有進程的值的對象(注意是對象,不是值本身)。

 1 import multiprocessing
 2 import time
 3 
 4 def func(msg):
 5     return multiprocessing.current_process().name + '-' + msg
 6 
 7 if __name__ == "__main__":
 8     pool = multiprocessing.Pool(processes=4) # 創建4個進程
 9     results = []
10     for i in xrange(10):
11         msg = "hello %d" %(i)
12         results.append(pool.apply_async(func, (msg, )))
13     pool.close() # 關閉進程池,表示不能再往進程池中添加進程,需要在join之前調用
14     pool.join() # 等待進程池中的所有進程執行完畢
15     print ("Sub-process(es) done.")
16 
17     for res in results:
18         print (res.get())

 

上述代碼輸出結果如下:

 1 Sub-process(es) done.
 2 PoolWorker-37-hello 0
 3 PoolWorker-38-hello 1
 4 PoolWorker-39-hello 2
 5 PoolWorker-40-hello 3
 6 PoolWorker-37-hello 4
 7 PoolWorker-38-hello 5
 8 PoolWorker-39-hello 6
 9 PoolWorker-37-hello 7
10 PoolWorker-40-hello 8
11 PoolWorker-38-hello 9

 

與之前的輸出不同,這次的輸出是有序的。

如果電腦是八核,建立8個進程,在Ubuntu下輸入top命令再按下大鍵盤的1,可以看到每個CPU的使用率是比較平均的,如下圖:

八核CPU使用情況

在system monitor中也可以清楚看到執行多進程前后CPU使用率曲線的差異。 
CPU使用情況


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM