三種方法實現 生產者消費者模型


在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。

該模式通過平衡生產進程和消費進程的工作能力提高程序的整體處理數據的速度

 

舉個應用栗子:

全棧開發時候,前端接收客戶請求,后端處理請求邏輯。

當某時刻客戶請求過於多的時候,后端處理不過來,

此時完全可以借助隊列來輔助,將客戶請求放入隊列中,

后端邏輯代碼處理完一批客戶請求后馬上從隊列中繼續獲取,

這樣平衡兩端的效率。

 

為什么要使用生產者和消費者模式

進程世界里,生產者就是生產數據的進程,消費者就是消費數據的進程

多進程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。

同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。

為了解決這個問題於是引入了生產者和消費者模式。

什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。

生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,

所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,

消費者不找生產者要數據,而是直接從阻塞隊列里取,

阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

 

 

三種方法實現 生產者消費者模型:

 1 from multiprocessing import Queue, Process
 2 
 3 
 4 def producer(q, name):
 5     for i in range(20):  # 生產20個
 6         info = name + '的娃娃%s' % str(i)  # 產生數據
 7         q.put(info)  # 把數據放入隊列
 8     q.put(None)  # 生產者生產完數據后,放入一個不再生產的標示None,消費者讀取到None后可以知道數據已取完
 9 
10 
11 def consumer(q, name):
12     while 1:  # 循環接收數據
13         info = q.get()  # 從隊列中拿取數據
14         if info:  # 如果隊列中存在數據
15             print('%s拿走了%s' % (name, info))  # 打印
16         else:
17             break  # 否則退出接收循環
18 
19 
20 if __name__ == '__main__':
21     q = Queue(10)  # 先建立一個10長度的隊列
22     p_pro = Process(target=producer, args=(q, '生產者'))  # 創建生產者進程
23     p_con = Process(target=consumer, args=(q, '消費者'))  # 創建消費者進程
24     p_pro.start()  # 進程開始
25     p_con.start()  # 進程開始
隊列1
 1 from multiprocessing import Process,Queue
 2 
 3 def producer(q,name):
 4     for i in range(1,21):
 5         info = name + '的娃娃%s' % str(i)
 6         q.put(info)
 7 
 8 def consumer(q,name):
 9     while 1:
10         info = q.get()
11         if info:
12             print(name+'拿了%s' % info)
13         else:
14             break
15 
16 if __name__ == '__main__':
17     q = Queue(10)
18     p_pro1 = Process(target=producer,args=(q,'\033[31m生產者1\033[0m'))
19     p_pro2 = Process(target=producer,args=(q,'\033[32m生產者2\033[0m'))
20     p_pro3 = Process(target=producer,args=(q,'\033[33m生產者3\033[0m'))
21     p_con1 = Process(target=consumer,args=(q,'\033[34m消費者1\033[0m'))
22     p_con2 = Process(target=consumer,args=(q,'\033[35m消費者2\033[0m'))
23     p_l = [p_pro1,p_pro2,p_pro3,p_con1,p_con2]
24     [p.start() for p in p_l]  #建立進程列表循環 開始每個進程
25     p_pro1.join() # 需要join 讓主程序等待子程序結束,然后添加標示
26     p_pro2.join() # 有幾個生產者 就需要幾個join
27     p_pro3.join() #
28     q.put(None) # 有幾個消費者就有幾個結束標示
29     q.put(None) # 有幾個消費者就有幾個結束標示
30     # q.put(None) # 有幾個消費者就有幾個結束標示
31     # 這個None,先進后出,在隊列的最后,所以最后讀取到這個None的時候,表示隊列已經取完了
隊列2
 1 from multiprocessing import Process, Pipe
 2 
 3 
 4 def consumer(p, name):
 5     produce, consume = p
 6     produce.close()
 7     while True:
 8         try:
 9             baozi = consume.recv()
10             print('%s 收到包子:%s' % (name, baozi))
11         except EOFError:
12             break
13 
14 
15 def producer(seq, p):
16     produce, consume = p
17     consume.close()
18     for i in seq:
19         produce.send(i)
20 
21 
22 if __name__ == '__main__':
23     produce, consume = Pipe()
24 
25     c1 = Process(target=consumer, args=((produce, consume), 'c1'))
26     c1.start()
27 
28     seq = (i for i in range(10))
29     producer(seq, (produce, consume))
30 
31     produce.close()
32     consume.close()
33 
34     c1.join()
35     print('主進程')
管道

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM