Python內置庫:multiprocessing(多進程)


Python的多進程因為可以充分利用CPU多核的特點,所以通常用於計算密集型的場景或者需要大量數據操作的場景,而對於多線程,在某些語言中因為可以充分利用CPU,所以可能多線程的場景使用得多一點,但是在Python中,多線程只能在CPU的單核中運行,不能充分利用CPU多核的特點,所以Python多線程通常用於IO密集型的場景或者少量數據的並發操作場景。總而言之,Python的多線程只是並發執行,而不是真正的並行執行,而且只能在CPU單核上進行,所以如果需要進行大量的數據操作或者比較耗時的並行操作,那么就可以考慮使用多進程了。
本文只是根據官方文檔簡單記了一下multiprocessing模塊中進程的基本操作,包括創建進程、進程啟動方式、進程間通信、進程間同步、進程池,如果需要其他更多操作,可以參考此模塊的官方中文文檔

創建進程

實例化Process類創建一個進程對象,然后調用它的start方法即可生成一個新的進程(子進程)。Process進程對象的使用其實和多線程模塊threading中的Thread線程對象非常相似,可以參考着來使用。

"""
簡單示例:創建一個子進程
"""
import os
from multiprocessing import Process


def func(s):
    # 輸出傳入的參數,當前子進程的進程ID,當前進程的父進程ID
    print(s, os.getpid(), os.getppid())


# 注意:此處的if __name__ == '__main__'語句不能少
if __name__ == '__main__':
    # 打印當前進程的進程ID
    print(os.getpid())
    print('main process start...')
    # 創建進程對象
    p = Process(target=func, args=('hello', ))
    # 生成一個進程,並開始運行新的進程
    p.start()
    # 等待子進程運行完畢
    p.join()
    print('main process end!')

打印輸出

13888
main process start...
hello 12484 13888
main process end!

Process類
Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)group不用特別指定,使用默認就行;target表示需要調用的對象;name表示新進程的名稱;argskwargs表示傳給target對象的元組參數和字典參數;daemon是一個關鍵字參數,使用時必須指定參數名,表示是否為守護進程,如果不指定則默認繼承自調用者進程。
注:需要注意的是如果重寫了Process__init__方法,那么在做任何操作之前需要先調用Process.__init__()方法。
常用的方法和屬性:

  • run:表示進程活動的方法,即此方法的運行是在新開啟的進程中,如果在子類中重寫了此方法,應該在此方法中調用target對象。
  • start():用於啟動進程活動(注意此方法是在調用者進程中,而不是在新的進程中),並用於保證run方法在一個新的進程中被調用。
  • join([timeout]):如果timeout參數沒有指定(默認),則會阻塞當前進程直到調用join方法的進程(子進程)運行結束,如果指定了timeout參數,則會阻塞指定的秒數。注意,join方法不能在start方法之前調用,但join方法可以調用多次。如果想要知道進程的狀態(包括是否結束),可以查看進程對象的exitcode值來進行判斷。
  • name:進程名稱,沒什么實際意義,只是用來表示進程,多個進程可能有相同的名稱。如果沒有特別指定,則默認命名格式為Process-N1:N2:N3...
  • is_alive():此進程是否存活。
  • damemon:表示進程是否為守護進程,這個標識必須在start()方法調用之前進行設置,如果不設置,默認繼承創建者進程。當一個進程終止時,會嘗試終止它的所有守護子進程,需要注意的是,守護進程是不允許創建子進程的。
  • pid:進程ID。
  • exitcode:進程退出狀態,當進程還未結束時,值為None,如果進程結束了,會用一個負值-N表示結束信號。
  • authkey:進程的身份驗證密鑰(字節字符串),當multiprocessing被初始化時,主進程會使用os.urandom()分配一個隨機的字符串,當創建Process子進程時,子進程會繼承其父進程的身份密鑰,當然,你也可以修改子進程的身份密鑰。
  • sentinel:系統對象的數字句柄,當進程結束時將變為“ready”。如果想要使用multiprocessing.connection.wait()一次等待多個事件,那可以使用這個值,否則調用join()方法會更簡單。
  • terminate():終止進程,在Unix上使用的是SIGTERM信號,在Windows上使用的是TerminateProcess()。注意,進程的后代進程不會被終止(會變成“孤兒”進程)。另外,如果被終止的進程在使用PipeQueue時,它們有可能會被損害,並無法被其他進程使用;如果被終止的進程已獲得鎖或信號量等,則有可能導致其他進程死鎖。所以請謹慎使用此方法。
  • kill():也是終止進程,但是在Unix上使用的是SIGKILL信號。
  • close():關閉Process對象,並釋放與之關聯的所有資源,如果底層進程仍在運行,則會引發ValueError。而且,一旦close()方法成功返回,Process對象的大多數方法和屬性也可能會引發ValueError

進程啟動方式

multiprocessing模塊中進程的啟動方式有三種spawn、fork和forkserver,在不同的系統平台上它們的使用和默認設置也會有所不同:

  • spawn:由父進程啟動一個新的Python解釋器Process子進程,子進程只會繼承run()方法中所必需的資源,而父進程中那些非必需的文件描述符和句柄是不會被繼承的。而且,相對於使用fork和forkserver來啟動進程,spawn方法啟動是非常慢的。spawn啟動方式可以在Unix和Windows上使用,且Windows上默認使用此方法啟動。
  • fork:父進程使用os.fork()來產生一個新的Python解釋器分叉(fork)子進程,子進程在開始時與父進程是相同的,即子進程會繼承父進程擁有的所有資源。這種方式的問題在於當父進程中存在多線程時,啟動的新的子進程的安全性需要自己留意。fort啟動方式只能在Unix中使用,且也是Unix中默認的啟動方式。
  • forkserver:程序會先使用forkserver啟動一個服務器進程,然后當需要運行一個新的進程時,父進程會先連接到服務器並請求其分叉(fork)一個新的進程。 相比於fork啟動方式,由於forkserver啟動的服務器進程是單線程的進程,所以由它通過os.fork()啟動的進程是安全的(此服務器進程沒有多線程的情況)。forkserver啟動方式可以在Unix平台使用,並支持通過Unix管道傳遞文件描述符。

設置統一的啟動方式:可以在程序運行開始時,即if __name__ == "__main__"中使用multiprocessing.set_start_method(method)函數來設置啟動方式,設置時傳入對應啟動方式的字符串即可("spawn"/"fork"/"forkserver")。但是需要注意兩點,一是需要在if __name__ == "__main__"子句中指定,二是只能指定一次,指定之后就不能在其他地方再次指定。
設置特定的啟動方式:可以使用multiprocessing.get_context(method)函數來設置上下文中的啟動方式,需要注意的是在此上下文中創建的對象可能與其他上下文中的對象不兼容,比如,使用fork方式的上下文中的鎖不能傳遞給spawn或forkserver中使用,另外,如果你不想采用默認的方式或者全局統一的方式,就可以考慮使用get_context(method)方法來指定自己的啟動方式。
注:在Unix上,spawn和forkserver啟動方法不能和“凍結的”可執行內容一同使用(例如,PyInstaller和cx_Freeze包產生的二進制文件),但是fork啟動方法可以。

進程間通信

使用多進程時,一般使用消息機制(Pipe()管道和Queue()隊列)實現進程間的通信,而且應該盡可能地避免同步操作,例如鎖。(如果這兩種方式不能滿足你的要求,可以參考下官方文檔中關於multiprocessing.connection的描述,它提供了如監聽器對象Listener和客戶端對象Client等通信方式,感興趣的話也可以去看下)
Pipe類
Pipe([duplex]):返回一對連接對象(conn1,conn2),它們代表了管道的兩端。參數duplex默認True,表示雙向的(雙工通信),表示管道每一端都可以進行發送和接收數據;如果設置False,則表示單向的(單工通信),此時conn1只能接受數據,conn2只能發送數據。

"""
簡單示例:使用管道Pipe進行進程間通信
"""
from multiprocessing import Process, Pipe


def func(conn):
    print('send a list object ot other side...')
    # 從管道對象的一端發送數據對象
    conn.send(['33', 44, None])
    conn.close()


if __name__ == '__main__':
    # 默認創建一個雙工管道對象,返回的兩個對象代表管道的兩端,
    # 雙工表示兩端的對象都可以發送和接收數據,但是需要注意,
    # 需要避免多個進程或線程從一端同時讀或寫數據
    parent_conn, child_conn = Pipe()
    p = Process(target=func, args=(child_conn, ))
    p.start()
    # 從管道的另一端接收數據對象
    print(parent_conn.recv())
    p.join()

Connection類
multiprocessing.connection.ConnectionConnection對象允許收發可以序列化的對象或字符串,Connection對象通常使用Pipe來創建。
常用的方法:

  • send(obj):將一個對象發送到連接的另一端,另一端可以使用recv()方法來讀取。注意,發送的對象必須是可以序列化的,對象如果過大可能會引發ValueError異常。
  • recv():返回一個對端使用send()方法發送的對象,該方法會一直阻塞直到接收到對象為止。如果對端關閉了連接或者沒有東西可以接收時,將會拋出EOFError異常。
  • fileno():返回由連接對象使用的描述符或句柄。
  • close():關閉連接。當連接對象被垃圾回收時,這個方法會被自動調用。
  • poll([timeout]):返回連接對象中是否有可以讀取的數據,如果未指定參數timeout(默認),此方法會立刻返回結果,如果指定了timeout,則會阻塞對應timeout秒數,如果timeout為None,則會一直阻塞,不會發生超時。
  • send_bytes(buffer[, offset[, size]]):從一個bytes-like object(字節類對象)中取出字節數組作為一條完整消息發送。offset參數表示偏移量或者buffer中數據的位置,size表示從offset開始讀取多少數據。如果buffer過大,可能會引發ValueError異常。
  • recv_bytes([maxlength]):以字符串的形式返回一條對端發送過來的字節數據,此方法會一直阻塞直到接收到消息,如果對端關閉了連接或者沒有數據可以接收時,將會拋出EOFError異常。如果接收的數據長度大於了maxlength指定的長度,那么也會拋出EOFError異常,並且此時此連接對象不再可讀。
  • recv_bytes_into(buffer[, offset]):將一條完整的字節數據讀入buffer,並返回數據的字節數。此方法會一直阻塞直到接收到數據,如果對端關閉或者沒有數據可以讀取,則會拋出EOFError異常。buffer必須是一個可寫入的字節類對象,如果指定了offset參數,將會從offset指定的位置開始寫入buffer,如果buffer過小,也會引發BufferTooShort異常。

Queue類
Queue隊列采用的是FIFO(先進先出)的通信方式。(另外還有SimpleQueueJoinableQueue,感興趣的可以參考下官方文檔)
當一個對象被放入隊列中時,這個對象首先會被一個后台線程序列化,然后會將序列化的數據通過一個底層管道傳遞到隊列中,從隊列中將數據取出來時也會進行反序列化的操作。
注意一點,在一個空隊列中放入對象后,它的empty()方法會在一個極小的延遲后才會返回False。
注:如果一個子進程將一些對象放入隊列中,那么這個進程在所有緩沖區的對象被刷新進管道之前,是不會終止的,所以,通常在終止這類進程時,應該保證隊列中的數據都已被使用了。(見示例中的注釋)

"""
簡單示例:使用隊列Queue進行進程間通信
"""
from multiprocessing import Process, Queue


def func(q):
    print('put a list object to queue...')
    # 向Queue對象中添加一個對象
    q.put(['33', 44, None])
    # q.put('X' * 1000000)


if __name__ == '__main__':
    # 創建一個隊列
    q = Queue()
    p = Process(target=func, args=(q, ))
    p.start()
    # 從Queue對象中獲取一個對象
    print(q.get())
    # 這里需要注意,當向隊列中放入的數據較大時,比如將['33', 44, None]替換為'X' * 1000000時,
    # 就會在join()處卡死,為了避免這種情況,
    # 通常的做法是先使用get()將數據取出來,再使用join()方法
    p.join()

Queue([maxsize]):返回一個使用Pipe管道和少量鎖和信號量實現的共享隊列實例,當一個進程將一個對象放入隊列時,一個寫入線程將會啟動並將對象從緩沖區寫入管道中。
注:multiprocessing.Queue實現了標准庫queue.Queue中除了task_done()join()的所有方法。
常用的方法和屬性:

  • qsize():返回隊列的大致長度,但這個數字在多進程或多線程的環境中通常是不可靠的。注意,在Unix平台上,例如Mac OS X,這個方法可能會拋出NotImplementedError,因為該平台沒有實現sem_getvalue()
  • empty():隊列為空則返回True,否則返回False,在多進程或多線程的環境中,此方法是不可靠的。
  • full():隊列滿則返回True,否則返回False,在多進程或多線程的環境中,此方法是不可靠的。
  • put(obj[, block[, timeout]]):將對象obj放入隊列。如果參數block為True(默認)且timeout為None(默認),則會阻塞當前進程,直到有空的緩沖槽。如果設置了timeout,則會阻塞指定的timeout秒數,如果阻塞timeout指定秒數后還是沒有可用的緩沖槽,則會拋出queue.Full異常。如果block為False,此時會忽略timeout參數,並且當前有空的緩沖槽可用時才能放入對象,否則會拋出queue.Full異常。
  • put_nowait(obj):相當於put(obj, False)
  • get([block[, timeout]]):從隊列中獲取一個對象。如果參數block為True(默認)且timeout為None(默認),則會阻塞當前進程,直到獲取到對象。如果設置了timeout,則會阻塞指定的timeout秒數,如果阻塞timeout指定秒數后還是沒有獲取到對象,則會拋出queue.Empty異常。如果block設置為False,此時會忽略timeout參數,並且當前有對象可以獲取時才能獲取,否則會拋出queue.Empty異常。
  • get_nowait():相當於get(False)
  • close():表示當前進程將不會再往隊列中放入對象了。一旦緩沖區的所有數據被寫入管道后,對應的后台線程就會退出。而且這個方法在隊列被gc回收時會自動調用。
  • join_thread():等待后台線程,這個方法僅在調用了close()方法之后可以被調用,並且會阻塞當前進程,當變為非阻塞狀態之后,隊列的后台線程會退出,以此確保緩沖區中的所有數據都被寫入管道中。默認情況下,如果一個進程不是此隊列的創建者進程,當它退出時,默認會嘗試等待這個隊列的后台線程,當然這個進程也可以使用cancel_join_thread()方法使join_thread()方法什么都不做直接跳過。
  • cancel_join_thread():用於防止join_thread()方法阻塞當前進程,即防止進程退出時自動等待隊列后線程的情況。使用這個方法有可能會導致隊列中的數據丟失,因此大多情況下這個方法並不需要用到,當然,如果你只是想要進程馬上退出,也不在意數據的丟失,那么可以使用這個方法。

注:multiprocessing使用了queue.Emptyqueue.Full異常去表示超時,需要從內置的queue模塊中導入它們,而不是從multiprocessing中導入。

進程間同步

通常來說同步原語在多進程環境中並不像在多線程環境中那么必要,但是也可以參考下。注意,也可以使用Manager()對象創建同步原語。
multiprocessing.Barrier(parties[, action[, timeout]]):類似threading.Barrier的柵欄對象。
multiprocessing.Semaphore([value]):信號量對象,類似於threading.Semaphore
multiprocessing.BoundedSemaphore([value]):類似threading.BoundedSemaphore的有界信號量對象。
multiprocessing.Condition([lock]):是threading.Condition的別名,參數lock應該是multiprocessing中的Lock或者RLock對象。
multiprocessing.Event:類似threading.Event的事件對象。
multiprocessing.Lock:原始鎖,除非特別說明,否則用法與threading.Lock是一致的。

  • acquire(block=True, timeout=None):獲取鎖,需要注意一下參數blocktimeoutthreading.Lock中的名稱和用法的區別。如果block設置為True(默認值),此方法會阻塞進程直到獲取鎖;如果block參數設置為False,進程將不會阻塞,且會忽略timeout參數;如果設置了timeout參數且為正數,則會阻塞指定秒數,如果設置為負數,則等效於值為0的情況,如果timeout為None(默認值),則會一直阻塞,需要注意timeout設置為負數和0時,其作用和threading.Lock是不一致的。此方法的返回值,在獲取到鎖並將鎖的狀態設置為“鎖住”時返回True,超時或者沒有獲取到鎖時返回False。
  • release():釋放鎖,注意,任何進程或線程都可釋放這種鎖,並不是只有獲取鎖的進程或線程才可以釋放鎖。當試圖釋放一個未“鎖住”的鎖時會引發ValueError異常。其他用法與threading.Lock是一致的。

multiprocessing.RLock:遞歸鎖,類似於threading.RLock,只能由獲取鎖的進程或線程來進行釋放,並且可以獲取多次,注意,釋放次數必須要與獲取次數一致。

  • acquire(block=True, timeout=None):當block設置為True時(默認值),會阻塞進程直到獲取鎖,如果當前進程已經獲取到了鎖(遞歸鎖可以多次獲取),那么不會阻塞,並且鎖內的遞歸等級加1,並返回True。如果block設置為False,則不會阻塞,此時如果沒有獲取到鎖,則鎖內的遞歸等級不會變,並返回False。timeout的使用與multiprocessing.Lock.acquire是一樣的,但是注意,此參數與threading.RLock中的使用是有區別的。
  • release():釋放鎖,使鎖內的遞歸等級減1。如果釋放后鎖的遞歸等級降低為0,則會重置鎖的狀態為“釋放”狀態,表名此時鎖沒有被任何進程或線程持有;如果釋放后鎖的遞歸等級不為0,則鎖定狀態還是“未釋放”的狀態,當前進程或線程仍然是鎖的持有者。如果鎖已經處於“釋放”狀態,或者是非鎖的持有者調用了此方法,則會拋出AssertionError異常,注意這個異常與threading.RLock.release()中拋出的異常是不同的。
"""
簡單示例:使用鎖保證進程間的同步操作
"""
from multiprocessing import Process, Lock


def func(lc, num):
    # 使用鎖保證以下代碼同一時間只有一個進程在執行
    lc.acquire()
    print('process num: ', num)
    lc.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(5):
        Process(target=func, args=(lock, i)).start()

打印輸出

process num:  0
process num:  1
process num:  3
process num:  2
process num:  4

進程間共享數據

在多進程的並發編程中應當盡量避免使用共享狀態,但是如果必須要使用的話,multiprocessing模塊提供了兩種方式來使用:共享內存和服務進程管理器(Manager()管理器對象會開啟一個服務進程,允許不同機器上的進程通過網絡共享數據,本文就不寫了,感興趣的可以去官方文檔了解下(有對應的中文文檔))。
共享內存
可以在共享內存中創建可被子進程繼承的共享ctypes對象,特點是快捷方便。
multiprocessing.Value(typecode_or_type, *args, lock=True):返回一個在共享內存上創建的ctypes對象,可以通過它的value屬性來訪問它的值。

  • typecode_or_type:指定返回的對象類型,可以是一個ctypes類型,也可以是array模塊中每個類型對應的單字符的字符串。
  • args:這個參數會傳給這個類的構造函數。
  • lock:默認為True,則會新建一個遞歸鎖用於對這個值的同步訪問操作;如果lock指定為一個LockRLock,則會使用這個鎖來控制這個變量的同步操作;如果lock為False,那么這個值將沒有鎖進行保護,也就是說這個變量不是進程安全的。
  • 注:如果想要進行+=這種操作時,因為這種操作並不是原子性的,它是分開的讀和寫操作,所以可以考慮使用如下方式進行這種操作:with my_value_obj.get_lock(): my_value_obj.value += 1

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True):返回一個在共享內存中創建的ctypes類型的數組。

  • typecode_or_type:指定數組中元素的類型,可以是一個ctypes類型,也可以是array模塊中每個類型對應的單字符的字符串。
  • size_or_initializer:如果是一個整數,則用於指定數組的長度,否則,應該傳入一個序列用於初始化這個數組對象,這個序列的長度就是這個數組對象的長度。
  • lock:默認為True,則會新建一個鎖用於對這個值的同步訪問操作;如果lock指定為一個LockRLock,則會使用這個鎖來控制這個變量的同步操作;如果lock為False,那么這個值將沒有鎖進行保護,也就是說這個變量不是進程安全的。
  • 注:ctypes.c_char類型的數組具有 value和raw屬性,可以用來保存和提取字符串。
"""
簡單示例:使用共享內存的方式,共享值Value對象和數據Array對象
"""
from multiprocessing import Process, Value, Array


def func(n, a):
    n.value = 3.333
    for i in range(len(a)):
        a[i] = -a[i]


if __name__ == '__main__':
    num = Value('d', 0.0)  # 第一個參數d表示數據類型“double”雙精度浮點類型
    arr = Array('i', range(6))  # 第一個參數i表示數據類型“integer”整型
    p = Process(target=func, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])

打印輸出

3.333
[0, -1, -2, -3, -4, -5]

進程池

創建一個Pool進程池對象,並執行提交給它的任務,進程池對象允許其中的進程以不同的方式運行,但是需要注意,Pool對象的方法只能是創建它的進程才能調用。
Pool類
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]):創建一個進程池對象,支持帶有超時和回調的異步結果,以及一個並行的map實現。

  • processes:指定進程池中的工作進程數量,如果為None,則使用os.cpu_count()的返回值。
  • initializerinitargs:如果initializer不為None,則每個工作進程將會在啟動時調用initializer(*initargs)
  • maxtasksperchild:指定一個工作進程在退出或者被一個新的進程替代之前能完成的任務數量,以便保證資源的釋放。默認為None,表示工作進程的壽命與進程池是相同的。
  • context:指定啟動的工作進程的上下文。通常一個進程池是通過multiprocessing.Pool()或者上下文對象的Pool()來創建的,而這兩種創建進程池的方式都是可以的。
  • 注:使用進程池對象時,應該正確終結該對象,應該將進程池對象當做上下文管理器來使用(with語句),或者手動調用close()terminate()方法,而依賴於垃圾回收器來銷毀進程池對象是不正確的做法。

進程池對象的常用方法:

  • apply(func[, args[, kwds]]):在進程池中開啟一個新的進程並執行func函數,另外兩個參數則是函數的參數,在這個函數執行完之前,當前進程會一直阻塞。
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):這是apply方法的一個變體,會返回一個AsyncResult結果對象。如果指定了callback參數,則會在func函數執行成功后將返回結果當做參數傳入callback指定的可調用對象,執行失敗則會調用error_callback指定的可調用對象。
  • map(func, iterable[, chunksize]):內置map()函數的一個並行版本,會一直阻塞當前進程直到運行完可迭代對象中的所有元素,並返回結果。此方法會將可迭代對象分割為許多塊,chunksize參數用於指定每個塊的大小,並行的進程,每個進程會對應一個塊,每次會運行塊中的一個元素。注意,對於比較大的迭代對象,可能會很耗時,此時可以考慮使用imap()或者imap_unordered(),並且使用時指定chunksize參數可能會得到更好的效率。
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])map方法的一個變體,可以返回一個處理后的AsyncResult結果對象。其他參數的使用與appply_async方法是一致的。
  • imap(func, iterable[, chunksize])map()方法的延遲執行版本,對於較大的迭代,chunksize設置一個較大的值會比默認值1會有更高的執行效率,同樣,對於比較消耗內存的迭代,建議使用這個方法,而不是使用map()方法。如果chunksize1,則imap()方法返回的迭代器的next()方法擁有一個可選的參數timeout,如果在指定的timeout時間內未得到執行結果,next(timeout)就會拋出multiprocessing.TimeoutError異常。
  • imap_unordered(func, iterable[, chunksize]):和imap()類似,只不過返回的結果是無序的,當然只有一個進程的時候,返回的結果就是有序的。
  • starmap(func, iterable[, chunksize]):和map()類似,不過iterable中的每個元素都會被再次解包作為func的參數傳入進去,如[(1, 2), (3, 4)]會轉化為類似[func(1, 2), func(3, 4)]
  • starmap_async(func, iterable[, chunksize[, callback[, error_callback]]]):和starmap類似,會返回一個結果對象。
  • close():會阻止后續任務提交到進程池,當所有任務都執行完成后,工作進程就會退出。
  • terminate():不用等待未完成的任務,立即停止工作進程,當進程池被垃圾回收時,此方法會被立即調用。
  • join():等待工作進程結束,注意,調用此方法前必須先調用close()方法或terminate()方法。

AsyncResult類
multiprocessing.pool.AsyncResultapply_async()map_async()這兩個方法返回的結果對象對應的類。
常用的方法:

  • get([timeout]):用於獲取執行結果。如果timeout參數不是None,並且在指定時間內沒有得到執行結果,則會拋出multiprocessing.TimeoutError異常。
  • wait([timeout]):阻塞當前進程,直到返回結果,或者timeout超時。
  • ready():判斷執行是否完成。
  • successful():判斷調用是否已經完成,並且未引發異常,如果未執行完成,則會引發ValueError異常。
"""
這是官方文檔上給出的示例,我就直接貼在這兒了
"""
from multiprocessing import Pool
import time


def f(x):
    return x * x


if __name__ == '__main__':
    with Pool(processes=4) as pool:  # start 4 worker processes
        result = pool.apply_async(f, (10,))  # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))  # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))  # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))  # prints "0"
        print(next(it))  # prints "1"
        print(it.next(timeout=1))  # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))  # raises multiprocessing.TimeoutError

編程指導

這是官方文檔中對於multiprocessing模塊給出的一些編程建議,我放在這里了,可以參考下。
對於所有啟動方法

  • 避免共享狀態:應該避免在進程間傳遞大量數據,傳遞的數據應該越少越好。最好使用隊列或者管道進行進程間的通信,而不是使用底層的同步原語。
  • 可序列化:保證代理的方法的參數是可序列化的。
  • 代理的線程安全:不要在多線程之間同時使用一個代理對象,除非你用鎖保護它,但是在多進程之間使用相同的代理對象是不會有問題的。
  • 使用join避免僵屍進程:在Unix上,如果一個進程執行完成但是沒有被join,就會變成僵屍進程。一般來說,僵屍進程不會很多,因為每次啟動新進程或者active_children()被調用時,所有已執行完成且沒有被join的進程都會被自動被join,而且對一個執行完成的進程調用Process.is_alive也會join這個進程。盡管如此,對自己啟動的進程顯式調用join依然是最佳的實踐。
  • 繼承優於序列化、反序列化:當使用spawn或者forkserver的啟動方式時,multiprocessing模塊中的許多類型都必須是可序列化的,這樣子進程才能使用它們。但是,通常我們都應該避免使用管道和隊列來發送共享對象到另一個進程,而是應該優先采用讓子進程通過繼承的方式從父進程中訪問這些共享對象。
  • 避免手動殺死進程:通過Process.terminate終止一個進程很容易導致這個進程正在使用的資源(如鎖、信號量、管道和隊列)損壞或者變得不可用,導致其他需要使用這些資源的進程無法使用。所以,最好是那些從來不使用這些共享資源的進程才調用Process.terminate
  • 使用隊列的進程的join:如果一個進程使用了隊列,並往隊列中放入數據,那么這個進程會一直阻塞,直到所有的緩存項都被feeder線程傳遞給底層管道,這意味着,在這個進程使用join方法之前,需要保證放入隊列的全部數據都已經被其他的線程或進程消費完,否則不能保證這個隊列的進程可以正常終止(注意,非守護進程都會自動join)。如下是一個死鎖的示例:解決辦法是交換最后兩行或者刪除p.join()這一行。
from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()
  • 顯式傳遞資源給子進程:在Unix上,使用fork方式啟動的子進程可以使用父進程中全局創建的共享資源,但是還是建議顯式的傳遞資源給子進程,這樣保證了子進程結束后,這個資源也不會被回收,如果直接使用,有可能會導致子進程結束時這個資源被釋放掉。如下,示例1為錯誤示范,應該為示例2的方式。
"""示例1"""
from multiprocessing import Process, Lock


def f():
    ... do something using "lock" ...


if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()
"""示例2"""
from multiprocessing import Process, Lock


def f(l):
    ... do something using "l" ...


if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

spawn和forkserver啟動方式
spawn和forkserver的以下一些特點,相對於另外一種fork啟動方式,會有一些區別和限制。

  • 更依賴序列化:Process.__init__()的所有參數都必須是可序列化的,同樣的,Process的子類實例在調用start方法時也必須保證是可以被序列化的。
  • 全局變量:如果子進程在代碼中嘗試訪問一個全局變量時,需要小心,它此時的值可能與父進程中執行Process.start方法時的值不一樣了,當然,如果它是模塊級別的常量時,是沒問題的。
  • 安全導入主模塊:需要確保主模塊可以被新啟動的Python解釋器(比如啟動了一個子進程)安全導入而不會引發其他問題。見示例1和示例2。

示例1:以下代碼會引發RuntimeError

from multiprocessing import Process


def foo():
    print('hello')


p = Process(target=foo)
p.start()

示例2:對於以上代碼,應該使用if __name__ == '__main__'來保護程序入口點。

from multiprocessing import Process, freeze_support, set_start_method


def foo():
    print('hello')


# 這個入口點可以允許子進程安全導入此模塊並使用此模塊中的foo函數
if __name__ == '__main__':
    freeze_support()  # 如果正常運行程序而不是需要打包“凍結”,則可以忽略此句。
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM