一 前言
本文算是一次隊列的學習筆記,Queue 模塊實現了三種類型的隊列,它們的區別僅僅是隊列中元素被取回的順序。在 FIFO
隊列中,先添加的任務先取回。在 LIFO
隊列中,最近被添加的元素先取回(操作類似一個堆棧)。優先級隊列中,元素將保持排序( 使用 heapq 模塊 ) 並且最小值的條目第一個返回。
值得注意的是 Python 2.X 版本中調用隊列需要引用
import Queue
而在Python 3.X版本中則需要import queue
二 隊列特性
2.1 Queue的常用函數
Queue常用的方法:
qsize() 獲取隊列的元素個數。
put(item [,block[, timeout]]): 往queue中放一個item
get(item [,block[, timeout]]): 從queue中取出一個item,並在隊列中刪除的這個item
需要特別說明的是:
如果 block 為 True , timeout 為 None(也是默認的選項),那么get()/put()可能會阻塞,直到隊列中出現可用的數據/位置。如果 timeout 是正整數,那么函數會阻塞直到超時N秒,然后拋出一個異常。
如果 block 為 False ,如果隊列無數據,調用get()或者有無空余位置時調用put(),就立即拋出異常(timeout 將會被忽略)。
task_done(): 表示前面排隊的任務已經被完成。被隊列的消費者線程使用。每個 get() 被用於獲取一個任務, 后續調用 task_done() 告訴隊列,該任務的處理已經完成。
join(): 隊列中所有的元素都被接收和處理完畢之前程序一直阻塞。
在應用程序中,如果主程序調用了join()則當前程序發生阻塞,當隊列中所有的元素都被處理后,將解除阻塞(意味着每個put()進隊列的條目的 task_done()
都被收到)。如果task_done()
被調用的次數多於放入隊列中的項目數量,將引發 ValueError 異常 。
我們通過程序向隊列添加元素的時候,未完成任務的計數就會增加。每當消費者線程調用
task_done()
時表示這個元素已經被回收,涉及到該元素的業務邏輯已經完成,未完成計數就會減少。當未完成計數降到零的時候,程序便會解除join()阻塞。
2.2 實踐
我們用一個比較經典的案例 生產者和消費者模型,生產者生產饅頭放到隊列,消費者去隊列里面獲取饅頭。
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2019/8/14 11:20 PM
func:
"""
from multiprocessing import Process, JoinableQueue, Lock
import time
import random
thread_lock = Lock()
def lock_print(msg):
with thread_lock:
print (msg)
def consumer(q):
while True:
res = q.get(block=True, timeout=3) # 如果為空 則等待3秒超時則報錯退出
print('消費者拿到了 %s' % res)
q.task_done()
def producer(q):
for item in range(4):
time.sleep(random.randrange(1, 2))
q.put('饅頭{0}'.format(item))
print('生產者做好了 %s' %'饅頭{0}'.format(item))
q.join()
lock_print("生產結束")
if __name__ == '__main__':
print('主進程開始')
q = JoinableQueue()
pd = Process(target=producer, args=(q,))
cp = Process(target=consumer, args=(q,))
cp.daemon = True ##
pd.start()
cp.start()
pd.join()
print('主進程結束')
說明
這里生產者生產饅頭並將饅頭通過put()
放到全局的隊列中,消費者從使用get()
隊列中獲取饅頭然后調用 task_done()
通知隊列中的饅頭已經被消費者獲取。
設置 cp.daemon = True
表示消費者進程會隨主進程一起結束而結束。還有一種寫法是
if __name__ == '__main__':
print('主進程開始')
q = JoinableQueue()
pd = Process(target=producer, args=(q,))
cp = Process(target=consumer, args=(q,))
pd.start()
cp.start()
pd.join()
cp.join()
print('主進程結束')
cp.join()
會讓消費者進程一直等待生產者往隊列放數據直到設置的超時時間。具體的邏輯需要結合自己程序的實際需求來定,是需要一直等待生產者生產數據還是隨着主進程結束而結束。
三 總結
本文結合前面文章中介紹的多進程中的 守護進程和 join()方法,學習如何使用隊列中的兩個函數 task_done
和 join
。其實還有其他比較多的函數用法,需要深入的學習探索,感興趣的朋友可以動手實踐一下。
推薦閱讀
https://docs.python.org/zh-cn/3/library/queue.html
https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/12_Thread_communication_using_a_queue.html
-The End-
本公眾號長期關注於數據庫技術以及性能優化,故障案例分析,數據庫運維技術知識分享,個人成長和自我管理等主題,歡迎掃碼關注。