第29天:Python queue 模塊詳解


by 吳刀釣魚

queue 模塊即隊列,特別適合處理信息在多個線程間安全交換的多線程程序中。下面我們對 queue 模塊進行一個詳細的使用介紹。

1 queue 模塊定義的類和異常

queue 模塊定義了以下四種不同類型的隊列,它們之間的區別在於數據入隊列之后出隊列的順序不同。

1.1 queue.Queue(maxsize=0)

先進先出(First In First Out: FIFO)隊列,最早進入隊列的數據擁有出隊列的優先權,就像看電影入場時排隊一樣,排在隊伍前頭的優先進入電影院。

入參 maxsize 是一個整數,用於設置隊列的最大長度。一旦隊列達到上限,插入數據將會被阻塞,直到有數據出隊列之后才可以繼續插入。如果 maxsize 設置為小於或等於零,則隊列的長度沒有限制。

示例如下:

import queue
q = queue.Queue()  # 創建 Queue 隊列
for i in range(3):
    q.put(i)  # 在隊列中依次插入0、1、2元素
for i in range(3):
    print(q.get())  # 依次從隊列中取出插入的元素,數據元素輸出順序為0、1、2

1.2 queue.LifoQueue(maxsize=0)

后進先出(Last In First Out: LIFO)隊列,最后進入隊列的數據擁有出隊列的優先權,就像棧一樣。

入參 maxsize 與先進先出隊列的定義一樣。

示例如下:

import queue
q = queue.LifoQueue()  # 創建 LifoQueue 隊列
for i in range(3):
    q.put(i)  # 在隊列中依次插入0、1、2元素
for i in range(3):
    print(q.get())  # 依次從隊列中取出插入的元素,數據元素輸出順序為2、1、0

1.3 PriorityQueue(maxsize=0)

優先級隊列,比較隊列中每個數據的大小,值最小的數據擁有出隊列的優先權。數據一般以元組的形式插入,典型形式為(priority_number, data)。如果隊列中的數據沒有可比性,那么數據將被包裝在一個類中,忽略數據值,僅僅比較優先級數字。

入參 maxsize 與先進先出隊列的定義一樣。

示例如下:

import queue
q = queue.PriorityQueue()  # 創建 PriorityQueue 隊列
data1 = (1, 'python')
data2 = (2, '-')
data3 = (3, '100')
style = (data2, data3, data1)
for i in style:
    q.put(i)  # 在隊列中依次插入元素 data2、data3、data1
for i in range(3):
    print(q.get())  # 依次從隊列中取出插入的元素,數據元素輸出順序為 data1、data2、data3

1.4 queue.SimpleQueue

先進先出類型的簡單隊列,沒有大小限制。由於它是簡單隊列,相比於 Queue 隊列會缺少一些高級功能,下面第2-3小節將會介紹。

示例如下:

import queue
q = queue.SimpleQueue()  # 創建 SimpleQueue 隊列
for i in range(3):
    q.put(i)  # 在隊列中依次插入0、1、2元素
for i in range(3):
    print(q.get())  # 依次從隊列中取出插入的元素,數據元素輸出順序為0、1、2

1.5 queue.Empty 異常

當隊列中沒有數據元素時,取出隊列中的數據會引發 queue.Empty 異常,主要是不正當使用 get() 和 get_nowait() 引起的。

示例如下:

import queue
try:
    q = queue.Queue(3)  # 設置隊列上限為3
    q.put('python')  # 在隊列中插入字符串 'python'
    q.put('-') # 在隊列中插入字符串 '-'
    q.put('100') # 在隊列中插入字符串 '100'
    for i in range(4):  # 從隊列中取數據,取出次數為4次,引發 queue.Empty 異常
        print(q.get(block=False))
except queue.Empty:
    print('queue.Empty')

1.6 queue.Full 異常

當隊列數據元素容量達到上限時,繼續往隊列中放入數據會引發 queue.Empty 異常,主要是不正當使用 put() 和 put_nowait() 引起的。

示例如下:

import queue
try:
    q = queue.Queue(3)  # 設置隊列上限為3
    q.put('python')  # 在隊列中插入字符串 'python'
    q.put('-') # 在隊列中插入字符串 '-'
    q.put('100') # 在隊列中插入字符串 '100'
    q.put('stay hungry, stay foolish', block=False)  # 隊列已滿,繼續往隊列中放入數據,引發 queue.Full 異常
except queue.Full:
    print('queue.Full')

2 Queue、LifoQueue、PriorityQueue 和 SimpleQueue 對象的基本使用方法

Queue、LifoQueue、PriorityQueue 和 SimpleQueue 四種隊列定義的對象均提供了以下函數使用方法,下面以 Queue 隊列為例進行介紹。

2.1 Queue.qsize()

返回隊列中數據元素的個數。

示例如下:

import queue
q = queue.Queue()
q.put('python-100')  # 在隊列中插入元素 'python-100'
print(q.qsize())  # 輸出隊列中元素個數為1

2.2 Queue.empty()

如果隊列為空,返回 True,否則返回 False。

示例如下:

import queue
q = queue.Queue()
print(q.empty())  # 對列為空,返回 True
q.put('python-100')  # 在隊列中插入元素 'python-100'
print(q.empty())  # 對列不為空,返回 False

2.3 Queue.full()

如果隊列中元素個數達到上限,返回 True,否則返回 False。

示例如下:

import queue
q = queue.Queue(3)  # 定義一個長度為3的隊列
print(q.full())  # 元素個數未達到上限,返回 False
q.put('python')  # 在隊列中插入字符串 'python'
q.put('-') # 在隊列中插入字符串 '-'
q.put('100') # 在隊列中插入字符串 '100'
print(q.full())  # 元素個數達到上限,返回 True

2.4 Queue.put(item, block=True, timeout=None)

  • item,放入隊列中的數據元素。
  • block,當隊列中元素個數達到上限繼續往里放數據時:如果 block=False,直接引發 queue.Full 異常;如果 block=True,且 timeout=None,則一直等待直到有數據出隊列后可以放入數據;如果 block=True,且 timeout=N,N 為某一正整數時,則等待 N 秒,如果隊列中還沒有位置放入數據就引發 queue.Full 異常。
  • timeout,設置超時時間。

示例如下:

import queue
try:
    q = queue.Queue(2)  # 設置隊列上限為2
    q.put('python')  # 在隊列中插入字符串 'python'
    q.put('-') # 在隊列中插入字符串 '-'
    q.put('100', block = True, timeout = 5) # 隊列已滿,繼續在隊列中插入字符串 '100',等待5秒后會引發 queue.Full 異常
except queue.Full:
    print('queue.Full')

2.5 Queue.put_nowait(item)

相當於 Queue.put(item, block=False),當隊列中元素個數達到上限繼續往里放數據時直接引發 queue.Full 異常。

import queue
try:
    q = queue.Queue(2)  # 設置隊列上限為2
    q.put_nowait('python')  # 在隊列中插入字符串 'python'
    q.put_nowait('-') # 在隊列中插入字符串 '-'
    q.put_nowait('100') # 隊列已滿,繼續在隊列中插入字符串 '100',直接引發 queue.Full 異常
except queue.Full:
    print('queue.Full')

2.6 Queue.get(block=True, timeout=None)

從隊列中取出數據並返回該數據內容。

  • block,當隊列中沒有數據元素繼續取數據時:如果 block=False,直接引發 queue.Empty 異常;如果 block=True,且 timeout=None,則一直等待直到有數據入隊列后可以取出數據;如果 block=True,且 timeout=N,N 為某一正整數時,則等待 N 秒,如果隊列中還沒有數據放入的話就引發 queue.Empty 異常。
  • timeout,設置超時時間。

示例如下:

import queue
try:
    q = queue.Queue()
    q.get(block = True, timeout = 5) # 隊列為空,往隊列中取數據時,等待5秒后會引發 queue.Empty 異常
except queue.Empty:
    print('queue.Empty')

2.7 Queue.get_nowait()

相當於 Queue.get(block=False)block,當隊列中沒有數據元素繼續取數據時直接引發 queue.Empty 異常。

示例如下:

import queue
try:
    q = queue.Queue()
    q.get_nowait() # 隊列為空,往隊列中取數據時直接引發 queue.Empty 異常
except queue.Empty:
    print('queue.Empty')

3 Queue、LifoQueue 和 PriorityQueue 對象的高級使用方法

SimpleQueue 是 Python 3.7 版本中新加入的特性,與 Queue、LifoQueue 和 PriorityQueue 三種隊列相比缺少了 task_done 和 join 的高級使用方法,所以才會取名叫 Simple 了,下面介紹一下 task_done 和 join 的使用方法。

  • task_done,表示隊列內的數據元素已經被取出,即每個 get 用於獲取一個數據元素, 后續調用 task_done 告訴隊列,該數據的處理已經完成。如果被調用的次數多於放入隊列中的元素個數,將引發 ValueError 異常。
  • join,一直阻塞直到隊列中的所有數據元素都被取出和執行,只要有元素添加到 queue 中就會增加。當未完成任務的計數等於0,join 就不會阻塞。

示例如下:

import queue
q = queue.Queue()
q.put('python')
q.put('-')
q.put('100')
for i in range(3):
    print(q.get())
    q.task_done()  # 如果不執行 task_done,join 會一直處於阻塞狀態,等待 task_done 告知它數據的處理已經完成
q.join()

下面是一個經典示例,生產者和消費者線程分別生產數據和消費數據,先生產后消費。采用 task_done 和 join 確保處理信息在多個線程間安全交換,生產者生產的數據能夠全部被消費者消費掉。

from queue import Queue
import random
import threading
import time

#生產者線程
class Producer(threading.Thread):
    def __init__(self, t_name, queue):
        threading.Thread.__init__(self, name=t_name)
        self.data=queue
    def run(self):
        for i in range(5):
            print ("%s: %s is producing %d to the queue!" %(time.ctime(), self.getName(), i))
            self.data.put(i)  # 將生產的數據放入隊列
            time.sleep(random.randrange(10)/5)
        print ("%s: %s finished!" %(time.ctime(), self.getName()))

#消費者線程
class Consumer(threading.Thread):
    def __init__(self, t_name, queue):
        threading.Thread.__init__(self, name=t_name)
        self.data=queue
    def run(self):
        for i in range(5):
            val = self.data.get()  # 拿出已經生產好的數據
            print ("%s: %s is consuming. %d in the queue is consumed!" %(time.ctime(), self.getName(), val))
            time.sleep(random.randrange(5))
            self.data.task_done() # 告訴隊列有關這個數據的任務已經處理完成
        print ("%s: %s finished!" %(time.ctime(), self.getName()))

#主線程
def main():
    queue = Queue()
    producer = Producer('Pro.', queue)
    consumer = Consumer('Con.', queue)
    producer.start()
    consumer.start()
    queue.join()  # 阻塞,直到生產者生產的數據全都被消費掉
    producer.join() # 等待生產者線程結束
    consumer.join() # 等待消費者線程結束
    print ('All threads terminate!')
 
if __name__ == '__main__':
    main()

4 總結

本節給大家介紹了 Python 的 queue 模塊,為 Python 工程師對該模塊的使用提供了支撐,讓大家對 queue 模塊的相關概念和使用有一個初步的了解。

參考資料

[1] https://docs.python.org/3/library/queue.html

[2] https://www.cnblogs.com/God-Li/p/7745408.html

示例代碼:Python-100-days-day029

關注公眾號:python技術,回復"python"一起學習交流


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM