Python|隊列Queue


一 前言

本文算是一次隊列的學習筆記,Queue 模塊實現了三種類型的隊列,它們的區別僅僅是隊列中元素被取回的順序。在 FIFO 隊列中,先添加的任務先取回。在 LIFO 隊列中,最近被添加的元素先取回(操作類似一個堆棧)。優先級隊列中,元素將保持排序( 使用 heapq 模塊 ) 並且最小值的條目第一個返回。

值得注意的是 Python 2.X 版本中調用隊列需要引用 import Queue 而在Python 3.X版本中則需要 import queue

二 隊列特性

2.1 Queue的常用函數

Queue常用的方法:

qsize() 獲取隊列的元素個數。
put(item [,block[, timeout]]): 往queue中放一個item
get(item [,block[, timeout]]): 從queue中取出一個item,並在隊列中刪除的這個item

需要特別說明的是:

如果 block 為 True , timeout 為 None(也是默認的選項),那么get()/put()可能會阻塞,直到隊列中出現可用的數據/位置。如果 timeout 是正整數,那么函數會阻塞直到超時N秒,然后拋出一個異常。

如果 block 為 False ,如果隊列無數據,調用get()或者有無空余位置時調用put(),就立即拋出異常(timeout 將會被忽略)。

task_done(): 表示前面排隊的任務已經被完成。被隊列的消費者線程使用。每個 get() 被用於獲取一個任務, 后續調用 task_done() 告訴隊列,該任務的處理已經完成。
join(): 隊列中所有的元素都被接收和處理完畢之前程序一直阻塞。

在應用程序中,如果主程序調用了join()則當前程序發生阻塞,當隊列中所有的元素都被處理后,將解除阻塞(意味着每個put()進隊列的條目的 task_done() 都被收到)。如果task_done()被調用的次數多於放入隊列中的項目數量,將引發 ValueError 異常 。

我們通過程序向隊列添加元素的時候,未完成任務的計數就會增加。每當消費者線程調用 task_done() 時表示這個元素已經被回收,涉及到該元素的業務邏輯已經完成,未完成計數就會減少。當未完成計數降到零的時候,程序便會解除join()阻塞。

2.2 實踐

我們用一個比較經典的案例 生產者和消費者模型,生產者生產饅頭放到隊列,消費者去隊列里面獲取饅頭。

# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2019/8/14 11:20 PM
func:
"""

from multiprocessing import Process, JoinableQueue, Lock
import time
import random

thread_lock = Lock()


def lock_print(msg):
    with thread_lock:
        print (msg)


def consumer(q):
    while True:
        res = q.get(block=True, timeout=3) # 如果為空 則等待3秒超時則報錯退出
        print('消費者拿到了 %s' % res)
        q.task_done()


def producer(q):
    for item in range(4):
        time.sleep(random.randrange(1, 2))
        q.put('饅頭{0}'.format(item))
        print('生產者做好了 %s' %'饅頭{0}'.format(item))
    q.join()
    lock_print("生產結束")


if __name__ == '__main__':
    print('主進程開始')
    q = JoinableQueue()
    pd = Process(target=producer, args=(q,))
    cp = Process(target=consumer, args=(q,))
    cp.daemon = True ## 
    pd.start()
    cp.start()
    pd.join()
    print('主進程結束')

說明
這里生產者生產饅頭並將饅頭通過put()放到全局的隊列中,消費者從使用get()隊列中獲取饅頭然后調用 task_done() 通知隊列中的饅頭已經被消費者獲取。

設置 cp.daemon = True 表示消費者進程會隨主進程一起結束而結束。還有一種寫法是

if __name__ == '__main__':
    print('主進程開始')
    q = JoinableQueue()
    pd = Process(target=producer, args=(q,))
    cp = Process(target=consumer, args=(q,))
    pd.start()
    cp.start()
    pd.join()
    cp.join() 
    print('主進程結束')

cp.join() 會讓消費者進程一直等待生產者往隊列放數據直到設置的超時時間。具體的邏輯需要結合自己程序的實際需求來定,是需要一直等待生產者生產數據還是隨着主進程結束而結束。

三 總結

本文結合前面文章中介紹的多進程中的 守護進程和 join()方法,學習如何使用隊列中的兩個函數 task_donejoin。其實還有其他比較多的函數用法,需要深入的學習探索,感興趣的朋友可以動手實踐一下。

推薦閱讀

https://docs.python.org/zh-cn/3/library/queue.html
https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/12_Thread_communication_using_a_queue.html

-The End-

本公眾號長期關注於數據庫技術以及性能優化,故障案例分析,數據庫運維技術知識分享,個人成長和自我管理等主題,歡迎掃碼關注。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM