參考:https://blog.csdn.net/qq_27825451/article/details/86218230
聲明:本文針對的是python3.4以后的版本的,因為從3.4開始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不過語法上面有稍微的改變。比如在3.4版本中使用@asyncio.coroutine裝飾器和yield from語句,但是在3.5以后的版本中使用async、await兩個關鍵字代替,雖然語法上稍微有所差異,但是原理是一樣的。本文用最通俗的語言解釋了pythonasyncio背后的一些核心概念,簡要解析了asyncio的設計架構,並給出了使用python進行asyncio異步編程的一般模板。
一,一些重要的概念
1,協程(coroutine)-----本質就是一個函數
所謂"協程"就是一個函數,這個函數需要有兩個基本的組成要是,第一,需要使用@asyncio.coroutine進行裝飾;第二,函數體內一定要有yield from返回的generator,或者使用yield from返回的一個另一個協程對象。
當然,這兩個條件並不是硬性規定的,如果沒有這兩個條件,依然是函數,只不過是普通函數而已。
怎么判斷一個函數是不是協程?通過asyncio.iscoroutine(obj)和asyncio.iscoroutinefunction
(func)加以判斷,返回true,則是。
個人理解:協程函數需要使用@asyncio.coroutine裝飾或者是使用關鍵字async定義,但是函數體內部不一定有yield from返回
示例如下
import time import asyncio # 協程函數需要使用@asyncio.coroutine進行裝飾,但是不一定函數體類有yield from返回 @asyncio.coroutine def hello(): pass # 因為函數使用@asyncio.coroutine所以所以使用函數創建的對象h是一個協程對象 # 函數hello是一個協程函數,以下兩個表達式都返回True h = hello() print(asyncio.iscoroutine(h)) # True print(asyncio.iscoroutinefunction(hello)) # True
那協程函數有什么作用
(1) result = yield from future
作用一:返回future的結果。什么是future?后面會講到,當協程函數執行到這一句,協程會被懸掛起來,直到future的結果被返回。如果是future被中途取消,則會觸發CancelledError異常。由於task是future的子類,后面也會解釋,關於future的所有應用,都同樣適用於task
注意:暫無舉例,因為我還不知道怎么創建一個future對象
(2)result = yield from coroutine
等待另一個協程函數返回結果或者觸發異常
# 協程函數等待另一個coroutine返回或者觸發異常 async def hello(): print('begin') # asyncio.sleep(1)是一個coroutine # 但任務運行到coroutine時線程不會等待asyncio.sleep()運行 # 而是直接中斷並執行下一個消息循環,本次因為只有一個任務,所以沒有看到線程執行其他的 result = await asyncio.sleep(1) #result = time.sleep(6) print('hello返回的結果是{0}'.format(result)) print('end') loop = asyncio.get_event_loop() loop.run_until_complete(hello())
輸出如下
begin hello返回的結果是None end
asyncio.sleep(1)是一個協程這里用來模擬一個耗時的IO操作,沒有返回所以返回None
這里需要理解的是asyncio.sleep(1)於time.sleep(1)的不同,任務執行到asyncio.sleep(1)則立即中斷,去執行其他任務,如果修改成time.sleep(1)則任務執行到這里是線程等待,不會去執行其他任務。
示例說明
定義一個tasks執行兩次協程函數hello()
# 理解asyncio.sleep()和time.sleep()的不同 async def hello(): print('begin') # asyncio.sleep(1)是一個coroutine # 但任務運行到coroutine時線程不會等待asyncio.sleep()運行 result = await asyncio.sleep(1) #result = time.sleep(6) print('hello返回的結果是{0}'.format(result)) print('end') loop = asyncio.get_event_loop() tasks = [hello(),hello()] loop.run_until_complete(asyncio.wait(tasks))
輸出如下
begin begin hello返回的結果是None end hello返回的結果是None end
從輸出結果可以看到兩個begin是同時輸出的而不是先完一次begin...end再輸出一次
解析:
本次任務執行兩次hello()當執行第一個hello()遇到await asyncio.sleep(1)時立即中斷取執行第二個hello()所以開始打印了兩個begin
兩個hello()幾乎是同時執行的
使用調試模式查看
省略前面步驟直接到執行任務
為了查看asyncio.sleep()和time.sleep()的區別,下面修改代碼
# 把asyncio.sleep()改成time.sleep()則是線程sleep而不是協程的並行了 # 理解asyncio.sleep()和time.sleep()的不同 async def hello(): print('begin') # asyncio.sleep(1)是一個coroutine # result = await asyncio.sleep(1) result = time.sleep(1) print('hello返回的結果是{0}'.format(result)) print('end') loop = asyncio.get_event_loop() tasks = [hello(),hello()] loop.run_until_complete(asyncio.wait(tasks))
輸出如下
begin hello返回的結果是None end begin hello返回的結果是None end
解析:因為在函數hello()內部沒有使用asyncio.sleep()而是使用time.sleep()則任務運行到time.sleep()是當前線程暫停一段時間,線程不會中斷而是繼續等待一段時間再繼續執行。所以兩個任務並沒有並發執行而是順序執行。
使用調試模式查看執行步驟
省略前面若干步
以下步驟為重復步驟,省略
可以看到假如函數內部沒有定義yield from或者是使用關鍵字await則函數還是一個普通函數,因為沒有遇到協程則不會中斷。
如果想更加清晰看到協程的並發執行過程可以定義多個協程函數,內部使用asyncio.sleep()設置不同
async def hello(): print('begin') # asyncio.sleep(1)是一個coroutine # 但任務運行到coroutine時線程不會等待asyncio.sleep()運行 # 而是直接中斷並執行下一個消息循環,本次因為只有一個任務,所以沒有看到線程執行其他的 result = await asyncio.sleep(4) # result = time.sleep(1) print('hello返回的結果是{0}'.format(result)) print('end') async def hello2(): print('begin2') result = await asyncio.sleep(6) #result = time.sleep(4) print('hello2返回的結果是{0}'.format(result)) print('end2') loop = asyncio.get_event_loop() tasks = [hello(),hello2()] loop.run_until_complete(asyncio.wait(tasks))
輸出如下
begin2 begin hello返回的結果是None end hello2返回的結果是None end2
為什么是先執行hello2()我也不清楚
解析:執行hello2()輸出完begin2后的await asyncio.sleep(6)同時開始執行hello所以先輸出begin2 然后輸出begin
然后hello()和hello2()在同時執行因為hello()的sleep時間更短所以先執行完hello()執行完hello()過了2秒hello2()也執行完畢
(3)result= yield from task
返回一個task結果
task 任務:一個協程對象就是一個原生可以掛起的函數,任務則是對協程進一步封裝,其中包含任務的各種狀態。
示例如下
# yield from task async def hello(): print('begin') # 創建一個task,以下兩種方法都可以創建task # task = asyncio.create_task(asyncio.sleep(5)) task = asyncio.ensure_future(asyncio.sleep(1)) result = await task print('hello返回的結果是{0}'.format(result)) print('end') return 1 loop = asyncio.get_event_loop() loop.run_until_complete(hello())
輸出如下
begin hello返回的結果是None end
分析輸出結果前了解怎么創建一個task
# asyncio.ensure_future創建一個task傳遞參數為一個coroutine對象 task = asyncio.ensure_future(asyncio.sleep(10)) # 類型為_asyncio.Task print(type(task)) # <class '_asyncio.Task'> # 運行 loop.run_until_complete(task)
使用調試模式分析執行過程
直接從執行協程函數開始分析
以上執行的task是asyncio.sleep(1)返回為None,如果定義hello()協程函數一個返回為1 然后把hello()放入到task中,使用其他協程函數調用則可以獲取一個return返回結果
async def hello2(): task = asyncio.ensure_future(hello()) result = await task print('hello2獲取的返回{0}'.format(result)) loop.run_until_complete(hello2())
輸出如下
hello2獲取的返回1
因為task封裝了協程hello()而在hello()定義了return 1所以執行協程hello2() await task的時候返回了hello()定義的返回值
(4)return expression
作為一個函數本身也是可以返回一個結果的
示例如下
# return expression async def hello(): return '協程的返回結果' loop = asyncio.get_event_loop() result = loop.run_until_complete(hello()) print(result) # 協程的返回結果
個人理解:本次執行協程返回一個結果,和一個普通函數沒什么區別。
(5)raise exception
拋出錯誤
示例如下
# raise exception async def hello(): raise StopIteration() loop = asyncio.get_event_loop() result = loop.run_until_complete(hello())
拋出錯誤中斷執行
RuntimeError: coroutine raised StopIteration
2,事件循環
event_loop
協程函數,不是想普通函數那樣直接調用運行,必須添加到事件循環中,然后由事件循環去運行,單獨運行協程函數是不會有結果的(單獨運行協程函數返回一個coroutine對象)看一個例子
# 事件event_loop async def say_after_time(delay,what): await asyncio.sleep(delay) print(what) async def main(): print(f"開始時間為: {time.time()}") await say_after_time(1,"hello") await say_after_time(2,"world") print(f"結束時間為: {time.time()}") # 創建事件循環對象 loop=asyncio.get_event_loop() #與上面等價,創建新的事件循環 # loop=asyncio.new_event_loop() #通過事件循環對象運行協程函數 loop.run_until_complete(main()) # tasks = [main(),main()] # loop.run_until_complete(asyncio.wait(tasks)) # 關閉 loop.close()
輸出如下
開始時間為: 1634363895.1841946 hello world 結束時間為: 1634363898.1979322
解析:
本次定義了兩個協程函數,協程函數say_after_time(delay,what)傳遞兩個參數delay,what分別為整數和字符串,在協程函數內部又調用協程asyncio.sleep(delay)模擬協程操作,然后打印出傳遞的字符串what。函數main()也是一個協程函數,在函數內部分別調用兩次協程函數say_after_time(),分別傳遞不同的參數。創建循環對象運行了協程函數main(),首先打印開始時間,然后調用協程函數say_after_time(delay,what),這個時候線程中斷,等待協程函數say_after_time(delay,what)執行,如果本次任務執行了多個其他協程任務則不會等待去執行其他協程,但是本次任務隊列只執行了main(),所以接着去執行第一個say_after_time(delay,what)了,在執行第一個say_after_time(delay,what)的時候遇到協程asyncio.sleep(delay)了,如果有其他協程也不會等待返回而去執行其他協程,本次沒有,所以等這個asyncio.sleep(delay)返回結果以后又執行打印what了,然后第一個say_after_time(delay,what)執行完畢,接着執行第二個say_after_time(delay,what),過程類似不重復了。
使用調試模式分析執行過程,從執行協程函數main()開始
重復過程不列出
以上代碼看起來是順序運行,是因為任務只定義了一個協程所以好像還是順序運行並沒有實現協程的並發運行,修改代碼定義一個task運行兩個協程
# 事件event_loop async def say_after_time(delay,what): await asyncio.sleep(delay) print(what) async def main(): print(f"main開始時間為: {time.time()}") await say_after_time(1,"hello1") await say_after_time(2,"world1") print(f"main結束時間為: {time.time()}") async def main2(): print(f"main2開始時間為: {time.time()}") await say_after_time(1,"hello2") await say_after_time(2,"world2") print(f"main2結束時間為: {time.time()}") # 創建事件循環對象 loop=asyncio.get_event_loop() #通過事件循環對象運行協程函數 # loop.run_until_complete(main()) tasks = [main(),main2()] loop.run_until_complete(asyncio.wait(tasks)) # 關閉 loop.close()
為了區分本次定義了兩個協程函數
運行輸出如下
main2開始時間為: 1634371608.927798 main開始時間為: 1634371608.927798 hello2 hello1 world2 main2結束時間為: 1634371611.9465902 world1 main結束時間為: 1634371611.9465902
通過輸出可以看出main2和main1幾乎是同時運行,同時結束的,如果不是協程而是普通函數輸出應該是向下面這樣有順序的分別輸出,不可能交叉輸出
main2開始時間為: 1634371608.927798 hello2 world2 main2結束時間為: 1634371611.9465902 main開始時間為: 1634371608.927798 hello1 world1 main結束時間為: 1634371611.9465902
如果我們單獨像執行普通函數那樣執行一個協程函數,只會返回一個coroutine對象(python3.7)如下所示:
<coroutine object hello at 0x000001B3B5345D40>
如果直接運行會返回coroutine對象並且報一個警告,注意是警告不是錯誤
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
(1)獲取事件循環對象的幾種方法
下面幾種方法可以用來獲取,設置,創建事件循環對象loop
loop=asyncio.
get_running_loop
() 返回(獲取)在當前線程中正在運行的事件循環,如果沒有正在運行的事件循環,則會顯示錯誤;它是python3.7中新添加的
使用該方法獲取事件循環需要在事件運行期間才能獲取,不能再事件運行之后獲取,因為事件運行之后相當於該事件已經運行完畢就無法獲取了
舉例說明
# 獲取事件循環對象的幾種方式 # asyncio.get_running_loop()獲取當前的事件循環對象 async def hello(): result = await asyncio.sleep(1) # 獲取當前事件循環對象並打印 loop = asyncio.get_running_loop() print(loop) # 創建事件循環對象 loop = asyncio.get_event_loop() # 運行事件循環對象 loop.run_until_complete(hello())
輸出如下
<ProactorEventLoop running=True closed=False debug=False>
其中各個選項含義如下
# 正在運行 running=True # 還沒有關閉 closed=False # 沒有開啟debug debug=False
如果是在事件循環外打印事件循環對象
# 獲取事件循環對象的幾種方式 # asyncio.get_running_loop()獲取當前的事件循環對象 async def hello(): result = await asyncio.sleep(1) # 獲取當前事件循環對象並打印 loop = asyncio.get_running_loop() print(loop) # 創建事件循環對象 loop = asyncio.get_event_loop() # 運行事件循環對象 loop.run_until_complete(hello()) # <ProactorEventLoop running=True closed=False debug=False> print(loop)
則輸出如下,事件循環對象已經創建但是running的狀態為False即事件循環對象沒有運行或者是已經運行完畢
<ProactorEventLoop running=False closed=False debug=False>
如果在事件循環對象運行之外使用get_running_loop()獲取事件循環對象則會報錯
# 獲取事件循環對象的幾種方式 # asyncio.get_running_loop()獲取當前的事件循環對象 async def hello(): result = await asyncio.sleep(1) # 獲取當前事件循環對象並打印 loop = asyncio.get_running_loop() print(loop) # 創建事件循環對象 loop = asyncio.get_event_loop() asyncio.get_running_loop()
報錯信息如下
RuntimeError: no running event loop
為什么會報錯,因為事件循環對象沒有運行或者是運行結束,當代碼執行到asyncio.get_running_loop()這一步時,當前線程並沒有正在運行事件循環對象。
loop=asyncio.get_event_loop() 獲得一個事件循環,如果當前線程還沒有事件循環,則創建一個新的事件循環loop;
不詳述,前面代碼都是使用這個方法創建一個新的事件循環loop
loop=asyncio.set_event_loop(loop) 設置一個事件循環為當前線程的事件循環;
個人注釋:使用這個方法返回為None,不知道這個怎么使用
loop=asyncio.new_event_loop() 創建一個新的事件循環
和asyncio.get_event_loop() 效果一樣
(2)通過事件循環運行協程函數的兩種方式
(1)方式一:創建循環對象loop,即asyncio.get_event_loop(),然后通過loop.run_until_complete()方法來運行。
(2)方式二:直接通過asyncio.run(function_name)運行協程函數。但是需要注意的是,首先run函數是python3.7版本新添加的,前面的版本是沒有的;其次,這個run函數總是創建一個新的實踐循環並在run結束之后關閉事件循環,所以,如果在同一個線程中已經有了一個事件循環,則不能再使用再給函數了,因為同一個線程不能有兩個事件循環,而且這個run函數不能同時運行兩次,因為它已經創建一個了。即同一個線程是不允許有多個事件循環loop的。
舉例說明
# 運行協程 async def hello(): print('begin') result = await asyncio.sleep(1) print('end') loop = asyncio.get_event_loop() loop.run_until_complete(hello()) loop.close() asyncio.run(hello())
輸出如下
begin end begin end
上述例子中,以下代碼實現的效果是一樣的
loop = asyncio.get_event_loop() loop.run_until_complete(hello()) loop.close()
使用run
asyncio.run(hello())
個人理解:使用asyncio.run會自動關閉,類似於使用with命令打開文件。
以下所說同一個線程不允許有多個事件循環,但是上例有兩個事件循環,是因為雖然有兩個事件循環但是其實他們是順序執行的,執行第一個事件循環結束以后再執行第二個事件循環,所以不沖突,是允許的。
以下例子示例一個線程只能運行一個事件循環
# 一個線程值同時只允許執行一個事件循環 async def hello(): print('begin') await asyncio.sleep(1) asyncio.run(asyncio.sleep(1)) print('end') asyncio.run(hello())
運行報錯
即不能在事件循環內再運行一個事件循環
注意:到底什么是事件循環?如何理解?
可以這樣理解:線程一直在各個協程方法直接永不停歇的游走,遇到一個yield from或await就懸掛起來,然后又走到另外一個方法,依次進行下去,直到事件循環所有的方法執行完畢。實際上loop是BaseEventLoop的一個實例,我們可以查看定義,它到底有哪些方法可調用。
3,什么是awaitable對象----即可暫停等待對象
有三類對象是可等待的,即 coroutines, Tasks, and Futures.
coroutine:本質上就是一個函數,一前面的生成器yield和yield from為基礎,不再贅述;
Tasks: 任務,顧名思義,就是要完成某件事情,其實就是對協程函數進一步的封裝;
Future:它是一個“更底層”的概念,他代表一個一步操作的最終結果,因為一步操作一般用於耗時操作,結果不會立即得到,會在“將來”得到異步運行的結果,故而命名為Future。
三者的關系,coroutine可以自動封裝成task,而Task是Future的子類。
4,什么是task任務
如前所述,Task用來並發調度的協程,即對協程函數的進一步包裝?那為什么還需要包裝呢?因為單純的協程函數僅僅是一個函數而已,將其包裝成任務,任務是可以包含各種狀態的,異步編程最重要的就是對異常操作狀態的把控了。
(1)創建任務(兩種方法)
方法一:task = asyncio.create_task(coro()) # 這是3.7版本新添加的
方法二:task = asyncio.ensure_future(coro())
兩種方法都可以創建task有什么區別?
使用create_task創建task需要在一個循環事件內創建,即在運行循環事件的時候創建,而不能在循環事件之外創建,舉例說明
# create_task創建task需要在一個循環事件內創建 async def hello(): print('start') task = asyncio.create_task(asyncio.sleep(1)) result = await task print('task返回結果為{0}'.format(result)) loop = asyncio.get_event_loop() loop.run_until_complete(hello())
輸出如下
start task返回結果為None
分析,運行協程函數hello()首先打印start,然后使用create_task(asyncio.sleep(1))創建一個task,該task完成一個sleep的操作,這個時候只是創建task並沒有運行,到下一步await的時候才執行然后等待1秒以后輸出 task返回結果為None
以上運行的task沒有返回所以返回None下面例子給協程函數hello()定義一個返回,然后在協程函數main里面創建task調用hello()
# create_task創建task需要在一個循環事件內創建 async def hello(): print('start') task = asyncio.create_task(asyncio.sleep(1)) result = await task print('task返回結果為{0}'.format(result)) # 自定義返回 return 'hello的返回' async def main(): task = asyncio.create_task(hello()) result = await task print(result) loop = asyncio.get_event_loop() # loop.run_until_complete(hello()) loop.run_until_complete(main())
輸出如下
start task返回結果為None hello的返回
解析
不能在一個循環事件之外使用該方法創建task,運行報錯
task = asyncio.create_task(hello())
運行報錯如下
RuntimeError: no running event loop
創建task的方法二示例,在循環事件之外創建task
# asyncio.ensure_future在循環事件外創建task async def hello(): print('start') task = asyncio.create_task(asyncio.sleep(1)) result = await task print('task返回結果為{0}'.format(result)) task = asyncio.ensure_future(hello()) loop.run_until_complete(task)
也可以使用
loop.
create_future
()
loop.
create_task
(coro)
示例
# create_future創建task示例start def hello(future): print('start') # result = await asyncio.sleep(1) future.set_result(1) print('end') loop = asyncio.get_event_loop() task = loop.create_future() loop.call_soon(hello,task) loop.run_until_complete(task) # create_future創建task示例end
loop.call_soon()傳遞參數第一個為函數名,第二個為創建的task ,定義的函數是普通函數
注意:create_future具體怎么使用還不了解
使用create_task創建task示例
# create_task創建task示例start async def hello(): print('start') result = await asyncio.sleep(1) print('end') loop = asyncio.get_event_loop() # 創建task傳遞參數為運行函數hello()即一個協程對象 task = loop.create_task(hello()) # 運行 loop.run_until_complete(task) # create_task創建task示例end
備注:關於任務的詳解,會在后面的系列文章繼續講解,本文只是概括性的說明。
(2)獲取某一個任務的方法
方法一:task=asyncio.current_task(loop=None)
返回在某一個指定的loop中,當前正在運行的任務,如果沒有任務正在運行,則返回None;
如果loop為None,則默認為在當前的事件循環中獲取,
方法二:asyncio.all_tasks(loop=None)
返回某一個loop中還沒有結束的任務
示例:
# asyncio.current_task獲取一個任務的方法start async def hello(): print('start') result = await asyncio.sleep(1) task = asyncio.current_task() # task = asyncio.all_tasks() print(task) print('end') loop = asyncio.get_event_loop() task = asyncio.ensure_future(hello()) loop.run_until_complete(task) # asyncio.current_task獲取一個任務的方法end
輸出如下
PS D:\learn-python3\函數式編程> & C:/ProgramData/Anaconda3/python.exe d:/learn-python3/學習腳本/協程系列/use_asyncio.py start <Task pending name='Task-1' coro=<hello() running at d:/learn-python3/學習腳本/協程系列/use_asyncio.py:256> cb=[_run_until_complete_cb() at C:\ProgramData\Anaconda3\lib\asyncio\base_events.py:184]> end
同理返回任務需要在一個事件循環內,方法asyncio.all_tasks()在本次得到到輸出和asyncio.current_task()是一致的
5,什么是future
Future是一個較底層的可等待(awaitable)對象,它表示的是異步操作的最終結果,當一個Future對象被等待的時候,協程會一直等待,直到Future已經運算完畢。
Future是Task的父類,一般情況下,已不用去管它們兩者的詳細區別,也沒有必要去用Future,用Task就可以了。
返回 future 對象的低級函數的一個很好的例子是 loop.run_in_executor()
.
二,asyncio的基本構架
前面介紹了asyncio里面最為核心的幾個概念,如果能夠很好地理解這些概念,對於學習協程是非常有幫助的,但是按照我個人的風格,我會先說asyncio的架構,理解asyncio的設計架構有助於更好地應用和理解。
asyncio分為高層API和低層API,我們都可以使用,就像我前面在講matplotlib的架構的時候所講的一樣,我們前面所講的Coroutine和Tasks屬於高層API,而Event Loop 和Future屬於低層API。當然asyncio所涉及到的功能遠不止於此,我們只看這么多。下面是是高層API和低層API的概覽:
High-level APIs
Coroutines and Tasks(本文要寫的)
Streams
Synchronization Primitives
Subprocesses
Queues
Exceptions
Low-level APIs
Event Loop(下一篇要寫的)
Futures
Transports and Protocols
Policies
Platform Support
所謂的高層API主要是指那些asyncio.xxx()的方法,
1,常見的以下高層API方法
(1)運行異步協程
asyncio.run
(coro, *, debug=False) #運行一個一步程序,參見上面
(2)創建任務
task=asyncio.
create_task
(coro) #python3.7 ,參見上面
task = asyncio.ensure_future(coro())
(3)睡眠
await asyncio.sleep(delay, result=None, *, loop=None)
這個函數表示的是:當前的那個任務(協程函數)睡眠多長時間,而允許其他任務執行。這是它與time.sleep()的區別,time.sleep()是當前線程休息,注意他們的區別哦。
另外如果提供了參數result,當當前任務(協程)結束的時候,它會返回;
loop參數將會在3.10中移除,這里就不再說了。
(4)並發運行多個任務
await asyncio.
gather
(*coros_or_futures, loop=None, return_exceptions=False)
它本身也是awaitable的。
*coros_or_futures是一個序列拆分操作,如果是以個協程函數,則會自動轉換成Task。
當所有的任務都完成之后,返回的結果是一個列表的形式,列表中值的順序和*coros_or_futures完成的順序是一樣的。
return_exceptions:False,這是他的默認值,第一個出發異常的任務會立即返回,然后其他的任務繼續執行;
True,對於已經發生了異常的任務,也會像成功執行了任務那樣,等到所有的任務執行結束一起將錯誤的結果返回到最終的結果列表里面。
如果gather()本身被取消了,那么綁定在它里面的任務也就取消了。
(5)防止任務取消
await asyncio.
shield
(*arg, *, loop=None)
它本身也是awaitable的。顧名思義,shield為屏蔽、保護的意思,即保護一個awaitable 對象防止取消,一般情況下不推薦使用,而且在使用的過程中,最好使用try語句塊更好。
try: res = await shield(something()) except CancelledError: res = None
(6)設置timeout——一定要好好理解
await asyncio.
wait_for
(aw, timeout, *, loop=None)
如果aw是一個協程函數,會自動包裝成一個任務task。參見下面的例子:
import asyncio async def eternity(): print('我馬上開始執行') await asyncio.sleep(3600) #當前任務休眠1小時,即3600秒 print('終於輪到我了') async def main(): # Wait for at most 1 second try: print('等你3秒鍾哦') await asyncio.wait_for(eternity(), timeout=3) #休息3秒鍾了執行任務 except asyncio.TimeoutError: print('超時了!') asyncio.run(main())
運行結果如下
等你3秒鍾哦 我馬上開始執行 超時了!
解析:首先調用main()入口函數,當輸出"等你3秒哦",main()掛起,執行eternity(),然后打印“我馬上開始執行”然后遇到await掛起,而且需要掛起3600秒,大於timeout設置的3秒,這時候觸發TimeoutError輸出“超時了!”
修改代碼把eternity的模擬等待時間始終為2秒
import asyncio async def eternity(): print('我馬上開始執行') await asyncio.sleep(2) #當前任務休眠1小時,即3600秒 print('終於輪到我了') async def main(): # Wait for at most 1 second try: print('等你3秒鍾哦') await asyncio.wait_for(eternity(), timeout=3) #休息3秒鍾了執行任務 except asyncio.TimeoutError: print('超時了!') asyncio.run(main())
輸出如下
等你3秒鍾哦 我馬上開始執行 終於輪到我了
因為eternity等待的時間為2秒,小於設置的timeout時間3秒,所以沒有錯發TimeoutError所以eternity完整執行了。
總結:當異步操作需要執行的時間超過waitfor設置的timeout,就會觸發異常,所以在編寫程序的時候,如果要給異步操作設置timeout,一定要選擇合適,如果異步操作本身的耗時較長,而你設置的timeout太短,會涉及到她還沒做完,就拋出異常了。
(7)多個協程函數時候的等待
await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
與上面的區別是,第一個參數aws是一個集合,要寫成集合set的形式,比如:
{func(),func(),func3()}
表示的是一系列的協程函數或者是任務,其中協程會自動包裝成任務。事實上,寫成列表的形式也是可以的。
注意:該函數的返回值是兩個Tasks/Futures的集合:
(done, pending)
其中done是一個集合,表示已經完成的任務tasks;pending也是一個集合,表示還沒有完成的任務。
常見的使用方法為:done, pending = await asyncio.wait(aws)
參數解釋:
timeout (a float or int), 同上面的含義一樣,需要注意的是,這個不會觸發asyncio.TimeoutError異常,如果到了timeout還有任務沒有執行完,那些沒有執行完的tasks和futures會被返回到第二個集合pending里面。
return_when參數,顧名思義,他表示的是,什么時候wait函數該返回值。只能夠取下面的幾個值。
Constant | Description |
FIRST_COMPLETED | 當任何一個task或者是future完成或者是取消,wait函數就返回 |
FIRST_EXCEPTION | 當任何一個task或者是future觸發了某一個異常,就返回,.如果是所有的task和future都沒有觸發異常,則等價與下面的 ALL_COMPLETED |
ALL_COMPLETED | 當所有的task或者是future都完成或者是都取消的時候,再返回。 |
如下面例子所示
import asyncio import time a=time.time() async def hello1(): #大約2秒 print("Hello world 01 begin") await asyncio.sleep(2) print("Hello again 01 end") async def hello2(): #大約3秒 print("Hello world 02 begin") await asyncio.sleep(3) print("Hello again 02 end") async def hello3(): #大約4秒 print("Hello world 03 begin") await asyncio.sleep(4) print("Hello again 03 end") async def main(): #入口函數 done,pending=await asyncio.wait({hello1(),hello2(),hello3()},return_when=asyncio.FIRST_COMPLETED) for i in done: print(i) for j in pending: print(j) asyncio.run(main()) #運行入口函數 b=time.time() print('---------------------------------------') print(b-a)
輸出如下
Hello world 02 begin Hello world 01 begin Hello world 03 begin Hello again 01 end <Task finished name='Task-3' coro=<hello1() done, defined at d:/learn-python3/學習腳本/協程系列/use_asyncio.py:298> result=None> <Task pending name='Task-2' coro=<hello2() running at d:/learn-python3/學習腳本/協程系列/use_asyncio.py:305> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000002BAE8B04AF0>()]>> <Task pending name='Task-4' coro=<hello3() running at d:/learn-python3/學習腳本/協程系列/use_asyncio.py:310> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000002BAE8B04B50>()]>> --------------------------------------- 2.030433177947998
從上面可以看出,hello1()運行結束了,hello2()和hello3()還沒有結束
因為參數設置為
return_when=asyncio.FIRST_COMPLETED
所以當任何一個task或者是future完成或者是取消,wait函數就返回,因為hello1()等待的時間最短所以執行完就返回了,但是這個時候hello2()和hello3()還沒有執行完畢,強迫中斷了,所以done為已完成的task集合即hello1(),pending為未完成的task集合即hello2() hello3()
(8)asyncio.as_completed()函數
asyncio.as_completed(aws, *, loop=None, timeout=None)
第一個參數aws:同上面一樣,是一個集合{}集合里面的元素是coroutine、task或者future
第三個參數timeout:意義和上面講的的一樣
那到底什么作用呢?
# asyncio.as_completed start import asyncio import time import threading a=time.time() async def hello1(): print("Hello world 01 begin") await asyncio.sleep(5) #大約5秒 print("Hello again 01 end") return '哈哈1' async def hello2(): print("Hello world 02 begin") await asyncio.sleep(3) #大約3秒 print("Hello again 02 end") return '哈哈2' async def hello3(): print("Hello world 03 begin") await asyncio.sleep(4) #大約4秒 print("Hello again 03 end") return '哈哈3' async def main(): s=asyncio.as_completed({hello1(),hello2(),hello3()}) for f in s: result=await f print(result) asyncio.run(main()) b=time.time() print('---------------------------------------') print(b-a) # asyncio.as_completed end
輸出如下
Hello world 03 begin Hello world 01 begin Hello world 02 begin Hello again 02 end 哈哈2 Hello again 03 end 哈哈3 Hello again 01 end 哈哈1 --------------------------------------- 5.02417516708374
結論:asyncio.as_completed()函數返回的是一個可迭代(iterator)的對象,對象的每個元素就是一個future對象,很多小伙伴說,這不是相當於沒變嗎?其實返回的future集合是對參數的future集合重新組合,組合的順序就是,最先執行完的協程函數(coroutine、task、future)最先返回,從上面的代碼可知,參數為
aws={hello1(),hello2(),hello3()},因為hello1大約花費5秒、hello2大約花費3秒、hello3大約花費4秒。返回的結果為
s={hello2()、hello3()、hello(1)},因為hello2時間最短,故而放在前面,hello1時間最長,故而放在最后面。然后對返回的集合s開始迭代。
2,task類詳解
官方英文文檔如下
class asyncio.Task(coro, *, loop=None) A Future-like object that runs a Python coroutine. Not thread-safe. Tasks are used to run coroutines in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future. When the Future is done, the execution of the wrapped coroutine resumes. Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations. Use the high-level asyncio.create_task() function to create Tasks, or the low-level loop.create_task() or ensure_future() functions. Manual instantiation of Tasks is discouraged. To cancel a running Task use the cancel() method. Calling it will cause the Task to throw a CancelledError exception into the wrapped coroutine. If a coroutine is awaiting on a Future object during cancellation, the Future object will be cancelled. cancelled() can be used to check if the Task was cancelled. The method returns True if the wrapped coroutine did not suppress the CancelledError exception and was actually cancelled. asyncio.Task inherits from Future all of its APIs except Future.set_result() and Future.set_exception(). Tasks support the contextvars module. When a Task is created it copies the current context and later runs its coroutine in the copied context.
上面的文字描述中推出了幾個非常重要的信息,特在此總結如下:
(1)他是作為一個python協程對象,和Future對象很像的這么一個對象,但不是線程安全的;他繼承了Future所有的API,,除了Future.set_result()和Future.set_Exception();
(2)使用高層API asyncio.ccreate_task()創建任務,或者是使用低層API loop.create_task()或者是loop.ensure_future()創建任務對象;
(3)相比於協程函數,任務時有狀態的,可以使用Task.cancel()進行取消,這會觸發CancelledError異常,使用cancelled()檢查是否取消。
下面介紹Task類常見的一些使用函數
(1)cancel()
Request the Task to be cancelled.
其實前面已經有所介紹,最好是使用他會出發CancelledError異常,所以需要取消的協程函數里面的代碼最好在try-except語句塊中進行,這樣方便觸發異常,打印相關信息,但是Task.cancel()沒有辦法保證任務一定會取消,而Future.cancel()是可以保證任務一定取消的。可以參見下面的一個例子:
# task.cancel() start import asyncio async def cancel_me(): print('cancel_me(): before sleep') try: await asyncio.sleep(3600) #模擬一個耗時任務 except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): #通過協程創建一個任務,需要注意的是,在創建任務的時候,就會跳入到異步開始執行 #因為是3.7版本,創建一個任務就相當於是運行了異步函數cancel_me task = asyncio.create_task(cancel_me()) #等待一秒鍾 await asyncio.sleep(1) print('main函數休息完了') #發出取消任務的請求 task.cancel() try: await task #因為任務被取消,觸發了異常 except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) # task.cancel() end
輸出如下
cancel_me(): before sleep main函數休息完了 cancel_me(): cancel sleep cancel_me(): after sleep main(): cancel_me is cancelled now
運行過程分析:
首先run函數啟動主函數入口main,在main,因為第一個話就是調用異步函數cancel_me(),函數,所以先打印出第一句話;
然后進入cancle_me中的tty語句,遇到await,暫停,這時返回main中執行,但是又在main中遇到await,也會暫停,但是由於main中只需要暫停1秒,而camcel_me中要暫停3600秒,所以等到main暫停結束后,接着運行main,所以打印出第二句話;
接下來遇到取消任務的請求task.cancel(),然后繼續執行main里面的try,又遇到await,接着main進入暫停,接下來進入到cancle_me函數中,但是由於main中請求取消任務,所以那個耗時3600秒的任務就不再執行了,直接觸發了CancelldeError異常,打印出第三句話,接下來raise一個異常信息
接下來執行cancel_me的finally,打印出第四句話,此時cancel_me執行完畢,由於它拋出了一個異常,返回到主程序main中,觸發異常,打印出第五句話。
(2)done()
當一個被包裝得協程既沒有觸發異常、也沒有被取消的時候,意味着它是done的,返回true。
(3)result()
返回任務的執行結果,
當任務被正常執行完畢,則返回結果;
當任務被取消了,調用這個方法,會觸發CancelledError異常;
當任務返回的結果是無用的時候,則調用這個方法會觸發InvalidStateError;
當任務出發了一個異常而中斷,調用這個方法還會再次觸發這個使程序中斷的異常。
(4)exception()
返回任務的異常信息,觸發了什么異常,就返回什么異常,如果任務是正常執行的無異常,則返回None;
當任務被取消了,調用這個方法會觸發CancelledError異常;
當任務沒有做完,調用這個方法會觸發InvalidStateError異常。
下面還有一些不常用的方法,如下:
(5)add_done_callback(callback, *, context=None)
(6)remove_done_callback(callback)
(7)get_stack(*, limit=None)
(8)print_stack(*, limit=None, file=None)
(9)all_tasks(loop=None),這是一個類方法
(10)current_task(loop=None),這是一個類方法
3,異步函數結果的獲取
對於異步編程,異步函數而言,最重要的就是異步函數調用結束之后,獲取異步函數的返回值,我們可以用以下幾種方式來獲取函數的返回值,第一個直接通過Task.reslut()來獲取;第二種是綁定一個回調函數來獲取,即函數執行完畢后調用一個函數來獲取異步函數的返回值。
(1)直接通過result獲取
# 通過result獲取 start async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務3秒 print("Hello again 01 end") return a+b coroutine=hello1(10,5) loop = asyncio.get_event_loop() #第一步:創建事件循環 task=asyncio.ensure_future(coroutine) #第二步:將多個協程函數包裝成任務列表 loop.run_until_complete(task) #第三步:通過事件循環運行 print('-------------------------------------') print(task.result()) loop.close() # 通過result獲取 end
輸出如下
Hello world 01 begin Hello again 01 end ------------------------------------- 15
(2)通過定義回調函數來獲取
# 通過回調函數獲取 start import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務3秒 print("Hello again 01 end") return a+b def callback(future): #定義的回調函數 print(future.result()) loop = asyncio.get_event_loop() #第一步:創建事件循環 task=asyncio.ensure_future(hello1(10,5)) #第二步:將多個協程函數包裝成任務 task.add_done_callback(callback) #給任務綁定一個回調函數 loop.run_until_complete(task) #第三步:通過事件循環運行 loop.close() # 通過回調函數獲取 end
注意:所謂的回調函數,就是指協程函數coroutine執行結束時候會調用回調函數。並通過參數future獲取協程執行的結果。我們創建的task和回調里的future對象,實際上是同一個對象,因為task是future的子類。
三,asyncio異步編程的基本模板
事實上,在使用asyncio進行異步編程的時候,語法形式往往是多樣性的,雖然理解異步編程的核心思想很重要,但是實現的時候終究還是要編寫語句的,本次給出的模板,是兩個不同的例子,例子一是三個異步方法,它們都沒有參數,沒有返回值,都模擬一個耗時任務;例子二是三個異步方法,都有參數,都有返回值。
1,Python3.7之前版本
(1)例子一:無參數,無返回值
# 3.7版本之前 無參數無返回值 start import asyncio import time a=time.time() async def hello1(): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務3秒 print("Hello again 01 end") async def hello2(): print("Hello world 02 begin") await asyncio.sleep(2) #模擬耗時任務2秒 print("Hello again 02 end") async def hello3(): print("Hello world 03 begin") await asyncio.sleep(4) #模擬耗時任務4秒 print("Hello again 03 end") loop = asyncio.get_event_loop() #第一步:創建事件循環 tasks = [hello1(), hello2(),hello3()] #第二步:將多個協程函數包裝成任務列表 loop.run_until_complete(asyncio.wait(tasks)) #第三步:通過事件循環運行 loop.close() #第四步:取消事件循環 # 3.7版本之前 無參數無返回值 end
(2)例子2 有參數有返回值
# 3.7版本之前 有參數有返回值 start import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務3秒 print("Hello again 01 end") return a+b async def hello2(a,b): print("Hello world 02 begin") await asyncio.sleep(2) #模擬耗時任務2秒 print("Hello again 02 end") return a-b async def hello3(a,b): print("Hello world 03 begin") await asyncio.sleep(4) #模擬耗時任務4秒 print("Hello again 03 end") return a*b loop = asyncio.get_event_loop() #第一步:創建事件循環 task1=asyncio.ensure_future(hello1(10,5)) task2=asyncio.ensure_future(hello2(10,5)) task3=asyncio.ensure_future(hello3(10,5)) tasks = [task1,task2,task3] #第二步:將多個協程函數包裝成任務列表 loop.run_until_complete(asyncio.wait(tasks)) #第三步:通過事件循環運行 print(task1.result()) #並且在所有的任務完成之后,獲取異步函數的返回值 print(task2.result()) print(task3.result()) loop.close() #第四步:關閉事件循環 # 3.7版本之前 有參數有返回值 end
輸出如下
Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50
(3)總結:四步走(針對3.7之前版本)
第一步:構建事假循環
loop=asyncio.get_running_loop() #返回(獲取)在當前線程中正在運行的事件循環,如果沒有正在運行的事件循環,則會顯示錯誤;它是python3.7中新添加的 loop=asyncio.get_event_loop() #獲得一個事件循環,如果當前線程還沒有事件循環,則創建一個新的事件循環loop; loop=asyncio.set_event_loop(loop) #設置一個事件循環為當前線程的事件循環; loop=asyncio.new_event_loop() #創建一個新的事件循環
第二步:將一個或是多個協程函數包裝成任務Task
#高層API task = asyncio.create_task(coro(參數列表)) # 這是3.7版本新添加的 task = asyncio.ensure_future(coro(參數列表)) #低層API loop.create_future(coro) loop.create_task(coro)
第三步:通過實踐循環運行
loop.run_until_complete(asyncio.wait(tasks)) #通過asyncio.wait()整合多個task loop.run_until_complete(asyncio.gather(tasks)) #通過asyncio.gather()整合多個task loop.run_until_complete(task_1) #單個任務則不需要整合 loop.run_forever() #但是這個方法在新版本已經取消,不再推薦使用,因為使用起來不簡潔 ''' 使用gather或者wait可以同時注冊多個任務,實現並發,但他們的設計是完全不一樣的,在前面的2.1.(4)中已經討論過了,主要區別如下: (1)參數形式不一樣 gather的參數為 *coroutines_or_futures,即如這種形式 tasks = asyncio.gather(*[task1,task2,task3])或者 tasks = asyncio.gather(task1,task2,task3) loop.run_until_complete(tasks) wait的參數為列表或者集合的形式,如下 tasks = asyncio.wait([task1,task2,task3]) loop.run_until_complete(tasks) (2)返回的值不一樣 gather的定義如下,gather返回的是每一個任務運行的結果, results = await asyncio.gather(*tasks) wait的定義如下,返回dones是已經完成的任務,pending是未完成的任務,都是集合類型 done, pending = yield from asyncio.wait(fs) (3)后面還會講到他們的進一步使用
簡單來說:async.wait會返回兩個值:done和pending,done為已完成的協程Task,pending為超時未完成的協程Task,需通過future.result調用Task的result。而async.gather返回的是已完成Task的result。
第四步:關閉事件循環
loop.close() ''' 以上示例都沒有調用 loop.close,好像也沒有什么問題。所以到底要不要調 loop.close 呢? 簡單來說,loop 只要不關閉,就還可以再運行: loop.run_until_complete(do_some_work(loop, 1)) loop.run_until_complete(do_some_work(loop, 3)) loop.close() 但是如果關閉了,就不能再運行了: loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 此處異常 建議調用 loop.close,以徹底清理 loop 對象防止誤用
2,Python3.7版本
在最新的python3.7版本中,asyncio又引進了一些新的特性和API,
例子以:無參數,無返回值
# 3.7版本或之后 無參數無返回值 start import asyncio import time async def hello1(): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務3秒 print("Hello again 01 end") async def hello2(): print("Hello world 02 begin") await asyncio.sleep(2) #模擬耗時任務2秒 print("Hello again 02 end") async def hello3(): print("Hello world 03 begin") await asyncio.sleep(4) #模擬耗時任務4秒 print("Hello again 03 end") async def main(): results=await asyncio.gather(hello1(),hello2(),hello3()) for result in results: print(result) #因為沒返回值,故而返回None asyncio.run(main()) # 3.7版本或之后 無參數無返回值 end
輸出如下
Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end None None None
例子二:有參數有返回值
# 3.7版本或之后 有參數有返回值 start import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務3秒 print("Hello again 01 end") return a+b async def hello2(a,b): print("Hello world 02 begin") await asyncio.sleep(2) #模擬耗時任務2秒 print("Hello again 02 end") return a-b async def hello3(a,b): print("Hello world 03 begin") await asyncio.sleep(4) #模擬耗時任務4秒 print("Hello again 03 end") return a*b async def main(): results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5)) for result in results: print(result) asyncio.run(main()) # 3.7版本或之后 有參數有返回值 end
輸出如下
Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50
(3)總結:兩步走(針對3.7和以后版本)
第一步:構建一個入口函數main
它也是一個異步協程函數,即通過async定義,並且要在main函數里面await一個或多個協程,和前面一樣,我們可以通過gather或者是wait進行組合,對於有返回值的協程函數,一般就在main里面進行結果的獲取。
第二步:啟動主函數main
這是python3.7新添加的函數,就一句話,即
asyncio.run(main())
注意:
不再需要顯式的創建事件循環,因為在啟動run函數的時候,就會自動創建一個新的事件循環。而且在main中也不需要通過事件循環去掉用被包裝的協程函數,只需要向普通函數那樣調用即可 ,只不過使用了await關鍵字而已。
四,協程編程的優點
1、無cpu分時切換線程保存上下文問題(協程上下文怎么保存)
2、遇到io阻塞切換(怎么實現的)
3、無需共享數據的保護鎖(為什么)
4、系列文章下篇預告——介紹低層的API,事件循環到底是怎么實現的以及future類的實現。