學習Python使得我快樂無比!!!
首先先解釋一下什么是生產者和消費者模型
在我們的日常生活中,無處不是生產者和消費者,加入有一個買包子的人家,而你是一個想要買包子的人。那么買包子的商家就是生產者,而你就是一個消費者。
在編程中映入這一個概念是非常有必要的,我們都知道現在的計算機是多核心的,很多手機都是八核心起步了,多核心也就意味着多進程同時運行,你的計算機可以同時處理多個指令這樣處理的速度的以加快,所以我們映入該概念。
概念:首先舉一個例子就是我們的爬蟲程序,我們都知道爬蟲就是一次性的吧網絡上的數據快速的爬取下來並且放入本地計算機或者雲端服務器供我們未來使用,那么我們的爬蟲程序其實就是這樣的一個生產者,但是我們知道計算機的性能和效率都很高,假如一個爬蟲程序不停工作可以生產出大量的數據,但是我們的處理程序運行起來卻比較的滿不可能一次性處理這么多的程序這就會爆內存,實在是放不下了只好先放在內存里面,但是越來越多的數據會導致內存被占滿結果就爆了。
生產者--->流水線---->消費者
這樣就出現了一個中繼的東西我們占時管它叫做流水線,我們有很多的生產者同時生產數據到流水線上,然后消費者在從流水線上取下,這樣我們可以有多個生產者同時生產提升了效率,同時也可以容納多個消費者在流水線上消費,這樣的話就可以解決上面的問題了,同時:!!我們也可以引入多進程的工作進入了,生產者太少了的時候就多開幾個進程生產,如果消費者的處理能力太弱了就多開幾個進程同時處理這樣就可以快很多了!
列子1
注意:代碼的思想(這是初步的思想)
導入了隨機數時間來休眠是為了模擬網絡延時的不可控因素,可以看到我們創建了10個生產者生產了一輪產品,但是消費者過多了說以代碼會處於阻塞狀態接下來會解決這個問題!
1 from multiprocessing import Process, Queue 2 from time import sleep 3 from random import randint 4 5 # 首先需要先構建一個生產者 6 def producer(food, name, q): 7 for i in range(1, 11): 8 sleep(randint(1, 2)) 9 f = "\033[34m%s生產了%s ,代號:%s\033[0m" % (name, food, i) 10 print(f) 11 q.put(food) 12 13 #構建一個消費者 14 def consumer(q, name): 15 while 1: 16 food = q.get() 17 sleep(randint(1, 2)) 18 print("\033[31m%s消費了%s\033[0m" % (name, food)) 19 20 if __name__ == '__main__': 21 q = Queue() 22 p1 = Process(target=producer, args=("漢堡", "王師傅", q)) 23 p2 = Process(target=producer, args=("三明治", "金師傅", q)) 24 c1 = Process(target=consumer, args=(q, "陳先生")) 25 c2 = Process(target=consumer, args=(q, "王先生")) 26 p1.start() 27 p2.start() 28 c1.start() 29 c2.start()
接下來的代碼就是為了解決阻塞的問題,我們希望當生產者生產完畢了就告訴消費者結束了賣完了,join的方法可以保證生產者先生產然后再對比;
Event是事件,可以實現多進程間的通信!默認False,可以使用set方法改為true,也可以使用clear改回false,is_set可以判斷目標是否是true
q.join()表示等待隊列中的所有數據全部執行處理完畢
q.tast_done()表示該元素已經完成了處理,這樣上面的join就可以感知到
守護進程是為了保證主進程結束時子進程也要一並結書
!在代碼的末尾我們設置了兩個join,目的是感知進程結束,由於我們生產者末尾的join,所以代碼不會一下結束,而是會等待消費者去消費隊列的數據,由於消費者一直購買,所以當消費者購買不到產品的時候也就表示流水線結束了,這是整個代碼底部的join就會感應到代碼的結束也就可以結束所有進程了。
from multiprocessing import Process,JoinableQueue
from time import sleep
from random import randint
# 首先需要先構建一個生產者
def producer(food, name, q):
for i in range(1, 5):
sleep(randint(1, 2))
f = "\033[34m%s生產了%s ,代號:%s\033[0m" % (name, food, i)
print(f)
q.put(food)
q.join()#把隊列q給設置為同步模式等待期內部的所有數據都被處理過代碼才會結束
#構建一個消費者
def consumer(q, name):
while 1:
food = q.get()
sleep(randint(1, 2))
print("\033[31m%s消費了%s\033[0m" % (name, food))
q.task_done()#每當從隊列當中獲取一個值得時候就會自動計數
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer, args=("漢堡", "王師傅", q))
p2 = Process(target=producer, args=("三明治", "金師傅", q))
c1 = Process(target=consumer, args=(q, "陳先生"))
c2 = Process(target=consumer, args=(q, "王先生"))
c1.daemon = True #設置成守護進程這樣就可以保證代碼回一路運行到底
c2.daemon = True
p1.start()
p2.start()
c1.start()
c2.start()
p1.join()
p2.join()#感知主進程的結束這樣就會自動結束子進程了
