multiprocessing模塊介紹
multiprocessing
是一個支持使用與 threading
模塊類似的 API 來產生進程的包。 multiprocessing
包同時提供了本地和遠程並發操作,通過使用子進程而非線程有效地繞過了 全局解釋器鎖。 因此,multiprocessing
模塊允許程序員充分利用給定機器上的多個處理器。 它在 Unix 和 Windows 上均可運行。
multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
一、Process類的介紹
在 multiprocessing
中,通過創建一個 Process
對象然后調用它的 start()
方法來生成進程
簡單實例:
from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()
創建進程的類:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,可用來開啟一個子進程
強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹:
group參數未使用,值始終為None
target表示調用對象,即子進程要執行的任務
args表示調用對象的位置參數元組,args=(1,2,'mike',)
kwargs表示調用對象的字典,kwargs={'name':'mike','age':18}
name為子進程的名稱
方法介紹:
p.start()
:# 啟動進程,並調用該子進程中的p.run()
p.run()
:# 進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法
p.terminate()
: # 強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
p.is_alive()
:# 如果p仍然運行,返回True
p.join([timeout])
:# 主進程等待p終止(強調:是主進程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間。
屬性介紹
p.daemon
:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置
p.name
:進程的名稱
p.pid
:進程的pid
multiprocessing
支持進程之間的兩種通信通道:隊列和管道
1、隊列(Quene)
class multiprocessing.
Queue
([maxsize])
返回一個使用一個管道和少量鎖和信號量實現的共享隊列實例。當一個進程將一個對象放進隊列中時,一個寫入線程會啟動並將對象從緩沖區寫入管道中。
一旦超時,將拋出標准庫 queue
模塊中常見的異常 queue.Empty
和 queue.Full
。
方法:
qsize
()-----返回隊列大致長度,返回數字不可靠
empty
()-----如果隊列是空的,返回 True
,反之返回 False。狀態不可靠
full()-----如果隊列是滿的,返回 True
,反之返回 False
。狀態不可靠
put
(obj[, block[, timeout]])
將 obj 放入隊列。如果可選參數 block 是 True
(默認值) 而且 timeout 是 None
(默認值), 將會阻塞當前進程,直到有空的緩沖槽。如果 timeout 是正數,將會在阻塞了最多 timeout 秒之后還是沒有可用的緩沖槽時拋出 queue.Full
異常。反之 (block 是 False
時),僅當有可用緩沖槽時才放入對象,否則拋出 queue.Full
異常 (在這種情形下 timeout 參數會被忽略)。
-
put_nowait
(obj)-----相當於put(obj, False)
。
-
get
([block[, timeout]]) -
從隊列中取出並返回對象。如果可選參數 block 是
True
(默認值) 而且 timeout 是None
(默認值), 將會阻塞當前進程,直到隊列中出現可用的對象。如果 timeout 是正數,將會在阻塞了最多 timeout 秒之后還是沒有可用的對象時拋出queue.Empty
異常。反之 (block 是False
時),僅當有可用對象能夠取出時返回,否則拋出queue.Empty
異常 (在這種情形下 timeout 參數會被忽略)。在 3.8 版更改: 如果隊列已經關閉,會拋出
ValueError
而不是OSError
。
-
get_nowait
()-----相當於get(False)
。
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
隊列是線程和進程安全的。
2、管道(Pipe)
multiprocessing.
Pipe
([duplex])
返回一對 Connection
對象 (conn1, conn2)
, 分別表示管道的兩端。
如果 duplex 被置為 True
(默認值),那么該管道是雙向的。如果 duplex 被置為 False
,那么該管道是單向的,即 conn1
只能用於接收消息,而 conn2
僅能用於發送消息。
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
返回的兩個連接對象 Pipe()
表示管道的兩端。每個連接對象都有 send()
和 recv()
方法(相互之間的)。
請注意,如果兩個進程(或線程)同時嘗試讀取或寫入管道的 同一 端,則管道中的數據可能會損壞。
當然,在不同進程中同時使用管道的不同端的情況下不存在損壞的風險。
三、進程間同步
對於所有在 threading
存在的同步原語,multiprocessing
中都有類似的等價物。例如,可以使用鎖來確保一次只有一個進程打印到標准輸出:
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
不使用鎖的情況下,來自於多進程的輸出很容易產生混淆。
四、服務進程
由 Manager()
返回的管理器對象控制一個服務進程,該進程保存Python對象並允許其他進程使用代理操作它們。
Manager()
返回的管理器支持類型: list
、 dict
、 Namespace
、 Lock
、 RLock
、 Semaphore
、 BoundedSemaphore
、 Condition
、 Event
、 Barrier
、 Queue
、 Value
和 Array
。
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
將打印
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
使用服務進程的管理器比使用共享內存對象更靈活,因為它們可以支持任意對象類型。此外,單個管理器可以通過網絡由不同計算機上的進程共享。但是,它們比使用共享內存慢
五、使用工作進程
Pool
類表示一個工作進程池。它具有允許以幾種不同方式將任務分配到工作進程的方法。
from multiprocessing import Pool, TimeoutError import time import os def f(x): return x*x if __name__ == '__main__': # start 4 worker processes with Pool(processes=4) as pool: # print "[0, 1, 4,..., 81]" print(pool.map(f, range(10))) # print same numbers in arbitrary order for i in pool.imap_unordered(f, range(10)): print(i) # evaluate "f(20)" asynchronously res = pool.apply_async(f, (20,)) # runs in *only* one process print(res.get(timeout=1)) # prints "400" # evaluate "os.getpid()" asynchronously res = pool.apply_async(os.getpid, ()) # runs in *only* one process print(res.get(timeout=1)) # prints the PID of that process # launching multiple evaluations asynchronously *may* use more processes multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) # make a single worker sleep for 10 secs res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("We lacked patience and got a multiprocessing.TimeoutError") print("For the moment, the pool remains available for more work") # exiting the 'with'-block has stopped the pool print("Now the pool is closed and no longer available")
請注意,進程池的方法只能由創建它的進程使用。
六、連接(Connection)對象
class multiprocessing.connection.
Connection
-
send
(obj)-----將一個對象發送到連接的另一端,可以用recv()
讀取。 -
recv
()------返回一個由另一端使用send()
發送的對象。 -
fileno
()-----返回由連接對象使用的描述符或者句柄。 -
close
()-----關閉連接對象。 -
poll
([timeout])-----返回連接對象中是否有可以讀取的數據。 -
send_bytes
(buffer[, offset[, size]])-----從一個 bytes-like object (字節類對象)對象中取出字節數組並作為一條完整消息發送。 -
recv_bytes
([maxlength])-----以字符串形式返回一條從連接對象另一端發送過來的字節數據。此方法在接收到數據前將一直阻塞。 -
ecv_bytes_into
(buffer[, offset])-----將一條完整的字節數據消息讀入 buffer 中並返回消息的字節數。 此方法在接收到數據前將一直阻塞。 -
七、class
multiprocessing.
Lock一旦一個進程或者線程拿到了鎖,后續的任何其他進程或線程的其他請求都會被阻塞直到鎖被釋放。
-
方法:acquire
(block=True, timeout=None)-----獲得鎖,阻塞或非阻塞的。
如果 block 參數被設為 True
( 默認值 ) , 對該方法的調用在鎖處於釋放狀態之前都會阻塞,然后將鎖設置為鎖住狀態並返回 True
。
如果 block 參數被設置成 False
,方法的調用將不會阻塞。 如果鎖當前處於鎖住狀態,將返回 False
; 否則將鎖設置成鎖住狀態,並返回 True
。
方法:release
()---釋放鎖,可以在任何進程、線程使用,並不限於鎖的擁有者。
八、管理器--管理器提供了一種創建共享數據的方法,從而可以在不同進程中共享,甚至可以通過網絡跨機器共享數據。管理器維護一個用於管理 共享對象 的服務。其他進程可以通過代理訪問這些共享對象。
1、multiprocessing.
Manager
()---返回一個已啟動的 SyncManager
管理器對象,這個對象可以用於在不同進程中共享數據。返回的管理器對象對應了一個已經啟動的子進程,並且擁有一系列方法可以用於創建共享對象、返回對應的代理。
2、class multiprocessing.managers.
BaseManager
([address[, authkey]])---創建一個 BaseManager 對象。
一旦創建,應該及時調用 start()
或者 get_server().serve_forever()
以確保管理器對象對應的管理進程已經啟動。
address 是管理器服務進程監聽的地址。如果 address 是 None
,則允許和任意主機的請求建立連接。
authkey 是認證標識,用於檢查連接服務進程的請求合法性。如果 authkey 是 None
, 則會使用 current_process().authkey
, 否則,就使用 authkey , 需要保證它必須是 byte 類型的字符串。
方法:tart
([initializer[, initargs]])-----為管理器開啟一個子進程,如果 initializer 不是 None
, 子進程在啟動時將會調用 initializer(*initargs)
方法:get_server
()----返回一個 Server
對象,它是管理器在后台控制的真實的服務。 Server
對象擁有 serve_forever()
方法。
方法:connect
()----將本地管理器對象連接到一個遠程管理器進程:
方法:shutdown
()-----停止管理器的進程。這個方法只能用於已經使用 start()
啟動的服務進程。它可以被多次調用。
方法:register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])-----一個 classmethod,可以將一個類型或者可調用對象注冊到管理器類。
屬性:address----
管理器所用的地址。
九、進程池------可以創建一個進程池,它將使用 Pool
類執行提交給它的任務。
class multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
一個進程池對象,它控制可以提交作業的工作進程池。它支持帶有超時和回調的異步結果,以及一個並行的 map 實現。
map
(func, iterable[, chunksize])-----內置 map()
函數的並行版本 (但它只支持一個 iterable 參數,對於多個可迭代對象請參閱 starmap()
)。 它會保持阻塞直到獲得結果。