python自帶庫--multiprocessing庫


multiprocessing模塊介紹

multiprocessing 是一個支持使用與 threading 模塊類似的 API 來產生進程的包。 multiprocessing 包同時提供了本地和遠程並發操作,通過使用子進程而非線程有效地繞過了 全局解釋器鎖。 因此,multiprocessing 模塊允許程序員充分利用給定機器上的多個處理器。 它在 Unix 和 Windows 上均可運行。

multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

一、Process類的介紹

在 multiprocessing 中,通過創建一個 Process 對象然后調用它的 start() 方法來生成進程

簡單實例:

from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()

創建進程的類:

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,可用來開啟一個子進程

強調:

1. 需要使用關鍵字的方式來指定參數 

2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:

group參數未使用,值始終為None

target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'mike',)

kwargs表示調用對象的字典,kwargs={'name':'mike','age':18}

name為子進程的名稱

方法介紹:

p.start() :# 啟動進程,並調用該子進程中的p.run() 

p.run() :# 進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法

p.terminate() : # 強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖

p.is_alive() :# 如果p仍然運行,返回True

p.join([timeout]) :# 主進程等待p終止(強調:是主進程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間。

屬性介紹

p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置

p.name:進程的名稱

p.pid:進程的pid

二、在進程之間交換對象

multiprocessing 支持進程之間的兩種通信通道:隊列和管道

1、隊列(Quene)

class multiprocessing.Queue([maxsize])

返回一個使用一個管道和少量鎖和信號量實現的共享隊列實例。當一個進程將一個對象放進隊列中時,一個寫入線程會啟動並將對象從緩沖區寫入管道中。

一旦超時,將拋出標准庫 queue 模塊中常見的異常 queue.Empty 和 queue.Full

方法:

qsize()-----返回隊列大致長度,返回數字不可靠

empty()-----如果隊列是空的,返回 True ,反之返回 False。狀態不可靠

full()-----如果隊列是滿的,返回 True ,反之返回 False 。狀態不可靠

put(obj[, block[, timeout]])

將 obj 放入隊列。如果可選參數 block 是 True (默認值) 而且 timeout 是 None (默認值), 將會阻塞當前進程,直到有空的緩沖槽。如果 timeout 是正數,將會在阻塞了最多 timeout 秒之后還是沒有可用的緩沖槽時拋出 queue.Full  異常。反之 (block 是 False 時),僅當有可用緩沖槽時才放入對象,否則拋出 queue.Full 異常 (在這種情形下 timeout 參數會被忽略)。

put_nowait (obj)-----相當於  put(obj, False)
get ([block[, timeout]])

從隊列中取出並返回對象。如果可選參數 block 是 True (默認值) 而且 timeout 是 None (默認值), 將會阻塞當前進程,直到隊列中出現可用的對象。如果 timeout 是正數,將會在阻塞了最多 timeout 秒之后還是沒有可用的對象時拋出 queue.Empty 異常。反之 (block 是 False 時),僅當有可用對象能夠取出時返回,否則拋出 queue.Empty 異常 (在這種情形下 timeout 參數會被忽略)。

在 3.8 版更改: 如果隊列已經關閉,會拋出 ValueError 而不是 OSError 。

get_nowait ()-----相當於  get(False)
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join() 

    隊列是線程和進程安全的。

2、管道(Pipe)

multiprocessing.Pipe([duplex])

返回一對 Connection 對象 (conn1, conn2) , 分別表示管道的兩端。

如果 duplex 被置為 True (默認值),那么該管道是雙向的。如果 duplex 被置為 False ,那么該管道是單向的,即 conn1 只能用於接收消息,而 conn2 僅能用於發送消息。

from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
返回的兩個連接對象 Pipe() 表示管道的兩端。每個連接對象都有 send() 和 recv() 方法(相互之間的)。
請注意,如果兩個進程(或線程)同時嘗試讀取或寫入管道
的 同一 端,則管道中的數據可能會損壞。
當然,在不同進程中同時使用管道的不同端的情況下不存在損壞的風險。

三、進程間同步

對於所有在 threading 存在的同步原語,multiprocessing 中都有類似的等價物。例如,可以使用鎖來確保一次只有一個進程打印到標准輸出:

from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start() 

不使用鎖的情況下,來自於多進程的輸出很容易產生混淆。

四、服務進程

由 Manager() 返回的管理器對象控制一個服務進程,該進程保存Python對象並允許其他進程使用代理操作它們。

Manager() 返回的管理器支持類型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。

from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l) 

將打印

{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] 

使用服務進程的管理器比使用共享內存對象更靈活,因為它們可以支持任意對象類型。此外,單個管理器可以通過網絡由不同計算機上的進程共享。但是,它們比使用共享內存慢

五、使用工作進程

Pool 類表示一個工作進程池。它具有允許以幾種不同方式將任務分配到工作進程的方法。

from multiprocessing import Pool, TimeoutError import time import os def f(x): return x*x if __name__ == '__main__': # start 4 worker processes with Pool(processes=4) as pool: # print "[0, 1, 4,..., 81]" print(pool.map(f, range(10))) # print same numbers in arbitrary order for i in pool.imap_unordered(f, range(10)): print(i) # evaluate "f(20)" asynchronously res = pool.apply_async(f, (20,)) # runs in *only* one process print(res.get(timeout=1)) # prints "400" # evaluate "os.getpid()" asynchronously res = pool.apply_async(os.getpid, ()) # runs in *only* one process print(res.get(timeout=1)) # prints the PID of that process # launching multiple evaluations asynchronously *may* use more processes multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) # make a single worker sleep for 10 secs res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("We lacked patience and got a multiprocessing.TimeoutError") print("For the moment, the pool remains available for more work") # exiting the 'with'-block has stopped the pool print("Now the pool is closed and no longer available") 

請注意,進程池的方法只能由創建它的進程使用。

六、連接(Connection)對象

class multiprocessing.connection.Connection

send (obj)-----將一個對象發送到連接的另一端,可以用  recv() 讀取。
recv ()------返回一個由另一端使用  send() 發送的對象。
fileno ()-----返回由連接對象使用的描述符或者句柄。
close ()-----關閉連接對象。
poll ([timeout])-----返回連接對象中是否有可以讀取的數據。
send_bytes (buffer[, offset[, size]])-----從一個  bytes-like object  (字節類對象)對象中取出字節數組並作為一條完整消息發送。
recv_bytes ([maxlength])-----以字符串形式返回一條從連接對象另一端發送過來的字節數據。此方法在接收到數據前將一直阻塞。
ecv_bytes_into (buffer[, offset])-----將一條完整的字節數據消息讀入  buffer 中並返回消息的字節數。 此方法在接收到數據前將一直阻塞。
七、class multiprocessing.Lock一旦一個進程或者線程拿到了鎖,后續的任何其他進程或線程的其他請求都會被阻塞直到鎖被釋放。
方法:acquire (block=Truetimeout=None)-----獲得鎖,阻塞或非阻塞的。

如果 block 參數被設為 True ( 默認值 ) , 對該方法的調用在鎖處於釋放狀態之前都會阻塞,然后將鎖設置為鎖住狀態並返回 True 。

如果 block 參數被設置成 False ,方法的調用將不會阻塞。 如果鎖當前處於鎖住狀態,將返回 False ; 否則將鎖設置成鎖住狀態,並返回 True 。

方法:release()---釋放鎖,可以在任何進程、線程使用,並不限於鎖的擁有者。

八、管理器--管理器提供了一種創建共享數據的方法,從而可以在不同進程中共享,甚至可以通過網絡跨機器共享數據。管理器維護一個用於管理 共享對象 的服務。其他進程可以通過代理訪問這些共享對象。

1、multiprocessing.Manager()---返回一個已啟動的 SyncManager 管理器對象,這個對象可以用於在不同進程中共享數據。返回的管理器對象對應了一個已經啟動的子進程,並且擁有一系列方法可以用於創建共享對象、返回對應的代理。

2、class multiprocessing.managers.BaseManager([address[, authkey]])---創建一個 BaseManager 對象。

一旦創建,應該及時調用 start() 或者 get_server().serve_forever() 以確保管理器對象對應的管理進程已經啟動。

address 是管理器服務進程監聽的地址。如果 address 是 None ,則允許和任意主機的請求建立連接。

authkey 是認證標識,用於檢查連接服務進程的請求合法性。如果 authkey 是 None, 則會使用 current_process().authkey , 否則,就使用 authkey , 需要保證它必須是 byte 類型的字符串。

方法:tart([initializer[, initargs]])-----為管理器開啟一個子進程,如果 initializer 不是 None , 子進程在啟動時將會調用 initializer(*initargs)

方法:get_server()----返回一個 Server  對象,它是管理器在后台控制的真實的服務。 Server  對象擁有 serve_forever() 方法。

方法:connect()----將本地管理器對象連接到一個遠程管理器進程:

方法:shutdown()-----停止管理器的進程。這個方法只能用於已經使用 start() 啟動的服務進程。它可以被多次調用。

方法:register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])-----一個 classmethod,可以將一個類型或者可調用對象注冊到管理器類。

屬性:address----管理器所用的地址。

九、進程池------可以創建一個進程池,它將使用 Pool 類執行提交給它的任務。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一個進程池對象,它控制可以提交作業的工作進程池。它支持帶有超時和回調的異步結果,以及一個並行的 map 實現。

map(funciterable[, chunksize])-----內置 map() 函數的並行版本 (但它只支持一個 iterable 參數,對於多個可迭代對象請參閱 starmap())。 它會保持阻塞直到獲得結果。

 

參考文檔:https://docs.python.org/zh-cn/3/library/multiprocessing.html#multiprocessing.managers.BaseManager.connect


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM