python asyncio
網絡模型有很多中,為了實現高並發也有很多方案,多線程,多進程。無論多線程和多進程,IO的調度更多取決於系統,而協程的方式,調度來自用戶,用戶可以在函數中yield一個狀態。使用協程可以實現高效的並發任務。Python的在3.4中引入了協程的概念,可是這個還是以生成器對象為基礎,3.5則確定了協程的語法。下面將簡單介紹asyncio的使用。實現協程的不僅僅是asyncio,tornado和gevent都實現了類似的功能。
- event_loop 事件循環:程序開啟一個無限的循環,程序員會把一些函數注冊到事件循環上。當滿足事件發生的時候,調用相應的協程函數。
- coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會立即執行函數,而是會返回一個協程對象。協程對象需要注冊到事件循環,由事件循環調用。
- task 任務:一個協程對象就是一個原生可以掛起的函數,任務則是對協程進一步封裝,其中包含任務的各種狀態。
- future: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質的區別
- async/await 關鍵字:python3.5 用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。
上述的概念單獨拎出來都不好懂,比較他們之間是相互聯系,一起工作。下面看例子,再回溯上述概念,更利於理解。
定義一個協程
定義一個協程很簡單,使用async關鍵字,就像定義普通函數一樣:
通過async關鍵字定義一個協程(coroutine),協程也是一種對象。協程不能直接運行,需要把協程加入到事件循環(loop),由后者在適當的時候調用協程。asyncio.get_event_loop
方法可以創建一個事件循環,然后使用run_until_complete
將協程注冊到事件循環,並啟動事件循環。因為本例只有一個協程,於是可以看見如下輸出:
Waiting: 2
TIME:
創建一個task
協程對象不能直接運行,在注冊事件循環的時候,其實是run_until_complete方法將協程包裝成為了一個任務(task)對象。所謂task對象是Future類的子類。保存了協程運行后的狀態,用於未來獲取協程的結果。
可以看到輸出結果為:
創建task后,task在加入事件循環之前是pending狀態,因為do_some_work中沒有耗時的阻塞操作,task很快就執行完畢了。后面打印的finished狀態。
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以創建一個task,run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。isinstance(task, asyncio.Future)
將會輸出True。
綁定回調
綁定回調,在task執行完畢的時候可以獲取執行的結果,回調的最后一個參數是future對象,通過該對象可以獲取協程返回值。如果回調需要多個參數,可以通過偏函數導入。
def callback(t, future): print('Callback:', t, future.result()) task.add_done_callback(functools.partial(callback, 2))
future 與 result
回調一直是很多異步編程的惡夢,程序員更喜歡使用同步的編寫方式寫異步代碼,以避免回調的惡夢。回調中我們使用了future對象的result方法。前面不綁定回調的例子中,我們可以看到task有fiinished狀態。在那個時候,可以直接讀取task的result方法。
可以看到輸出的結果:
阻塞和await
使用async可以定義協程對象,使用await可以針對耗時的操作進行掛起,就像生成器里的yield一樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其他的協程也掛起或者執行完畢,再進行下一個協程的執行。
耗時的操作一般是一些IO操作,例如網絡請求,文件讀取等。我們使用asyncio.sleep函數來模擬IO操作。協程的目的也是讓這些IO操作異步化。
在 sleep的時候,使用await讓出控制權。即當遇到阻塞調用的函數的時候,使用await方法將協程的控制權讓出,以便loop調用其他的協程。現在我們的例子就用耗時的阻塞操作了。
並發和並行
並發和並行一直是容易混淆的概念。並發通常指有多個任務需要同時進行,並行則是同一時刻有多個任務執行。用上課來舉例就是,並發情況下是一個老師在同一時間段輔助不同的人功課。並行則是好幾個老師分別同時輔助多個學生功課。簡而言之就是一個人同時吃三個饅頭還是三個人同時分別吃一個的情況,吃一個饅頭算一個任務。
asyncio實現並發,就需要多個協程來完成任務,每當有任務阻塞的時候就await,然后其他協程繼續工作。創建多個協程的列表,然后將這些協程注冊到事件循環中。
結果如下
總時間為4s左右。4s的阻塞時間,足夠前面兩個協程執行完畢。如果是同步順序的任務,那么至少需要7s。此時我們使用了aysncio實現了並發。asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個task列表,后者接收一堆task。
協程嵌套
使用async可以定義協程,協程用於耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實現了嵌套的協程,即一個協程中await了另外一個協程,如此連接起來。
如果使用的是 asyncio.gather創建協程對象,那么await的返回值就是協程運行的結果。
不在main協程函數里處理結果,直接返回await的內容,那么最外層的run_until_complete將會返回main協程的結果。
或者返回使用asyncio.wait方式掛起協程。
也可以使用asyncio的as_completed方法
由此可見,協程的調用和組合十分靈活,尤其是對於結果的處理,如何返回,如何掛起,需要逐漸積累經驗和前瞻的設計。
協程停止
上面見識了協程的幾種常用的用法,都是協程圍繞着事件循環進行的操作。future對象有幾個狀態:
- Pending
- Running
- Done
- Cancelled
創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,就需要先把task取消。可以使用asyncio.Task獲取事件循環的task
啟動事件循環之后,馬上ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。然后通過循環asyncio.Task取消future。可以看到輸出如下:
True表示cannel成功,loop stop之后還需要再次開啟事件循環,最后在close,不然還會拋出異常:
循環task,逐個cancel是一種方案,可是正如上面我們把task的列表封裝在main函數中,main函數外進行事件循環的調用。這個時候,main相當於最外出的一個task,那么處理包裝的main函數即可。