主要內容:
一. 鎖
二. 信號量
三. 事件
通過event來完成紅綠燈模型
四. 隊列(重點)
隊列實現進程間的通信
五. 生產者消費者模型
1. 初始版本(程序會阻塞住)
2. 升級版本一(通過拋出異常信號的方式結束進程)
3. 升級版本二(通過發送結束信號的方式結束進程)
第一種: 生產者發結束信號
第二種: 主進程發結束信號
4. 升級版本三(有多個消費者和生產者的時候需要發送多次結束信號)
六. JoinableQuene實現生產者消費者模型
一. 進程同步(鎖)
在之前muitiprocessing模塊的學習中,當我們使用Process創建子進程時就已經實現了進程的異步了.我們可以讓多個任務同時在幾個進程中並發處理,它們之間的運行沒有順序,一旦開啟也不受人們控制. 盡管並發編程讓我們能更加充分的利用IO資源,但與此同時它也帶來了新的問題: 進程之間數據不共享但卻共享同一套文件系統, 所以幾個進程同時訪問同一個文件或同一個打印終端,是沒有問題的. 可是, 共享帶來的是競爭, 競爭的結果就是錯亂. 如何進行控制?我們想到了"加鎖處理".

import random, os, time from multiprocessing import Process def work(n): print("{} >>> {}號進程正在執行".format(n, os.getpid())) time.sleep(random.random()) print("{} >>> {}號進程執行完畢".format(n, os.getpid())) if __name__ == '__main__': for i in range(5): p = Process(target=work, args=(i,)) p.start() # 執行結果: # 0 >>> 9748號進程正在執行 # 1 >>> 10904號進程正在執行 # 2 >>> 8976號進程正在執行 # 3 >>> 5784號進程正在執行 # 4 >>> 12132號進程正在執行 # 4 >>> 12132號進程執行完畢 # 2 >>> 8976號進程執行完畢 # 3 >>> 5784號進程執行完畢 # 1 >>> 10904號進程執行完畢 # 0 >>> 9748號進程執行完畢

import random, os, time from multiprocessing import Process, Lock # 引入Lock模塊 def work(n, lock): lock.acquire() # 加鎖,保證每次只有一個進程在執行鎖內的程序.此時對於所有加鎖的進程來說,都變成了串行. print("{} >>> {}號進程正在執行".format(n, os.getpid())) time.sleep(random.random()) print("{} >>> {}號進程執行完畢".format(n, os.getpid())) lock.release() # 解鎖,解鎖之后其他進程才能執行自己的程序 if __name__ == '__main__': lock = Lock() # 創建Lock對象 for i in range(5): p = Process(target=work, args=(i, lock)) p.start() # 執行結果 # 0 >>> 8056號進程正在執行 # 0 >>> 8056號進程執行完畢 # 1 >>> 3096號進程正在執行 # 1 >>> 3096號進程執行完畢 # 2 >>> 268號進程正在執行 # 2 >>> 268號進程執行完畢 # 3 >>> 8948號進程正在執行 # 3 >>> 8948號進程執行完畢 # 4 >>> 10612號進程正在執行 # 4 >>> 10612號進程執行完畢 # 得出結論: 加鎖后程序由並發變成了串行,犧牲了運行效率,但避免了競爭
上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了. 這種做法浪費了時間卻保證了數據的安全.
接下來,我們以模擬搶票為例,來看看數據安全的重要性:

# 注意:首先在當前文件目錄下創建一個名為db的文件 # 文件db的內容為:{"count":1},只有這一行數據,並且注意,每次運行完了之后,文件中的1變成了0,你需要手動將0改為1,然后再運行代碼. # 注意字典中一定要用雙引號,不然json無法識別 from multiprocessing import Process import time, json # 查看剩余票數 def search(): dic = json.load(open("db")) #打開文件,直接load文件中的內容,拿到文件中的包含剩余票數的字典 print("剩余票數{}".format(dic["count"])) # 搶票 def get(i): dic = json.load(open("db")) time.sleep(0.5) # 模擬讀取數據的網絡延遲 if dic['count'] > 0: dic['count'] -= 1 time.sleep(1) # 模擬寫入數據的網絡延遲 json.dump(dic, open("db", "w")) print("{}號用戶購票成功".format(i)) def task(i): search() get(i) if __name__ == '__main__': for i in range(3): # 模擬並發3個客戶端搶票 p = Process(target=task, args=(i,)) p.start() # 執行結果: # 剩余票數1 # 剩余票數1 # 剩余票數1 # 0號用戶購票成功 # 1號用戶購票成功 # 2號用戶購票成功 # 分析結果: 由於網絡延遲等原因使得進程切換,導致每個人都搶到了這最后一張票 # 得出結論: 並發運行,效率高,但競爭寫同一文件,數據寫入錯亂

# 注意:首先在當前文件目錄下創建一個名為db的文件 # 文件db的內容為:{"count":1},只有這一行數據,並且注意,每次運行完了之后,文件中的1變成了0,你需要手動將0改為1,然后再運行代碼. # 注意字典中一定要用雙引號,不然json無法識別 import time, json from multiprocessing import Process, Lock # 引入Lock模塊 # 查看剩余票數 def search(): dic = json.load(open("db")) #打開文件,直接load文件中的內容,拿到文件中的包含剩余票數的字典 print("剩余票數{}".format(dic["count"])) # 搶票 def get(i): dic = json.load(open("db")) time.sleep(0.5) # 模擬讀取數據的網絡延遲 if dic['count'] > 0: dic['count'] -= 1 time.sleep(1) # 模擬寫入數據的網絡延遲 json.dump(dic, open("db", "w")) print("{}號用戶購票成功".format(i)) def task(i, lock): search() lock.acquire() # 加鎖 get(i) lock.release() # 解鎖 if __name__ == '__main__': lock = Lock() # 創建一個鎖 for i in range(3): # 模擬並發3個客戶端搶票 p = Process(target=task, args=(i, lock)) # "鎖"也要作為參數傳遞給需要加鎖的函數 p.start() # 執行結果: # 剩余票數1 # 剩余票數1 # 剩余票數1 # 0號用戶購票成功 # 分析結果: 只有一個人搶到了票 # 得出結論: 加鎖保證數據安全,不出現混亂
進程鎖分析總結:

#加鎖可以保證:多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改.這種方式雖然犧牲了速度(效率)卻保證了數據安全. 雖然可以用文件共享數據實現進程間通信,但問題是: 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 2.需要自己加鎖處理 #因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制: 隊列和管道. 隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性.
二. 信號量(Semaphore)
信號量可以規定同時進入鎖內的進程數量

# 互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。 假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。 #實現: #信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。 #信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念
舉例說明:

# 假設10個人去游戲廳玩: 提前設定好,一個房間只有4台游戲機(計數器現在為4),那么同時只能四個人進來,誰先來的誰先占一個游戲機(acquire,計數器減1),4台機器滿了之后(計數器為0),第五個人就要等着,等其中一個人出來(release,計數器加1),他就可以占用那台游戲機了. import random, time from multiprocessing import Process, Semaphore def play(i, s): s.acquire() print("{}號顧客來玩游戲了".format(i)) time.sleep(random.randrange(2, 5)) # 每位顧客游戲時間不同 s.release() if __name__ == '__main__': s = Semaphore(4) # 設定好一次只能4個人進來 for i in range(10): # 創建10位顧客 p = Process(target=play, args=(i, s)) p.start()
三. 事件(Event)

python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear.
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 wait 方法時就會阻塞,如果“Flag”值為True,那么執行 wait 方法時便不再阻塞.
is_set(): 查看一個事件的狀態,默認為False
clear(): 將“Flag”設置為False
set(): 將“Flag”設置為True
方法詳述:

from multiprocessing import Event e = Event() # 創建一個事件對象 print(e.is_set()) # is_set()查看一個事件的狀態,默認為False,可通過set方法改為True print("Look here!") # 執行結果: # False # Look here!

from multiprocessing import Event e = Event() # 創建一個事件對象 print(e.is_set()) # 執行結果: False e.set() # 將is_set()的狀態改為True print(e.is_set()) # 執行結果: True e.clear() # 將is_set()的狀態改為True print(e.is_set()) # 執行結果: False
wait()方法示例說明:

from multiprocessing import Event e = Event() # 創建一個事件對象 print(e.is_set()) print("我在wait之前!") e.wait() # 依據事件的狀態來決定是否阻塞: False-->阻塞 True-->不阻塞 print("我在wait之后!") # 執行結果: # False # 我在wait之前 # 從上面的結果可以看出,is_set()為False,則此時事件的狀態為False,於是程序執行到wait()方法處就阻塞住了

from multiprocessing import Event e = Event() # 創建一個事件對象 e.set() # 將is_set()的狀態改為True print(e.is_set()) print("我在wait之前!") e.wait() # 依據事件的狀態來決定是否阻塞: False-->阻塞 True-->不阻塞 print("我在wait之后!") # 執行結果: # True # 我在wait之前! # 我在wait之后! # 從上面的結果可以看出,set()方法將is_set()的狀態改為True,則此時事件的狀態為True,於是程序執行到wait()方法處不會阻塞住,繼續向下執行

from multiprocessing import Event e = Event() # 創建一個事件對象 e.set() # 將is_set()的狀態改為True print(e.is_set()) e.clear() # 將is_set()的狀態改為False print(e.is_set()) print("我在wait之前!") e.wait() # 依據事件的狀態來決定是否阻塞: False-->阻塞 True-->不阻塞 print("我在wait之后!") # 執行結果: # True # False # 我在wait之前! # 從上面的結果可以看出,set()方法將is_set()的狀態改為True,clear()方法又重新將is_set()的狀態改為False,則此時事件的狀態為False,於是程序執行到wait()方法處就阻塞住了
通過事件來模擬紅綠燈:

import time from multiprocessing import Process, Event # 創建一個"模擬紅綠燈執行狀態"的函數 def traffic_lights(e): while 1: print("!!!紅燈亮!!!") time.sleep(6) e.set() # 把e改為True print("~~~綠燈亮~~~") time.sleep(3) e.clear() # 把e改為False def car(i, e): if not e.is_set(): # 新來的車看到的是紅燈,執行這里,車在等待 print("車{}在等待......".format(i)) e.wait() print("車{}走你........".format(i)) else: # 此時已經是綠燈,執行這里,車可以走了 print("車{}可以走了....".format(i)) if __name__ == '__main__': e = Event() # 創建一個紅綠燈 tra_lig = Process(target=traffic_lights,args=(e,)) tra_lig.start() while 1: time.sleep(1) # 創建3輛車 for i in range(3): c = Process(target=car, args=(i, e)) c.start()
四. 隊列(重點)
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。隊列就像一個特殊的列表,但是可以設置固定長度,並且從前面插入數據,從后面取出數據,先進先出。
語法: Queue([maxsize]) --> 創建共享的進程隊列 參數: maxsize --> 是隊列中允許的最大項數.如果省略此參數,則無大小限制 注意: 隊列的底層使用管道和鎖實現
Queue的方法介紹:

q = Queue([maxsize]) 創建共享的進程隊列. maxsize是隊列中允許的最大項數. 如果省略此參數, 則無大小限制. 底層隊列使用管道和鎖定實現. 另外, 還需要運行支持線程以便隊列中的數據傳輸到底層管道中. # Queue的實例q具有以下方法: q.get([block[,timeout]]) 返回q中的一個項目. 如果q為空, 此方法將阻塞, 直到隊列中有項目可用為止. block用於控制阻塞行為, 默認為True, 如果設置為False, 將引發Queue.Empty異常(定義在Queue模塊中). timeout是可選超時時間, 用在阻塞模式中, 如果在制定的時間間隔內沒有項目變為可用, 將引發Queue.Empty異常. q.get_nowait() 同q.get(False)方法. q.put(item[,block[,timeout]]) 將item放入隊列. 如果隊列已滿, 此方法將阻塞至有空間可用為止. block控制阻塞行為, 默認為True. 如果設置為False, 將引發Queue.Empty異常(定義在Queue庫模塊中). timeout指定在阻塞模式中等待可用空間的時間長短, 超時后將引發Queue.Full異常. q.qsize() 返回隊列中目前項目的正確數量. 此函數的結果並不可靠, 因為在返回結果和在稍后程序中使用結果之間, 隊列中可能添加或刪除了項目. 在某些系統上, 此方法可能引發NotImplementedError異常. q.empty() 如果調用此方法時q為空, 返回True. 如果其他進程或線程正在往隊列中添加項目, 結果是不可靠的. 也就是說, 在返回和使用結果之間, 隊列中可能已經加入新的項目. q.full() 如果q已滿, 返回為True. 由於線程的存在, 結果也可能是不可靠的(參考q.empty()方法).
Queue的其他方法介紹(了解):

q.close()
關閉隊列, 防止隊列中加入更多數據. 調用此方法時, 后台線程將繼續寫入那些已入隊列但尚未寫入的數據, 但將會在此方法完成時馬上關閉. 如果q被垃圾收集, 將自動調用此方法. 關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常. 例如, 如果某個使用者正被阻塞在get()操作上, 關閉生產者中的隊列不會導致get()方法返回錯誤.
q.cancel_join_thread()
不會在進程退出時自動連接后台線程. 這可以防止join_thread()方法阻塞.
q.join_thread()
連接隊列的后台線程. 此方法用於在調用q.close()方法后, 等待所有隊列項被消耗. 默認情況下, 此方法由不是q的原始創建者的所有進程調用. 調用q.cancel_join_thread()方法可以禁止這種行為.
Queue的方法詳述:

from multiprocessing import Queue q = Queue(3) #創建一個隊列對象,隊列長度為3 # 以下是方法詳述: put, get, put_nowait, get_nowait, full, empty q.put(1) # 往隊列中添加數據 q.put(2) q.put(3) # q.put(4) # 如果隊列已經滿了, 程序就會停在這里, 等待數據被別人取走, 再將數據放入隊列. 但如果隊列中的數據一直不被取走, 程序就會永遠停在這里. try: q.put_nowait(4) # 使用put_nowait(), 如果隊列滿了不會阻塞, 但是會因為隊列滿了而報錯. except: # 因此我們可以用一個try語句來處理這個錯誤, 這樣程序不會一直阻塞下去, 但是會丟掉這個消息. print("隊列已經滿了!") # 所以, 我們再放入數據之前, 可以先看一下隊列的狀態, 如果已經滿了, 就不繼續put了. print(q.full()) # 查看隊列是否滿了, 滿了返回True, 不滿返回False. print(q.get()) # 取出數據 print(q.get()) print(q.get()) # print(q.get()) # get()同put()方法一樣, 如果隊列已經空了, 那么繼續取就會出現阻塞現象. try: q.get_nowait(3) # 可以使用get_nowait()方法, 如果隊列滿了不會阻塞, 但是會因為沒取到值而報錯. except: # 因此我們可以用一個try語句來處理這個錯誤, 這樣程序不會一直阻塞下去. print("隊列已經空了") print(q.empty()) # 查看隊列是否空了, 空了返回True, 不空返回False.
隊列實現進程間的通信:

import time from multiprocessing import Process,Queue def girl(q): print("來自boy的信息>>>", q.get()) print("來自班主任的凝視>>>", q.get()) def boy(q): q.put("中午一起吃飯嗎?") if __name__ == '__main__': q = Queue(5) boy_p = Process(target=boy, args=(q,)) girl_p = Process(target=girl, args=(q,)) boy_p.start() girl_p.start() time.sleep(1) # 等待子進程執行完畢 q.put("好好上課,別開小差!") # 執行結果: # 來自boy的信息>>> 中午一起吃飯嗎? # 來自班主任的凝視>>> 好好上課,別開小差!
隊列是進程安全的: 同一時間只能一個進程拿到隊列中的一個數據, 你拿到了一個數據, 這個數據別人就拿不到了.

import os, time import multiprocessing # 向queue中輸入數據的函數 def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.asctime()) queue.put(info) # 向queue中輸出數據的函數 def outputQ(queue): info = queue.get() print('%s%s\033[32m%s\033[0m' % (str(os.getpid()), '(get):', info)) # Main if __name__ == '__main__': # windows下,如果開啟的進程比較多的話,程序會崩潰,為了防止這個問題,使用freeze_support()方法來解決(了解即可) multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 輸入進程 for i in range(10): process = multiprocessing.Process(target=inputQ, args=(queue,)) process.start() record1.append(process) # 輸出進程 for i in range(10): process = multiprocessing.Process(target=outputQ, args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join()
五. 生產者消費者模型
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題. 該模式通過平衡"生產線程"和"消費線程"的工作能力來提高程序的整體處理數據的速度.
為什么要使用生產者和消費者模式?
在線程世界里, 生產者就是生產數據的線程, 消費者就是消費數據的線程. 在多線程開發當中, 如果生產者處理速度很快, 而消費者處理速度很慢, 那么生產者就必須等待消費者處理完, 才能繼續生產數據. 同樣的道理, 如果消費者的處理能力大於生產者, 那么消費者就必須等待生產者. 為了解決這個問題於是引入了生產者和消費者模式.
什么是生產者消費者模式?
生產者和消費者模式是通過一個容器來解決生產者和消費者的強耦合問題的. 生產者和消費者彼此之間不直接通訊, 而通過阻塞隊列來進行通訊, 所以生產者生產完數據之后不用等待消費者處理, 直接扔給阻塞隊列, 消費者不找生產者要數據, 而是直接從阻塞隊列里取, 阻塞隊列就相當於一個緩沖區, 平衡了生產者和消費者的處理能力, 並且我可以根據生產速度和消費速度來均衡一下多少個生產者可以為多少個消費者提供足夠的服務, 就可以開多進程等等, 而這些進程都是到阻塞隊列或者說是緩沖區中去獲取或者添加數據的.
1. 初始版本(程序會阻塞住)

import time from multiprocessing import Process, Queue # 版本1 def producer(q): for i in range(1, 11): time.sleep(1) q.put(i) print("已生產了{}個產品".format(i)) def consumer(q): while 1: # 死循環,不停地往外取 time.sleep(2) s = q.get() print("消費者已拿走{}個產品".format(s)) if __name__ == '__main__': # 通過隊列來模擬緩沖區,大小設置為20 q = Queue(20) # 生產者進程 pro_p = Process(target=producer, args=(q,)) pro_p.start() # 消費者進程 con_p = Process(target=consumer, args=(q,)) con_p.start() # 從最后的執行結果中可以看出: 當消費者取出了所有產品之后,程序並沒有結束,而是阻塞在消費者進程的get()處了.
2. 升級版本一(通過拋出異常信號的方式結束進程)

def producer(q): for i in range(1, 11): time.sleep(1) q.put(i) print("已生產了{}個產品".format(i)) def consumer(q): while 1: time.sleep(2) try: s = q.get(False) # 如果隊列為空,則再次get()會拋出異常 # s = q.get_nowait() # get_nowait()與get(False)是等同的效果 print("消費者已拿走{}個產品".format(s)) except: # 捕獲異常 break # 結束循環 if __name__ == '__main__': # 通過隊列來模擬緩沖區,大小設置為20 q = Queue(20) # 生產者進程 pro_p = Process(target=producer, args=(q,)) pro_p.start() # 消費者進程 con_p = Process(target=consumer, args=(q,)) con_p.start()
3. 升級版本二(通過發送結束信號的方式結束進程)
第一種: 生產者發結束信號

import time from multiprocessing import Process, Queue def producer(q): for i in range(1, 11): time.sleep(1) q.put(i) print("{}號產品已生產完畢".format(i)) q.put(None) # 生產者在自己的子進程的最后加入一個結束信號 def consumer(q): while 1: time.sleep(2) s = q.get() if s == None: # 如果消費者最后拿到了結束信號(None)就會跳出循環 break else: print("消費者已拿走{}個產品".format(s)) if __name__ == '__main__': # 通過隊列來模擬緩沖區,大小設置為20 q = Queue(20) # 生產者進程 pro_p = Process(target=producer, args=(q,)) pro_p.start() # 消費者進程 con_p = Process(target=consumer, args=(q,)) con_p.start()
第二種: 主進程發結束信號

import time from multiprocessing import Process, Queue def producer(q): for i in range(1, 11): time.sleep(1) q.put(i) print("{}號產品已生產完畢".format(i)) def consumer(q): while 1: time.sleep(2) s = q.get() if s == None: break else: print("消費者已拿走{}個產品".format(s)) if __name__ == '__main__': # 通過隊列來模擬緩沖區,大小設置為20 q = Queue(20) # 生產者進程 pro_p = Process(target=producer, args=(q,)) pro_p.start() # 消費者進程 con_p = Process(target=consumer, args=(q,)) con_p.start() pro_p.join() # 生產者進程執行完畢后才會執行主進程 q.put(None) # 主進程在生產者生產結束后發送結束信號None
4. 升級版本三(有多個消費者和生產者的時候需要發送多次結束信號)

# 升級版本三: 有多個消費者和生產者的時候需要發送多次結束信號,有幾個消費者來取(有幾個get())就發送幾次結束信號 import time from multiprocessing import Process,Queue def producer1(q): for i in range(1, 11): time.sleep(1) q.put(i) print("生產者1號已經生產了{}個產品".format(i)) def producer2(q): for i in range(1, 11): time.sleep(1) q.put(i) print("生產者2號已經生產了{}個產品".format(i)) def producer3(q): for i in range(1, 11): time.sleep(1) q.put(i) print("生產者3號已經生產了{}個產品".format(i)) def consumer1(q): while 1: el = q.get() if el == None: # 跳出循環的條件 break print("消費者1號已經取走了{}個產品".format(el)) def consumer2(q): while 1: el = q.get() if el == None: # 跳出循環的條件 break print("消費者2號已經取走了{}個產品".format(el)) def consumer3(q): while 1: el = q.get() if el == None: # 跳出循環的條件 break print("消費者3號已經取走了{}個產品".format(el)) if __name__ == '__main__': q = Queue(50) # 創建隊列,通過隊列來模擬緩沖器,大小為50 producer1_process = Process(target=producer1, args=(q,)) # 創建所有生產者進程 producer2_process = Process(target=producer2, args=(q,)) producer3_process = Process(target=producer3, args=(q,)) consumer1_process = Process(target=consumer1, args=(q,)) # 創建所有消費者進程 consumer2_process = Process(target=consumer2, args=(q,)) consumer3_process = Process(target=consumer3, args=(q,)) producer1_process.start() # 啟動所有生產者進程 producer2_process.start() producer3_process.start() consumer1_process.start() # 啟動所有消費者進程 consumer2_process.start() consumer3_process.start() producer1_process.join() # 必須保證生產者全部執行完畢主進程才能繼續執行 producer2_process.join() producer3_process.join() q.put(None) # 有多少個消費者來取走產品就要發送幾次結束信號 q.put(None) q.put(None) print("主進程執行完畢")
六. JoinableQuene實現生產者消費者模型
#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 #參數介紹: maxsize是隊列中允許最大項數,省略則無大小限制。 #方法介紹: JoinableQueue的實例p除了與Queue對象相同的方法之外還具有: q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止,也就是隊列中的數據全部被get拿走了。

import time from multiprocessing import Process, JoinableQueue def producer(q): for i in range(1, 11): time.sleep(0.5) q.put(i) print("{}號產品已生產完畢".format(i)) q.join() print("在這里等待...") def consumer(q): while 1: time.sleep(1) s = q.get() if s == None: break else: print("消費者已拿走{}個產品".format(s)) q.task_done() # 給q對象發送一個任務結束的信號 if __name__ == '__main__': # 通過隊列來模擬緩沖區,大小設置為20 q = JoinableQueue(20) # 生產者進程 pro_p = Process(target=producer, args=(q,)) pro_p.start() # 消費者進程 con_p = Process(target=consumer, args=(q,)) con_p.daemon = True # 把生產者進程設置為守護進程,它會隨主進程的結束而結束 con_p.start() pro_p.join() # 主進程要等待生產者執行結束再繼續執行 print("主進程結束") """總而言之,先把消費者進程設置為守護進程,於是消費者進程與守護進程同生共死. 於是,在主進程和消費者進程結束之前,必須等待生產者進程執行完畢,如此一來,我們 便可以看到這樣一個過程: 主進程開啟,生產者首先執行,消費者緊跟其后執行,最后 待生產者執行結束后,消費者和主進程同時結束.整個程序全部結束."""