asyncio 被用作 提供高性能 Python 異步框架的基礎,包括網絡和網站服務,數據庫連接庫,分布式任務隊列等等。
asyncio 提供一組 高層級 API 用於:
-
並發地 運行 Python 協程 並對其執行過程實現完全控制;
-
執行 網絡 IO 和 IPC;
-
控制 子進程;
-
通過 隊列 實現分布式任務;
-
同步 並發代碼;
此外,還有一些 低層級 API 以支持 庫和框架的開發者 實現:
-
使用 transports 實現高效率協議;
-
通過 async/await 語法 橋接 基於回調的庫和代碼。
event_loop 事件循環:程序開啟一個無限的循環,會把一些函數注冊到事件循環上。當滿足事件發生時,調用相應的協程函數。
asyncio.gather和asyncio.wait
asyncio.gather
能收集協程的結果,且會按照輸入協程的順序保存對應協程的執行結果,而asyncio.wait
的返回值有兩項,第一項是完成的任務列表,第二項表示等待完成的任務列表。asyncio.wait
支持接受一個參數return_when
,默認情況下asyncio.wait
會等待全部任務完成(return_when='ALL_COMPLETED')
,它還支持FIRST_COMPLETED
(第一個協程完成就返回)和FIRST_EXCEPTION
(出現第一個異常就返回):
# 定義協程coroutine:調用不會立即執行,而是會返回一個協程對象。協程對象需要注冊到事件循環,由事件循環調用。 async def my_task(x): print('Waiting: ', x) return "Done" # 消息循環: 從asyncio模塊中直接獲取一個EventLoop的引用,然后把需要執行的協程扔到EventLoop中執行,就實現異步IO。 loop = asyncio.get_event_loop() # new_event_loop loop.run_until_complete(my_task(2)) # 堵塞直到所有tasks完成 # 創建task task = loop.create_task(my_task(2)) # 或 task = asyncio.ensure_future(my_task(2)) loop.run_until_complete(task) # 運行任務直到Future完成並返回它的結果,task.result()協程返回值 # 綁定回調 def callback(future): print('Callback: ', future.result()) task.add_done_callback(callback) loop.run_until_complete(task) # coroutine執行結束時會調用回調函數,並通過參數future獲取協程執行的結果, 創建的task和回調里的future對象,實際上是同一個對象。 # 在函數定義時用async def foo()
代替@gen.coroutine
裝飾器, 用await
代替yield. # 協程遇到await,事件循環會將其掛起,執行別的協程,直到其他協程也掛起或執行完畢,再進行下一個協程的執行。 async def my_task(x): print('Waiting: ', x) await asyncio.sleep(x) return "Done" # ==================================================================== def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def more_work(x): print('More work {}'.format(x)) time.sleep(x) # 同步阻塞 print('Finished more work {}'.format(x)) # 動態添加協程到事件循環: 多線程 new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() new_loop.call_soon_threadsafe(more_work, 6) new_loop.call_soon_threadsafe(more_work, 3) # 子線程中進行事件循環的並發操作,同時主線程又不會被block asyncio.run_coroutine_threadsafe(my_task(6), new_loop) asyncio.run_coroutine_threadsafe(my_task(4), new_loop) # master-worker主從模式: 主線程用來監聽隊列,子線程用於處理隊列 t.setDaemon(True) # 設置子線程為守護線程, 當主線程結束的時候,子線程也隨機退出。 try: while True: task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(my_task(int(task)), new_loop) except KeyboardInterrupt as e: print(e) new_loop.stop()
loop.run_until_complete(future):協程()、task、asyncio.wait(tasks)、asyncio.gather(*tasks) # 阻塞調用,直到協程運行結束,它才返回。 loop.run_forever() # 一直運行,直到loop.stop被調用. 但不能直接調用,可用gather把多個協程合並成一個future,並添加回調,然后在回調里再去停止 loop。 @asyncio.coroutine把一個generator標記為coroutine類型,然后就把這個coroutine扔到EventLoop中執行。 asyncio.sleep() # 協程,可模擬IO操作如網絡請求,文件讀取等