python 隊列(queue)阻塞


本文鏈接:https://www.cnblogs.com/tujia/p/13637535.html

 

背景:python 隊列 queue.Queue 或 multiprcessing.Queue 或其他隊列在寫入隊列或從隊列中讀取元素時,都有可能會發生線程阻塞。

 

下面來說一下阻塞的類型,然后怎么避免阻塞~

 

一、阻塞的類型

隊列的阻塞分為:入隊(put)時的阻塞、出隊(get)時的阻塞、整體(join)的阻塞(消費的阻塞)

 

二、入隊的阻塞

import queue


def 入隊阻塞():
    q = queue.Queue(maxsize=3)
    for i in range(4):
        q.put('任務' + str(i+1))

    print('Finished')


if __name__ == '__main__':
    入隊阻塞()

注:因為定義的隊列的 maxsize=3,但 put 了4個元素進隊列,第4個元素將無法 put 進隊列,發生阻塞;注意:就算不設置 maxsize,電腦的內存也是有限的,隊列也是會滿的。當隊列已滿,做 put 操作時,一樣會發生阻塞。

 

正確的處理方法:

import queue


def 入隊阻塞():
    q = queue.Queue(maxsize=3)
    for i in range(4):
        try:
            q.put('任務' + str(i+1), block=True, timeout=3)
        except queue.Full:
            print('任務%d: 隊列已滿,寫入失敗' % (i+1))

    print('Finished')


if __name__ == '__main__':
    入隊阻塞()

注:設置 timeout 超時時間,並捕捉 queue.Full 異常;設置tomeout一樣會阻塞線程,但timeout之后,可以繼續操行程序。如果不想使用 timeout 選項,也可以直接設置 block(阻塞) 為 False,或者直接使用 q.put_nowait 方法(注意:當隊列已滿的時候 ,put_nowait 一樣會觸發 queue.Full 異常)

 

三、出隊的阻塞

import queue


def 出隊阻塞():
    q = queue.Queue(maxsize=3)
    for i in range(3):
        try:
            q.put_nowait('任務' + str(i+1))
        except queue.Full:
            print('full')

    for i in range(4):
        task = q.get()
        print(task)

    print('Finished')


if __name__ == '__main__':
    出隊阻塞()

注:隊列里只有3個元素,但get了4次。第4次get的時候,不會返回空,而是會發生阻塞。

 

正確的處理方法:

import queue


def 出隊阻塞():
    q = queue.Queue(maxsize=3)
    for i in range(3):
        try:
            q.put_nowait('任務' + str(i+1))
        except queue.Full:
            print('full')

    for i in range(4):
        try:
            task = q.get(block=True, timeout=3)
            print(task)
        except queue.Empty:
            print('隊列為空,get失敗')

    print('Finished')


if __name__ == '__main__':
    出隊阻塞()

注:設置 timeout 超時時間,並捕捉 queue.Empty 異常;設置tomeout一樣會阻塞線程,但timeout之后,可以繼續操行程序。如果不想使用 timeout 選項,也可以直接設置 block(阻塞) 為 False,或者直接使用 q.get_nowait 方法(注意:當隊列為空的時候 ,get_nowait 一樣會觸發 queue.Empty 異常)

 

四、消費阻塞(正確來說,應該是未消費完時的阻塞)

import queue


def 消費阻塞():
    q = queue.Queue(maxsize=3)
    for i in range(3):
        try:
            q.put_nowait('任務' + str(i+1))
        except queue.Full:
            print('full')

    for i in range(2):
        try:
            task = q.get(block=True, timeout=3)
            print(task)
            q.task_done()
        except queue.Empty:
            print('隊列為空,get失敗')
    # 阻塞隊列
    q.join()
    print('Finished')


if __name__ == '__main__':
    消費阻塞()

注:隊列里設置了3個任務,但只調用了兩次 task_done(標記兩個任務已完成),還有一個任務未處理,隊列將阻塞至第三個任務被消費(標志為 task_done)

 


 

上面說完了各種阻塞,下面來說一下阻塞作用~~

 

五、入隊阻塞的作用

很明顯,當我要做入隊操作時,如果隊列已滿時,我不會說馬上掉頭就走,而是會等一下,看有沒有人出隊,然后,我就可以擠上去了。這就是入隊阻塞的作用。

例如異步(asyncio)或多線程(Thread)操作同一個隊列(queue),下面看一下使用 asyncio 異步操作 Queue 的例子:

import time
import queue
import asyncio


def get_now():
    return time.strftime('%X')


# 入隊
async def qput(q):
    for i in range(5):
        # 每1秒寫入一個元素
        await asyncio.sleep(1)
        try:
            await q.put(i)
            print('%s: %d 入隊' % (get_now(), i))
        except queue.Full:
            print('Full')


# 出隊
async def qget(q):
    for i in range(5):
        # 每2秒消費一個元素
        await asyncio.sleep(2)
        try:
            item = await q.get()
            print('%s: %d 出隊' % (get_now(), item))
        except queue.Empty:
            print('Empty')


async def main():
    q = asyncio.Queue(maxsize=3)

    print('%s: Start' % get_now())
    await asyncio.gather(qput(q), qget(q))
    print('%s: Finished' % get_now())


if __name__ == '__main__':
    asyncio.run(main())

運行結果大概是這樣:

 

 六、出隊阻塞的作用

出隊阻塞和入隊阻塞是一樣的。假設你是一個包工頭,看到應聘的隊列里沒有人,不要着急着馬上走啊,等一下可能就有人過來應聘了。這就是 get 阻塞的作用。

下面來看一下 asyncio 異步操作 queue 的例子:

import time
import queue
import asyncio
import random


def get_now():
    return time.strftime('%X')


# 招工
async def 招工(q):
    print('包工頭:招人了喂,管吃管喝、五險一金~')
    worker_count = 0
    for i in range(q.maxsize):
        try:
            # 就等10秒
            worker = await asyncio.wait_for(q.get(), timeout=10)
            worker_count = worker_count + 1
            print('%s: 面試【%s】,通過/入職' % (get_now(), worker))
        except asyncio.TimeoutError:
            # 10秒內都沒人來,直接提前下班了
            print('包工頭:唉,都沒人來應聘,今天只能提前下班了~')
            exit(0)

    print('包工頭:招夠了,可以下班了~~')


# 應聘
async def 應聘(q):
    workers = ['張三', '李四', '王五', '趙六', '陳七']
    for name in workers:
        # 不定時有人來應聘。注:時間要控制到10秒內,10秒內都沒人來,包工頭就要提前下班了
        await asyncio.sleep(random.randint(1, 10))
        try:
            await q.put(name)
            print('%s: 【%s】 去應聘了' % (get_now(), name))
        except queue.Full:
            print('Full')


async def main():
    # 上級給任務了,要招夠5個人
    q = asyncio.Queue(maxsize=5)

    print('%s: Start' % get_now())
    await asyncio.gather(招工(q), 應聘(q))
    print('%s: Finished' % get_now())


if __name__ == '__main__':
    asyncio.run(main())

運行結果大概是這樣:

 

注:須要注意一下,asyncio 操作 queue 時,不能用原生的 queue.Queue,要用 asyncio.Queue

 

參考鏈接:

https://docs.python.org/zh-cn/3.7/library/queue.html

https://docs.python.org/zh-cn/3.7/library/asyncio-task.html

https://docs.python.org/zh-cn/3.7/library/asyncio-queue.html

 

 

本文鏈接:https://www.cnblogs.com/tujia/p/13637535.html


完。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM