什么是異步編程?
同步代碼(synchrnous code)我們都很熟悉,就是運行完一個步驟再運行下一個。要在同步代碼里面實現"同時"運行多個任務,最簡單也是最直觀地方式就是運行多個 threads 或者多個 processes。這個層次的『同時運行』多個任務,是操作系統協助完成的。 也就是操作系統的任務調度系統來決定什么時候運行這個任務,什么時候切換任務,你自己,作為一個應用層的程序員,是沒辦法進行干預的。
我相信你也已經聽說了什么關於 thread 和 process 的抱怨:process 太重,thread 又要牽涉到很多頭條的鎖問題。尤其是對於一個 Python 開發者來說,由於GIL(全局解釋器鎖)的存在,多線程無法真正使用多核,如果你用多線程來運行計算型任務,速度會更慢。
異步編程與之不同的是,值使用一個進程,不使用 threads,但是也能實現"同時"運行多個任務(這里的任務其實就是函數)。
這些函數有一個非常 nice 的 feature:必要的可以暫停,把運行的權利交給其他函數。等到時機恰當,又可以恢復之前的狀態繼續運行。這聽上去是不是有點像進程呢?可以暫停,可以恢復運行。只不過進程的調度是操作系統完成的,這些函數的調度是進程自己(或者說程序員你自己)完成的。這也就意味着這將省去了很多計算機的資源,因為進程的調度必然需要大量 syscall,而 syscall 是很昂貴的。
一 定義一個簡單的協程:
import asyncio async def execute(x): print('Number:', x) return x coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print('Task:', task) loop.run_until_complete(task) print('Task:', task) print('After calling loop') # print('Task Result:', task.result()) 這樣也能查看task執行的結果
運行結果:
Coroutine: <coroutine object execute at 0x10e0f7830> After calling execute Task: <Task pending coro=<execute() running at demo.py:4>> Number: 1 Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1> After calling loop
我們使用 async 定義了一個 execute() 方法,方法接收一個數字參數,方法執行之后會打印這個數字。
隨后我們直接調用了這個方法,然而這個方法並沒有執行,而是返回了一個 coroutine 協程對象。
隨后我們使用 get_event_loop() 方法創建了一個事件循環 loop,並調用了 loop 對象的 run_until_complete() 方法將協程注冊到事件循環 loop 中,然后啟動。最后我們才看到了 execute() 方法打印了輸出結果。
可見,async 定義的方法就會變成一個無法直接執行的 coroutine 對象,必須將其注冊到事件循環中才可以執行。
我們也可以不使用task來運行,它里面相比 coroutine 對象多了運行狀態,比如 running、finished 等,我們可以用這些狀態來獲取協程對象的執行情況。
將 coroutine 對象傳遞給 run_until_complete() 方法的時候,實際上它進行了一個操作就是將 coroutine 封裝成了 task 對象,如:

import asyncio async def execute(x): print('Number:', x) coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print('After calling loop')
查看了源碼,正好可以驗證上面這一觀點:
run_until_complete()這個方法位於源碼中的base_events.py,函數有句注釋:
Run until the Future is done.If the argument is a coroutine, it is wrapped in a Task.
二 發送網絡請求結合aiohttp實現異步:
我們用一個網絡請求作為示例,這就是一個耗時等待的操作,因為我們請求網頁之后需要等待頁面響應並返回結果。耗時等待的操作一般都是 IO 操作,比如文件讀取、網絡請求等等。協程對於處理這種操作是有很大優勢的,當遇到需要等待的情況的時候,程序可以暫時掛起,轉而去執行其他的操作,從而避免一直等待一個程序而耗費過多的時間,充分利用資源。為了測試,我自己先通過flask 創建一個實驗環境:
from flask import Flask import time app = Flask(__name__) @app.route('/') def index(): time.sleep(3) return 'Hello!' if __name__ == '__main__': app.run(threaded=True)
開始測試...
import asyncio import aiohttp import time start = time.time() async def get(url): session = aiohttp.ClientSession() response = await session.get(url) result = await response.text() session.close() return result async def request(): url = 'http://127.0.0.1:5000' # 訪問flask搭建的服務器(睡眠3秒),模仿IO阻塞 print('Waiting for', url) result = await get(url) print('Get response from', url, 'Result:', result) tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('Cost time:', end - start)
運行結果:
Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Cost time: 3.0199508666992188
我們發現這次請求的耗時由 15 秒變成了 3 秒,耗時直接變成了原來的 1/5。
代碼里面我們使用了 await,后面跟了 get() 方法,在執行這五個協程的時候,如果遇到了 await,那么就會將當前協程掛起,轉而去執行其他的協程,直到其他的協程也掛起或執行完畢,再進行下一個協程的執行。
二 總結
協程"同時"運行多個任務的基礎是函數可以暫停(await實際就是用到了yield)。上面的代碼中使用到了 asyncio的 event_loop,它做的事情,本質上來說就是當函數暫停時,切換到下一個任務,當時機恰當(這個例子中是請求完成了)恢復函數讓他繼續運行(這有點像操作系統了)。