進擊のpython
並發編程——生產者消費者模型
介紹這個模型,有助於更好的理解隊列在真正的項目開發過程中的使用場景
方便更好的理解隊列的數據處理方式
本小節針對生產者消費者模型的介紹與創建進行剖析
可以將進程的知識點進行串講,達到一個綜合的目的
生產者消費者模型
生產者:就是產生數據的任務;消費者:就是處理數據的任務
在並發的時候,如果生產者的生產速度很快,消費者處理速度很慢
那生產者就得等消費者處理完才能產生數據,同樣的道理
如果消費者處理速度很快,生產者生產速度很慢
那消費者就得等生產者生產完才能夠處理數據
為了解決這個問題,就引入了生產者消費者模式
生產者消費者模式
生產者消費者模式就是通過一個容器來解決生產者與消費者的強耦合問題
生產者和消費者彼此不直接通信,而是通過阻塞隊列來進行通訊
所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列
消費者不找生產者要數據,而是直接從阻塞隊列里取
阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力
這個阻塞隊列就是用來給生產者和消費者解耦的
其實就像吃包子一樣,不可能廚師做包子顧客直接吃
也不可能來顧客廚師才做包子
而正確的做法就是廚師把包子做完放在籠屜
顧客來直接從籠屜里拿包子,這樣才是合情合理的
模型實現
根據隊列,我們可以實現一下這個模型
import random
import time
from multiprocessing import Process, Queue
def pro(q, *args, **kwargs):
for i in range(1, 10):
time.sleep(random.random())
q.put(i)
print(f'生產出來{i}號包子.. ..')
pass
def cro(q, *args, **kwargs):
while 1:
time.sleep(random.random())
print(f'吃了{q.get()}號包子!')
pass
if __name__ == '__main__':
q = Queue()
p = Process(target=pro, args=(q,))
c = Process(target=cro, args=(q,))
p.start()
c.start()
生產出來1號包子.. ..
吃了1號包子!
生產出來2號包子.. ..
吃了2號包子!
生產出來3號包子.. ..
生產出來4號包子.. ..
生產出來5號包子.. ..
吃了3號包子!
生產出來6號包子.. ..
吃了4號包子!
生產出來7號包子.. ..
生產出來8號包子.. ..
生產出來9號包子.. ..
吃了5號包子!
吃了6號包子!
吃了7號包子!
吃了8號包子!
吃了9號包子!
這和我們想的是一樣的,你吃你的,我做我的
但是運行的時候你會發現,當包子吃完了,程序處於阻塞狀態,不會停止
根本原因就是,隊列空了,但是while還在從隊列里要值,就會導致阻塞
那這個問題怎么解決呢?我們可以給個標記,當遇到這個標記就說明吃完了
就退出程序:
import random
import time
from multiprocessing import Process, Queue
def pro(q, *args, **kwargs):
for i in range(1, 10):
time.sleep(random.random())
q.put(i)
print(f'生產出來{i}號包子.. ..')
q.put('None')
pass
def cro(q, *args, **kwargs):
while 1:
msg = q.get()
if msg == 'None': break
time.sleep(random.random())
print(f'吃了{msg}號包子!')
pass
if __name__ == '__main__':
q = Queue()
p = Process(target=pro, args=(q,))
c = Process(target=cro, args=(q,))
p.start()
c.start()
或者也可以這么寫:
import random
import time
from multiprocessing import Process, Queue
def pro(q, *args, **kwargs):
for i in range(1, 10):
time.sleep(random.random())
q.put(i)
print(f'生產出來{i}號包子.. ..')
q.put('None')
pass
def cro(q, *args, **kwargs):
while 1:
msg = q.get()
if msg == 'None': break
time.sleep(random.random())
print(f'吃了{msg}號包子!')
pass
if __name__ == '__main__':
q = Queue()
p = Process(target=pro, args=(q,))
c = Process(target=cro, args=(q,))
p.start()
c.start()
p.join()
q.put("None")
但是當生產者數量變多的時候,我就需要寫兩個三個,更多的q.put("None")
很明顯這么寫是不方便的
其實我們的思路無非是發送結束信號而已,有另外一種隊列提供了這種機制
JoinableQueue([maxsize])
這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理
通知進程是使用共享的信號和條件變量來實現的
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理
如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
q.join(): 生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理
阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
那我們的生產者消費者模型,就可以這么寫:
from multiprocessing import Process, JoinableQueue
import time, random, os
def consumer(q, name):
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print('\033[43m%s 吃 %s\033[0m' % (name, res))
q.task_done() # 發送信號給q.join(),說明已經從隊列中取走一個數據並處理完畢了
def producer(q, name, food):
for i in range(3):
time.sleep(random.randint(1, 3))
res = '%s%s' % (food, i)
q.put(res)
print('\033[45m%s 生產了 %s\033[0m' % (name, res))
q.join() # 等到消費者把自己放入隊列中的所有的數據都取走之后,生產者才結束
if __name__ == '__main__':
q = JoinableQueue() # 使用JoinableQueue()
# 生產者們:即廚師們
p1 = Process(target=producer, args=(q, 'ponny', '包子'))
p2 = Process(target=producer, args=(q, 'ponny', '骨頭'))
p3 = Process(target=producer, args=(q, 'ponny', '泔水'))
# 消費者們:即吃貨們
c1 = Process(target=consumer, args=(q, 'jevious'))
c2 = Process(target=consumer, args=(q, 'jevious'))
c1.daemon = True
c2.daemon = True
# 開始
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
# 1、主進程等生產者p1、p2、p3結束
# 2、而p1、p2、p3是在消費者把所有數據都取干凈之后才會結束
# 3、所以一旦p1、p2、p3結束了,證明消費者也沒必要存在了,應該隨着主進程一塊死掉,因而需要將生產者們設置成守護進程
模型總結
1、這個模型有兩個角色:生產者和消費者
生產者負責生產數據,消費者負責處理數據
2、模型解決的問題是平衡這兩者的速度
對這兩個身份的解耦
3、怎么實現呢?生產者⇠⇢隊列⇠⇢消費者