隊列 — 線程安全的FIFO實現
queue
模塊提供了一個適合多線程編程的先入、先出(FIFO)數據結構。它可以用來安全地傳遞生產者和消費者線程之間的消息或其他數據。由於線程安全,多線程可以安全地處理同一個Queue實例。
基本的FIFO隊列
Queue類實現基本的先進先出容器
put()
-- 從隊尾添加元素
get()
-- 從隊首刪除元素,並返回該元素
import queue
q = queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
這個例子使用一個線程來說明元素添加與刪除的順序相同,即先進先出。 結果:
0 1 2 3 4
一個小例子:擊鼓傳炸彈
代碼實現:
import queue
import random
q = queue.Queue()
nameList = ['A', 'B', 'C', 'D', 'E', 'F']
for name in nameList:
q.put(name)
while not q.qsize() == 1:
num = int(random.uniform(0, 20))
print(f'num={num}', end=' ')
while num: # 隨機轉動num次
q.put(q.get())
num -= 1
outName = q.get()
print(f'outName={outName}')
print(f'winner={q.get()}')
運行結果:
num=2 outName=C
num=6 outName=E
num=7 outName=D
num=0 outName=F
num=11 outName=B
winner=A
LIFO隊列
與隊列的標准FIFO實現不同,LifoQueue使用后進先出順序(通常與棧相關聯)。
import queue
q = queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
結果:
4 3 2 1 0
優先級隊列 Priority Queue
有時候,隊列中的項目的處理順序需要基於這些項的特征,而不僅僅是它們被創建或添加到隊列中的順序。例如,來自工資部門的打印作業可能優先於開發人員想要打印的代碼清單。PriorityQueue使用隊列內容的排序順序來決定要檢索哪一項。
import functools
import queue
import threading
@functools.total_ordering
class Job:
def __init__(self, priority, description):
self.priority = priority
self.description = description
print('New job:', description)
return
def __eq__(self, other):
try:
return self.priority == other.priority
except AttributeError:
return NotImplemented
def __lt__(self, other):
try:
return self.priority < other.priority
except AttributeError:
return NotImplemented
q = queue.PriorityQueue()
q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))
def process_job(q):
while True:
next_job = q.get()
print('Processing job:', next_job.description)
q.task_done()
workers = [
threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
w.setDaemon(True)
w.start()
q.join()
結果:
New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job
一點解釋 :
task_done()
意味着之前入隊的一個任務已經完成。由隊列的消費者線程調用。每一個get()調用得到一個任務,接下來的task_done()調用告訴隊列該任務已經處理完畢。
即每一個由put()調用入隊的任務都有一個對應的task_done()調用。
join()
阻塞調用線程,直到隊列中的所有任務被處理掉。
只要有數據被加入隊列,未完成的任務數就會增加。當消費者線程調用task_done()(意味着有消費者取得任務並完成任務),未完成的任務數就會減少。當未完成的任務數降到0,join()解除阻塞。
put(item[, block[, timeout]])
將item放入隊列中。
- 如果可選的參數block為True且timeout為空對象(默認的情況,阻塞調用,無超時)。
- 如果timeout是個正整數,阻塞調用進程最多timeout秒,如果一直無空空間可用,拋出Full異常(帶超時的阻塞調用)。
- 如果block為False,如果有空閑空間可用將數據放入隊列,否則立即拋出Full異常
其非阻塞版本為put_nowait等同於put(item, False)
get([block[, timeout]])
從隊列中移除並返回一個數據。block跟timeout參數同put方法
其非阻塞方法為get_nowait()相當與get(False)
empty()
如果隊列為空,返回True,反之返回False