JoinableQueue同樣通過multiprocessing使用。
創建隊列的另外一個類:
JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹:
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
示例1:
from multiprocessing import Process,JoinableQueue import time,random def consumer(q): while True: time.sleep(random.randint(1,5)) res=q.get() print('消費者拿到了 %s' %res) q.task_done() def producer(seq,q): for item in seq: time.sleep(random.randrange(1,2)) q.put(item) print('生產者做好了 %s' %item) q.join() if __name__ == '__main__': q=JoinableQueue() seq=('包子%s' %i for i in range(10)) p=Process(target=consumer,args=(q,)) p.daemon=True #設置為守護進程,在主線程停止時p也停止,但是不用擔心,producer內調用q.join保證了consumer已經處理完隊列中的所有元素 p.start() producer(seq,q) print('主線程')
示例2:

from multiprocessing import Process,JoinableQueue import time,random def consumer(name,q): while True: time.sleep(random.randint(1,2)) res=q.get() print('\033[45m%s拿到了 %s\033[0m' %(name,res)) q.task_done() def producer(seq,q): for item in seq: time.sleep(random.randrange(1,2)) q.put(item) print('\033[46m生產者做好了 %s\033[0m' %item) q.join() if __name__ == '__main__': q=JoinableQueue() seq=('包子%s' %i for i in range(10)) p1=Process(target=consumer,args=('消費者1',q,)) p2=Process(target=consumer,args=('消費者2',q,)) p3=Process(target=consumer,args=('消費者3',q,)) p1.daemon=True p2.daemon=True p3.daemon=True p1.start() p2.start() p3.start() producer(seq,q) print('主線程')
示例3:

from multiprocessing import Process,JoinableQueue import time,random def consumer(name,q): while True: # time.sleep(random.randint(1,2)) res=q.get() print('\033[45m%s拿到了 %s\033[0m' %(name,res)) q.task_done() def producer(seq,q): for item in seq: # time.sleep(random.randrange(1,2)) q.put(item) print('\033[46m生產者做好了 %s\033[0m' %item) q.join() if __name__ == '__main__': q=JoinableQueue() seq=['包子%s' %i for i in range(10)] #在windows下無法傳入生成器,我們可以用列表解析測試 p1=Process(target=consumer,args=('消費者1',q,)) p2=Process(target=consumer,args=('消費者2',q,)) p3=Process(target=consumer,args=('消費者3',q,)) p1.daemon=True p2.daemon=True p3.daemon=True p1.start() p2.start() p3.start() # producer(seq,q) #也可以是下面三行的形式,開啟一個新的子進程當生產者,不用主線程當生產者 p4=Process(target=producer,args=(seq,q)) p4.start() p4.join() print('主線程')