查看:https://blog.csdn.net/qq_27825451/article/details/86483493
聲明:python協程系列文章的上一篇,即第六篇,詳細介紹了asyncio的幾個底層API概念,asyncio的事件循環EventLoop,Future類的詳細使用,以及集中回答了關於異步編程的一些疑問,本文為系列文章的第七篇,將介紹如何使用多線程結合異步編程asyncio,開發出真正“不假死”的應用程序;以及如何模擬一個timer,實現定時操作。
一,異步方法依然會假死(freezing)
什么是程序的假死,這里不再多描述,特別是在編寫桌面程序的時候,如果是使用當個線程,同步函數的方式,假死是不可避免的,但是有時候我們即使使用了異步函數的方式依然是不可避免的,依然會假死,這是為什么呢?下面會通過幾個例子來詳細說明。
1,一般程序的調用方“假死”
asyncio結合多線程解決阻塞問題以及timer模擬.py
# 一般程序的調用方假死 start
import asyncio
import time
import threading
async def hello1(a,b):
print('異步函數開始執行')
await asyncio.sleep(3)
print('異步函數執行結束')
return a + b
async def main():
c = await hello1(10,20)
print(c)
print('主函數執行')
loop = asyncio.get_event_loop()
tasks = [main()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
# 一般程序的調用方假死 end
輸出如下
異步函數開始執行 異步函數執行結束 30 主函數執行
解析:
執行異步主函數main,首先執行的是
c = await hello1(10,20)
這個時候主函數阻塞,等待hello1(10,20)執行結束並返回結果,然后依次執行的是輸出 ”異步函數開始執行” ,等待3秒再輸出 “異步函數執行結束”,然后return a+b的值
返回主函數main打印返回值為30 最后打印 “主函數執行”
注意一個問題:我們前面所講的例子中,沒有出現等待,是因為各個異步方法之間是“完全並列”關系,彼此之間沒有依賴,所以我們可以將所有異步操作“gathrer”起來,然后通過事件循環,讓事件循環在多個異步方法之間來回調用,永不停止,故而沒有出現等待。
但是,現實中不可能所有的異步方法都是完全獨立的,沒有任何關系的,在上面的這個例子中,就是很好的說明,hello1是一個耗時任務,耗時大約為3秒,main也是一個異步方法,但是main中需要用到hello1中的返回結果,所以他必須要等到hello1運行結束之后才能繼續執行,這就是為什么得到上面結果的原因。這也再一次說明,異步依然是會有阻塞的。
我們也可以這樣理解,因為我給事件循環只注冊了一個異步方法,那就是main,當在main里面遇到await,事件循環掛起,轉而尋找其他的異步方法,但是由於只注冊了一個異步方法給事件循環,沒有其他方法可執行了,所以只能等待,讓hello1執行完了,再繼續執行。
2,窗口程序的假死
(1)同步假死
# 同步假死 start
import tkinter as tk # 導入 Tkinter 庫
import time
class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗體程序') #設置窗口標題
self.button=tk.Button(self.root,text="開始計算",command=self.calculate)
self.label=tk.Label(master=self.root,text="等待計算結果")
self.button.pack()
self.label.pack()
self.root.mainloop()
def calculate(self):
time.sleep(3) #模擬耗時計算
self.label["text"]=300
if __name__=='__main__':
form=Form()
解析:tk模板用來創建一個窗口,以下語句代碼如果點擊該按鈕執行對應的函數,該函數模擬耗時計算,然后顯示設置的值
self.button=tk.Button(self.root,text="開始計算",command=self.calculate)
運行的結果會先顯示一個窗口,然后單擊"開始計算",這個時候就去執行對應的calculate()了,需要執行3秒,然后窗體會假死,這時候無法移動窗體,也無法最大化最小化,3秒之后,“等待計算結果”的label會顯示300,然后前面在假死時進行的操作會接着發生,假如在假死的時候移動了窗口則窗口會移動一段距離,如果最小化則窗口會最小化,顯示如下

上面的窗體假死,這無可厚非,因為,所有的操作都是同步方法,只有一個線程,負責維護窗體狀態的線程和執行計算的線程是同一個,當在執行計算遇到time.sleep()的時候自然會遇到阻塞。那如果我們將函數任務換成異步方法呢?代碼如下:
(2)異步假死
# 異步假死 start
import tkinter as tk # 導入 Tkinter 庫
import time
import asyncio
class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗體程序') #設置窗口標題
self.button=tk.Button(self.root,text="開始計算",command=self.get_loop)
self.label=tk.Label(master=self.root,text="等待計算結果")
self.button.pack()
self.label.pack()
self.root.mainloop()
async def calculate(self):
await asyncio.sleep(3) #模擬耗時計算
self.label["text"]=300
def get_loop(self):
self.loop = asyncio.get_event_loop()
self.loop.run_until_complete(self.calculate())
#self.loop.close()
if __name__=='__main__':
form=Form()
# 異步假死 end
我們發現,窗體依然會造成阻塞,情況和前面的同步方法是一樣的,為什么會這樣呢?因為這個地方雖然啟動了事件循環,但是擁有事件循環的那個線程同時還需要維護窗體的狀態,始終只有一個線程在運行,當單擊“開始計算”按鈕,開始執行get_loop函數,在get_loop里面啟動異步方法calculate,然后遇到await,這個時候事件循環暫停,但是由於事件循環只注冊了calculate一個異步方法,也沒其他事情干,所以只能等待,造成假死阻塞。
解決辦法就是我專門再創建一個線程去執行一些計算任務,維護窗體狀態的線程就只專門負責維護狀態,后面再詳說。
二,多線程結合asyncio解決調用時的假死
1,asyncio咱們實現Concurrency and Multithreading(多線程和並發)的函數介紹
為了讓一個協程函數在不同的線程中執行,我們可以使用以下兩個函數
(1)loop.call_soon_threadsafe(callback, *args),這是一個很底層的API接口,一般很少使用,本文也暫時不做討論。
(2)asyncio.run_coroutine_threadsafe(coroutine,loop)
第一個參數為需要異步執行的協程函數,第二個loop參數為在新線程中創建的事件循環loop,注意一定要是在新線程中創建哦,該函數的返回值是一個 concurrent.futures.Future類的對象,用來獲取協程的返回結果。
future = asyncio.run_coroutine_threadsafe(coro_func(), loop) # 在新線程中運行協程
result = future.result() #等待獲取Future的結果
2、不阻塞的多線程並發實例
asyncio.run_coroutine_threadsafe(coroutine,loop)的意思很簡單,就是我在新線程中創建一個事件循環loop,然后在新線程的loop中不斷不停的運行一個或者是多個coroutine。參考下面代碼:
# 不阻塞的多線程實例 start
import asyncio
import asyncio,time,threading
# 需要執行的函數異步任務
async def func(num):
print(f'准備調用func,大約耗時{num}')
await asyncio.sleep(num)
print(threading.currentThread())
print(f'耗時{num}之后,func函數運行結束')
# 定義一個專門創建事件循環的loop函數,在另一個線程中啟動它
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
# 定義一個main函數
def main():
coroutine1 = func(3)
coroutine2 = func(2)
coroutine3 = func(1)
# 在當前線程下創建事件循環(未啟動),在start_loop里面啟動
new_loop = asyncio.new_event_loop()
# 通過當前線程開啟新的線程去啟動事件循環
t = threading.Thread(target=start_loop,args=(new_loop,))
t.start()
print(threading.currentThread())
# 這幾條語句是關鍵,代表在新線程中事件循環不斷"游走"執行
asyncio.run_coroutine_threadsafe(coroutine1,new_loop)
asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
asyncio.run_coroutine_threadsafe(coroutine3,new_loop)
# 在main定義個循環用於檢測main和協程是否同時執行
for i in 'iloveu':
print(str(i)+" ")
if __name__ == '__main__':
main()
# 不阻塞的多線程實例 end
輸出如下
<_MainThread(MainThread, started 11988)> 准備調用func,大約耗時3 i 准備調用func,大約耗時2 l o 准備調用func,大約耗時1 v e u <Thread(Thread-1, started 9876)> 耗時1之后,func函數運行結束 <Thread(Thread-1, started 9876)> 耗時2之后,func函數運行結束 <Thread(Thread-1, started 9876)> 耗時3之后,func函數運行結束
可以看到主函數main的循環和3個協程幾乎是同時執行,3個協程因為設置的執行時長不同所以執行結束時間有先后
通過打印線程信息可以看到主線程id和協程的線程id是不一樣的,即主函數main使用一個線程執行,3個協程使用新創建的線程執行

我們發現,main是在主線程中的,而三個協程函數是在新線程中的,它們是在一起執行的,沒有造成主線程main的阻塞。下面再看一下窗體函數中的實現。
# 函數窗體無阻塞 start
import tkinter as tk # 導入 Tkinter 庫
import time
import asyncio
import threading
class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗體程序') #設置窗口標題
self.button=tk.Button(self.root,text="開始計算",command=self.change_form_state)
self.label=tk.Label(master=self.root,text="等待計算結果")
self.button.pack()
self.label.pack()
self.root.mainloop()
async def calculate(self):
await asyncio.sleep(3)
self.label["text"]=300
def get_loop(self,loop):
self.loop=loop
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def change_form_state(self):
coroutine1 = self.calculate()
new_loop = asyncio.new_event_loop() #在當前線程下創建事件循環,(未啟用),在start_loop里面啟動它
t = threading.Thread(target=self.get_loop,args=(new_loop,)) #通過當前線程開啟新的線程去啟動事件循環
t.start()
asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #這幾個是關鍵,代表在新線程中事件循環不斷“游走”執行
if __name__=='__main__':
form=Form()
# 函數窗體無阻塞 end
運行上面的代碼,我們發現,此時點擊“開始計算”按鈕執行耗時任務,沒有造成窗體的任何阻塞,我可以最大最小化、移動等等,然后3秒之后標簽會自動顯示運算結果。為什么會這樣?
上面的代碼中,get_loop()、change_form_state()、__init__()都是定義在主線程中的,窗體的狀態維護也是主線程,耗時計算calculate()是一個異步協程函數。
現在單擊“開始計算”按鈕,這個事件發生之后,會觸發主線程的chang_form_state函數,然后在該函數中,會創建新的線程,通過新的線程創建一個事件循環,然后將協程函數注冊到新線程中的事件循環中去,達到的效果就是,主線程做主線程的,新線程做新線程的,不會造成任何阻塞。
4、multithreading+asyncio總結
第一步:定義需要異步執行的一系列操作,及一系列協程函數;
第二步:在主線程中定義一個新的線程,然后再新線程中產生一個新的事件循環;
第三步:在主線程中,通過asyncio.run_coroutine_threadsade(coroutine,loop)這個方法,將一系列異步方法注冊到新線程的loop里面去,這樣就是新線程賦值事件循環的執行。
三,使用asyncio實現一個timer
所謂的timer指的是,指定一個時間間隔,讓某一個操作隔一個時間間隔執行一次,如此周而復始。很多編程語言都提供了專門的timer實現機制,包括C++,C#等。但是Python並沒有原生支持timer,不過可以用asyncio.sleep模擬。
大致的思想如下,將timer定義為一個異步協程,然后同事件循環去調用這個異步協程,讓事件循環不斷在這個協程中反反復復調用,只不過隔幾秒調用一次即可。
簡單的實現如下(本例基於python3.8):
# timer start
import asyncio
# 定義協程函數傳遞參數為整數然后休眠一段時間
async def delay(time):
await asyncio.sleep(time)
# 定義timer函數
# 該協程函數無限循環,首先創建一個future,該future調用協程函數delay
# 然后await future即等待delay(time)執行
# 然后調用回調函數function這個函數也是作為一個參數傳遞進來的
# future.all_done_all(fn) 附加可調用fn到future對象。當future對象取消或者運行完成時,調用fn
# 而這個future對象將作為它的唯一參數即函數fn只能有一個參數,這個參數就是future對象
async def timer(time,function):
while True:
future = asyncio.ensure_future(delay(time))
await future
future.add_done_callback(function)
# add_done_callback(fn)方法定義的函數只能有唯一參數,參數為future對象
def func(future):
#print(future)
print('done')
if __name__ == "__main__":
asyncio.run(timer(2, func))
# timer end
每隔2秒輸出done,無限循環
done done done done done done done done done done .....
調試模式分析



幾個注意點:asyncio.sleep()本身就是一個協程函數,故而可以將它封裝成一個Task或者Future,等待時間結束也就是任務完成,綁定回調函數。當然本身python語法靈活,上面只是其中一種實現而已。
