. 本文目錄#
- 協程中的並發
- 協程中的嵌套
- 協程中的狀態
- gather與wait
. 協程中的並發#
協程的並發,和線程一樣。舉個例子來說,就好像 一個人同時吃三個饅頭,咬了第一個饅頭一口,就得等這口咽下去,才能去啃第其他兩個饅頭。就這樣交替換着吃。
asyncio
實現並發,就需要多個協程來完成任務,每當有任務阻塞的時候就await,然后其他協程繼續工作。
第一步,當然是創建多個協程的列表。
# 協程函數 async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) # 協程對象 coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) # 將協程轉成task,並組成list tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]
第二步,如何將這些協程注冊到事件循環中呢。
有兩種方法,至於這兩種方法什么區別,稍后會介紹。
- 使用
asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
- 使用
asyncio.gather()
# 千萬注意,這里的 「*」 不能省略 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks))
最后,return的結果,可以用task.result()
查看。
for task in tasks: print('Task ret: ', task.result())
完整代碼如下
import asyncio # 協程函數 async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) # 協程對象 coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) # 將協程轉成task,並組成list tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task ret: ', task.result())
輸出結果
Waiting: 1 Waiting: 2 Waiting: 4 Task ret: Done after 1s Task ret: Done after 2s Task ret: Done after 4s
協程中的嵌套#
使用async可以定義協程,協程用於耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實現了嵌套的協程,即一個協程中await了另外一個協程,如此連接起來。
來看個例子。
import asyncio # 用於內部的協程函數 async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) # 外部的協程函數 async def main(): # 創建三個協程對象 coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) # 將協程轉為task,並組成list tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] # 【重點】:await 一個task列表(協程) # dones:表示已經完成的任務 # pendings:表示未完成的任務 dones, pendings = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result()) loop = asyncio.get_event_loop() loop.run_until_complete(main())
如果這邊,使用的是asyncio.gather()
,是這么用的
# 注意這邊返回結果,與await不一樣 results = await asyncio.gather(*tasks) for result in results: print('Task ret: ', result)
輸出還是一樣的。
Waiting: 1 Waiting: 2 Waiting: 4 Task ret: Done after 1s Task ret: Done after 2s Task ret: Done after 4s
仔細查看,可以發現這個例子完全是由 上面「協程中的並發
」例子改編而來。結果完全一樣。只是把創建協程對象,轉換task任務,封裝成在一個協程函數里而已。外部的協程,嵌套了一個內部的協程。
其實你如果去看下asyncio.await()
的源碼的話,你會發現下面這種寫法
loop.run_until_complete(asyncio.wait(tasks))
看似沒有嵌套,實際上內部也是嵌套的。
這里也把源碼,貼出來,有興趣可以看下,沒興趣,可以直接跳過。
# 內部協程函數 async def _wait(fs, timeout, return_when, loop): assert fs, 'Set of Futures is empty.' waiter = loop.create_future() timeout_handle = None if timeout is not None: timeout_handle = loop.call_later(timeout, _release_waiter, waiter) counter = len(fs) def _on_completion(f): nonlocal counter counter -= 1 if (counter <= 0 or return_when == FIRST_COMPLETED or return_when == FIRST_EXCEPTION and (not f.cancelled() and f.exception() is not None)): if timeout_handle is not None: timeout_handle.cancel() if not waiter.done(): waiter.set_result(None) for f in fs: f.add_done_callback(_on_completion) try: await waiter finally: if timeout_handle is not None: timeout_handle.cancel() done, pending = set(), set() for f in fs: f.remove_done_callback(_on_completion) if f.done(): done.add(f) else: pending.add(f) return done, pending # 外部協程函數 async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): if futures.isfuture(fs) or coroutines.iscoroutine(fs): raise TypeError(f"expect a list of futures, not {type(fs).__name__}") if not fs: raise ValueError('Set of coroutines/Futures is empty.') if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): raise ValueError(f'Invalid return_when value: {return_when}') if loop is None: loop = events.get_event_loop() fs = {ensure_future(f, loop=loop) for f in set(fs)} # 【重點】:await一個內部協程 return await _wait(fs, timeout, return_when, loop)
. 協程中的狀態#
還記得我們在講生成器的時候,有提及過生成器的狀態。同樣,在協程這里,我們也了解一下協程(准確的說,應該是Future對象,或者Task任務)有哪些狀態。
Pending
:創建future,還未執行Running
:事件循環正在調用執行任務Done
:任務執行完畢Cancelled
:Task被取消后的狀態
可手工 python3 xx.py
執行這段代碼,
import asyncio import threading import time async def hello(): print("Running in the loop...") flag = 0 while flag < 1000: with open("F:\\test.txt", "a") as f: f.write("------") flag += 1 print("Stop the loop") if __name__ == '__main__': coroutine = hello() loop = asyncio.get_event_loop() task = loop.create_task(coroutine) # Pending:未執行狀態 print(task) try: t1 = threading.Thread(target=loop.run_until_complete, args=(task,)) # t1.daemon = True t1.start() # Running:運行中狀態 time.sleep(1) print(task) t1.join() except KeyboardInterrupt as e: # 取消任務 task.cancel() # Cacelled:取消任務 print(task) finally: print(task)
順利執行的話,將會打印 Pending
-> Pending:Runing
-> Finished
的狀態變化
假如,執行后 立馬按下 Ctrl+C,則會觸發task取消,就會打印 Pending
-> Cancelling
-> Cancelling
的狀態變化。
. gather與wait#
還記得上面我說,把多個協程注冊進一個事件循環中有兩種方法嗎?
- 使用
asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
- 使用
asyncio.gather()
# 千萬注意,這里的 「*」 不能省略 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks))
asyncio.gather
和 asyncio.wait
在asyncio中用得的比較廣泛,這里有必要好好研究下這兩貨。
還是照例用例子來說明,先定義一個協程函數
import asyncio async def factorial(name, number): f = 1 for i in range(2, number+1): print("Task %s: Compute factorial(%s)..." % (name, i)) await asyncio.sleep(1) f *= i print("Task %s: factorial(%s) = %s" % (name, number, f))
接收參數方式
asyncio.wait
接收的tasks,必須是一個list對象,這個list對象里,存放多個的task。
它可以這樣,用asyncio.ensure_future
轉為task對象
tasks=[ asyncio.ensure_future(factorial("A", 2)), asyncio.ensure_future(factorial("B", 3)), asyncio.ensure_future(factorial("C", 4)) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
也可以這樣,不轉為task對象。
loop = asyncio.get_event_loop() tasks=[ factorial("A", 2), factorial("B", 3), factorial("C", 4) ] loop.run_until_complete(asyncio.wait(tasks))
asyncio.gather
接收的就比較廣泛了,他可以接收list對象,但是 *
不能省略
tasks=[ asyncio.ensure_future(factorial("A", 2)), asyncio.ensure_future(factorial("B", 3)), asyncio.ensure_future(factorial("C", 4)) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks))
還可以這樣,和上面的 *
作用一致,這是因為asyncio.gather()
的第一個參數是 *coros_or_futures
,它叫 非命名鍵值可變長參數列表
,可以集合所有沒有命名的變量。
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ))
甚至還可以這樣
loop = asyncio.get_event_loop() group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)]) group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)]) group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)]) loop.run_until_complete(asyncio.gather(group1, group2, group3))
返回結果不同
asyncio.wait
asyncio.wait
返回dones
和pendings
dones
:表示已經完成的任務pendings
:表示未完成的任務
如果我們需要獲取,運行結果,需要手工去收集獲取。
dones, pendings = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result())
asyncio.gather
asyncio.gather
它會把值直接返回給我們,不需要手工去收集。
results = await asyncio.gather(*tasks) for result in results: print('Task ret: ', result)
wait有控制功能
import asyncio import random async def coro(tag): await asyncio.sleep(random.uniform(0.5, 5)) loop = asyncio.get_event_loop() tasks = [coro(i) for i in range(1, 11)] # 【控制運行任務數】:運行第一個任務就返回 # FIRST_COMPLETED :第一個任務完全返回 # FIRST_EXCEPTION:產生第一個異常返回 # ALL_COMPLETED:所有任務完成返回 (默認選項) dones, pendings = loop.run_until_complete( asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) print("第一次完成的任務數:", len(dones)) # 【控制時間】:運行一秒后,就返回 dones2, pendings2 = loop.run_until_complete( asyncio.wait(pendings, timeout=1)) print("第二次完成的任務數:", len(dones2)) # 【默認】:所有任務完成后返回 dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2)) print("第三次完成的任務數:", len(dones3)) loop.close()
輸出結果
第一次完成的任務數: 1 第二次完成的任務數: 4 第三次完成的任務數: 5