一 生產者消費者模型介紹
為什么要使用生產者消費者模型
生產者指的是生產數據的任務,消費者指的是處理數據的任務,在並發編程中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什么是生產者和消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題,生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
這個阻塞隊列就是用來給生產者和消費者解耦的
二 生產者消費者模型實現
from multiprocessing import Process, Queue import time import random import os def consumer(q, name): while True: res = q.get() time.sleep(random.randint(1, 3)) print("\033[43m %s 吃%s\033[0m" % (name, res)) def producer(q, name, food): for i in range(3): time.sleep(random.randint(1, 3)) res = "%s%s" % (food, i) q.put(res) print("\033[45m %s 生產了 %s\033[0m" % (name, res)) if __name__ == '__main__': q = Queue() # 生產者們:即廚師們 p1 = Process(target=producer, args=(q, 'egon', '包子')) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q, 'mike')) # 開始 p1.start() c1.start() print('主')
執行結果
主 egon 生產了 包子0 egon 生產了 包子1 mike 吃包子0 egon 生產了 包子2 mike 吃包子1 mike 吃包子2
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處於死循環中且卡在q.get()這一步
解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環
def consumer(q, name): while True: res = q.get() if res is None: break time.sleep(random.randint(1, 3)) print("\033[43m %s 吃%s\033[0m" % (name, res)) def producer(q, name, food): for i in range(3): time.sleep(random.randint(1, 3)) res = "%s%s" % (food, i) q.put(res) print("\033[45m %s 生產了 %s\033[0m" % (name, res)) if __name__ == '__main__': q = Queue() # 生產者們:即廚師們 p1 = Process(target=producer, args=(q, 'egon', '包子')) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q, 'mike')) # 開始 p1.start() c1.start() p1.join() q.put(None) print('主')
但上述解決方法,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決,有幾個消費者就需要發送幾次結束信號:相當low,例如:
def consumer(q, name): while True: res = q.get() if res is None: break time.sleep(random.randint(1, 3)) print("\033[43m %s 吃%s\033[0m" % (name, res)) def producer(q, name, food): for i in range(3): time.sleep(random.randint(1, 3)) res = "%s%s" % (food, i) q.put(res) print("\033[45m %s 生產了 %s\033[0m" % (name, res)) if __name__ == '__main__': q = Queue() # 生產者們:即廚師們 p1 = Process(target=producer, args=(q, 'egon1', '包子')) p2 = Process(target=producer, args=(q, 'egon2', '燒麥')) p3 = Process(target=producer, args=(q, 'egon3', '豆漿')) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q, 'mike1')) c2 = Process(target=consumer, args=(q, 'mike2')) # 開始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) q.put(None) q.put(None) print('主')
其實,我們的思路無非是發送結束信號而已,有另外一種隊列提供了這種機制
JoinableQueue(maxsize)
這就是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理,通知進程時使用共享的信號和條件變量來實現的
參數實現
maxsize是隊列中允許最大項數,省略則無大小限制
方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有: q.task_done():使用者使用此方法發出信號,表示q.get()的返回項已經被處理,如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
基於JoinableQueue實現生產者消費者模型
from multiprocessing import Process, JoinableQueue import time import random def consumer(q, name): while True: res = q.get() time.sleep(random.randint(1, 3)) print('%s 吃 %s' % (name, res)) q.task_done() # 發送信號給q.join(),說明已經從隊列中取走一個數據並處理完畢 def producer(q, name, food): for i in range(3): time.sleep(random.randint(1, 3)) res = '%s%s' % (food, i) q.put(res) print('%s 生產了 %s' % (name, res)) q.join() # 等到消費者把自己放入隊列中的所有的數據都取走之后,生產者才結束 if __name__ == '__main__': q = JoinableQueue() # 使用JoinableQueue() # 生產者:即廚師們 p1 = Process(target=producer, args=(q, 'egon1', '包子')) p2 = Process(target=producer, args=(q, 'egon2', '燒麥')) p3 = Process(target=producer, args=(q, 'egon3', '豆漿')) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q, 'mike1')) c2 = Process(target=consumer, args=(q, 'mike2')) c1.daemon = True c2.daemon = True # 開始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # 1、主進程等生產者p1,p2,p3結束 # 2、而p1,p2,p3,是在消費者把所有數據都取干凈之后才會結束 # 3、所以一旦p1,p2,p3結束了,證明消費者也沒必要存在了,應該隨着主進程一塊死掉,因而需要將生產者們設置成守護進程 print("主")
三 生產者消費者模型總結
1、程序中有兩類角色
一類負責生產數據(生產者) 一類負責處理數據(消費者)
2、引入生產者消費者模型為了解決的問題是
平衡生產者與消費者之間的速度差 程序解開耦合
3、如何實現生產者消費者模型
生產者<----->隊列<------>消費者