(一)進程鎖
搶票的例子:
# -*- coding:utf-8 -*- from multiprocessing import Process, Lock import time import json count = {'count': 1} # 僅剩最后一張票 with open('db.txt', 'w', encoding='utf-8') as f: json.dump(count, f) # 返回剩余票數 def search(): dic = json.load(open('db.txt')) print('剩余票數%s' % dic['count']) return dic def get_ticket(dic): time.sleep(0.1) # 模擬讀數據的網絡延遲 if dic['count'] > 0: dic['count'] -= 1 time.sleep(0.2) # 模擬寫數據的網絡延遲 json.dump(dic, open('db.txt', 'w')) print('購票成功,剩余:{}'.format(dic['count'])) else: print('搶票失敗,去邀請好友助力!') def ticket_purchase(lock, i): print('第{}個用戶'.format(i)) # lock.acquire() get_ticket(search()) # lock.release() if __name__ == '__main__': lock = Lock() for i in range(10): # 模擬並發10個客戶端搶票 p = Process(target=ticket_purchase, args=(lock, i + 1)) p.start()
結果:
第6個用戶
剩余票數1
第4個用戶
剩余票數1
第7個用戶
剩余票數1
第1個用戶
剩余票數1
第10個用戶
剩余票數1
第3個用戶
剩余票數1
第5個用戶
剩余票數1
第8個用戶
剩余票數1
第2個用戶
剩余票數1
第9個用戶
剩余票數1
購票成功,剩余:0
購票成功,剩余:0
購票成功,剩余:0
購票成功,剩余:0
購票成功,剩余:0
購票成功,剩余:0
購票成功,剩余:0
購票成功,剩余:0
購票成功,剩余:0
購票成功,剩余:0
十個用戶會同時把票搶走,因為每次search同一時間能查到只有一個票
multipleprocessing.Lock
- 非遞歸的鎖定對象,非常類似threading.Lock.一旦進程或線程獲得了鎖,后續嘗試從任何進程或線程獲取它,將被阻塞直到被釋放; 任何進程或線程都可以釋放它。
- Lock支持上下文管理協議,可以在with中使用。
acquire(block=True, timeout=None)
- 獲取一個鎖,阻塞(block=True)或不阻塞(block=False)
- 當block設置為True的時候(默認設為True),如果鎖處於鎖定狀態,調用該方法會阻塞,直到鎖被釋放;然后將鎖設置為鎖定狀態,並返回True。
- 當block設置為False的時候,調用該方法不會阻塞。如果鎖處於鎖定狀態,則返回False,否則將鎖設置為鎖定狀態,並返回True。
- 當timeout為正數時,只要無法獲取鎖,最多阻塞超時指定的秒數。超時值為負值相當於超時值為零。超時值為“None”(默認值)的調用將超時時間設置為無限。請注意,超時的負值或無值的處理方式與threading.lock.acquire()中實現的行為不同。如果block參數設置為false,超時參數沒有實際意義,因此會忽略timeout參數,。如果獲取了鎖,則返回true;如果過了超時時間,則返回false。
release()
- 解鎖,可以從任何進程或線程調用,而不僅僅是最初獲取鎖的進程或線程。
- 大部分行為和threading.lock.release()相同,但在未鎖定狀態時調用引發ValueError 【后者引發RuntimeError】
加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
如果搶票步驟沒有加鎖,那么可能會有幾個人同時把票搶走,因為每次search都能查到有一個票,加了鎖以后只能一個一個搶
加鎖:
# -*- coding:utf-8 -*- from multiprocessing import Process, Lock import time import json count = {'count': 1} # 僅剩最后一張票 with open('db.txt', 'w', encoding='utf-8') as f: json.dump(count, f) # 返回剩余票數 def search(): dic = json.load(open('db.txt')) print('剩余票數%s' % dic['count']) return dic def get_ticket(dic): time.sleep(0.1) # 模擬讀數據的網絡延遲 if dic['count'] > 0: dic['count'] -= 1 time.sleep(0.2) # 模擬寫數據的網絡延遲 json.dump(dic, open('db.txt', 'w')) print('購票成功,剩余:{}'.format(dic['count'])) else: print('搶票失敗,去邀請好友助力!') def ticket_purchase(lock, i): print('第{}個用戶'.format(i)) lock.acquire() get_ticket(search()) lock.release() if __name__ == '__main__': lock = Lock() for i in range(10): # 模擬並發10個客戶端搶票 p = Process(target=ticket_purchase, args=(lock, i + 1)) p.start()
結果:
第2個用戶
剩余票數1
第1個用戶
第9個用戶
第10個用戶
第5個用戶
第7個用戶
第8個用戶
第3個用戶
第6個用戶
第4個用戶
購票成功,剩余:0
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
從結果可以看出,並不是手速最快的才能搶到
multiprocessing.RLock
- 遞歸的鎖對象,必須由獲取它的進程或線程釋放遞歸鎖。一旦進程或線程獲得了遞歸鎖定,相同的進程或線程就可以再次獲取它而不會阻塞; 該進程或線程獲取和釋放鎖的次數必須相等。
RLock
支持上下文管理器協議,因此可以在with
語句中使用。
acquire(block=True, timeout=None)
- 獲取一個鎖,阻塞(block=True)或不阻塞(block=False)
- 當在block參數設置為True的情況下調用時,除非該鎖已由當前進程或線程擁有,否則將一直阻塞到該鎖處於未鎖定狀態(此時它不屬於任何進程或線程)。然后,當前進程或線程取得鎖的所有權(如果它還沒有所有權),鎖內的遞歸級別將增加一,返回true。
- 當block設置為False時,調用時不阻塞。如果鎖已經被另一個進程或線程獲取,則當前進程或線程不占用所有權,並且鎖中的遞歸級別不會更改,返回值為False; 如果鎖處於解鎖狀態,則當前進程或線程將獲得所有權,遞歸級別將遞增,返回True.
release()
- 釋放鎖,遞減遞歸級別。如果在遞減遞歸級別為零之后,將鎖重置為解鎖狀態(不由任何進程或線程擁有),並且如果任何其他進程或線程正在阻塞,等待解鎖,僅允許其中一個繼續執行。如果在遞減之后遞歸級別仍然非零,則鎖保持鎖定並仍由調用進程或線程擁有。
- 只有在調用進程或線程擁有鎖時才調用此方法。如果此方法由所有者以外的進程或線程調用,或者鎖處於未鎖定(無主)狀態,則引發
AssertionError
。請注意,在這種情況下引發的異常類型與threading.rlock.release()中實現的行為不同 【后者引發RuntimeError
】。
若你的線程處理中會有一些比較復雜的代碼邏輯過程,比如很多層的函數調用,而這些函數其實都需要進行加鎖保護數據訪問。這樣就可能會反復的多次加鎖,因而用RLock就可以進行多次加鎖,解鎖,直到最終鎖被釋放,而如果用普通的lock,當你一個函數A已經加鎖,它內部調用另一個函數B,如果B內部也會對同一個鎖加鎖,那么這種情況就也會導致死鎖。
(二)信號量
multiprocessing.Semaphore
- 信號量對象:近似的類比
threading.Semaphore
此類實現信號量對象。信號量管理表示release()調用數減去acquire()調用數再加上原子計數器的初始值。當計數器為0時,acquire()方法將一直阻塞,直到它可以返回而不使計數器為負為止。如果未給定,則值默認為1。
- 可選參數提供內部計數器的初始值;默認值為1。如果給定的值小於0,則會引發ValueError。
- 其
acquire
方法的第一個參數被命名為block,和multiprocessing.Lock.acquire() 一致
acquire(block=True,timeout=None)
- 獲取信號量。
- 在不帶參數的情況下調用:
-
如果內部計數器大於零,則將其遞減1並立即返回true。
-
如果內部計數器為零,則阻塞直到調用
release()喚醒
。一旦喚醒(計數器大於0),將計數器遞減1並返回true。每次調用release()都會喚醒一個進程。不應依賴進程被喚醒的順序。
- 當block設置為false 時,不阻塞。如果此時調用沒有參數acquire()會阻塞(即此時計數器為0),則立即返回false; 否則,執行與不帶參數調用時相同的操作,並返回true。
- 當timeout是None以外的值,它將最多阻止timeout秒。如果在該時間間隔內(計數器一直為0),則返回false。否則返回true。
release()
- 釋放信號量,將內部計數器遞增1。當計數器為零並且另一個進程正在等待它再次大於零時喚醒該進程。
【acquire()和release()不一定一對一,是否阻塞要取決於計數器的值】
# -*- coding:utf-8 -*- from multiprocessing import Semaphore, Process import time import random def enter_room(smp, i): if smp.acquire(block=True, timeout=random.randint(1, 3)): # 超時還未獲取,返回false,反之返回True print('用戶%d進入了房間' % i) time.sleep(1) smp.release() print('用戶%d離開了房間' % i) else: print('等太久,走人') if __name__ == '__main__': smp = Semaphore(2) for i in range(10): p = Process(target=enter_room, args=(smp, i)) p.start()
結果:
用戶5進入了房間
用戶8進入了房間
用戶5離開了房間
用戶0進入了房間
用戶8離開了房間用戶9進入了房間
等太久,走人
用戶0離開了房間
用戶2進入了房間
用戶9離開了房間
用戶4進入了房間
等太久,走人
等太久,走人
用戶2離開了房間用戶6進入了房間
用戶4離開了房間
用戶6離開了房間
(三)事件
multiprocessing.Event
- 克隆
threading.Event
- 這是進程之間通信的最簡單機制之一:一個進程發出事件信號,其他進程等待它。
- 事件對象管理一個內部flag,該標志可以使用
set()
方法設置為true,並使用clear()方法重置為false 。flag = False,wait()
方法將阻塞,直到該flag為True。flag初始值是Flase。
is_set()
- 當且僅當內部標志為真時返回true。
set()
- 將內部標志設置為true。一旦標志為真,調用wait()的線程將不阻塞。
clear()
- 將內部標志重置為false。隨后,調用 wait的進程
將阻塞,直到set()被調用以再次將內部標志設置為true。
wait(timeout=None)
- 阻塞直到內部flag為True: 如果調用時flag就為True,則立即返回, 否則,直到另一個進程調用set()將內部flag設置為True,或者阻塞超時。
- 當timeout不為None時,它指定超時的時間(單位:秒)
- 除非給出超時參數並且阻塞超時返回False,其它情況皆為True
紅綠燈:
# -*- coding:utf-8 -*- from multiprocessing import Event, Process, Lock import time # 紅綠燈 def light(e): while 1: if e.is_set(): # 為True,flag為True print('紅燈') e.clear() # 重置為False,調用wait()的進程阻塞 time.sleep(5) else: print('綠燈') e.set() time.sleep(5) def car(e, i, l): while 1: l.acquire() # 先獲取鎖,確認下一輛通行的車 e.wait() # 紅燈停,綠燈行 print('奔馳{}以兩秒的時間飄過'.format(i)) time.sleep(2) l.release() if __name__ == '__main__': e = Event() l = Lock() p = Process(target=light, args=(e,)) p.start() for i in range(5): # 5輛車 p = Process(target=car, args=(e, i, l)) p.start()
結果:
綠燈
奔馳4以兩秒的時間飄過
奔馳2以兩秒的時間飄過
奔馳1以兩秒的時間飄過
紅燈
綠燈
奔馳3以兩秒的時間飄過
奔馳0以兩秒的時間飄過
奔馳4以兩秒的時間飄過
紅燈
闖紅燈的例子:
# -*- coding:utf-8 -*- from multiprocessing import Event, Process, Lock import time import random # 紅綠燈 def light(e): while 1: if e.is_set(): # 為True,flag為True print('紅燈') e.clear() # 重置為False,調用wait()的進程阻塞 time.sleep(5) else: print('綠燈') e.set() time.sleep(5) def car(e, i, l): while 1: l.acquire() # 先獲取鎖,確認下一輛通行的車,如果沒有鎖那么就同時過紅綠燈 if e.wait(random.randint(0, 3)): # 紅燈停,綠燈行 print('奔馳{}以兩秒的時間飄過'.format(i)) else: print('奔馳{}闖紅燈以兩秒的時間飄過'.format(i)) time.sleep(2) l.release() if __name__ == '__main__': e = Event() l = Lock() p = Process(target=light, args=(e,)) p.start() for i in range(5): # 5輛車 p = Process(target=car, args=(e, i, l)) p.start()
結果:
綠燈
奔馳4以兩秒的時間飄過
奔馳2以兩秒的時間飄過
奔馳1以兩秒的時間飄過
紅燈
奔馳3闖紅燈以兩秒的時間飄過
綠燈
奔馳0以兩秒的時間飄過
奔馳4以兩秒的時間飄過
紅燈
奔馳2闖紅燈以兩秒的時間飄過
奔馳1闖紅燈以兩秒的時間飄過
(四)管道
multipleprocessing.Pipe([duplex])
- 返回一對的 代表的配管的端部的對象。
(conn1, conn2)
Connection.
- 如果duplex是
True(
默認值),則管道是雙向的。如果duplex是False,
管道是單向的:conn1
只能用於接收消息,conn2
只能用於發送消息 - multipleprocessing.connection.Connection
send(obj)
- 將對象發送到應該使用的連接的另一端
recv()
。 - 該對象必須是可選擇的。非常大的pickle可能引發
ValueError
異常。(大約32 MiB +,雖然它取決於操作系統)
recv()
- 使用返回從連接另一端發送的對象
send()
。阻塞直到有東西要收到。如果沒有什么留下來接收,而另一端被關閉。拋出EOFError
fileno()
- 返回連接使用的文件描述符或句柄。
close()
- 關閉連接。
- 當連接被垃圾收集時,會自動調用此方法。
poll([timeout])
- 查詢是否有可供讀取的數據。
- 未指定timeout,立即返回,如果timeout是一個數字,則阻塞timeout時間(單位:秒),如果是None,一直阻塞。
【若另一端已關閉,則觸發BrokenPipeError異常】
send_bytes(buffer[, offset[, size]])
- 從類似bytes的對象中發送字節數據作為完整的消息。
- 如果給出offset,則從buffer中的該位置讀取數據。如果給出size,則將從緩沖區中讀取多個字節。非常大的緩沖區可能會引發
ValueError
異常(大約32 MiB +,取決於操作系統)
recv_bytes([maxlength])
- 返回從連接另一端發送的字節數據的完整消息。直到有數據要接收。如果沒有要接收的內容,並且另一端已關閉,則引發EOFError
recv_bytes_into(buffer[, offset])
- 將連接另一端發送的字節數據的完整消息讀入緩沖區,並返回消息中的字節數。直到有數據要接收。如果沒有要接收的內容並且另一端已關閉,則引發EOFError。
- 緩沖區必須是與可寫的和bytes類似的對象。如果給出了偏移量,那么消息將從該位置寫入緩沖區。偏移量必須是小於緩沖區長度的非負整數(以字節為單位)。
- 如果緩沖區太短,則會引發BufferToSort異常,發送的完整消息可從e.args[0]獲取,其中e是異常實例。
生產消費者模型:
傳輸字符串數據:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print('{}生產第{}包子'.format(name, i + 1)) right.send('包子') time.sleep(1) right.close() def consume(left, name): while 1: try: goods = left.recv() print('{}消費了一個{}'.format(name, goods)) except EOFError: # 關閉另一端,由recv觸發此異常 left.close() break if __name__ == '__main__': left, right = Pipe() print('文件描述符:{}'.format(left.fileno())) p = Process(target=produce, args=(right, '生產者1')) # 生產 c = Process(target=consume, args=(left, '消費者1')) # 消費 p.start() c.start() right.close() # 關閉多余的兩端 left.close()
結果:
文件描述符:436
生產者1生產第1包子
消費者1消費了一個包子
生產者1生產第2包子
消費者1消費了一個包子
生產者1生產第3包子
消費者1消費了一個包子
多個消費者:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print('{}生產第{}包子'.format(name, i + 1)) right.send('包子') time.sleep(1) right.close() def consume(left, name): while 1: try: goods = left.recv() print('{}消費了一個{}'.format(name, goods)) except EOFError: # 關閉另一端,由recv觸發此異常 left.close() break if __name__ == '__main__': left, right = Pipe() print('文件描述符:{}'.format(left.fileno())) p = Process(target=produce, args=(right, '生產者1')) # 生產 c1 = Process(target=consume, args=(left, '消費者1')) # 消費 c2 = Process(target=consume, args=(left, '消費者2')) # 消費 c3 = Process(target=consume, args=(left, '消費者3')) # 消費 p.start() c1.start() c2.start() c3.start() right.close() # 關閉多余的兩端 left.close()
結果:
文件描述符:432
生產者1生產第1包子
消費者2消費了一個包子
生產者1生產第2包子
消費者2消費了一個包子
生產者1生產第3包子
消費者3消費了一個包子
請注意,如果兩個進程(或線程)同時嘗試讀取或寫入管道的同一端,則管道中的數據可能會損壞。當然,同時使用管道的不同端的進程不存在損壞的風險。
傳輸字節:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print('{}生產第{}包子'.format(name, i + 1)) right.send_bytes('包子'.encode()) time.sleep(1) right.send_bytes('包子包子包子'.encode()) right.close() def consume(left, name): while 1: try: byte_content = bytearray(10) bytes_size = left.recv_bytes_into(byte_content) print('{}消費了一個{}'.format(name, byte_content.decode())) print('接收了{}個數據'.format(bytes_size)) except EOFError: # 關閉另一端,由recv觸發此異常 left.close() break except BufferTooShort as e: print('數據太長,完整數據為:{}'.format(e.args[0].decode())) if __name__ == '__main__': left, right = Pipe() print('文件描述符:{}'.format(left.fileno())) p = Process(target=produce, args=(right, '生產者1')) # 生產 c = Process(target=consume, args=(left, '消費者1')) # 消費 p.start() c.start() right.close() # 關閉多余的兩端 left.close()
結果:
文件描述符:476
生產者1生產第1包子
消費者1消費了一個包子
接收了6個數據
生產者1生產第2包子
消費者1消費了一個包子
接收了6個數據
生產者1生產第3包子
消費者1消費了一個包子
接收了6個數據
數據太長,完整數據為:包子包子包子
奇怪的poll(),分析下面兩個代碼結果:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print('{}生產第{}包子'.format(name, i + 1)) right.send('包子') time.sleep(3) right.close() print('right已關閉') def consume(left, name): while 1: try: print('poll阻塞') print('是否有可供讀取的數據:{}'.format(left.poll(None))) goods = left.recv() print('{}消費了一個{}'.format(name, goods)) except EOFError: # 已關閉另一端,由recv觸發此異常 left.close() break if __name__ == '__main__': left, right = Pipe() print('文件描述符:{}'.format(left.fileno())) p = Process(target=produce, args=(right, '生產者1')) # 生產 c = Process(target=consume, args=(left, '消費者1')) # 消費 p.start() c.start() right.close() # 關閉多余的兩端 left.close()
結果:
文件描述符:544
poll阻塞
生產者1生產第1包子
生產者1生產第2包子
生產者1生產第3包子
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
right已關閉
是否有可供讀取的數據:True
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print('{}生產第{}包子'.format(name, i + 1)) right.send('包子') # time.sleep(3) right.close() print('right已關閉') def consume(left, name): while 1: try: print('poll阻塞') print('是否有可供讀取的數據:{}'.format(left.poll(None))) goods = left.recv() print('{}消費了一個{}'.format(name, goods)) except EOFError: # 已關閉另一端,由recv觸發此異常 left.close() break if __name__ == '__main__': left, right = Pipe() print('文件描述符:{}'.format(left.fileno())) p = Process(target=produce, args=(right, '生產者1')) # 生產 c = Process(target=consume, args=(left, '消費者1')) # 消費 p.start() c.start() right.close() # 關閉多余的兩端 left.close()
結果:
文件描述符:440
生產者1生產第1包子
生產者1生產第2包子
生產者1生產第3包子
right已關閉
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
Process Process-2:
Traceback (most recent call last):
......
BrokenPipeError: [WinError 109] 管道已結束。
第四次循環poll(None)的執行若先於管道的right端關閉代碼right.close()的執行,poll(None)返回True,並以recv引發的異常結束。反之,poll(None)引發BrokenPipeError異常
(五)隊列
隊列是線程和進程安全的。
multiprocessing.Queue
- 返回使用管道和一些鎖/信號量實現的進程共享隊列。當一個進程第一次將一個項目放入隊列時,就會啟動一個feeder線程,它將對象從緩沖區傳輸到管道中。
- 通常的queue.empty和queue.full異常從標准庫的queue模塊引發,以發出信號超時。
- queue實現queue.queue的所有方法,但task_done()和join()除外。
qsize()
- 返回隊列的大致大小。這個數字不可靠.
empty()
- 為空返回True,否則返回False。這個數字不可靠
full()
- 隊列已滿返回True,否則返回False。這個數字不可靠
put(obj[, block[, timeout]])
- 將obj放入隊列。如果可選參數block為“True”(默認值),timeout為“None”(默認值),則根據需要阻塞,直到插槽可用。如果超時為正數,則最多會阻塞timeout秒數,如果在此時間內沒有可用插槽,引發queue.Full異常。否則(block為False),如果空閑插槽立即可用,則將項目放入隊列,否則引發queue.Full異常(在這種情況下,超時將被忽略)。
put_nowait()
- 顧名思義,等效於put(obj,False)
get([block,[, timeout])
- 從隊列中移除並返回項目。如果可選參數block為True(默認值),timeout=None(默認值),則在item可用之前根據需要進行阻塞。如果超時為正數,則最多阻塞timeout秒,如果在該時間內沒有可用項,則引發queue.Empty異常。否則(block為False),如果item立即可用,則返回該項,否則引發queue.Empty異常(在這種情況下忽略超時)。
get_nowait()
- 顧名思義,等效於get(False)
close()
- 指示當前進程不會將更多數據放入此隊列。后台線程將所有緩沖數據刷新到管道后將退出。當隊列被垃圾收集時,將自動調用此函數。
join_thread()
- join()后台線程。這只能在調用close()之后使用。它將一直阻塞,直到后台線程退出,以確保緩沖區中的所有數據都已刷新到管道中。
- 默認情況下,如果進程不是隊列的創建者,那么在退出時,它將嘗試加入隊列的后台線程。進程可以調用cancel_join_thread()使join_thread()不做任何操作。
cancel_join_thread()
- 防止join_thread()阻塞。可防止后台線程在進程退出時自動join()——請參見join_thread()。
- 此方法的更好名稱可能是allow_exit_without_flush()。但它很可能會導致排隊的數據丟失,您幾乎肯定不需要使用它。只有當您需要當前進程立即退出而不等待將排隊的數據刷新到底層管道時,它才有存在的意義,而且您不關心丟失的數據。
隊列進程安全
生產消費者模型,隊列實現:
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue, JoinableQueue import os def consumer(q): while True: print('消費者進程{}等吃'.format(os.getpid())) res = q.get() if res is None: print('消費者進程{}結束'.format(os.getpid(), res)) break # 收到結束信號則結束 else: print('消費者進程{}吃了{}'.format(os.getpid(), res)) def producer(food, q): for i in range(2): q.put(food) print('生產者進程{}生產了 第{}個{}'.format(os.getpid(), i + 1, food)) print('生產者進程{}生產完成'.format(os.getpid())) if __name__ == '__main__': q = Queue() # 生產者 p1 = Process(target=producer, args=('包子', q)) p2 = Process(target=producer, args=('水果', q)) p3 = Process(target=producer, args=('米飯', q)) # 消費者 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) # 開始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # 有幾個消費者就put幾個None q.put(None) # 必須保證生產者全部生產完畢,才應該發送結束信號 q.put(None) q.put(None)
結果:
消費者進程12108等吃
消費者進程3648等吃
生產者進程19544生產了 第1個包子
生產者進程19544生產了 第2個包子
生產者進程19544生產完成
消費者進程12108吃了包子
消費者進程12108等吃
消費者進程3648吃了包子
消費者進程3648等吃
生產者進程828生產了 第1個米飯
消費者進程12108吃了米飯生產者進程828生產了 第2個米飯
消費者進程12108等吃
生產者進程828生產完成
消費者進程3648吃了米飯
消費者進程3648等吃
生產者進程20244生產了 第1個水果
消費者進程12108吃了水果生產者進程20244生產了 第2個水果
消費者進程12108等吃
生產者進程20244生產完成
消費者進程3648吃了水果
消費者進程3648等吃
消費者進程12108結束
消費者進程3648結束
由於消費者收到None才能結束,因此要注意兩個問題,None必須在隊列尾部,幾個消費者,尾部就應該有幾個None
multipleprocessing.JoinableQueue
task_done()
- 指示以前排隊的任務已完成。由隊列使用者使用。對於用於獲取任務的每個get(),對task_done()的后續調用會告訴隊列任務的處理已完成。
join()
- 阻塞,直到隊列中的所有項目都被獲取和處理。
- 每當將項目添加到隊列時,未完成任務的計數就會增加。每當使用者調用task_done()以指示已檢索到該項,並且對其進行的所有工作都已完成時,計數就會下降。當未完成任務的計數降至零時,join()將取消阻塞。
生產消費者模型,JoinableQueue實現
# -*- coding:utf-8 -*- from multiprocessing import Process,Queue, JoinableQueue import os def consumer(q): while 1: print('消費者進程{}等吃'.format(os.getpid())) res = q.get() q.task_done() # Semaphore - 1 print('消費者進程{}吃了{}'.format(os.getpid(), res)) def producer(food, q): for i in range(2): q.put(food) print('生產者進程{}生產了 第{}個{}'.format(os.getpid(), i + 1, food)) print('生產者進程{}生產完成,等待消費者消費'.format(os.getpid())) q.join() # 等待消費者進程 if __name__ == '__main__': q = JoinableQueue() # 生產者 p1 = Process(target=producer, args=('包子', q)) p2 = Process(target=producer, args=('水果', q)) p3 = Process(target=producer, args=('米飯', q)) # 消費者 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) c1.daemon = True c2.daemon = True # 開始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join()
結果:
消費者進程9952等吃
消費者進程3840等吃
生產者進程10980生產了 第1個包子
生產者進程10980生產了 第2個包子
生產者進程10980生產完成,等待消費者消費
消費者進程9952吃了包子
消費者進程9952等吃
消費者進程3840吃了包子
消費者進程3840等吃
生產者進程7452生產了 第1個水果
生產者進程18556生產了 第1個米飯
消費者進程9952吃了水果
消費者進程9952等吃
生產者進程7452生產了 第2個水果
生產者進程7452生產完成,等待消費者消費
生產者進程18556生產了 第2個米飯
生產者進程18556生產完成,等待消費者消費
消費者進程3840吃了米飯
消費者進程3840等吃
消費者進程9952吃了水果
消費者進程9952等吃
消費者進程3840吃了米飯
消費者進程3840等吃
其思路就是put之后,有個信號量計數器+1 ,每get一下調用一下taskdone,計數器就會-1。如果生產者很快生產完后,調用join,進程會等待,等到計數器為0的時候,所有調用join()的生產者會被喚醒。因此,生產者喚醒了-->意味着消費者已經消費完,消費者由於死循環還在等吃的(get阻塞)。設置消費者線程為守護線程,讓主進程隨着生產者進程的結束而結束,主進程 結束后,中止守護線程(消費者)
死鎖:
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) # q.cancel_join_thread() if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() print('join阻塞') p.join() # this deadlocks print('get阻塞') obj = queue.get() # q.cancel_join_thread()執行后,join()不阻塞,但是get()拿不到數據,數據丟失,導致阻塞 print(obj)
multiprocessing.Queue底層是基於Pipe構建的,但是數據傳遞時並不是直接寫入Pipe,而是寫入進程本地buffer,通過一個feeder線程寫入底層Pipe,
因此一次put數據很大的時候,會一直等待get()取出。沒有get()就join該進程,會導致死鎖
(六)進程池
你可以創建一個進程池,進程將使用Pool類執行提交給它的任務。
multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild[,context]]]]])
- 一個進程池對象,它控制可向其提交作業的工作進程池。它支持帶超時和回調的異步結果,並具有並行映射實現。
- processes是要使用的工作進程數。如果processes為None,則使用os.cpu_count()返回的數字。
- 如果initializer設定項不是“None”,則每個工作進程在啟動時都將調用initializer(*initargs) 。
- maxtasksperchild是工作進程在退出並用新的工作進程替換之前可以完成的任務數,以釋放未使用的資源。 默認的maxtasksperchild是None,這意味着工作進程將與池一樣長。
- context可用於指定用於啟動工作進程的上下文。 通常使用函數multiprocessing.Pool()或上下文對象的Pool()方法創建池。 在兩種情況下,上下文都是適當的。(如何指定上下文?怎么用?)
- 注意,pool對象的方法只能由創建pool的進程調用。
apply(fun[, args[, kwds]])
- 使用參數args和關鍵字參數kwds調用func。它會一直阻塞,直到結果准備就緒。apply_async() 更適合並行執行工作。此外,func只在池中的一個進程中執行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- apply()方法的一個變種,它返回一個結果對象o;o.get()獲取func返回的結果。
- 如果指定了callback,則它應該接受單個參數,並且可調用。當結果就緒時,調用callback函數,如果調用失敗,調用error_callback。
- 如果指定了error_callback,那么它應該是接受單個參數並且可調用。如果目標函數失敗,則使用異常實例調用error_callback。
- 回調應該立即完成,否則處理結果的線程將被阻塞。
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def func(n): print('i={}, pid={}'.format(n, os.getpid())) time.sleep(1) return n**2 if __name__ == '__main__': p = Pool(5) for i in range(10): # p.apply(func, (i,)) # 只在一個進程中執行,會阻塞主進程 p.apply_async(func, (i,)) # 適合並行,一下由五個進程處理五個任務,不阻塞主進程 print('主進程') p.close() p.join()
結果:
主進程
i=0, pid=6540
i=1, pid=1348
i=2, pid=17060
i=3, pid=7632
i=4, pid=7396
i=5, pid=6540
i=6, pid=1348
i=7, pid=7396
i=8, pid=7632
i=9, pid=17060
map(func, [iterable,[chunksize]])
- 相當於內置函數map,它僅支持可迭代的參數,他會阻塞,直到全部結果准備就緒。
- 此方法將iterable切換為多個塊,並將其作為單獨的任務提交給進程池。可以通過將chunksize設置為正整數來指定這些塊的(近似)大小。
- 請注意,它可能會導致非常長的iterables的高內存使用率,考慮將imap() 或 imap_unordered() 與顯式chunksize選項一起使用,以提高效率
map_async(func, iterable[, chunksize[,callback[,error_callback]]])
- map()方法的一個變種,它返回一個結果對象。
- 如果指定了回調,則它應該接受單個參數並且可調用。當結果就緒時,調用callback,如果調用失敗,error_callback被應用。
- 如果指定了error_callback,那么它應該接受單個參數並可調用。如果目標函數調用失敗,則使用異常實例調用error_callback。
- 回調應該立即完成,否則處理結果的線程將被阻塞。
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n): print('i={}, pid={}'.format(n, os.getpid())) time.sleep(1) return n if __name__ == '__main__': p = Pool(4) # result = p.map(fun, [(1, 2), (1, 2)], chunksize=1) # map阻塞主進程,結果出來后,再解除阻塞 result = p.map_async(fun, [(1, 2), (1, 2)], chunksize=1) # 異步,不阻塞主線程,任務還在子進程進行; print('主進程') # print(result) # map返回列表,可直接打印 print(result.get()) # map_async返回結果對象 p.close() p.join()
結果:
主進程
i=(1, 2), pid=2004
i=(1, 2), pid=5328
[(1, 2), (1, 2)]
imap(func, iterable[, chunksize])
- 惰性版本的map
- chunksize參數與map()方法使用的參數相同。 對於非常長的迭代,使用較大的chunksize值可以使作業比使用默認值1更快地完成。
- 此外,如果chunksize為1,則imap()方法返回的迭代器的next()方法具有可選的超時參數:如果在超時秒內無法返回結果,則next(timeout)將引發multiprocessing.TimeoutError。
imap_unorderable(func, iterable[,chunksize])
- 除了返回的結果是無序的(除非池中只有一個進程),其它跟imap一樣
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n): print('i={}, pid={}'.format(n, os.getpid())) time.sleep(1) return n if __name__ == '__main__': p = Pool(4) # result = p.imap(fun, [(1, 2), (3, 4)], chunksize=1) # 異步 result = p.imap_unordered(fun, [(1, 2), (3, 4)], chunksize=1) # 異步,不阻塞主線程,任務還在子進程進行,結果無序; print('主進程') for i in result: # imap返回迭代器 print(i) p.close() p.join()
結果:
主進程
i=(1, 2), pid=17396
i=(3, 4), pid=12496
(1, 2)
(3, 4)
startmap(func, iterable[, chunksize])
- 與map()類似,只是迭代的元素應該作為參數解包的迭代。因此,iterable=[(1,2),(3,4)]的結果是[func(1,2),func(3,4)]。
startmap_async(func, iterable[, chunksize[,callback[,error_callback]]])
- starmap()和map_async()的組合
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n, k): print('i={}, pid={}'.format(n, os.getpid())) time.sleep(1) return n, k if __name__ == '__main__': p = Pool(4) # result = p.starmap(fun, [(1, 2), (3, 4)], chunksize=1) # 阻塞,直到全部結果處理完 result = p.starmap_async(fun, [(1, 2), (3, 4)], chunksize=1) # 異步,不阻塞主線程,任務還在子進程進行; print('主進程') # print(result) # starmap返回列表,直接打印 print(result.get()) p.close() p.join()
結果:
主進程
i=1, pid=14660
i=3, pid=10564
[(1, 2), (3, 4)]
close()
- 防止將任何其他任務提交到池中。 完成所有任務后,工作進程將退出。
terminate()
- 立即停止工作進程而不完成未完成的工作。 當池對象被垃圾收集時,將立即調用terminate()。
join()
- 等待工作進程退出。 必須在使用join()之前調用close()或terminate()。
池對象在3.3版本支持上下文管理協議
使用進程池實現搶票:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pool, Manager import time import json count = {'count': 1} # 僅剩最后一張票 with open('db.txt', 'w', encoding='utf-8') as f: json.dump(count, f) # 返回剩余票數 def search(): dic = json.load(open('db.txt')) print('剩余票數%s' % dic['count']) return dic def get_ticket(dic): time.sleep(0.1) # 模擬讀數據的網絡延遲 if dic['count'] > 0: dic['count'] -= 1 time.sleep(0.2) # 模擬寫數據的網絡延遲 json.dump(dic, open('db.txt', 'w')) print('購票成功,剩余:{}'.format(dic['count'])) else: print('搶票失敗,去邀請好友助力!') def ticket_purchase(lock, i): print('第{}個用戶'.format(i)) lock.acquire() get_ticket(search()) lock.release() if __name__ == '__main__': lock = Manager().Lock() # 要使用Manager().Lock() p = Pool(5) for i in range(10): # 模擬並發10個客戶端搶票 p.apply_async(ticket_purchase, (lock, i + 1)) p.close() p.join()
結果:
第1個用戶
剩余票數1
第2個用戶
第3個用戶
第4個用戶
第5個用戶
購票成功,剩余:0
剩余票數0
第6個用戶
搶票失敗,去邀請好友助力!
剩余票數0
第7個用戶
搶票失敗,去邀請好友助力!
剩余票數0
第8個用戶
搶票失敗,去邀請好友助力!
剩余票數0
第9個用戶
搶票失敗,去邀請好友助力!
剩余票數0
第10個用戶
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
剩余票數0
搶票失敗,去邀請好友助力!
maxtasksperchild和chunksize具體效果:
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import sys def func(x): print("pid: ", os.getpid(), " got: ", x) sys.stdout.flush() return [x, x+1] def got(r): print("got result: ", r) if __name__ == '__main__': pool = Pool(processes=1, maxtasksperchild=9) # 進程執行了九個任務就會退出,換新的進程執行 keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] result = pool.map_async(func, keys, chunksize=1, callback=got) # chunksize指定每chuncksize個元素為一個任務 # result = pool.map_async(func, keys, chunksize=2, callback=got) # chunksize為2說明此時只有五個任務,沒有換新的進程執行 pool.close() pool.join()
結果:
pid: 8188 got: 1
pid: 8188 got: 2
pid: 8188 got: 3
pid: 8188 got: 4
pid: 8188 got: 5
pid: 8188 got: 6
pid: 8188 got: 7
pid: 8188 got: 8
pid: 8188 got: 9
pid: 10860 got: 10
got result: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10], [10, 11]]
參考: