1. asyncio異步I/O、事件循環和並發工具
asyncio模塊提供了使用協程構建並發應用的工具。threading模塊通過應用線程實現並發,multiprocessing使用系統進程實現並發,asyncio則使用一種單線程單進程方法來實現並發,應用的各個部分會彼此合作,在最優的時刻顯式地切換任務。大多數情況下,會在程序阻塞等待讀寫數據時發生這種上下文切換,不過asyncio也支持調度代碼在將來的某個特定時間運行,從而支持一個協程等待另一個協程完成,以處理系統信號和識別其他一些事件(這些事件可能導致應用改變其工作內容)。
1.1 異步並發概念
使用其他並發模型的大多數程序都采用線性方式編寫,而且依賴於語言運行時系統或操作系統的底層線程或進程管理來適當地改變上下文。基於asyncio的應用要求應用代碼顯式地處理上下文切換,要正確地使用相關技術,這取決於是否能正確理解一些相關聯的概念。asyncio提供的框架以一個事件循環(event loop)為中心,這是一個首類對象,負責高效地處理I/O事件、系統事件和應用上下文切換。目前已經提供了多個循環實現來高效地利用操作系統的功能。盡管通常會自動地選擇一個合理的默認實現,但也完全可以在應用中選擇某個特定的事件循環實現。在很多情況下這會很有用,例如,在Windows下,一些循環類增加了對外部進程的支持,這可能會以犧牲一些網絡I/O效率為代價。與事件循環交互的應用要顯式地注冊將運行的代碼,讓事件循環在資源可用時向應用代碼發出必要的調用。例如,一個網絡服務器打開套接字,然后注冊為當這些套接字上出現輸入事件時服務器要得到通知。事件循環在建立一個新的進入連接或者在數據可讀取時都會提醒服務器代碼。當前上下文中沒有更多工作可做時,應用代碼要再次短時間地交出控制。例如,如果一個套接字再沒有更多的數據可以讀取,那么服務器會把控制交回給事件循環。
將控制交還給事件循環的機制依賴於Python的協程(coroutine),這是一些特殊的函數,可以將控制交回給調用者而不丟失其狀態。協程與生成器函數非常類似;實際上,在Python3.5版本之前對協程未提供原生支持時,可以用生成器來實現協程。asyncio還為協議(protocol)和傳輸(transport)提供了一個基於類的抽象層,可以使用回調編寫代碼而不是直接編寫協程。在基於類的模型和協程模型中,可以通過重新進入事件循環顯式地改
變上下文,以取代Python多線程實現中隱式的上下文改變。future是一個數據結構,表示還未完成的工作結果。事件循環可以監視Future對象是否完成,從而允許應用的一部分等待另一部分完成一些工作。除了future,asyncio還包括其他並發原語,如鎖和信號量。
Task是Future的一個子類,它知道如何包裝和管理一個協程的執行。任務所需的資源可用時,事件循環會調度任務運行,並生成一個結果,從而可以由其他協程消費。
1.2 利用協程合作完成多任務
協程是一個專門設計用來實現並發操作的語言構造。調用協程函數時會創建一個協程對象,然后調用者使用協程的send()方法運行這個函數的代碼。協程可以使用await關鍵字(並提供另一個協程)暫停執行。暫停時,這個協程的狀態會保留,使得下一次被喚醒時可以從暫停的地方恢復執行。
1.2.1 啟動一個協程
asyncio事件循環可以采用多種不同的方法啟動一個協程。最簡單的方法是使用run_until complete(),並把協程直接傳人這個方法。
import asyncio async def coroutine(): print('in coroutine') event_loop = asyncio.get_event_loop() try: print('starting coroutine') coro = coroutine() print('entering event loop') event_loop.run_until_complete(coro) finally: print('closing event loop') event_loop.close()
第一步是得到事件循環的一個引用。可以使用默認的循環類型,也可以實例化一個特定的循環類。在這個例子中使用了默認循環。run_until_complete()方法用這個協程啟動循環;協程返回退出時這個方法會停止循環。
1.2.2 從協程返回值
協程的返回值傳回給啟動並等待這個協程的代碼。
import asyncio async def coroutine(): print('in coroutine') return 'result' event_loop = asyncio.get_event_loop() try: return_value = event_loop.run_until_complete( coroutine() ) print('it returned: {!r}'.format(return_value)) finally: event_loop.close()
在這里,run_unitil_complete()還會返回它等待的協程的結果。
1.2.3 串鏈協程
一個協程可以啟動另一個協程並等待結果,從而可以更容易地將一個任務分解為可重用的部分。下面的例子有兩個階段,它們必須按順序執行,不過可以與其他操作並發運行。
import asyncio async def outer(): print('in outer') print('waiting for result1') result1 = await phase1() print('waiting for result2') result2 = await phase2(result1) return (result1, result2) async def phase1(): print('in phase1') return 'result1' async def phase2(arg): print('in phase2') return 'result2 derived from {}'.format(arg) event_loop = asyncio.get_event_loop() try: return_value = event_loop.run_until_complete(outer()) print('return value: {!r}'.format(return_value)) finally: event_loop.close()
這里使用了await關鍵字而不是向循環增加新的協程。因為控制流已經在循環管理的一個協程中,所以沒有必要告訴循環管理這些新協程。
1.2.4 生成器代替協程
協程函數是asyncio設計中的關鍵部分。它們提供了一個語言構造,可以停止程序某一部分的執行,保留這個調用的狀態,並在以后重新進人這個狀態。所有這些動作都是並發框架很重要的功能。
Python3.5引入了一些新的語言特性,可以使用async def以原生方式定義這些協程,以及使用await交出控制,asyncio的例子利用了這些新特性。Python3的早期版本可以使用由 asyncio.coroutine()修飾符包裝的生成器函數和yield from來達到同樣的效果。
import asyncio @asyncio.coroutine def outer(): print('in outer') print('waiting for result1') result1 = yield from phase1() print('waiting for result2') result2 = yield from phase2(result1) return (result1, result2) @asyncio.coroutine def phase1(): print('in phase1') return 'result1' @asyncio.coroutine def phase2(arg): print('in phase2') return 'result2 derived from {}'.format(arg) event_loop = asyncio.get_event_loop() try: return_value = event_loop.run_until_complete(outer()) print('return value: {!r}'.format(return_value)) finally: event_loop.close()
前面的例子使用生成器函數而不是原生協程重新實現。
1.3 調度常規函數調用
除了管理協程和/O回調,asyncio事件循環還可以根據循環中保存的一個定時器值來調度常規函數調用。
1.3.1 迅速調度一個回調
如果回調的時間不重要,那么可以用callsoon()調度下一次循環迭代的調用。調用回調時,函數后面額外的位置參數會傳入回調。要向回調傳入關鍵字參數,可以使用functools模塊的partial()。
import asyncio import functools def callback(arg, *, kwarg='default'): print('callback invoked with {} and {}'.format(arg, kwarg)) async def main(loop): print('registering callbacks') loop.call_soon(callback, 1) wrapped = functools.partial(callback, kwarg='not default') loop.call_soon(wrapped, 2) await asyncio.sleep(0.1) event_loop = asyncio.get_event_loop() try: print('entering event loop') event_loop.run_until_complete(main(event_loop)) finally: print('closing event loop') event_loop.close()
回調會按其調度的順序來調用。
1.3.2 用Delay調度回調
要將一個回調推遲到將來某個時間調用,可以使用call_later()。這個方法的第一個參數是推遲時間(單位為秒),第二個參數是回調。
import asyncio def callback(n): print('callback {} invoked'.format(n)) async def main(loop): print('registering callbacks') loop.call_later(0.2, callback, 1) loop.call_later(0.1, callback, 2) loop.call_soon(callback, 3) await asyncio.sleep(0.4) event_loop = asyncio.get_event_loop() try: print('entering event loop') event_loop.run_until_complete(main(event_loop)) finally: print('closing event loop') event_loop.close()
在這個例子中,同一個回調函數調度了多次,每次提供了不同的參數。最后一個調用使用了call_soon(),這會在所有按時間調用的實例之前基於參數3來調用這個回調,由此可以看出“迅速”調用的延遲往往最小。
1.3.3 在指定時間內調度一個回調
還可以安排在指定時間內調度一個調用。實現這個目的的循環依賴於一個單調時鍾,而不是牆上時鍾時間,以確保“now”時間絕對不會逆轉。要為一個調度回調選擇時間,必須使用循環的time()方法從這個時鍾的內部狀態開始。
import asyncio import time def callback(n, loop): print('callback {} invoked at {}'.format(n, loop.time())) async def main(loop): now = loop.time() print('clock time: {}'.format(time.time())) print('loop time: {}'.format(now)) print('registering callbacks') loop.call_at(now + 0.2, callback, 1, loop) loop.call_at(now + 0.1, callback, 2, loop) loop.call_soon(callback, 3, loop) await asyncio.sleep(1) event_loop = asyncio.get_event_loop() try: print('entering event loop') event_loop.run_until_complete(main(event_loop)) finally: print('closing event loop') event_loop.close()
需要注意,循環的時間與time.time()返回的值並不一致。
1.4 異步的生成結果
Future表示還未完成的工作的結果。事件循環可以通過監視一個Future對象的狀態來指示它已經完成,從而允許應用的一部分等待另一部分完成一些工作。
1.4.1 等待future
Future的做法類似於協程,所以等待協程所用的技術同樣可以用於等待future被標記為完成。下面的例子將future傳遞到事件循環的run_until_complete()方法。
import asyncio def mark_done(future, result): print('setting future result to {!r}'.format(result)) future.set_result(result) event_loop = asyncio.get_event_loop() try: all_done = asyncio.Future() print('scheduling mark_done') event_loop.call_soon(mark_done, all_done, 'the result') print('entering event loop') result = event_loop.run_until_complete(all_done) print('returned result: {!r}'.format(result)) finally: print('closing event loop') event_loop.close() print('future result: {!r}'.format(all_done.result()))
調用set_result()時,Future的狀態改為完成,Future實例會保留提供給方法的結果,以備以后獲取。
Future還可以結合await關鍵字使用。
import asyncio def mark_done(future, result): print('setting future result to {!r}'.format(result)) future.set_result(result) async def main(loop): all_done = asyncio.Future() print('scheduling mark_done') loop.call_soon(mark_done, all_done, 'the result') result = await all_done print('returned result: {!r}'.format(result)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
Future的結果由await返回,所以經常會讓同樣的代碼處理一個常規的協程和一個Future實例。
1.4.2 Future回調
除了做法與協程類似,Future完成時也可以調用回調。回調會按其注冊的順序調用。
import asyncio import functools def callback(future, n): print('{}: future done: {}'.format(n, future.result())) async def register_callbacks(all_done): print('registering callbacks on future') all_done.add_done_callback(functools.partial(callback, n=1)) all_done.add_done_callback(functools.partial(callback, n=2)) async def main(all_done): await register_callbacks(all_done) print('setting result of future') all_done.set_result('the result') event_loop = asyncio.get_event_loop() try: all_done = asyncio.Future() event_loop.run_until_complete(main(all_done)) finally: event_loop.close()
這個回調只希望得到一個參數,即一個Future實例。要想為回調傳遞額外的參數,可以使用functools.partial()創建一個包裝器。
1.5 並發地執行任務
任務是與事件循環交互的主要途徑之一。任務可以包裝協程,並跟蹤協程何時完成。由於任務是Future的子類,所以其他協程可以等待任務,而且每個任務可以有一個結果,在它完成之后可以獲取這個結果。
1.5.1 啟動一個任務
要啟動一個任務,可以使用create_task()創建一個Task實例。只要循環還在運行而且協程沒有返回,create_task()得到的任務便會作為事件循環管理的並發操作的一部分運行。
import asyncio async def task_func(): print('in task_func') return 'the result' async def main(loop): print('creating task') task = loop.create_task(task_func()) print('waiting for {!r}'.format(task)) return_value = await task print('task completed {!r}'.format(task)) print('return value: {!r}'.format(return_value)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
這個例子中,在main()函數退出之前,會等待任務返回一個結果。
1.5.2 取消一個任務
通過保留create_task()返回的Task對象,可以在任務完成之前取消它的操作。
import asyncio async def task_func(): print('in task_func') return 'the result' async def main(loop): print('creating task') task = loop.create_task(task_func()) print('canceling task') task.cancel() print('canceled task {!r}'.format(task)) try: await task except asyncio.CancelledError: print('caught error from canceled task') else: print('task result: {!r}'.format(task.result())) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
這個例子會在啟動事件循環之前創建一個任務,然后取消這個任務。結果是run_unitl_complete()方法拋出一個CancelledError異常。
如果一個任務正在等待另一個並發運行的操作完成,那么倘若在這個等待時刻取消任務,則其會通過此時產生的一個CancelledError異常通知任務將其取消。
import asyncio async def task_func(): print('in task_func, sleeping') try: await asyncio.sleep(1) except asyncio.CancelledError: print('task_func was canceled') raise return 'the result' def task_canceller(t): print('in task_canceller') t.cancel() print('canceled the task') async def main(loop): print('creating task') task = loop.create_task(task_func()) loop.call_soon(task_canceller, task) try: await task except asyncio.CancelledError: print('main() also sees task as canceled') event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
捕捉異常會提供一個機會,如果必要,可以利用這個機會清理已經完成的工作。
1.5.3 從協程創建任務
ensure_future()函數返回一個與協程執行綁定的Task。這個Task實例再傳遞到其他代碼,這個代碼可以等待這個實例,而無須知道原來的協程是如何構造或調用的。
import asyncio async def wrapped(): print('wrapped') return 'result' async def inner(task): print('inner: starting') print('inner: waiting for {!r}'.format(task)) result = await task print('inner: task returned {!r}'.format(result)) async def starter(): print('starter: creating task') task = asyncio.ensure_future(wrapped()) print('starter: waiting for inner') await inner(task) print('starter: inner returned') event_loop = asyncio.get_event_loop() try: print('entering event loop') result = event_loop.run_until_complete(starter()) finally: event_loop.close()
需要說明,對於提供給ensure_future()的協程,在使用await之前這個協程不會啟動,只有await才會讓它執行。
1.6 組合協程和控制結構
一系列協程之間的線性控制流用內置關鍵字await可以很容易地管理。更復雜的結構可能允許一個協程等待多個其他協程並行完成,可以使用asyncio中的工具創建這些更復雜的結構。
1.6.1 等待多個協程
通常可以把一個操作划分為多個部分,然后分別執行,這會很有用。例如,采用這種方法,可以高效地下載多個遠程資源或者查詢遠程API。有些情況下,執行順序並不重要,而且可能有任意多個操作,可以使用wait()暫停一個協程,直到其他后台操作完成。
import asyncio async def phase(i): print('in phase {}'.format(i)) await asyncio.sleep(0.1 * i) print('done with phase {}'.format(i)) return 'phase {} result'.format(i) async def main(num_phases): print('starting main') phases = [ phase(i) for i in range(num_phases) ] print('waiting for phases to complete') completed, pending = await asyncio.wait(phases) results = [t.result() for t in completed] print('results: {!r}'.format(results)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(3)) finally: event_loop.close()
在內部,wait()使用一個set來保存它創建的Task實例,這說明這些實例會按一種不可預知的順序啟動和完成。wait()的返回值是一個元組,包括兩個集合,分別包括已完成和未完成的任務。
如果使用wait()時提供了一個超時值,那么達到這個超時時間后,將只保留未完成的操作。
import asyncio async def phase(i): print('in phase {}'.format(i)) try: await asyncio.sleep(0.1 * i) except asyncio.CancelledError: print('phase {} canceled'.format(i)) raise else: print('done with phase {}'.format(i)) return 'phase {} result'.format(i) async def main(num_phases): print('starting main') phases = [ phase(i) for i in range(num_phases) ] print('waiting 0.1 for phases to complete') completed, pending = await asyncio.wait(phases, timeout=0.1) print('{} completed and {} pending'.format( len(completed), len(pending), )) # Cancel remaining tasks so they do not generate errors # as we exit without finishing them. if pending: print('canceling tasks') for t in pending: t.cancel() print('exiting main') event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(3)) finally: event_loop.close()
其余的后台操作要顯式地處理,這有多方面的原因。盡管wait()返回時未完成的任務是掛起的,但只要控制返回到事件循環它們就會恢復運行。如果沒有另一個wait()調用,則將沒有對象接收任務的輸出;也就是說,任務會運行並消費資源,但不會帶來任何好處。另外,如果程序退出時還有未完成的任務,那么asyncio會發出一個警告。這些警告可能打印到控制台上,應用的用戶便會看到。因此,最好取消所有剩余的后台操作,或者使用wait()讓它們結束運行。
1.6.2 從協程收集結果
如果后台階段是明確的,而且這些階段的結果很重要,那么gather()可能對等待多個操作很有用。
import asyncio async def phase1(): print('in phase1') await asyncio.sleep(2) print('done with phase1') return 'phase1 result' async def phase2(): print('in phase2') await asyncio.sleep(1) print('done with phase2') return 'phase2 result' async def main(): print('starting main') print('waiting for phases to complete') results = await asyncio.gather( phase1(), phase2(), ) print('results: {!r}'.format(results)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main()) finally: event_loop.close()
gather()創建的任務不會對外提供,所以無法將其取消。返回值是一個結果列表,結果的順序與傳入gather()的參數順序相同,而不論后台操作實際上是按什么順序完成的。
1.6.3 后台操作完成時進行處理
as_completed()是一個生成器,會管理指定的一個協程列表,並生成它們的結果,每個協程結束運行時一次生成一個結果。與wait()類似,as_completed()不能保證順序,不過執行其他動作之前沒有必要等待所有后台操作完成。
import asyncio async def phase(i): print('in phase {}'.format(i)) await asyncio.sleep(0.5 - (0.1 * i)) print('done with phase {}'.format(i)) return 'phase {} result'.format(i) async def main(num_phases): print('starting main') phases = [ phase(i) for i in range(num_phases) ] print('waiting for phases to complete') results = [] for next_to_complete in asyncio.as_completed(phases): answer = await next_to_complete print('received answer {!r}'.format(answer)) results.append(answer) print('results: {!r}'.format(results)) return results event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(3)) finally: event_loop.close()
這個例子啟動了多個后台階段,它們會按其啟動順序的逆序完成。消費生成器時,循環會使用await等待協程的結果。
1.7 同步原語
盡管asyncio應用通常作為單線程的進程運行,不過仍被構建為並發應用。由於I/0以及其他外部事件的延遲和中斷,每個協程或任務可能按一種不可預知的順序執行。為了支持安全的並發執行,asyncio包含了threading和multiprocessing模塊中一些底層原語的實現。
1.7.1 鎖
Lock可以用來保護對一個共享資源的訪問。只有鎖的持有者可以使用這個資源。如果有多個請求要得到這個鎖,那么其將會阻塞,以保證一次只有一個持有者。
import asyncio import functools def unlock(lock): print('callback releasing lock') lock.release() async def coro1(lock): print('coro1 waiting for the lock') async with lock: print('coro1 acquired lock') print('coro1 released lock') async def coro2(lock): print('coro2 waiting for the lock') await lock.acquire() try: print('coro2 acquired lock') finally: print('coro2 released lock') lock.release() async def main(loop): # Create and acquire a shared lock. lock = asyncio.Lock() print('acquiring the lock before starting coroutines') await lock.acquire() print('lock acquired: {}'.format(lock.locked())) # Schedule a callback to unlock the lock. loop.call_later(0.1, functools.partial(unlock, lock)) # Run the coroutines that want to use the lock. print('waiting for coroutines') await asyncio.wait([coro1(lock), coro2(lock)]), event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
鎖可以直接調用,使用await來得到,並且使用結束時可以調用release()方法釋放鎖,如這個例子中的coro2()所示。還可以結合with await關鍵字使用鎖作為異步上下文管理器,如coro1()中所示。
1.7.2 事件
asyncio.Event基於threading.Event。它允許多個消費者等待某個事件發生,而不必尋找一個特定值與通知關聯。
import asyncio import functools def set_event(event): print('setting event in callback') event.set() async def coro1(event): print('coro1 waiting for event') await event.wait() print('coro1 triggered') async def coro2(event): print('coro2 waiting for event') await event.wait() print('coro2 triggered') async def main(loop): # Create a shared event event = asyncio.Event() print('event start state: {}'.format(event.is_set())) loop.call_later( 0.1, functools.partial(set_event, event) ) await asyncio.wait([coro1(event), coro2(event)]) print('event end state: {}'.format(event.is_set())) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
與Lock一樣,coro1()和coro2()會等待設置事件。區別是一旦事件狀態改變,它們便可以立即啟動,並且它們不需要得到事件對象上的唯一的鎖。
1.7.3 條件
Condition的做法與Event類似,只不過不是通知所有等待的協程,被喚醒的等待協程的數目由notify()的一個參數控制。
import asyncio async def consumer(condition, n): async with condition: print('consumer {} is waiting'.format(n)) await condition.wait() print('consumer {} triggered'.format(n)) print('ending consumer {}'.format(n)) async def manipulate_condition(condition): print('starting manipulate_condition') # pause to let consumers start await asyncio.sleep(0.1) for i in range(1, 3): async with condition: print('notifying {} consumers'.format(i)) condition.notify(n=i) await asyncio.sleep(0.1) async with condition: print('notifying remaining consumers') condition.notify_all() print('ending manipulate_condition') async def main(loop): # Create a condition condition = asyncio.Condition() # Set up tasks watching the condition consumers = [ consumer(condition, i) for i in range(5) ] # Schedule a task to manipulate the condition variable loop.create_task(manipulate_condition(condition)) # Wait for the consumers to be done await asyncio.wait(consumers) event_loop = asyncio.get_event_loop() try: result = event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
這個例子啟動Condition的5個消費者。它們分別使用wait()方法來等待通知讓它繼續。manipulate_condition()通知一個消費者,再通知兩個消費者,然后通知所有其余的消費者。
1.7.4 隊列
asyncio.Queue為協程提供了一個先進先出的數據結構,這與線程的queue.Queue或進程的multiprocessing.Queue很類似。
import asyncio async def consumer(n, q): print('consumer {}: starting'.format(n)) while True: print('consumer {}: waiting for item'.format(n)) item = await q.get() print('consumer {}: has item {}'.format(n, item)) if item is None: # None is the signal to stop. q.task_done() break else: await asyncio.sleep(0.01 * item) q.task_done() print('consumer {}: ending'.format(n)) async def producer(q, num_workers): print('producer: starting') # Add some numbers to the queue to simulate jobs for i in range(num_workers * 3): await q.put(i) print('producer: added task {} to the queue'.format(i)) # Add None entries in the queue # to signal the consumers to exit print('producer: adding stop signals to the queue') for i in range(num_workers): await q.put(None) print('producer: waiting for queue to empty') await q.join() print('producer: ending') async def main(loop, num_consumers): # Create the queue with a fixed size so the producer # will block until the consumers pull some items out. q = asyncio.Queue(maxsize=num_consumers) # Scheduled the consumer tasks. consumers = [ loop.create_task(consumer(i, q)) for i in range(num_consumers) ] # Schedule the producer task. prod = loop.create_task(producer(q, num_consumers)) # Wait for all of the coroutines to finish. await asyncio.wait(consumers + [prod]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop, 2)) finally: event_loop.close()
用put()增加元素和用get()刪除元素都是異步操作,因為隊列大小可能是固定的(阻塞增加操作),或者隊列可能為空(阻塞獲取元素的調用)。