隊列--線程安全的FIFO實現


隊列 — 線程安全的FIFO實現

queue 模塊提供了一個適合多線程編程的先入、先出(FIFO)數據結構。它可以用來安全地傳遞生產者和消費者線程之間的消息或其他數據。由於線程安全,多線程可以安全地處理同一個Queue實例。

基本的FIFO隊列

Queue類實現基本的先進先出容器
put() -- 從隊尾添加元素
get() -- 從隊首刪除元素,並返回該元素

import queue

q = queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

這個例子使用一個線程來說明元素添加與刪除的順序相同,即先進先出。 結果:

0 1 2 3 4 

一個小例子:擊鼓傳炸彈

image
代碼實現:

import queue
import random
q = queue.Queue()
nameList = ['A', 'B', 'C', 'D', 'E', 'F']
for name in nameList:
    q.put(name)

while not q.qsize() == 1:
    num = int(random.uniform(0, 20))
    print(f'num={num}', end=' ')
    while num:   # 隨機轉動num次
        q.put(q.get())
        num -= 1
    outName = q.get()
    print(f'outName={outName}')
print(f'winner={q.get()}')

運行結果:

num=2 outName=C
num=6 outName=E
num=7 outName=D
num=0 outName=F
num=11 outName=B
winner=A

LIFO隊列

與隊列的標准FIFO實現不同,LifoQueue使用后進先出順序(通常與棧相關聯)。

import queue

q = queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

結果:

4 3 2 1 0 

優先級隊列 Priority Queue

有時候,隊列中的項目的處理順序需要基於這些項的特征,而不僅僅是它們被創建或添加到隊列中的順序。例如,來自工資部門的打印作業可能優先於開發人員想要打印的代碼清單。PriorityQueue使用隊列內容的排序順序來決定要檢索哪一項。

import functools
import queue
import threading


@functools.total_ordering
class Job:

    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print('New job:', description)
        return

    def __eq__(self, other):
        try:
            return self.priority == other.priority
        except AttributeError:
            return NotImplemented

    def __lt__(self, other):
        try:
            return self.priority < other.priority
        except AttributeError:
            return NotImplemented


q = queue.PriorityQueue()

q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))


def process_job(q):
    while True:
        next_job = q.get()
        print('Processing job:', next_job.description)
        q.task_done()


workers = [
    threading.Thread(target=process_job, args=(q,)),
    threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
    w.setDaemon(True)
    w.start()

q.join()

結果:

New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job

一點解釋 :

task_done()
意味着之前入隊的一個任務已經完成。由隊列的消費者線程調用。每一個get()調用得到一個任務,接下來的task_done()調用告訴隊列該任務已經處理完畢。
即每一個由put()調用入隊的任務都有一個對應的task_done()調用。

join()
阻塞調用線程,直到隊列中的所有任務被處理掉。

只要有數據被加入隊列,未完成的任務數就會增加。當消費者線程調用task_done()(意味着有消費者取得任務並完成任務),未完成的任務數就會減少。當未完成的任務數降到0,join()解除阻塞。

put(item[, block[, timeout]])
將item放入隊列中。

  • 如果可選的參數block為True且timeout為空對象(默認的情況,阻塞調用,無超時)。
  • 如果timeout是個正整數,阻塞調用進程最多timeout秒,如果一直無空空間可用,拋出Full異常(帶超時的阻塞調用)。
  • 如果block為False,如果有空閑空間可用將數據放入隊列,否則立即拋出Full異常

其非阻塞版本為put_nowait等同於put(item, False)

get([block[, timeout]])
從隊列中移除並返回一個數據。block跟timeout參數同put方法
其非阻塞方法為get_nowait()相當與get(False)

empty()
如果隊列為空,返回True,反之返回False


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM