Python異步IO之協程(一):從yield from到async的使用


引言:協程(coroutine)是Python中一直較為難理解的知識,但其在多任務協作中體現的效率又極為的突出。眾所周知,Python中執行多任務還可以通過多進程或一個進程中的多線程來執行,但兩者之中均存在一些缺點。因此,我們引出了協程。

Tips

欲看完整代碼請見:我的GitHub

為什么需要協程?
首先,我們需要知道同步和異步是什么東東,不知道的看詳解。
簡單來說:
【同步】:就是發出一個“調用”時,在沒有得到結果之前,該“調用”就不返回,“調用者”需要一直等待該“調用”結束,才能進行下一步工作。
【異步】:“調用”在發出之后,就直接返回了,也就沒有返回結果。“被調用者”完成任務后,通過狀態來通知“調用者”繼續回來處理該“調用”。

下面我們先來看一個用普通同步代碼實現多個IO任務的案例。

# 普通同步代碼實現多個IO任務
import time
def taskIO_1():
    print('開始運行IO任務1...')
    time.sleep(2)  # 假設該任務耗時2s
    print('IO任務1已完成,耗時2s')
def taskIO_2():
    print('開始運行IO任務2...')
    time.sleep(3)  # 假設該任務耗時3s
    print('IO任務2已完成,耗時3s')

start = time.time()
taskIO_1()
taskIO_2()
print('所有IO任務總耗時%.5f秒' % float(time.time()-start))

執行結果:

開始運行IO任務1...
IO任務1已完成,耗時2s
開始運行IO任務2...
IO任務2已完成,耗時3s
所有IO任務總耗時5.00604秒

上面,我們順序實現了兩個同步IO任務taskIO_1()和taskIO_2(),則最后總耗時就是5秒。我們都知道,在計算機中CPU的運算速率要遠遠大於IO速率,而當CPU運算完畢后,如果再要閑置很長時間去等待IO任務完成才能進行下一個任務的計算,這樣的任務執行效率很低。

所以我們需要有一種異步的方式來處理類似上述任務,會極大增加效率(當然就是協程啦~)。而我們最初很容易想到的,是能否在上述IO任務執行前中斷當前IO任務(對應於上述代碼time.sleep(2)),進行下一個任務,當該IO任務完成后再喚醒該任務。

而在Python中生成器中的關鍵字yield可以實現中斷功能。所以起初,協程是基於生成器的變形進行實現的,之后雖然編碼形式有變化,但基本原理還是一樣的。戳我查看生成器及迭代器和可迭代對象的講解和區別。

一、使用yield from和@asyncio.coroutine實現協程
在Python3.4中,協程都是通過使用yield from和asyncio模塊中的@asyncio.coroutine來實現的。asyncio專門被用來實現異步IO操作。

(1)什么是yield from?和yield有什么區別?
【1】我們都知道,yield在生成器中有中斷的功能,可以傳出值,也可以從函數外部接收值,而yield from的實現就是簡化了yield操作。
讓我們先來看一個案例:

def generator_1(titles):
    yield titles
def generator_2(titles):
    yield from titles

titles = ['Python','Java','C++']
for title in generator_1(titles):
    print('生成器1:',title)
for title in generator_2(titles):
    print('生成器2:',title)

執行結果如下:

生成器1: ['Python', 'Java', 'C++']
生成器2: Python
生成器2: Java
生成器2: C++

在這個例子中yield titles返回了titles完整列表,而yield from titles實際等價於:

for title in titles: # 等價於yield from titles
    yield title 

【2】而yield from功能還不止於此,它還有一個主要的功能是省去了很多異常的處理,不再需要我們手動編寫,其內部已經實現大部分異常處理。

【舉個例子】:下面通過生成器來實現一個整數加和的程序,通過send()函數向生成器中傳入要加和的數字,然后最后以返回None結束,total保存最后加和的總數。

def generator_1():
    total = 0
    while True:
        x = yield 
        print('',x)
        if not x:
            break
        total += x
    return total
def generator_2(): # 委托生成器
    while True:
        total = yield from generator_1() # 子生成器
        print('加和總數是:',total)
def main(): # 調用方
    g1 = generator_1()
    g1.send(None)
    g1.send(2)
    g1.send(3)
    g1.send(None)
    # g2 = generator_2()
    # g2.send(None)
    # g2.send(2)
    # g2.send(3)
    # g2.send(None)
    
main()

執行結果如下。可見對於生成器g1,在最后傳入None后,程序退出,報StopIteration異常並返回了最后total值是5。

加 23
加 None
------------------------------------------
StopIteration       
<ipython-input-37-cf298490352b> in main()
---> 19     g1.send(None)
StopIteration: 5

如果把g1.send()那5行注釋掉,解注下面的g2.send()代碼,則結果如下。可見yield from封裝了處理常見異常的代碼。對於g2即便傳入None也不報異常,其中total = yield from generator_1()返回給total的值是generator_1()最終的return total

加 23
加 None
加和總數是: 5

【3】借用上述例子,這里有幾個概念需要理一下:

【子生成器】:yield from后的generator_1()生成器函數是子生成器
【委托生成器】:generator_2()是程序中的委托生成器,它負責委托子生成器完成具體任務。
【調用方】:main()是程序中的調用方,負責調用委托生成器。
yield from在其中還有一個關鍵的作用是:建立調用方和子生成器的通道,

在上述代碼中main()每一次在調用send(value)時,value不是傳遞給了委托生成器generator_2(),而是借助yield from傳遞給了子生成器generator_1()中的yield
同理,子生成器中的數據也是通過yield直接發送到調用方main()中。
之后我們的代碼都依據調用方-子生成器-委托生成器的規范形式書寫。

(2)如何結合@asyncio.coroutine實現協程
那yield from通常用在什么地方呢?在協程中,只要是和IO任務類似的、耗費時間的任務都需要使用yield from來進行中斷,達到異步功能!
我們在上面那個同步IO任務的代碼中修改成協程的用法如下:

# 使用同步方式編寫異步功能
import time
import asyncio
@asyncio.coroutine # 標志協程的裝飾器
def taskIO_1():
    print('開始運行IO任務1...')
    yield from asyncio.sleep(2)  # 假設該任務耗時2s
    print('IO任務1已完成,耗時2s')
    return taskIO_1.__name__
@asyncio.coroutine # 標志協程的裝飾器
def taskIO_2():
    print('開始運行IO任務2...')
    yield from asyncio.sleep(3)  # 假設該任務耗時3s
    print('IO任務2已完成,耗時3s')
    return taskIO_2.__name__
@asyncio.coroutine # 標志協程的裝飾器
def main(): # 調用方
    tasks = [taskIO_1(), taskIO_2()]  # 把所有任務添加到task中
    done,pending = yield from asyncio.wait(tasks) # 子生成器
    for r in done: # done和pending都是一個任務,所以返回結果需要逐個調用result()
        print('協程無序返回值:'+r.result())

if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop() # 創建一個事件循環對象loop
    try:
        loop.run_until_complete(main()) # 完成事件循環,直到最后一個任務結束
    finally:
        loop.close() # 結束事件循環
    print('所有IO任務總耗時%.5f秒' % float(time.time()-start))

執行結果如下:

開始運行IO任務1...
開始運行IO任務2...
IO任務1已完成,耗時2s
IO任務2已完成,耗時3s
協程無序返回值:taskIO_2
協程無序返回值:taskIO_1
所有IO任務總耗時3.00209秒

【使用方法】: @asyncio.coroutine飾器是協程函數的標志,我們需要在每一個任務函數前加這個裝飾器,並在函數中使用yield from。在同步IO任務的代碼中使用的time.sleep(2)來假設任務執行了2秒。但在協程中yield from后面必須是子生成器函數,而time.sleep()並不是生成器,所以這里需要使用內置模塊提供的生成器函數asyncio.sleep()
【功能】:通過使用協程,極大增加了多任務執行效率,最后消耗的時間是任務隊列中耗時最多的時間。上述例子中的總耗時3秒就是taskIO_2()的耗時時間。
【執行過程】:

   1、上面代碼先通過get_event_loop()獲取了一個標准事件循環loop(因為是一個,所以協程是單線程)
   2、然后,我們通過run_until_complete(main())來運行協程(此處把調用方協程main()作為參數,調用方負責調用其他委托生成器),run_until_complete的特點就像該函數的名                    字,直到循環事件的所有事件都處理完才能完整結束。
           3、進入調用方協程,我們把多個任務[taskIO_1()taskIO_2()]放到一個task列表中,可理解為打包任務。
           4、現在,我們使用asyncio.wait(tasks)來獲取一個awaitable objects即可等待對象的集合(此處的aws是協程的列表),並發運行傳入的aws,同時通過yield from返回一個包含                    (done, pending)的元組,done表示已完成的任務列表pending表示未完成的任務列表;如果使用asyncio.as_completed(tasks)則會按完成順序生成協程的迭代器(常用於                    for循環中),因此當你用它迭代時,會盡快得到每個可用的結果。【此外,當輪詢到某個事件時(如taskIO_1()),直到遇到該任務中的yield from中斷開始處理下一個事件                  (如taskIO_2())),當yield from后面的子生成器完成任務時,該事件才再次被喚醒】
           5、因為done里面有我們需要的返回結果,但它目前還是個任務列表,所以要取出返回的結果值,我們遍歷它並逐個調用result()取出結果即可。(注:對於asyncio.wait()和                        asyncio.as_completed()返回的結果均是先完成的任務結果排在前面,所以此時打印出的結果不一定和原始順序相同,但使用gather()的話可以得到原始順序的結果集,兩                    者更詳細的案例說明見此)
           6、最后我們通過loop.close()關閉事件循環。
   綜上所述:協程的完整實現是靠①事件循環+②協程。

二、使用async和await實現協程
在Python 3.4中,我們發現很容易將協程和生成器混淆(雖然協程底層就是用生成器實現的),所以在后期加入了其他標識來區別協程和生成器。

在Python 3.5開始引入了新的語法async和await,以簡化並更好地標識異步IO。

要使用新的語法,只需要做兩步簡單的替換:

  把@asyncio.coroutine替換為async
  把yield from替換為await
 更改上面的代碼如下,可得到同樣的結果:

import time
import asyncio
async def taskIO_1():
    print('開始運行IO任務1...')
    await asyncio.sleep(2)  # 假設該任務耗時2s
    print('IO任務1已完成,耗時2s')
    return taskIO_1.__name__
async def taskIO_2():
    print('開始運行IO任務2...')
    await asyncio.sleep(3)  # 假設該任務耗時3s
    print('IO任務2已完成,耗時3s')
    return taskIO_2.__name__
async def main(): # 調用方
    tasks = [taskIO_1(), taskIO_2()]  # 把所有任務添加到task中
    done,pending = await asyncio.wait(tasks) # 子生成器
    for r in done: # done和pending都是一個任務,所以返回結果需要逐個調用result()
        print('協程無序返回值:'+r.result())

if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop() # 創建一個事件循環對象loop
    try:
        loop.run_until_complete(main()) # 完成事件循環,直到最后一個任務結束
    finally:
        loop.close() # 結束事件循環
    print('所有IO任務總耗時%.5f秒' % float(time.time()-start))

三、總結

最后我們將整個過程串一遍。

【引出問題】:

  1. 同步編程的並發性不高
  2. 多進程編程效率受CPU核數限制,當任務數量遠大於CPU核數時,執行效率會降低。
  3. 多線程編程需要線程之間的通信,而且需要鎖機制來防止共享變量被不同線程亂改,而且由於Python中的GIL(全局解釋器鎖),所以實際上也無法做到真正的並行

【產生需求】:

  1. 可不可以采用同步的方式來編寫異步功能代碼?
  2. 能不能只用一個單線程就能做到不同任務間的切換?這樣就沒有了線程切換的時間消耗,也不用使用鎖機制來削弱多任務並發效率!
  3. 對於IO密集型任務,可否有更高的處理方式來節省CPU等待時間?

【結果】:所以協程應運而生。當然,實現協程還有其他方式和函數,以上僅展示了一種較為常見的實現方式。此外,多進程和多線程是內核級別的程序,而協程是函數級別的程序,是可以通過程序員進行調用的。以下是協程特性的總結:

協程 屬性
所需線程

單線程

(因為僅定義一個loop,所有event均在一個loop中)

編程方式 同步
實現效果 異步
是否需要鎖機制
程序級別 函數級
實現機制 事件循環+協程
總耗時 最耗時事件的時間
應用場景 IO密集型任務等

【額外加餐】:使用tqdm庫實現進度條
這是一個免費的庫:tqdm是一個用來生成進度條的優秀的庫。這個協程就像asyncio.wait一樣工作,不過會顯示一個代表完成度的進度條。詳情見:python進度可視化

async def wait_with_progress(coros):
    for f in tqdm.tqdm(asyncio.as_completed(coros), total=len(coros)):
        await f
from tqdm import tqdm
for i in tqdm(range(10000)):
    pass

使用tqdm實現效果:

四、結束語
感謝大家能耐心讀到這里,寫了這么多文字,再來個真實的案例實戰一下效果更佳哦~!
以下是一個協程在爬蟲的應用實戰案例,其中對比了分布式多進程爬蟲,最后將異步爬蟲和多進程爬蟲融合(含案例以及耗時對比),效果更好。我們可以先提前來通過一幅圖看清多進程和協程的爬蟲之間的原理及其區別。(圖片來源於網絡)

這里,異步爬蟲不同於多進程爬蟲,它使用單線程(即僅創建一個事件循環,然后把所有任務添加到事件循環中)就能並發處理多任務。在輪詢到某個任務后,當遇到耗時操作(如請求URL)時,掛起該任務並進行下一個任務,當之前被掛起的任務更新了狀態(如獲得了網頁響應),則被喚醒,程序繼續從上次掛起的地方運行下去。極大的減少了中間不必要的等待時間。

【參考文獻】:

[1] Python協程:從yield/send到async/await

[2] 廖雪峰.協程

[3] Python 3.7.2文檔.協程與任務

[4] 加速爬蟲: 異步加載 Asyncio

[5]《python高級編程》異步IO和協程(視頻)

[6] 關於同步、異步與阻塞、非阻塞的理解

[7] python:利用asyncio進行快速抓取(aiohttp)

[8] 控制組合式 Coroutines


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM