本文鏈接:https://www.cnblogs.com/tujia/p/13637535.html
背景:python 隊列 queue.Queue 或 multiprcessing.Queue 或其他隊列在寫入隊列或從隊列中讀取元素時,都有可能會發生線程阻塞。
下面來說一下阻塞的類型,然后怎么避免阻塞~
一、阻塞的類型
隊列的阻塞分為:入隊(put)時的阻塞、出隊(get)時的阻塞、整體(join)的阻塞(消費的阻塞)
二、入隊的阻塞
import queue def 入隊阻塞(): q = queue.Queue(maxsize=3) for i in range(4): q.put('任務' + str(i+1)) print('Finished') if __name__ == '__main__': 入隊阻塞()
注:因為定義的隊列的 maxsize=3,但 put 了4個元素進隊列,第4個元素將無法 put 進隊列,發生阻塞;注意:就算不設置 maxsize,電腦的內存也是有限的,隊列也是會滿的。當隊列已滿,做 put 操作時,一樣會發生阻塞。
正確的處理方法:
import queue def 入隊阻塞(): q = queue.Queue(maxsize=3) for i in range(4): try: q.put('任務' + str(i+1), block=True, timeout=3) except queue.Full: print('任務%d: 隊列已滿,寫入失敗' % (i+1)) print('Finished') if __name__ == '__main__': 入隊阻塞()
注:設置 timeout 超時時間,並捕捉 queue.Full 異常;設置tomeout一樣會阻塞線程,但timeout之后,可以繼續操行程序。如果不想使用 timeout 選項,也可以直接設置 block(阻塞) 為 False,或者直接使用 q.put_nowait 方法(注意:當隊列已滿的時候 ,put_nowait 一樣會觸發 queue.Full 異常)
三、出隊的阻塞
import queue def 出隊阻塞(): q = queue.Queue(maxsize=3) for i in range(3): try: q.put_nowait('任務' + str(i+1)) except queue.Full: print('full') for i in range(4): task = q.get() print(task) print('Finished') if __name__ == '__main__': 出隊阻塞()
注:隊列里只有3個元素,但get了4次。第4次get的時候,不會返回空,而是會發生阻塞。
正確的處理方法:
import queue def 出隊阻塞(): q = queue.Queue(maxsize=3) for i in range(3): try: q.put_nowait('任務' + str(i+1)) except queue.Full: print('full') for i in range(4): try: task = q.get(block=True, timeout=3) print(task) except queue.Empty: print('隊列為空,get失敗') print('Finished') if __name__ == '__main__': 出隊阻塞()
注:設置 timeout 超時時間,並捕捉 queue.Empty 異常;設置tomeout一樣會阻塞線程,但timeout之后,可以繼續操行程序。如果不想使用 timeout 選項,也可以直接設置 block(阻塞) 為 False,或者直接使用 q.get_nowait 方法(注意:當隊列為空的時候 ,get_nowait 一樣會觸發 queue.Empty 異常)
四、消費阻塞(正確來說,應該是未消費完時的阻塞)
import queue def 消費阻塞(): q = queue.Queue(maxsize=3) for i in range(3): try: q.put_nowait('任務' + str(i+1)) except queue.Full: print('full') for i in range(2): try: task = q.get(block=True, timeout=3) print(task) q.task_done() except queue.Empty: print('隊列為空,get失敗') # 阻塞隊列 q.join() print('Finished') if __name__ == '__main__': 消費阻塞()
注:隊列里設置了3個任務,但只調用了兩次 task_done(標記兩個任務已完成),還有一個任務未處理,隊列將阻塞至第三個任務被消費(標志為 task_done)
上面說完了各種阻塞,下面來說一下阻塞作用~~
五、入隊阻塞的作用
很明顯,當我要做入隊操作時,如果隊列已滿時,我不會說馬上掉頭就走,而是會等一下,看有沒有人出隊,然后,我就可以擠上去了。這就是入隊阻塞的作用。
例如異步(asyncio)或多線程(Thread)操作同一個隊列(queue),下面看一下使用 asyncio 異步操作 Queue 的例子:
import time import queue import asyncio def get_now(): return time.strftime('%X') # 入隊 async def qput(q): for i in range(5): # 每1秒寫入一個元素 await asyncio.sleep(1) try: await q.put(i) print('%s: %d 入隊' % (get_now(), i)) except queue.Full: print('Full') # 出隊 async def qget(q): for i in range(5): # 每2秒消費一個元素 await asyncio.sleep(2) try: item = await q.get() print('%s: %d 出隊' % (get_now(), item)) except queue.Empty: print('Empty') async def main(): q = asyncio.Queue(maxsize=3) print('%s: Start' % get_now()) await asyncio.gather(qput(q), qget(q)) print('%s: Finished' % get_now()) if __name__ == '__main__': asyncio.run(main())
運行結果大概是這樣:
六、出隊阻塞的作用
出隊阻塞和入隊阻塞是一樣的。假設你是一個包工頭,看到應聘的隊列里沒有人,不要着急着馬上走啊,等一下可能就有人過來應聘了。這就是 get 阻塞的作用。
下面來看一下 asyncio 異步操作 queue 的例子:
import time import queue import asyncio import random def get_now(): return time.strftime('%X') # 招工 async def 招工(q): print('包工頭:招人了喂,管吃管喝、五險一金~') worker_count = 0 for i in range(q.maxsize): try: # 就等10秒 worker = await asyncio.wait_for(q.get(), timeout=10) worker_count = worker_count + 1 print('%s: 面試【%s】,通過/入職' % (get_now(), worker)) except asyncio.TimeoutError: # 10秒內都沒人來,直接提前下班了 print('包工頭:唉,都沒人來應聘,今天只能提前下班了~') exit(0) print('包工頭:招夠了,可以下班了~~') # 應聘 async def 應聘(q): workers = ['張三', '李四', '王五', '趙六', '陳七'] for name in workers: # 不定時有人來應聘。注:時間要控制到10秒內,10秒內都沒人來,包工頭就要提前下班了 await asyncio.sleep(random.randint(1, 10)) try: await q.put(name) print('%s: 【%s】 去應聘了' % (get_now(), name)) except queue.Full: print('Full') async def main(): # 上級給任務了,要招夠5個人 q = asyncio.Queue(maxsize=5) print('%s: Start' % get_now()) await asyncio.gather(招工(q), 應聘(q)) print('%s: Finished' % get_now()) if __name__ == '__main__': asyncio.run(main())
運行結果大概是這樣:
注:須要注意一下,asyncio 操作 queue 時,不能用原生的 queue.Queue,要用 asyncio.Queue
參考鏈接:
https://docs.python.org/zh-cn/3.7/library/queue.html
https://docs.python.org/zh-cn/3.7/library/asyncio-task.html
https://docs.python.org/zh-cn/3.7/library/asyncio-queue.html
本文鏈接:https://www.cnblogs.com/tujia/p/13637535.html
完。