與線程池類似的是,如果程序需要啟動多個進程,也可以使用進程池來管理進程。程序可以通過 multiprocessing 模塊的 Pool() 函數創建進程池,進程池實際上是 multiprocessing.pool.Pool 類。
進程池具有如下常用方法:
1.apply(func[, args[, kwds]]):將 func 函數提交給進程池處理。其中 args 代表傳給 func 的位置參數,kwds 代表傳給 func 的關鍵字參數。該方法會被阻塞直到 func 函數執行完成。
2.apply_async(func[, args[, kwds[, callback[, error_callback]]]]):這是 apply() 方法的異步版本,該方法不會被阻塞。其中 callback 指定 func 函數完成后的回調函數,error_callback 指定 func 函數出錯后的回調函數。
3.map(func, iterable[, chunksize]):類似於 Python 的 map() 全局函數,只不過此處使用新進程對 iterable 的每一個元素執行 func 函數。
4.map_async(func, iterable[, chunksize[, callback[, error_callback]]]):這是 map() 方法的異步版本,該方法不會被阻塞。其中 callback 指定 func 函數完成后的回調函數,error_callback 指定 func 函數出錯后的回調函數。
5.imap(func, iterable[, chunksize]):這是 map() 方法的延遲版本。
6.imap_unordered(func, iterable[, chunksize]):功能類似於 imap() 方法,但該方法不能保證所生成的結果(包含多個元素)與原 iterable 中的元素順序一致。
7.starmap(func, iterable[,chunksize]):功能類似於 map() 方法,但該方法要求 iterable 的元素也是 iterable 對象,程序會將每一個元素解包之后作為 func 函數的參數。
8.close():關閉進程池。在調用該方法之后,該進程池不能再接收新任務,它會把當前進程池中的所有任務執行完成后再關閉自己。
9.terminate():立即中止進程池。
10.join():等待所有進程完成。
如果程序只是想將任務提交給進程池執行,則可調用 apply() 或 apply_async() 方法;
如果程序需要使用指定函數將 iterable 轉換成其他 iterable,則可使用 map() 或 imap() 方法
使用 apply_async() 方法啟動進程
import multiprocessing import time import os def action(name='default'): print('(%s)進程正在執行,參數為: %s' % (os.getpid(), name)) time.sleep(3) if __name__ == '__main__': # 創建包含4條進程的進程池 pool = multiprocessing.Pool(processes=4) # 將action分3次提交給進程池 pool.apply_async(action) pool.apply_async(action, args=('位置參數', )) pool.apply_async(action, kwds={'name': '關鍵字參數'}) pool.close() pool.join()
從上面程序可以看出,進程池同樣實現了上下文管理協議,因此程序可以使用 with 子句來管理進程池,這樣就可以避免程序主動關閉進程池。
使用 map() 方法來啟動進程
import multiprocessing import time import os # 定義一個准備作為進程任務的函數 def action(max): my_sum = 0 for i in range(max): print('(%s)進程正在執行: %d' % (os.getpid(), i)) my_sum += i return my_sum if __name__ == '__main__': # 創建一個包含4條進程的進程池 with multiprocessing.Pool(processes=4) as pool: # 使用進程執行map計算 # 后面元組有3個元素,因此程序啟動3條進程來執行action函數 results = pool.map(action, (50, 100, 150)) print('--------------') for r in results: print(r)
進程通信
Python為進程通信提供了兩種機制:
1.Queue:一個進程向 Queue 中放入數據,另一個進程從 Queue 中讀取數據。
2.Pipe:Pipe 代表連接兩個進程的管道。程序在調用 Pipe() 函數時會產生兩個連接端,分別交給通信的兩個進程,接下來進程既可從該連接端讀取數據,也可向該連接端寫入數據。
使用Queue實現進程間通信
multiprocessing 模塊下的 Queue 和 queue 模塊下的 Queue 基本類似,它們都提供了 qsize()、empty()、full()、put()、put_nowait()、get()、get_nowait() 等方法。區別只是 multiprocessing 模塊下的 Queue 為進程提供服務,而 queue 模塊下的 Queue 為線程提供服務。
下面程序使用 Queue 來實現進程之間的通信:
1 import multiprocessing 2 3 def f(q): 4 print('(%s) 進程開始放入數據...' % multiprocessing.current_process().pid) 5 q.put('Python') 6 if __name__ == '__main__': 7 # 創建進程通信的Queue 8 q = multiprocessing.Queue() 9 # 創建子進程 10 p = multiprocessing.Process(target=f, args=(q,)) 11 # 啟動子進程 12 p.start() 13 print('(%s) 進程開始取出數據...' % multiprocessing.current_process().pid) 14 # 取出數據 15 print(q.get()) # Python 16 p.join()
上面程序中,第 5 行代碼(子進程)負責向 Queue 中放入一個數據,第 15 行代碼(父進程)負責從 Queue 中讀取一個數據,
這樣就實現了父、子兩個進程之間的通信。
使用Pipe實現進程間通信
使用 Pipe 實現進程通信,程序會調用 multiprocessing.Pipe() 函數來創建一個管道,該函數會返回兩個 PipeConnection 對象,代表管道的兩個連接端(一個管道有兩個連接端,分別用於連接通信的兩個進程)。
PipeConnection 對象包含如下常用方法:
1.send(obj):發送一個 obj 給管道的另一端,另一端使用 recv() 方法接收。需要說明的是,該 obj 必須是可 picklable 的(Python 的序列化機制),
如果該對象序列化之后超過 32MB,則很可能會引發 ValueError 異常。
2.recv():接收另一端通過 send() 方法發送過來的數據。
3.fileno():關於連接所使用的文件描述器。
4.close():關閉連接。
5.poll([timeout]):返回連接中是否還有數據可以讀取。
6.send_bytes(buffer[, offset[, size]]):發送字節數據。如果沒有指定 offset、size 參數,則默認發送 buffer 字節串的全部數據;如果指定了 offset 和 size 參數,則只發送 buffer 字節串中從 offset 開始、長度為 size 的字節數據。通過該方法發送的數據,應該使用 recv_bytes() 或 recv_bytes_into 方法接收。
7.recv_bytes([maxlength]):接收通過 send_bytes() 方法發迭的數據,maxlength 指定最多接收的字節數。該方法返回接收到的字節數據。
8.recv_bytes_into(buffer[, offset]):功能與 recv_bytes() 方法類似,只是該方法將接收到的數據放在 buffer 中。
使用 Pipe 來實現兩個進程之間的通信:
1 import multiprocessing 2 def f(conn): 3 print('(%s) 進程開始發送數據...' % multiprocessing.current_process().pid) 4 # 使用conn發送數據 5 conn.send('Python') 6 if __name__ == '__main__': 7 # 創建Pipe,該函數返回兩個PipeConnection對象 8 parent_conn, child_conn = multiprocessing.Pipe() 9 # 創建子進程 10 p = multiprocessing.Process(target=f, args=(child_conn, )) 11 # 啟動子進程 12 p.start() 13 print('(%s) 進程開始接收數據...' % multiprocessing.current_process().pid) 14 # 通過conn讀取數據 15 print(parent_conn.recv()) # Python 16 p.join()
上面程序中第 6 行代碼(子進程)通過 PipeConnection 向管道發送數據,數據將會被發送給管道另一端的父進程。
第 16 行代碼(父進程)通過 PipeConnection 從管道讀取數據,程序就可以讀取到另一端子進程寫入的數據,這樣就實現了父、子兩個進程之間的通信。