2019-5-20未命名文件
歡迎使用 小書匠(xiaoshujiang)編輯器,您可以通過 小書匠主按鈕>模板 里的模板管理來改變新建文章的內容。
joinablequeue實現生產者消費者模型
1、使用Queue實現的代碼
import time
import random
from multiprocessing import Process,Queue
def producer(q,name,food):
for i in range(2):
time.sleep(random.random())
fd = '%s%s'%(food,i)
q.put(fd)
print('%s生產了一個%s'%(name,food))
def consumer(q,name):
while True:
food = q.get()
if not food:break
time.sleep(random.randint(1,3))
print('%s吃了%s'%(name,food))
def cp(c_count,p_count):
q = Queue(10)
for i in range(c_count):
Process(target=consumer, args=(q, '灰太狼')).start()
p_l = []
for i in range(p_count):
p1 = Process(target=producer, args=(q, '喜洋洋', '包子'))
p1.start()
p_l.append(p1)
for p in p_l:p.join()
for i in range(c_count):
q.put(None)
if __name__ == '__main__':
cp(2,3)
----------------結果:
喜洋洋生產了一個包子
喜洋洋生產了一個包子
喜洋洋生產了一個包子
喜洋洋生產了一個包子
喜洋洋生產了一個包子
喜洋洋生產了一個包子
灰太狼吃了包子1
灰太狼吃了包子0
灰太狼吃了包子0
灰太狼吃了包子0
灰太狼吃了包子1
灰太狼吃了包子1
2、使用joinablequeue實現隊列
(1)消費者不需要判斷從隊列里拿到None再退出執行消費者函數了
(2)消費者每次從隊列里面q.get()一個數據,處理過后就使用隊列.task_done()
(3)生產者for循環生產完所有產品,需要q.join()阻塞一下,對這個隊列進行阻塞。
(4)啟動一個生產者,啟動一個消費者,並且這個消費者做成守護進程,然后生產者需要p.join()阻塞一下。
(5)我啟動了生產者之后,生產者函數一直在生成數據,直到生產完所有數據將隊列q.join()一下,意思是當我生產的數據都被消費者消費完之后 隊列的阻塞才結束。
(6)結束過程:消費者這邊是每消費完一個數據給隊列返回一個q.task_done(),直到所有的數據都被消費完之后,生產者函數這邊的隊列.阻塞結束了,隊列阻塞結束了生產者函數執行結束了。生產者函數結束了,那么p.join()生產者進程對象就結束了。生產者進程對象結束了整個主進程的代碼就執行結束了。主進程代碼結束了守護進程及消費者進程也結束了
import time
import random
from multiprocessing import JoinableQueue,Process
def producer(q,name,food):
for i in range(5):
time.sleep(random.random())
fd = '%s%s'%(food,i+1)
q.put(fd)
print('%s生產了一個%s'%(name,food))
q.join()#(3)生產者for循環生產完所有產品,需要q.join()阻塞一下,對這個隊列進行阻塞。
#(5)我啟動了生產者之后,生產者函數一直在生成數據,直到生產完所有數據將隊列q.join()一下,意思是當我生產的數據都被消費者消費完之后 隊列的阻塞才結束。
def consumer(q,name): #(1)消費者不需要像Queue那樣判斷從隊列里拿到None再退出執行消費者函數了
while True:
food = q.get()
time.sleep(random.random())
print('%s吃了%s'%(name,food))
q.task_done() #(2)消費者每次從隊列里面q.get()一個數據,處理過后就使用隊列.task_done()
if __name__ == '__main__':
jq = JoinableQueue()
p =Process(target=producer,args=(jq,'喜洋洋','包子')) #
p.start() #(4)啟動一個生產者,啟動一個消費者,並且這個消費者做成守護進程,然后生產者需要p.join()阻塞一下。
c = Process(target=consumer,args=(jq,'灰太狼'))
c.daemon = True #
c.start()
p.join()
#(6)結束過程:消費者這邊是每消費完一個數據給隊列返回一個q.task_done(),直到所有的數據都被消費完之后,生產者函數這邊的隊列.阻塞結束了,隊列阻塞結束了生產者函數執行結束了。生產者函數結束了,那么p.join()生產者進程對象就結束了。生產者進程對象結束了整個主進程的代碼就執行結束了。主進程代碼結束了守護進程即消費者進程也結束了
---------------結果:
喜洋洋生產了一個包子
灰太狼吃了包子1
喜洋洋生產了一個包子
喜洋洋生產了一個包子
喜洋洋生產了一個包子
喜洋洋生產了一個包子
灰太狼吃了包子2
灰太狼吃了包子3
灰太狼吃了包子4
灰太狼吃了包子5
import time
import random
from multiprocessing import JoinableQueue,Process
def producer(q,name,food):
for i in range(5):
time.sleep(random.random())
fd = '%s%s'%(food,i+1)
q.put(fd)
print('%s生產了一個%s'%(name,food))
q.join()
def consumer(q,name):
while True:
food = q.get()
time.sleep(random.random())
print('%s吃了%s'%(name,food))
q.task_done()
if __name__ == '__main__':
jq = JoinableQueue()
p =Process(target=producer,args=(jq,'喜洋洋','包子'))
p.start()
c = Process(target=consumer,args=(jq,'灰太狼'))
c.daemon = True
c.start()
p.join()
3、二者區別
1)Queue有多少消費者,就要put多少個None。要在消費者函數添加if 不是真(非None數據)就退出死循環
2)二者效果一樣但是從程序員角度看,joinablequeue更加嚴謹,更符合編程思維
