python 多進程和多線程3 —— asyncio - 異步IO


asyncio 被用作 提供高性能 Python 異步框架的基礎,包括網絡和網站服務,數據庫連接庫,分布式任務隊列等等。

asyncio 提供一組 高層級 API 用於:

此外,還有一些 低層級 API 以支持 庫和框架的開發者 實現:

 

event_loop 事件循環:程序開啟一個無限的循環,會把一些函數注冊到事件循環上。當滿足事件發生時,調用相應的協程函數。

asyncio.gather和asyncio.wait

  1. asyncio.gather能收集協程的結果,且會按照輸入協程的順序保存對應協程的執行結果,而asyncio.wait的返回值有兩項,第一項是完成的任務列表,第二項表示等待完成的任務列表。
  2. asyncio.wait 支持接受一個參數return_when,默認情況下asyncio.wait會等待全部任務完成(return_when='ALL_COMPLETED'),它還支持FIRST_COMPLETED(第一個協程完成就返回)和FIRST_EXCEPTION(出現第一個異常就返回):

 

# 定義協程coroutine:調用不會立即執行,而是會返回一個協程對象。協程對象需要注冊到事件循環,由事件循環調用。
async def my_task(x):
    print('Waiting: ', x)
    return "Done"
 
# 消息循環: 從asyncio模塊中直接獲取一個EventLoop的引用,然后把需要執行的協程扔到EventLoop中執行,就實現異步IO。
loop = asyncio.get_event_loop()       # new_event_loop
loop.run_until_complete(my_task(2))   # 堵塞直到所有tasks完成 # 創建task
task = loop.create_task(my_task(2))   # 或 task = asyncio.ensure_future(my_task(2))
loop.run_until_complete(task)         # 運行任務直到Future完成並返回它的結果,task.result()協程返回值
 
 
# 綁定回調
def callback(future):
    print('Callback: ', future.result())
task.add_done_callback(callback)
loop.run_until_complete(task)      # coroutine執行結束時會調用回調函數,並通過參數future獲取協程執行的結果, 創建的task和回調里的future對象,實際上是同一個對象。
 
 
# 在函數定義時用async def foo()代替 @gen.coroutine 裝飾器, 用 await 代替yield. 
# 協程遇到await,事件循環會將其掛起,執行別的協程,直到其他協程也掛起或執行完畢,再進行下一個協程的執行。
async def my_task(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return "Done"
# ====================================================================
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)   # 同步阻塞
    print('Finished more work {}'.format(x))
 
 
# 動態添加協程到事件循環: 多線程
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
 
 
# 子線程中進行事件循環的並發操作,同時主線程又不會被block
asyncio.run_coroutine_threadsafe(my_task(6), new_loop)
asyncio.run_coroutine_threadsafe(my_task(4), new_loop)
 
 
# master-worker主從模式: 主線程用來監聽隊列,子線程用於處理隊列
t.setDaemon(True)    # 設置子線程為守護線程, 當主線程結束的時候,子線程也隨機退出。
try:
    while True:
        task = rcon.rpop("queue")
        if not task:
            time.sleep(1)
            continue
        asyncio.run_coroutine_threadsafe(my_task(int(task)), new_loop)
except KeyboardInterrupt as e:
    print(e)
    new_loop.stop()
 
 
loop.run_until_complete(future):協程()、task、asyncio.wait(tasks)、asyncio.gather(
*tasks) # 阻塞調用,直到協程運行結束,它才返回。 loop.run_forever() # 一直運行,直到loop.stop被調用. 但不能直接調用,可用gather把多個協程合並成一個future,並添加回調,然后在回調里再去停止 loop。 @asyncio.coroutine把一個generator標記為coroutine類型,然后就把這個coroutine扔到EventLoop中執行。 asyncio.sleep() # 協程,可模擬IO操作如網絡請求,文件讀取等

參考: https://www.jianshu.com/p/b5e347b3a17c


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM