對比一個簡單的多線程程序和對應的 asyncio 版,說明多線程和異步任務之間的關
系
asyncio.Future 類與 concurrent.futures.Future 類之間的區別
摒棄線程或進程,如何使用異步編程管理網絡應用中的高並發
在異步編程中,與回調相比,協程顯著提升性能的方式
如何把阻塞的操作交給線程池處理,從而避免阻塞事件循環
使用 asyncio 編寫服務器,重新審視 Web 應用對高並發的處理方式
為什么 asyncio 已經准備好對 Python 生態系統產生重大影響
線程與協程對比
import threading import itertools import time import sys class Signal: go = True def spin(msg, signal): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\\'): status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) time.sleep(.1) if not signal.go: break write(' ' * len(status) + '\x08' * len(status)) def slow_function(): time.sleep(3) return 42 def supervisor(): signal = Signal() spinner = threading.Thread(target=spin, args=('thinking!', signal)) print('spinner object:', spinner) spinner.start() result = slow_function() signal.go = False spinner.join() return result def main(): result = supervisor() print('Answer:', result) if __name__ == '__main__': main()
以上是threading
import asyncio import itertools import sys @asyncio.coroutine def spin(msg): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\\'): status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) try: yield from asyncio.sleep(.1) except asyncio.CancelledError: break write(' ' * len(status) + '\x08' * len(status)) @asyncio.coroutine def slow_function(): yield from asyncio.sleep(3) return 42 @asyncio.coroutine def supervisor(): spinner = asyncio.async(spin('thinking!')) print('spinner object:', spinner) result = yield from slow_function() spinner.cancel() return result def main(): loop = asyncio.get_event_loop() result = loop.run_until_complete(supervisor()) loop.close() print('Answer:', result) if __name__ == '__main__': main()
以上是asyncio
除非想阻塞主線程,從而凍結事件循環或整個應用,否則不要在 asyncio 協
程中使用 time.sleep(...)。如果協程需要在一段時間內什么也不做,應該使用
yield from asyncio.sleep(DELAY)
使用 @asyncio.coroutine 裝飾器不是強制要求,但是強烈建議這么做,因為這樣能在
一眾普通的函數中把協程凸顯出來,也有助於調試:如果還沒從中產出值,協程就被垃圾
回收了(意味着有操作未完成,因此有可能是個缺陷),那就可以發出警告。這個裝飾器
不會預激協程。
線程與協程之間的比較還有最后一點要說明:如果使用線程做過重要的編程,你就知道寫
出程序有多么困難,因為調度程序任何時候都能中斷線程。必須記住保留鎖,去保護程序
中的重要部分,防止多步操作在執行的過程中中斷,防止數據處於無效狀態。
而協程默認會做好全方位保護,以防止中斷。我們必須顯式產出才能讓程序的余下部分運
行。對協程來說,無需保留鎖,在多個線程之間同步操作,協程自身就會同步,因為在任
意時刻只有一個協程運行。想交出控制權時,可以使用 yield 或 yield from 把控制權
交還調度程序。這就是能夠安全地取消協程的原因:按照定義,協程只能在暫停的 yield
處取消,因此可以處理 CancelledError 異常,執行清理操作。
asyncio與concurrent.future的區別
期物只是調度執行某物的結果。在 asyncio 包
中,BaseEventLoop.create_task(...) 方法接收一個協程,排定它的運行時間,然后
返回一個 asyncio.Task 實例——也是 asyncio.Future 類的實例,因為 Task 是
Future 的子類,用於包裝協程。這與調用 Executor.submit(...) 方法創建
concurrent.futures.Future 實例是一個道理。
與 concurrent.futures.Future 類似,asyncio.Future 類也提供了
.done()、.add_done_callback(...) 和 .result() 等方法。前兩個方法的用法與
17.1.3 節所述的一樣,不過 .result() 方法差別很大。
asyncio.Future 類的 .result() 方法沒有參數,因此不能指定超時時間。此外,如果
調用 .result() 方法時期物還沒運行完畢,那么 .result() 方法不會阻塞去等待結果,
而是拋出 asyncio.InvalidStateError 異常。
然而,獲取 asyncio.Future 對象的結果通常使用 yield from,從中產出結果,如示例
18-8 所示。
使用 yield from 處理期物,等待期物運行完畢這一步無需我們關心,而且不會阻塞事件
循環,因為在 asyncio 包中,yield from 的作用是把控制權還給事件循環。
注意,使用 yield from 處理期物與使用 add_done_callback 方法處理協程的作用一
樣:延遲的操作結束后,事件循環不會觸發回調對象,而是設置期物的返回值;而 yield
from 表達式則在暫停的協程中生成返回值,恢復執行協程。
總之,因為 asyncio.Future 類的目的是與 yield from 一起使用,所以通常不需要使
用以下方法。
無需調用 my_future.add_done_callback(...),因為可以直接把想在期物運行結
束后執行的操作放在協程中 yield from my_future 表達式的后面。這是協程的一
大優勢:協程是可以暫停和恢復的函數。
無需調用 my_future.result(),因為 yield from 從期物中產出的值就是結果
(例如,result = yield from my_future)。
當然,有時也需要使用 .done()、.add_done_callback(...) 和 .result() 方法。但
是一般情況下,asyncio.Future 對象由 yield from 驅動,而不是靠調用這些方法驅
動。
對協程來說,獲取 Task 對象有兩種主要方式。
asyncio.async(coro_or_future, *, loop=None)
這個函數統一了協程和期物:第一個參數可以是二者中的任何一個。如果是 Future
或 Task 對象,那就原封不動地返回。如果是協程,那么 async 函數會調用
loop.create_task(...) 方法創建 Task 對象。loop= 關鍵字參數是可選的,用於傳入
事件循環;如果沒有傳入,那么 async 函數會通過調用 asyncio.get_event_loop() 函
數獲取循環對象。
BaseEventLoop.create_task(coro)
這個方法排定協程的執行時間,返回一個 asyncio.Task 對象。如果在自定義的
BaseEventLoop 子類上調用,返回的對象可能是外部庫(如 Tornado)中與 Task 類兼容
的某個類的實例。
使用 asyncio 包時,我們編寫的異步代碼中包含由 asyncio 本身驅動的
協程(即委派生成器),而生成器最終把職責委托給 asyncio 包或第三方庫(如
aiohttp)中的協程。這種處理方式相當於架起了管道,讓 asyncio 事件循環(通過我
們編寫的協程)驅動執行低層異步 I/O 操作的庫函數。
import asyncio import aiohttp from ..chapter17.flags import BASE_URL, save_flag, show, main @asyncio.coroutine def get_flag(cc): url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request('GET', url) image = yield from resp.read() return image @asyncio.coroutine def download_one(cc): image = yield from get_flag(cc) show(cc) save_flag(image, cc.lower() + '.gif') return cc def download_many(cc_list): loop = asyncio.get_event_loop() to_do = [download_one(cc) for cc in sorted(cc_list)] wait_coro = asyncio.wait(to_do) res, _ = loop.run_until_complete(wait_coro) loop.close() return len(res) if __name__ == '__main__': main(download_many)
有兩種方法能避免阻塞型調用中止整個應用程序的進程:
在單獨的線程中運行各個阻塞型操作
把每個阻塞型操作轉換成非阻塞的異步調用使用
現在你應該能理解為什么 flags_asyncio.py 腳本的性能比 flags.py 腳本高 5 倍了:flags.py
腳本依序下載,而每次下載都要用幾十億個 CPU 周期等待結果。其實,CPU 同時做了很
多事,只是沒有運行你的程序。與此相比,在 flags_asyncio.py 腳本中,在
download_many 函數中調用 loop.run_until_complete 方法時,事件循環驅動各個
download_one 協程,運行到第一個 yield from 表達式處,那個表達式又驅動各個
get_flag 協程,運行到第一個 yield from 表達式處,調用 aiohttp.request(...)
函數。這些調用都不會阻塞,因此在零點幾秒內所有請求全部開始。
asyncio 的基礎設施獲得第一個響應后,事件循環把響應發給等待結果的 get_flag 協
程。得到響應后,get_flag 向前執行到下一個 yield from 表達式處,調用
resp.read() 方法,然后把控制權還給主循環。其他響應會陸續返回(因為請求幾乎同
時發出)。所有 get_ flag 協程都獲得結果后,委派生成器 download_one 恢復,保存
圖像文件。
因為異步操作是交叉執行的,所以並發下載多張圖像所需的總時間比依序下載少得多。我
使用 asyncio 包發起了 600 個 HTTP 請求,獲得所有結果的時間比依序下載快 70 倍。
關於concurrent.future模塊以及asyncio模塊的內容不容易理解,需要查閱其他資料,另寫一篇博文。