生產者消費者模型介紹
為什么要使用生產者消費者模型
生產者指的是生產數據的任務,消費者指的是處理數據的任務,在並發編程中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。
同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什么是生產者和消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,
消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
這個阻塞隊列就是用來給生產者和消費者解耦的
生產者消費者模型實現
from multiprocessing import Process,Queue import time def producer(q,name,food): """ 生產者 :param q: 隊列 :param name: :param food: :return: """ for i in range(3): res = '%s,%s' %(food,i) time.sleep(1) # 生產food得有個過程,就先讓睡一會 print('生產者[%s] 生產了 [%s]' % (name, res)) q.put(res) def consumer(q,name): while True: res = q.get() if res is None:break time.sleep(1) print('消費者[%s]吃了[%s]' % (name,res) ) if __name__ == '__main__': # 容器 q=Queue() p = Process(target=producer,args=(q,'egon','包子')) c = Process(target=consumer,args=(q,'alex',)) p.start() c.start() ----------------輸出 生產者[egon] 生產了 [包子,0] 生產者[egon] 生產了 [包子,1] 消費者[alex]吃了[包子,0] 消費者[alex]吃了[包子,1] 生產者[egon] 生產了 [包子,2] 消費者[alex]吃了[包子,2]
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處於死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環
from multiprocessing import Process,Queue import time def producer(q,name,food): """ 生產者 :param q: 隊列 :param name: :param food: :return: """ for i in range(3): res = '%s,%s' %(food,i) time.sleep(1) # 生產food得有個過程,就先讓睡一會 print('生產者[%s] 生產了 [%s]' % (name, res)) q.put(res) def consumer(q,name): while True: res = q.get() if res is None:break time.sleep(2) print('消費者[%s]吃了[%s]' % (name,res) ) if __name__ == '__main__': # 容器 q=Queue() # 生產者 p1 = Process(target=producer, args=(q, 'egon1', '包子')) p2 = Process(target=producer,args=(q,'egon2','餃子')) p3 = Process(target=producer,args=(q,'egon3','玉米')) # 消費者 c1 = Process(target=consumer,args=(q,'alex1',)) c2 = Process(target=consumer, args=(q, 'alex2',)) pl = [p1,p2,p3] cl = [c1,c2] for p in pl: p.start() for c in cl: c.start() for p in pl: p.join() # 等待生產者生產完畢后,發送信號None,消費者在接收到結束信號后就可以break出死循環 for c in cl: # 有2個消費者,發送2個結束信號 q.put(None) -------------輸出 生產者[egon1] 生產了 [包子,0] 生產者[egon2] 生產了 [餃子,0] 生產者[egon3] 生產了 [玉米,0] 生產者[egon1] 生產了 [包子,1] 生產者[egon2] 生產了 [餃子,1] 生產者[egon3] 生產了 [玉米,1] 消費者[alex1]吃了[包子,0] 生產者[egon1] 生產了 [包子,2] 生產者[egon2] 生產了 [餃子,2] 消費者[alex2]吃了[餃子,0] 生產者[egon3] 生產了 [玉米,2] 消費者[alex1]吃了[玉米,0] 消費者[alex2]吃了[包子,1] 消費者[alex1]吃了[餃子,1] 消費者[alex2]吃了[玉米,1] 消費者[alex1]吃了[包子,2] 消費者[alex2]吃了[餃子,2] 消費者[alex1]吃了[玉米,2]
帶有bug的解決方式:請注意
from multiprocessing import Process,Queue import time def producer(q,name,food): """ 生產者 :param q: 隊列 :param name: :param food: :return: """ for i in range(2): res = '%s,%s' %(food,i) time.sleep(1) # 生產food得有個過程,就先讓睡一會 print('生產者[%s] 生產了 [%s]' % (name, res)) q.put(res) q.put(None) # 在多個生產者和消費者中會產生bug,比如3個生產者,2個消費者,會出現消費者少消費, # 因為生產者1發送None信號后,可能其他生產者還沒有生產完數據,但是消費者收到None信號后會提前結束 def consumer(q,name): while True: res = q.get() if res is None:break time.sleep(2) print('消費者[%s]吃了[%s]' % (name,res) ) if __name__ == '__main__': # 容器 q=Queue() # 生產者 p1 = Process(target=producer, args=(q, 'egon1', '包子')) p2 = Process(target=producer,args=(q,'egon2','餃子')) p3 = Process(target=producer,args=(q,'egon3','玉米')) # 消費者 c1 = Process(target=consumer,args=(q,'alex1',)) c2 = Process(target=consumer, args=(q, 'alex2',)) pl = [p1,p2,p3] cl = [c1,c2] for p in pl: p.start() for c in cl: c.start() ------輸出--------------- 生產者[egon3] 生產了 [玉米,0] 生產者[egon1] 生產了 [包子,0] 生產者[egon2] 生產了 [餃子,0] 生產者[egon3] 生產了 [玉米,1] 生產者[egon1] 生產了 [包子,1] 生產者[egon2] 生產了 [餃子,1] 消費者[alex1]吃了[玉米,0] 消費者[alex2]吃了[包子,0] 消費者[alex1]吃了[餃子,0] 消費者[alex2]吃了[玉米,1] 消費者[alex2]吃了[包子,1] # [餃子,1]沒有吃完,因為之前egon3和egon1已經發送了兩個None信號了,兩個消費收到None后就結束了消費了
JoinableQueue使用
就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
參數介紹
maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有: q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
基於JoinableQueue實現生產者消費者模型
from multiprocessing import Process,JoinableQueue import time def producer(q,name,food): """ 生產者 :param q: 隊列 :param name: :param food: :return: """ for i in range(2): res = '%s,%s' %(food,i) time.sleep(1) # 生產food得有個過程,就先讓睡一會 print('生產者[%s] 生產了 [%s]' % (name, res)) q.put(res) q.join() # 等到消費者把自己放入隊列中的所有的數據都取走之后,生產者才結束 def consumer(q,name): while True: res = q.get() if res is None:break time.sleep(2) print('消費者[%s]吃了[%s]' % (name,res) ) q.task_done() # 發送信號給q.join(),說明已經從隊列中取走一個數據並處理完畢了 if __name__ == '__main__': # 容器 q=JoinableQueue() # 生產者 p1 = Process(target=producer, args=(q, 'egon1', '包子')) p2 = Process(target=producer,args=(q,'egon2','餃子')) p3 = Process(target=producer,args=(q,'egon3','玉米')) # 消費者 c1 = Process(target=consumer,args=(q,'alex1',)) c2 = Process(target=consumer, args=(q, 'alex2',)) pl = [p1,p2,p3] cl = [c1,c2] for p in pl: p.start() for c in cl: c.daemon = True # 設置守護進程,主進程結束后,守護進程跟着結束 c.start() for p in pl: p.join() # 1、主進程等生產者p1、p2、p3結束 # 2、而p1、p2、p3是在消費者把所有數據都取干凈之后才會結束 # 3、所以一旦p1、p2、p3結束了,證明消費者也沒必要存在了,應該隨着主進程一塊死掉,因而需要將生產者們設置成守護進程 print('主進程結束') --------輸出 生產者[egon1] 生產了 [包子,0] 生產者[egon3] 生產了 [玉米,0] 生產者[egon2] 生產了 [餃子,0] 生產者[egon1] 生產了 [包子,1] 生產者[egon3] 生產了 [玉米,1] 生產者[egon2] 生產了 [餃子,1] 消費者[alex1]吃了[包子,0] 消費者[alex2]吃了[玉米,0] 消費者[alex1]吃了[餃子,0] 消費者[alex2]吃了[包子,1] 消費者[alex1]吃了[玉米,1] 消費者[alex2]吃了[餃子,1] 主進程結束