參考:https://blog.csdn.net/qq_27825451/article/details/86292513
聲明:python協程系列文章的上一篇,即第五篇,詳細介紹了asyncio的核心概念,asyncio的設計架構,Task類的詳細作用,本文為系列文章的第六篇,將介紹更加底層的API,以EventLoop和Future為主,介紹他們的設計理念,包含的方法以及使用技巧。
一,事件循環EventLoop
事件循環是asyncio的核心,異步任務的運行,任務完成之后的回調,網絡IO操作,子進程的運行,都是通過事件循環完成的。在前一篇文章中,已經提到過,在Python3.7中,我們甚至完全不用管事件循環,只需要使用高層API,即asyncio中的方法,我們很少直接與事件循環打交道,但是為了更加熟悉asyncio的運行原理,最好還是要理解EventLoop的設計原理。
1,事件循環的,創建,獲取,設置(上文已經介紹過了)
在上文我們已經介紹過事件循環的創建,獲取,設置本文復習一下
- loop=asyncio.get_running_loop() 返回(獲取)在當前線程中正在運行的事件循環,如果沒有正在運行的事件循環,則會顯示錯誤;它是python3.7中新添加的
- loop=asyncio.get_event_loop() 獲得一個事件循環,如果當前線程還沒有事件循環,則創建一個新的事件循環loop;
- loop=asyncio.set_event_loop(loop) 設置一個事件循環為當前線程的事件循環;
- loop=asyncio.new_event_loop() 創建一個新的事件循環
舉例說明
(1)loop=asyncio.get_running_loop()
獲取的是正在運行的事件循環,也就是說需要在事件循環內部函數運行,否則如果是在事件循環外部運行,事件循環已經運行完畢則會報錯
示例:
# asyncio.get_running_loop()獲取正在運行的事件循環 start import asyncio async def hello(): print('start') loop = asyncio.get_running_loop() print('現在運行的事件循環是{0}'.format(loop)) await asyncio.sleep(1) print('end') # 創建事件循環 loop = asyncio.get_event_loop() # 運行事件循環 loop.run_until_complete(hello()) # 輸出如下 # start # 現在運行的事件循環是<ProactorEventLoop running=True closed=False debug=False> # end # asyncio.get_running_loop()獲取正在運行的事件循環 end
如果在沒有事件循環的位置運行asyncio.get_running_loop()則會報錯
# 創建事件循環 loop = asyncio.get_event_loop() # 運行事件循環 loop.run_until_complete(hello()) # 在這個位置運行獲取在運行的事件循環會報錯,因為事件循環處於未運行狀態 asyncio.get_running_loop() # 報錯信息如下 # RuntimeError: no running event loop
(2) loop=asyncio.get_event_loop()
獲得一個事件循環,如果當前線程沒有事件循環則創建一個新的事件循環,等同於asyncio.new_event_loop()
舉例經常用到,暫無示例
(3)loop=asyncio.set_event_loop(loop)
不清楚怎么使用,沒有示例
(4)loop=asyncio.new_event_loop()
創建一個新的事件循環,和get_event_loop()差不多
2,運行和停止事件循環
(1)loop.run_until_complete(future)。運行事件循環,直到future運行結束
(2)loop.run_forever()。在python3.7中已經取消了,表示事件循環會一直運行,直到遇到stop。
(3)loop.stop()。停止事件循環
(4)loop.is_running()。如果事件循環依然在運行,則返回True
(5)loop.is_closed()。如果事件循環已經close,則返回True
(6)loop.close()。關閉事件循環
3,創建Future和Task
(1)loop.create_future(coroutine) ,返回future對象
(2)loop.create_task(corootine) ,返回task對象
(3)loop.set_task_factory(factory)
(4)loop.get_task_factory()
關於future可以參考
https://www.cnblogs.com/minseo/p/15440234.html
4,事件循環的時鍾
loop.time().可以這么理解,事件循環內部也維護着一個時鍾,可以查看事件循環現在運行的時間點是多少,就像普通的time.time()類似,它返回的是一個浮點數值,比如下面的代碼。
# loop.time() start import asyncio async def hello(a,b): print('准備做加法運算') await asyncio.sleep(3) return a + b loop = asyncio.get_event_loop() t1 = loop.time() print(t1) r1 = loop.run_until_complete(hello(3,4)) t2 = loop.time() print(t2) print(t2-t1) # loop.time() end
5,計划執行回調函數(CallBacks)
(1)loop.call_later(delay, callback, *args, context=None)
首先簡單的說一下它的含義,就是事件循環在delay多長時間之后才執行callback函數,它的返回值是asyncio.TimerHandle類的一個實例對象。
(2)loop.call_at(when, callback, *args, context=None)
即在某一個時刻進行調用計划的回調函數,第一個參數不再是delay而是when,表示一個絕對的時間點,結合前面的loop.time使用,它的使用方法和call_later()很類似。它的返回值是asyncio.TimerHandle類的一個實例對象。
(3)loop.call_soon(callback, *args, context=None)
在下一個迭代的時間循環中立刻調用回調函數,用法同上面。它的返回值是asyncio.Handle類的一個實例對象。
(4)loop.call_soon_threadsafe(callback, *args, context=None)
這是call_soon()函數的線程安全版本,計划回調函數必須在另一個線程中使用。
需要注意的是:上面的幾個回調函數都只使用了“位置參數”哦,asyncio中,大部分的計划回調函數都不支持“關鍵字參數”,如果是想要使用關鍵字參數,則推薦使用functools.aprtial()對方法進一步包裝,詳細可以參考前面的python標准庫系列文章。
如:
# will schedule "print("Hello", flush=True)" loop.call_soon( functools.partial(print, "Hello", flush=True))
下面來看一下具體的使用例子。
# 回調函數 start import asyncio def callback(n): print('我是回調函數,參數為: {0} '.format(n)) async def main(loop): print('在異步函數中注冊回調函數') loop.call_later(2, callback, 1) loop.call_later(1, callback, 2) loop.call_soon(callback, 3) await asyncio.sleep(10) loop = asyncio.get_event_loop() print('進入事件循環') loop.run_until_complete(main(loop)) print('關閉事件循環') loop.close() # 回調函數 end
輸出如下
進入事件循環 在異步函數中注冊回調函數 我是回調函數,參數為: 3 我是回調函數,參數為: 2 我是回調函數,參數為: 1 關閉事件循環
解析:
主函數main(loop)傳遞的參數為一個事件循環loop然后給該事件循環綁定3個回調函數,分別為延遲2秒,1秒和立即執行
所以在執行的時候先輸出參數為3的因為該回調函數是立即執行,然后等待1秒輸出參數為2的再等待1秒輸出參數為1的,然后運行asyncio.sleep(10)等待
最后輸出"關閉事件循環"
注意:這個例子如果使用asyncio.run()方式運行則不會執行回調函數
loop = asyncio.get_event_loop() print('進入事件循環') # loop.run_until_complete(main(loop)) asyncio.run(main(loop)) print('關閉事件循環')
輸出如下
進入事件循環 在異步函數中注冊回調函數 關閉事件循環
在輸出完“在異步函數中注冊回調函數”以后等待一段時間輸出"關閉事件循環"
再看一個簡答的例子
import asyncio def callback(a, loop): print("我的參數為 {0},執行的時間為{1}".format(a,loop.time())) #call_later, call_at if __name__ == "__main__": try: loop = asyncio.get_event_loop() now = loop.time() loop.call_later(5, callback, 5, loop) #第一個參數設置的時間5.5秒后執行, loop.call_at(now+2, callback, 2, loop) #在指定的時間,運行,當前時間+2秒 loop.call_at(now+1, callback, 1, loop) loop.call_at(now+3, callback, 3, loop) loop.call_soon(callback, 4, loop) loop.run_forever() #要用這個run_forever運行,因為沒有傳入協程,這個函數在3.7中已經被取消 except KeyboardInterrupt: print("Goodbye!")
輸出如下
PS D:\learn-python3\函數式編程> & C:/ProgramData/Anaconda3/python.exe d:/learn-python3/學習腳本/future對象/use_future.py 我的參數為 4,執行的時間為863856.046 我的參數為 1,執行的時間為863857.062 我的參數為 2,執行的時間為863858.062 我的參數為 3,執行的時間為863859.046 我的參數為 5,執行的時間為863861.046
解析:
參數為4的立即執行,然后按照事件順序分別執行輸出參數為1 2 3的最后執行參數為5的
本次輸出參數5的輸出以后程序會阻塞,因為沒有關閉,需要綁定一個關閉的回調函數,該回調函數使用loop.stop()關閉事件循環,
設置運行該回調函數的時間要長一點,要不在運行該結束回調函數的時候,其他回調函數還沒有運行完畢。
import asyncio def callback(a, loop): print("我的參數為 {0},執行的時間為{1}".format(a,loop.time())) # 設置停止loop函數 def stop_loop(loop): loop.stop() #call_later, call_at if __name__ == "__main__": try: loop = asyncio.get_event_loop() now = loop.time() loop.call_later(5, callback, 5, loop) #第一個參數設置的時間5秒后執行, loop.call_at(now+2, callback, 2, loop) #在指定的時間,運行,當前時間+2秒 loop.call_at(now+1, callback, 1, loop) loop.call_at(now+3, callback, 3, loop) loop.call_soon(callback, 4, loop) # 停止loop在運行事件循環的6秒以后即等其他事件循環都運行完畢再運行 loop.call_later(6,stop_loop,loop) loop.run_forever() #要用這個run_forever運行,因為沒有傳入協程,這個函數在3.7中已經被取消 except KeyboardInterrupt: print("Goodbye!")
總結注意事項:
(1)CallBack函數只能夠定義為同步方法,不能夠定義為async方法,及不能使用async和@asyncio.coroutine修飾;
(2)每一個CallBack方法只會調用一次,如果在同一個時刻有另個CallBack方法需要調用,則他們的執行順序是不確定的;
(3)注意使用functools.partial()去修飾帶有關鍵字參數的CallBack方法;
(4)如何理解?對於一般的異步函數,我們需要將它放在時間循環里面,然后通過事件循環去循環調用它,而因為CallBack並不是異步函數,它是定義為普通的同步方法,所以不能夠放在時間循環里面,但是如果我依然想要讓事件循環去執行它怎么辦呢?那就不放進事件循環,直接讓事件循環“立即、稍后、在什么時候”去執行它不就行了嘛,call的含義就是“執行”。
二,底層API之Future
1、Future的定義概覽
Future的本質是一個類。他表示的是異步操作的最終將要返回的結果,故而命名為Future,它不是線程安全的。Future對象是awaitable的,參見系類文章的前面,
class asyncio.Future(*, loop=None)
2、asyncio中關於Future的幾個方法
(1)asyncio.isfuture(obj) 。判斷一個對象是不是Future,注意python中一切皆對象哦,包括函數,當obj是下面幾種情況時返回true:
asyncio.Future的實例對象
asyncio.Task的實例對象
一個具有 _asyncio_future_blocking 屬性的對象
(2)asyncio.ensure_future(obj, *, loop=None)。將一個obj包裝成Future
(3)asyncio.wrap_future(future, *, loop=None)
將concurrent.futures.Future對象包裝成一個 asyncio.Future 對象。
3、Future對象的常用方法
(1)result()。返回Future執行的結果返回值
如果Future被執行完成,如果使用set_result()方法設置了一個結果,那個設置的value就會被返回;
如果Future被執行完成,如果使用set_exception()方法設置了一個異常,那么使用這個方法也會觸發異常;
如果Future被取消了,那么使用這個方法會觸發CancelledError異常;
如果Future的結果不可用或者是不可達,那么使用這個方法也會觸發InvalidStateError異常;
(2)set_result(result)
標記Future已經執行完畢,並且設置它的返回值。
(3)set_exception(exception)
標記Future已經執行完畢,並且觸發一個異常。
(4)done()
如果Future1執行完畢,則返回 True 。
(5)cancelled()
判斷任務是否取消。
(6)add_done_callback(callback, *, context=None)
在Future完成之后,給它添加一個回調方法,這個方法就相當於是loop.call_soon()方法,參見前面,如下例子:
如果要回調帶有關鍵字參數的函數,也需要使用partial方法哦。
(7)remove_done_callback(callback)
(8)cancel()
(9)exception()
(10)get_loop()。返回Future所綁定的事件循環
三,集中回答以下幾個問題
通過前面的講解,我們已經清楚了asyncio架構里面的一些基本東西,現在可以集中回答以下一些常見的問題了,弄清楚這些問題,可以方便我們更加深入的理解協程。
1,多個協程一起運行是否創建新的線程?
協程運行時,都是在一個線程中運行的,沒有創建新的線程。如下
# 多個協程運行是否創建新的線程 start import asyncio import time import threading a=time.time() async def hello1(): print(f"Hello world 01 begin,my thread is:{threading.currentThread()}") await asyncio.sleep(3) print("Hello again 01 end") async def hello2(): print(f"Hello world 02 begin,my thread is:{threading.currentThread()}") await asyncio.sleep(2) print("Hello again 02 end") async def hello3(): print(f"Hello world 03 begin,my thread is:{threading.currentThread()}") await asyncio.sleep(1) print("Hello again 03 end") loop = asyncio.get_event_loop() tasks = [hello1(), hello2(),hello3()] loop.run_until_complete(asyncio.wait(tasks)) loop.close() b=time.time() print('---------------------------------------') print(b-a) # 多個協程運行是否創建新的線程 end
輸出如下
PS D:\learn-python3\函數式編程> & C:/ProgramData/Anaconda3/python.exe d:/learn-python3/學習腳本/協程系列/eventloop_future.py Hello world 02 begin,my thread is:<_MainThread(MainThread, started 13296)> Hello world 03 begin,my thread is:<_MainThread(MainThread, started 13296)> Hello world 01 begin,my thread is:<_MainThread(MainThread, started 13296)> Hello again 03 end Hello again 02 end Hello again 01 end --------------------------------------- 3.009370803833008
3個任務同時運行,每個任務等待的時間不一樣所以結束時間不一樣
通過打印線程可以看到線程id都是一樣的13296所以3個協程運行在同一個線程
從上面那個可以看出,三個不同的協程函數都是在一個線程完成的。但是並不是意味着,多個協程函數只能在一個線程中執行,同樣可以創建新的線程,其實我們完全可以在新的線程中重新創建一個事件循環,具體的實例參見后面。
2,線程效率一定更高嗎?
也不是絕對的,當然在一般情況下,異步方式的執行效率是更高的,就比如上面的三個函數,如果按照同步的方式執行,則一共需要6秒的時間,但是采用協程則只需要最長的那個時間3秒,這自然是提高了工作效率,那是不是一定會提高呢?也不一定,這與協程的調用方式是由密切關系的。如下所示:
# 協程效率一定高嗎? start import asyncio import time import threading a=time.time() async def hello1(): print(f"Hello world 01 begin,my thread is:{threading.currentThread()}") await asyncio.sleep(3) print("Hello again 01 end") async def hello2(): print(f"Hello world 02 begin,my thread is:{threading.currentThread()}") await asyncio.sleep(2) print("Hello again 02 end") async def hello3(): print(f"Hello world 03 begin,my thread is:{threading.currentThread()}") await hello2() await hello1() print("Hello again 03 end") loop = asyncio.get_event_loop() tasks = [hello3()] loop.run_until_complete(asyncio.wait(tasks)) loop.close() b=time.time() print('---------------------------------------') print(b-a) # 協程效率一定高嗎? end
輸出如下
Hello world 03 begin,my thread is:<_MainThread(MainThread, started 14872)> Hello world 02 begin,my thread is:<_MainThread(MainThread, started 14872)> Hello again 02 end Hello world 01 begin,my thread is:<_MainThread(MainThread, started 14872)> Hello again 01 end Hello again 03 end --------------------------------------- 5.018410921096802
在協程函數hello3()中調用hello1() hello2()所以先執行輸出
Hello world 03 begin,my thread is:<_MainThread(MainThread, started 14872)>
然后等待hello2()和hello1()的執行結果,注意這里是順序執行的,而不是同時執行,因為他們都放在了hello3()內,而不是放在同一個任務里面。
我們發現一個問題,上面執行的順序完全不是異步執行,執行的時間也沒有得到改善,究其原因,是因為上面是通過hello3去調用hello1和hello2的,這和同步調用的方式完全是一樣的,即使我定義的都是異步方法,它既沒有提高執行效率,還會有阻塞。
結論:在有很多個異步方式的時候,一定要盡量避免這種異步函數的直接調用,這和同步是沒什么區別的,一定要通過事件循環loop,“讓事件循環在各個異步函數之間不停游走”,這樣才不會造成阻塞。
3,協程會不會阻塞?
異步方式依然會有阻塞的,當我們定義的很多個異步方法彼此之間有一來的時候,比如,我必須要等到函數1執行完畢,函數2需要用到函數1的返回值,如上面的例子2所示,就會造成阻塞,這也是異步編程的難點之一,如何合理配置這些資源,盡量減少函數之間的明確依賴,這是很重要的。
4,協程的4種狀態
協程函數相比於一般的函數來說,我們可以將協程包裝成任務Task,任務Task就在於可以跟蹤它的狀態,我就知道它具體執行到哪一步了,一般來說,協程函數具有4種狀態,可以通過相關的模塊進行查看,請參見前面的文章,他的四種狀態為:
Pending
Running
Done
Cacelled
創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,中途需要取消,就需要先把task取消,即為cancelled。
四,多任務實現並發
python異步協程函數的最終目的是實現並發,這樣才能提高工作效率。
我們經常看見西門這樣的代碼,即:
tasks = asyncio.gather(*[task1,task2,task3]) loop.run_until_complete(tasks) #或者是 tasks = asyncio.wait([task1,task2,task3]) loop.run_until_complete(tasks) #甚至可以寫在一起,即 loop.run_until_complete(asyncio.gather(*[task1,task2,task3]) #或者是 asyncio.gather(asyncio.wait([task1,task2,task3]))
上面這些都是一些簡單的應用,可以同時進行多任務,進行並發,但是如果我們每一個任務都有返回值,而且需要獲取這些返回值,這樣做顯然還不夠,還需要做進一步的處理。
asyncio實現並發的思想是一樣的,只是實現的手段稍有區別,主要有以下幾種實現方式:
(1)使用gather同時注冊多個任務,實現並發
awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
注意事項:
gather的返回值是它所綁定的所有任務的執行結果,而且順序是不變的,即返回的result的順序和綁定的順序是保持一致的。
除此之外,它是awaitable的,所以,如果需要獲取多個任務的返回值,既然是awaitable的,就需要將它放在一個函數里面,所以我們引入一個包裝多個任務的入口main,這也是python3.7的思想。如下:
# asyncio.gather() start import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務3秒 print("Hello again 01 end") return a+b async def hello2(a,b): print("Hello world 02 begin") await asyncio.sleep(2) #模擬耗時任務2秒 print("Hello again 02 end") return a-b async def hello3(a,b): print("Hello world 03 begin") await asyncio.sleep(4) #模擬耗時任務4秒 print("Hello again 03 end") return a*b async def main(): #封裝多任務的入口函數 task1=asyncio.ensure_future(hello1(10,5)) task2=asyncio.ensure_future(hello2(10,5)) task3=asyncio.ensure_future(hello3(10,5)) results=await asyncio.gather(task1,task2,task3) for result in results: #通過迭代獲取函數的結果,每一個元素就是相對應的任務的返回值,順序都沒變 print(result) loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() # asyncio.gather() end
輸出如下
Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50
(2)使用wait可以同時注冊多個任務,實現並發
await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
它與gather不同的地方是他的參數是集合類型,而且他的返回類型是這樣一個形式,即
(done, pending). #返回dones是已經完成的任務,pending是未完成的任務,都是集合類型,不同的是每一個元素不再是返回值,而是某一個task哦,
相同的是它依然也是awaitable的,故而也需要定義在一個異步函數main()中,如下。
#前面的代碼和上面一樣 async def main(): #封裝多任務的入口函數 task1=asyncio.ensure_future(hello1(10,5)) task2=asyncio.ensure_future(hello2(10,5)) task3=asyncio.ensure_future(hello3(10,5)) done,pending=await asyncio.wait([task1,task2,task3]) for done_task in done: print(done_task.result()) #這里返回的是一個任務,不是直接的返回值,故而需要使用result函數進行獲取 loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() #運行結果也一樣
(3)使用as_completed可以同時注冊多個任務,實現並發
這個方法使用的比較少,與前面的兩個gather和wait不同的是,它不是awaitable。使用實例參見前面的一篇文章
這個方法時按照任務執行完成的先后順返回,先執行完的任務先返回,后執行完的任務后返回,組成一個集合
(4)主調方獲取任務的運行結果
上面的運行結果,都是在main()函數里面獲取的運行結果,那可不可以不再main()里面獲取結果呢,,當然是可以的,我們可以這樣做
async def main(): #封裝多任務的入口函數 task1=asyncio.ensure_future(hello1(10,5)) task2=asyncio.ensure_future(hello2(10,5)) task3=asyncio.ensure_future(hello3(10,5)) return await asyncio.gather(task1,task2,task3) #不在這里獲取結果,只是返回 loop = asyncio.get_event_loop() results=loop.run_until_complete(main()) #在這里再獲取返回函數值,然后迭代獲取 for result in results: print(result) loop.close()
或者是如下:
async def main(): #封裝多任務的入口函數 task1=asyncio.ensure_future(hello1(10,5)) task2=asyncio.ensure_future(hello2(10,5)) task3=asyncio.ensure_future(hello3(10,5)) return await asyncio.wait([task1,task2,task3]) #不在這里獲取結果,只是返回 loop = asyncio.get_event_loop() done,pending=loop.run_until_complete(main()) #在這里再獲取返回函數值,然后迭代獲取 for done_task in done: print(done_task.result()) loop.close()
五,Future補充下一篇預告
1、Future補充
asyncio中的Future類是模仿concurrent.futures.Future類而設計的,關於concurrent.futures.Future,可以查閱相關的文檔。它們之間的主要區別是:
(1)asyncio.Future對象是awaitable的,但是concurrent.futures.Future對象是不能夠awaitable的;
(2)asyncio.Future.result()和asyncio.Future.exception()是不接受關鍵字參數timeout的;
(3)當Future沒有完成的時候,asyncio.Future.result()和asyncio.Future.exception()將會觸發一個InvalidStateError異常;
(4)使用asyncio.Future.add_done_callback()注冊的回調函數不會立即執行,它可以使用loop.call_soon代替;
(5)asyncio里面的Future和concurrent.futures.wait()以及concurrent.futures.as_completed()是不兼容的。
有興趣的小伙伴可以自己學一下concurrent.futures哦!
2、下一篇預告
多線程+asyncio協程。實現更加強健的程序。