python協程系列(七)——asyncio結合多線程解決阻塞問題以及timer模擬


  查看:https://blog.csdn.net/qq_27825451/article/details/86483493

  聲明:python協程系列文章的上一篇,即第六篇,詳細介紹了asyncio的幾個底層API概念,asyncio的事件循環EventLoop,Future類的詳細使用,以及集中回答了關於異步編程的一些疑問,本文為系列文章的第七篇,將介紹如何使用多線程結合異步編程asyncio,開發出真正“不假死”的應用程序;以及如何模擬一個timer,實現定時操作。

  一,異步方法依然會假死(freezing)

  什么是程序的假死,這里不再多描述,特別是在編寫桌面程序的時候,如果是使用當個線程,同步函數的方式,假死是不可避免的,但是有時候我們即使使用了異步函數的方式依然是不可避免的,依然會假死,這是為什么呢?下面會通過幾個例子來詳細說明。

  1,一般程序的調用方“假死”

  asyncio結合多線程解決阻塞問題以及timer模擬.py

# 一般程序的調用方假死 start
import asyncio
import time
import threading

async def hello1(a,b):
    print('異步函數開始執行')
    await asyncio.sleep(3)
    print('異步函數執行結束')
    return a + b

async def main():
    c = await hello1(10,20)
    print(c)
    print('主函數執行')

loop = asyncio.get_event_loop()
tasks = [main()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()

# 一般程序的調用方假死 end

  輸出如下

異步函數開始執行
異步函數執行結束
30
主函數執行

  解析:

  執行異步主函數main,首先執行的是

c = await hello1(10,20)

  這個時候主函數阻塞,等待hello1(10,20)執行結束並返回結果,然后依次執行的是輸出 ”異步函數開始執行” ,等待3秒再輸出 “異步函數執行結束”,然后return a+b的值

  返回主函數main打印返回值為30 最后打印 “主函數執行”

  注意一個問題:我們前面所講的例子中,沒有出現等待,是因為各個異步方法之間是“完全並列”關系,彼此之間沒有依賴,所以我們可以將所有異步操作“gathrer”起來,然后通過事件循環,讓事件循環在多個異步方法之間來回調用,永不停止,故而沒有出現等待。

  但是,現實中不可能所有的異步方法都是完全獨立的,沒有任何關系的,在上面的這個例子中,就是很好的說明,hello1是一個耗時任務,耗時大約為3秒,main也是一個異步方法,但是main中需要用到hello1中的返回結果,所以他必須要等到hello1運行結束之后才能繼續執行,這就是為什么得到上面結果的原因。這也再一次說明,異步依然是會有阻塞的。

  我們也可以這樣理解,因為我給事件循環只注冊了一個異步方法,那就是main,當在main里面遇到await,事件循環掛起,轉而尋找其他的異步方法,但是由於只注冊了一個異步方法給事件循環,沒有其他方法可執行了,所以只能等待,讓hello1執行完了,再繼續執行。

  2,窗口程序的假死

  (1)同步假死

# 同步假死 start
import tkinter as tk          # 導入 Tkinter 庫
import time
 
class Form:
    def __init__(self):
        self.root=tk.Tk()
        self.root.geometry('500x300')
        self.root.title('窗體程序')  #設置窗口標題
 
        self.button=tk.Button(self.root,text="開始計算",command=self.calculate)
        self.label=tk.Label(master=self.root,text="等待計算結果")
 
        self.button.pack()
        self.label.pack()
        self.root.mainloop()
 
    def calculate(self):
        time.sleep(3)  #模擬耗時計算
        self.label["text"]=300
 
if __name__=='__main__':
    form=Form()

  解析:tk模板用來創建一個窗口,以下語句代碼如果點擊該按鈕執行對應的函數,該函數模擬耗時計算,然后顯示設置的值

self.button=tk.Button(self.root,text="開始計算",command=self.calculate)

  運行的結果會先顯示一個窗口,然后單擊"開始計算",這個時候就去執行對應的calculate()了,需要執行3秒,然后窗體會假死,這時候無法移動窗體,也無法最大化最小化,3秒之后,“等待計算結果”的label會顯示300,然后前面在假死時進行的操作會接着發生,假如在假死的時候移動了窗口則窗口會移動一段距離,如果最小化則窗口會最小化,顯示如下

 

 

   上面的窗體假死,這無可厚非,因為,所有的操作都是同步方法,只有一個線程,負責維護窗體狀態的線程和執行計算的線程是同一個,當在執行計算遇到time.sleep()的時候自然會遇到阻塞。那如果我們將函數任務換成異步方法呢?代碼如下:

  (2)異步假死

# 異步假死 start
import tkinter as tk          # 導入 Tkinter 庫
import time
import asyncio 
 
class Form:
    def __init__(self):
        self.root=tk.Tk()
        self.root.geometry('500x300')
        self.root.title('窗體程序')  #設置窗口標題
 
        self.button=tk.Button(self.root,text="開始計算",command=self.get_loop)
        self.label=tk.Label(master=self.root,text="等待計算結果")
 
        self.button.pack()
        self.label.pack()
        self.root.mainloop()
 
    async def calculate(self):
        await asyncio.sleep(3)  #模擬耗時計算
        self.label["text"]=300

    def get_loop(self):
        self.loop = asyncio.get_event_loop()
        self.loop.run_until_complete(self.calculate())
        #self.loop.close()

if __name__=='__main__':
    form=Form()

# 異步假死 end

  我們發現,窗體依然會造成阻塞,情況和前面的同步方法是一樣的,為什么會這樣呢?因為這個地方雖然啟動了事件循環,但是擁有事件循環的那個線程同時還需要維護窗體的狀態,始終只有一個線程在運行,當單擊“開始計算”按鈕,開始執行get_loop函數,在get_loop里面啟動異步方法calculate,然后遇到await,這個時候事件循環暫停,但是由於事件循環只注冊了calculate一個異步方法,也沒其他事情干,所以只能等待,造成假死阻塞。

  解決辦法就是我專門再創建一個線程去執行一些計算任務,維護窗體狀態的線程就只專門負責維護狀態,后面再詳說。
  

  二,多線程結合asyncio解決調用時的假死

  1,asyncio咱們實現Concurrency and Multithreading(多線程和並發)的函數介紹

  為了讓一個協程函數在不同的線程中執行,我們可以使用以下兩個函數

  (1)loop.call_soon_threadsafe(callback, *args),這是一個很底層的API接口,一般很少使用,本文也暫時不做討論。

  (2)asyncio.run_coroutine_threadsafe(coroutine,loop)

  第一個參數為需要異步執行的協程函數,第二個loop參數為在新線程中創建的事件循環loop,注意一定要是在新線程中創建哦,該函數的返回值是一個 concurrent.futures.Future類的對象,用來獲取協程的返回結果。

  future = asyncio.run_coroutine_threadsafe(coro_func(), loop) # 在新線程中運行協程

  result = future.result() #等待獲取Future的結果

  2、不阻塞的多線程並發實例

  asyncio.run_coroutine_threadsafe(coroutine,loop)的意思很簡單,就是我在新線程中創建一個事件循環loop,然后在新線程的loop中不斷不停的運行一個或者是多個coroutine。參考下面代碼:

# 不阻塞的多線程實例 start
import asyncio
import asyncio,time,threading
# 需要執行的函數異步任務
async def func(num):
    print(f'准備調用func,大約耗時{num}')
    await asyncio.sleep(num)
    print(threading.currentThread())
    print(f'耗時{num}之后,func函數運行結束')

# 定義一個專門創建事件循環的loop函數,在另一個線程中啟動它
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
 

# 定義一個main函數
def main():
    coroutine1 = func(3)
    coroutine2 = func(2)
    coroutine3 = func(1)
    # 在當前線程下創建事件循環(未啟動),在start_loop里面啟動
    new_loop = asyncio.new_event_loop()
    # 通過當前線程開啟新的線程去啟動事件循環
    t = threading.Thread(target=start_loop,args=(new_loop,))
    t.start()
    print(threading.currentThread())
    # 這幾條語句是關鍵,代表在新線程中事件循環不斷"游走"執行
    asyncio.run_coroutine_threadsafe(coroutine1,new_loop)
    asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
    asyncio.run_coroutine_threadsafe(coroutine3,new_loop)
    # 在main定義個循環用於檢測main和協程是否同時執行
    for i in 'iloveu':
        print(str(i)+"    ")

if __name__ == '__main__':
    main()
# 不阻塞的多線程實例 end

  輸出如下

<_MainThread(MainThread, started 11988)>
准備調用func,大約耗時3
i
准備調用func,大約耗時2
l
o
准備調用func,大約耗時1
v
e
u
<Thread(Thread-1, started 9876)>
耗時1之后,func函數運行結束
<Thread(Thread-1, started 9876)>
耗時2之后,func函數運行結束
<Thread(Thread-1, started 9876)>
耗時3之后,func函數運行結束

  可以看到主函數main的循環和3個協程幾乎是同時執行,3個協程因為設置的執行時長不同所以執行結束時間有先后

  通過打印線程信息可以看到主線程id和協程的線程id是不一樣的,即主函數main使用一個線程執行,3個協程使用新創建的線程執行

 

 

   我們發現,main是在主線程中的,而三個協程函數是在新線程中的,它們是在一起執行的,沒有造成主線程main的阻塞。下面再看一下窗體函數中的實現。

# 函數窗體無阻塞 start
import tkinter as tk          # 導入 Tkinter 庫
import time
import asyncio
import threading
 
class Form:
    def __init__(self):
        self.root=tk.Tk()
        self.root.geometry('500x300')
        self.root.title('窗體程序')  #設置窗口標題
        
        self.button=tk.Button(self.root,text="開始計算",command=self.change_form_state)
        self.label=tk.Label(master=self.root,text="等待計算結果")
 
        self.button.pack()
        self.label.pack()
 
        self.root.mainloop()
 
    async def calculate(self):
        await asyncio.sleep(3)
        self.label["text"]=300
 
    def get_loop(self,loop):
        self.loop=loop
        asyncio.set_event_loop(self.loop)
        self.loop.run_forever()
    def change_form_state(self):
        coroutine1 = self.calculate()
        new_loop = asyncio.new_event_loop()                        #在當前線程下創建事件循環,(未啟用),在start_loop里面啟動它
        t = threading.Thread(target=self.get_loop,args=(new_loop,))   #通過當前線程開啟新的線程去啟動事件循環
        t.start()
 
        asyncio.run_coroutine_threadsafe(coroutine1,new_loop)  #這幾個是關鍵,代表在新線程中事件循環不斷“游走”執行
 
 
if __name__=='__main__':
    form=Form()
# 函數窗體無阻塞 end

  運行上面的代碼,我們發現,此時點擊“開始計算”按鈕執行耗時任務,沒有造成窗體的任何阻塞,我可以最大最小化、移動等等,然后3秒之后標簽會自動顯示運算結果。為什么會這樣?

  上面的代碼中,get_loop()、change_form_state()、__init__()都是定義在主線程中的,窗體的狀態維護也是主線程,耗時計算calculate()是一個異步協程函數。

  現在單擊“開始計算”按鈕,這個事件發生之后,會觸發主線程的chang_form_state函數,然后在該函數中,會創建新的線程,通過新的線程創建一個事件循環,然后將協程函數注冊到新線程中的事件循環中去,達到的效果就是,主線程做主線程的,新線程做新線程的,不會造成任何阻塞。
   

  4、multithreading+asyncio總結

  第一步:定義需要異步執行的一系列操作,及一系列協程函數;

  第二步:在主線程中定義一個新的線程,然后再新線程中產生一個新的事件循環;

  第三步:在主線程中,通過asyncio.run_coroutine_threadsade(coroutine,loop)這個方法,將一系列異步方法注冊到新線程的loop里面去,這樣就是新線程賦值事件循環的執行。

  三,使用asyncio實現一個timer

  所謂的timer指的是,指定一個時間間隔,讓某一個操作隔一個時間間隔執行一次,如此周而復始。很多編程語言都提供了專門的timer實現機制,包括C++,C#等。但是Python並沒有原生支持timer,不過可以用asyncio.sleep模擬。

  大致的思想如下,將timer定義為一個異步協程,然后同事件循環去調用這個異步協程,讓事件循環不斷在這個協程中反反復復調用,只不過隔幾秒調用一次即可。

  簡單的實現如下(本例基於python3.8):

# timer start
import asyncio
# 定義協程函數傳遞參數為整數然后休眠一段時間
async def delay(time):
    await asyncio.sleep(time)

# 定義timer函數
# 該協程函數無限循環,首先創建一個future,該future調用協程函數delay
# 然后await future即等待delay(time)執行
# 然后調用回調函數function這個函數也是作為一個參數傳遞進來的
# future.all_done_all(fn) 附加可調用fn到future對象。當future對象取消或者運行完成時,調用fn
# 而這個future對象將作為它的唯一參數即函數fn只能有一個參數,這個參數就是future對象
async def timer(time,function):
    while True:
        future = asyncio.ensure_future(delay(time))
        await future
        future.add_done_callback(function)

# add_done_callback(fn)方法定義的函數只能有唯一參數,參數為future對象
def func(future):
    #print(future)
    print('done')

if __name__ == "__main__":
    asyncio.run(timer(2, func))
# timer end

  每隔2秒輸出done,無限循環

done
done
done
done
done
done
done
done
done
done
.....

  調試模式分析

 

 

 

 

 

   幾個注意點:asyncio.sleep()本身就是一個協程函數,故而可以將它封裝成一個Task或者Future,等待時間結束也就是任務完成,綁定回調函數。當然本身python語法靈活,上面只是其中一種實現而已。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM