流暢的python第十八章使用asyncio包處理並發


對比一個簡單的多線程程序和對應的 asyncio 版,說明多線程和異步任務之間的關

asyncio.Future 類與 concurrent.futures.Future 類之間的區別
摒棄線程或進程,如何使用異步編程管理網絡應用中的高並發
在異步編程中,與回調相比,協程顯著提升性能的方式
如何把阻塞的操作交給線程池處理,從而避免阻塞事件循環
使用 asyncio 編寫服務器,重新審視 Web 應用對高並發的處理方式
為什么 asyncio 已經准備好對 Python 生態系統產生重大影響

 

線程與協程對比

import threading
import itertools
import time
import sys


class Signal:
    go = True


def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status))

def slow_function():
    time.sleep(3)
    return 42

def supervisor():
    signal = Signal()
    spinner = threading.Thread(target=spin, args=('thinking!', signal))
    print('spinner object:', spinner)
    spinner.start()
    result = slow_function()
    signal.go = False
    spinner.join()
    return result

def main():
    result = supervisor()
    print('Answer:', result)

if __name__ == '__main__':
    main()

以上是threading

import asyncio
import itertools
import sys


@asyncio.coroutine
def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield  from asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
    write(' ' * len(status) + '\x08' * len(status))

@asyncio.coroutine
def slow_function():
    yield from asyncio.sleep(3)
    return 42

@asyncio.coroutine
def supervisor():
    spinner = asyncio.async(spin('thinking!'))
    print('spinner object:', spinner)
    result = yield from slow_function()
    spinner.cancel()
    return result

def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer:', result)


if __name__ == '__main__':
    main()

以上是asyncio

除非想阻塞主線程,從而凍結事件循環或整個應用,否則不要在 asyncio 協
程中使用 time.sleep(...)。如果協程需要在一段時間內什么也不做,應該使用
yield from asyncio.sleep(DELAY)

使用 @asyncio.coroutine 裝飾器不是強制要求,但是強烈建議這么做,因為這樣能在
一眾普通的函數中把協程凸顯出來,也有助於調試:如果還沒從中產出值,協程就被垃圾
回收了(意味着有操作未完成,因此有可能是個缺陷),那就可以發出警告。這個裝飾器
不會預激協程。

線程與協程之間的比較還有最后一點要說明:如果使用線程做過重要的編程,你就知道寫
出程序有多么困難,因為調度程序任何時候都能中斷線程。必須記住保留鎖,去保護程序
中的重要部分,防止多步操作在執行的過程中中斷,防止數據處於無效狀態。
而協程默認會做好全方位保護,以防止中斷。我們必須顯式產出才能讓程序的余下部分運
行。對協程來說,無需保留鎖,在多個線程之間同步操作,協程自身就會同步,因為在任
意時刻只有一個協程運行。想交出控制權時,可以使用 yield 或 yield from 把控制權
交還調度程序。這就是能夠安全地取消協程的原因:按照定義,協程只能在暫停的 yield
處取消,因此可以處理 CancelledError 異常,執行清理操作。

asyncio與concurrent.future的區別

期物只是調度執行某物的結果。在 asyncio 包
中,BaseEventLoop.create_task(...) 方法接收一個協程,排定它的運行時間,然后
返回一個 asyncio.Task 實例——也是 asyncio.Future 類的實例,因為 Task 是
Future 的子類,用於包裝協程。這與調用 Executor.submit(...) 方法創建
concurrent.futures.Future 實例是一個道理。
與 concurrent.futures.Future 類似,asyncio.Future 類也提供了
.done()、.add_done_callback(...) 和 .result() 等方法。前兩個方法的用法與
17.1.3 節所述的一樣,不過 .result() 方法差別很大。
asyncio.Future 類的 .result() 方法沒有參數,因此不能指定超時時間。此外,如果
調用 .result() 方法時期物還沒運行完畢,那么 .result() 方法不會阻塞去等待結果,
而是拋出 asyncio.InvalidStateError 異常。
然而,獲取 asyncio.Future 對象的結果通常使用 yield from,從中產出結果,如示例
18-8 所示。
使用 yield from 處理期物,等待期物運行完畢這一步無需我們關心,而且不會阻塞事件
循環,因為在 asyncio 包中,yield from 的作用是把控制權還給事件循環。
注意,使用 yield from 處理期物與使用 add_done_callback 方法處理協程的作用一
樣:延遲的操作結束后,事件循環不會觸發回調對象,而是設置期物的返回值;而 yield
from 表達式則在暫停的協程中生成返回值,恢復執行協程。
總之,因為 asyncio.Future 類的目的是與 yield from 一起使用,所以通常不需要使
用以下方法。
無需調用 my_future.add_done_callback(...),因為可以直接把想在期物運行結
束后執行的操作放在協程中 yield from my_future 表達式的后面。這是協程的一
大優勢:協程是可以暫停和恢復的函數。
無需調用 my_future.result(),因為 yield from 從期物中產出的值就是結果
(例如,result = yield from my_future)。
當然,有時也需要使用 .done()、.add_done_callback(...) 和 .result() 方法。但
是一般情況下,asyncio.Future 對象由 yield from 驅動,而不是靠調用這些方法驅
動。

對協程來說,獲取 Task 對象有兩種主要方式。
asyncio.async(coro_or_future, *, loop=None)
  這個函數統一了協程和期物:第一個參數可以是二者中的任何一個。如果是 Future
或 Task 對象,那就原封不動地返回。如果是協程,那么 async 函數會調用
loop.create_task(...) 方法創建 Task 對象。loop= 關鍵字參數是可選的,用於傳入
事件循環;如果沒有傳入,那么 async 函數會通過調用 asyncio.get_event_loop() 函
數獲取循環對象。
BaseEventLoop.create_task(coro)
  這個方法排定協程的執行時間,返回一個 asyncio.Task 對象。如果在自定義的
BaseEventLoop 子類上調用,返回的對象可能是外部庫(如 Tornado)中與 Task 類兼容
的某個類的實例。

使用 asyncio 包時,我們編寫的異步代碼中包含由 asyncio 本身驅動的
協程(即委派生成器),而生成器最終把職責委托給 asyncio 包或第三方庫(如
aiohttp)中的協程。這種處理方式相當於架起了管道,讓 asyncio 事件循環(通過我
們編寫的協程)驅動執行低層異步 I/O 操作的庫函數。

import asyncio

import aiohttp

from ..chapter17.flags import BASE_URL, save_flag, show, main


@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    image = yield from resp.read()
    return image

@asyncio.coroutine
def download_one(cc):
    image = yield from get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop()
    to_do = [download_one(cc) for cc in sorted(cc_list)]
    wait_coro = asyncio.wait(to_do)
    res, _ = loop.run_until_complete(wait_coro)
    loop.close()

    return len(res)


if __name__ == '__main__':
    main(download_many)

有兩種方法能避免阻塞型調用中止整個應用程序的進程:
在單獨的線程中運行各個阻塞型操作
把每個阻塞型操作轉換成非阻塞的異步調用使用

 

現在你應該能理解為什么 flags_asyncio.py 腳本的性能比 flags.py 腳本高 5 倍了:flags.py
腳本依序下載,而每次下載都要用幾十億個 CPU 周期等待結果。其實,CPU 同時做了很
多事,只是沒有運行你的程序。與此相比,在 flags_asyncio.py 腳本中,在
download_many 函數中調用 loop.run_until_complete 方法時,事件循環驅動各個
download_one 協程,運行到第一個 yield from 表達式處,那個表達式又驅動各個
get_flag 協程,運行到第一個 yield from 表達式處,調用 aiohttp.request(...)
函數。這些調用都不會阻塞,因此在零點幾秒內所有請求全部開始。
asyncio 的基礎設施獲得第一個響應后,事件循環把響應發給等待結果的 get_flag 協
程。得到響應后,get_flag 向前執行到下一個 yield from 表達式處,調用
resp.read() 方法,然后把控制權還給主循環。其他響應會陸續返回(因為請求幾乎同
時發出)。所有 get_ flag 協程都獲得結果后,委派生成器 download_one 恢復,保存
圖像文件。

因為異步操作是交叉執行的,所以並發下載多張圖像所需的總時間比依序下載少得多。我
使用 asyncio 包發起了 600 個 HTTP 請求,獲得所有結果的時間比依序下載快 70 倍。

 

關於concurrent.future模塊以及asyncio模塊的內容不容易理解,需要查閱其他資料,另寫一篇博文。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM