Python使用進程池管理進程和進程間通信


與線程池類似的是,如果程序需要啟動多個進程,也可以使用進程池來管理進程。程序可以通過 multiprocessing 模塊的 Pool() 函數創建進程池,進程池實際上是 multiprocessing.pool.Pool 類。

進程池具有如下常用方法:

              1.apply(func[, args[, kwds]]):將 func 函數提交給進程池處理。其中 args 代表傳給 func 的位置參數,kwds 代表傳給 func 的關鍵字參數。該方法會被阻塞直到 func 函數執行完成。

              2.apply_async(func[, args[, kwds[, callback[, error_callback]]]]):這是 apply() 方法的異步版本,該方法不會被阻塞。其中 callback 指定 func 函數完成后的回調函數,error_callback 指定 func 函數出錯后的回調函數。

              3.map(func, iterable[, chunksize]):類似於 Python 的 map() 全局函數,只不過此處使用新進程對 iterable 的每一個元素執行 func 函數。

              4.map_async(func, iterable[, chunksize[, callback[, error_callback]]]):這是 map() 方法的異步版本,該方法不會被阻塞。其中 callback 指定 func 函數完成后的回調函數,error_callback 指定 func 函數出錯后的回調函數。

              5.imap(func, iterable[, chunksize]):這是 map() 方法的延遲版本。

              6.imap_unordered(func, iterable[, chunksize]):功能類似於 imap() 方法,但該方法不能保證所生成的結果(包含多個元素)與原 iterable 中的元素順序一致。

              7.starmap(func, iterable[,chunksize]):功能類似於 map() 方法,但該方法要求 iterable 的元素也是 iterable 對象,程序會將每一個元素解包之后作為 func 函數的參數。

              8.close():關閉進程池。在調用該方法之后,該進程池不能再接收新任務,它會把當前進程池中的所有任務執行完成后再關閉自己。

              9.terminate():立即中止進程池。

              10.join():等待所有進程完成。

 

如果程序只是想將任務提交給進程池執行,則可調用 apply() 或 apply_async() 方法;

如果程序需要使用指定函數將 iterable 轉換成其他 iterable,則可使用 map() 或 imap() 方法

 

使用 apply_async() 方法啟動進程

import multiprocessing
import time
import os
def action(name='default'):
    print('(%s)進程正在執行,參數為: %s' % (os.getpid(), name))
    time.sleep(3)
if __name__ == '__main__':
    # 創建包含4條進程的進程池
    pool = multiprocessing.Pool(processes=4)
    # 將action分3次提交給進程池
    pool.apply_async(action)
    pool.apply_async(action, args=('位置參數', ))
    pool.apply_async(action, kwds={'name': '關鍵字參數'})
    pool.close()
    pool.join()

從上面程序可以看出,進程池同樣實現了上下文管理協議,因此程序可以使用 with 子句來管理進程池,這樣就可以避免程序主動關閉進程池。

 

 

使用 map() 方法來啟動進程

import multiprocessing
import time
import os
# 定義一個准備作為進程任務的函數
def action(max):
    my_sum = 0
    for i in range(max):
        print('(%s)進程正在執行: %d' % (os.getpid(), i))
        my_sum += i
    return my_sum
if __name__ == '__main__':
    # 創建一個包含4條進程的進程池
    with multiprocessing.Pool(processes=4) as pool:
        # 使用進程執行map計算
        # 后面元組有3個元素,因此程序啟動3條進程來執行action函數
        results = pool.map(action, (50, 100, 150))
        print('--------------')
        for r in results:
            print(r)

 

進程通信

Python為進程通信提供了兩種機制:

        1.Queue:一個進程向 Queue 中放入數據,另一個進程從 Queue 中讀取數據。

        2.Pipe:Pipe 代表連接兩個進程的管道。程序在調用 Pipe() 函數時會產生兩個連接端,分別交給通信的兩個進程,接下來進程既可從該連接端讀取數據,也可向該連接端寫入數據。

使用Queue實現進程間通信

multiprocessing 模塊下的 Queue 和 queue 模塊下的 Queue 基本類似,它們都提供了 qsize()、empty()、full()、put()、put_nowait()、get()、get_nowait() 等方法。區別只是 multiprocessing 模塊下的 Queue 為進程提供服務,而 queue 模塊下的 Queue 為線程提供服務。

下面程序使用 Queue 來實現進程之間的通信:

 1 import multiprocessing
 2 
 3 def f(q):
 4     print('(%s) 進程開始放入數據...' % multiprocessing.current_process().pid)
 5     q.put('Python')
 6 if __name__ == '__main__':
 7     # 創建進程通信的Queue
 8     q = multiprocessing.Queue()
 9     # 創建子進程
10     p = multiprocessing.Process(target=f, args=(q,))
11     # 啟動子進程
12     p.start()
13     print('(%s) 進程開始取出數據...' % multiprocessing.current_process().pid)
14     # 取出數據
15     print(q.get())  # Python
16     p.join()

上面程序中,第 5 行代碼(子進程)負責向 Queue 中放入一個數據,第 15 行代碼(父進程)負責從 Queue 中讀取一個數據,

這樣就實現了父、子兩個進程之間的通信。

 

使用Pipe實現進程間通信

使用 Pipe 實現進程通信,程序會調用 multiprocessing.Pipe() 函數來創建一個管道,該函數會返回兩個 PipeConnection 對象,代表管道的兩個連接端(一個管道有兩個連接端,分別用於連接通信的兩個進程)。

PipeConnection 對象包含如下常用方法:

       1.send(obj):發送一個 obj 給管道的另一端,另一端使用 recv() 方法接收。需要說明的是,該 obj 必須是可 picklable 的(Python 的序列化機制),

        如果該對象序列化之后超過 32MB,則很可能會引發 ValueError 異常。

       2.recv():接收另一端通過 send() 方法發送過來的數據。

       3.fileno():關於連接所使用的文件描述器。

       4.close():關閉連接。

       5.poll([timeout]):返回連接中是否還有數據可以讀取。

       6.send_bytes(buffer[, offset[, size]]):發送字節數據。如果沒有指定 offset、size 參數,則默認發送 buffer 字節串的全部數據;如果指定了 offset 和 size 參數,則只發送 buffer 字節串中從 offset 開始、長度為 size 的字節數據。通過該方法發送的數據,應該使用 recv_bytes() 或 recv_bytes_into 方法接收。

       7.recv_bytes([maxlength]):接收通過 send_bytes() 方法發迭的數據,maxlength 指定最多接收的字節數。該方法返回接收到的字節數據。

       8.recv_bytes_into(buffer[, offset]):功能與 recv_bytes() 方法類似,只是該方法將接收到的數據放在 buffer 中。

 

使用 Pipe 來實現兩個進程之間的通信:

 1 import multiprocessing
 2 def f(conn):
 3     print('(%s) 進程開始發送數據...' % multiprocessing.current_process().pid)
 4     # 使用conn發送數據
 5     conn.send('Python')
 6 if __name__ == '__main__':
 7     # 創建Pipe,該函數返回兩個PipeConnection對象
 8     parent_conn, child_conn = multiprocessing.Pipe()
 9     # 創建子進程
10     p = multiprocessing.Process(target=f, args=(child_conn, ))
11     # 啟動子進程
12     p.start()
13     print('(%s) 進程開始接收數據...' % multiprocessing.current_process().pid)
14     # 通過conn讀取數據
15     print(parent_conn.recv())  # Python
16     p.join()

上面程序中第 6 行代碼(子進程)通過 PipeConnection 向管道發送數據,數據將會被發送給管道另一端的父進程。

第 16 行代碼(父進程)通過 PipeConnection 從管道讀取數據,程序就可以讀取到另一端子進程寫入的數據,這樣就實現了父、子兩個進程之間的通信。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM