asyncio:python3未來並發編程主流、充滿野心的模塊


介紹

asyncio是Python在3.5中正式引入的標准庫,這是Python未來的並發編程的主流,非常重要的一個模塊。有一個web框架叫sanic,就是基於asyncio,語法和flask類似,使用sanic可以達到匹配go語言的並發量,但無奈第三方組件太少。

asyncio模塊提供了使用協程構建並發應用的工具。threading模塊通過應用線程實現並發,multiprocessing使用系統進程實現並發,asyncio使用一種單線程、單進程模式實現並發,應用的各個部分會彼此合作,在最優的時刻顯式的切換任務。大多數情況下,會在程序阻塞等待讀寫數據時發生這種上下文切換,不過asyncio也支持調度代碼在將來的某個特定時間運行,從而支持一個協程等待另一個協程完成,以處理系統信號和識別其他一些事件(這些事件可能導致應用改變其工作內容)

asyncio中,有幾個非常重要的概念。

  • coroutine 協程協程對象,指一個使用async關鍵字定義的函數,它的調用不會立即執行函數,而是會返回一個協程對象。協程對象需要注冊到事件循環,由事件循環調用。
  • future 未來對象:在asyncio中,如何才能得知異步調用的結果呢?先設計一個對象,異步調用執行完的時候,就把結果放在它里面。這種對象稱之為未來對象。未來對象有一個result屬性,用於存放未來的執行結果。還有個set_result()方法,是用於設置result的,future可以看作為下面的task的容器。
  • task 任務:一個協程對象就是一個原生可以掛起的函數,任務則是對協程進一步封裝,其中包含任務的各種狀態。
  • event_loop 事件循環:程序開啟一個無限的循環,程序員會把一些函數注冊到事件循環上。當滿足事件發生的時候,調用相應的協程函數。
  • async/await 關鍵字python3.5 開始引入的用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。

特點

使用其他並發模型的大多數程序都采用線性方式編寫,而且依賴於語言運行時系統或操作系統的底層線程或進程管理來適當地改變上下文。基於asyncio的應用要求應用代碼顯式地處理上下文切換,要正確地使用相關技術,這取決於是否能正確理解一些相關聯的概念。

asyncio提供的框架以一個事件循環(event loop)為中心,這是一個首類對象,負責高效地處理I/O事件、系統事件、和應用上下文切換。目前已經提供了多個循環實現來高效地利用操作系統的功能。盡管通常會自動選擇一個合理的默認實現,但也完全可以在應用中選擇某個特定的事件循環實現。

與事件循環交互的應用要顯式地注冊將運行的代碼,讓事件循環在資源可用時向應用代碼發出必要的調用。

例如:一個網絡服務器打開套接字,然后注冊為當這些套接字上出現輸入事件時服務器要得到的通知。

事件循環在建立一個新的進入鏈接或者在數據可讀取時都會提醒服務器代碼。當前上下文中沒有更多工作可做時,應用代碼要再次短時間地交出控制權。

例如:如果一個套接字沒有更多的數據可以接收,那么服務器會把控制權交給事件循環

所以,就是把事件注冊到事件循環中,不斷地循環這些事件,可以處理了那么就去處理,如果卡住了,那么把控制權交給事件循環,繼續執行其他可執行的任務。

像傳統的twisted、gevent、以及tornado,都是采用了事件循環的方式,這種模式只適用於高I/O,低CPU的場景,一旦出現了耗時的復雜運算,那么所有任務都會被卡住。

將控制權交給事件循環的機制依賴於協程(coroutine),這是一些特殊的函數,可以將控制返回給調用者而不丟失其狀態。

協程與生成器非常類似,實際上,在python3.5版本之前還未對協程提供原生支持時,可以用生成器來實現協程。asyncio還為協議(protocol)和傳輸(transport)提供了一個基於類的抽象層,可以使用回調編寫代碼而不是直接編寫協程。在基於類的模型和協程模型時,可以通過重新進入事件循環顯式地改變上下文,以取代python多線程實現中隱式的上下文改變

創建一個協程並執行

import asyncio

'''
協程是一個專門設計用來實現並發操作的語言構造。
在早期版本,是使用yield來模擬協程,但它本質上是一個生成器。
但是從python3.5開始,python已經支持原生協程。
調用協程會創建一個協程對象,協程可以使用await關鍵字(並提供另一個協程)暫停執行。
暫停時,這個協程的狀態會保留,使得下一次被喚醒時可以從暫停的地方恢復執行
'''


# 使用async def可以直接定義一個協程
async def coroutine():
    print("in coroutine")


# 創建事件循環
loop = asyncio.get_event_loop()
try:
    print("start coroutine")
    # 協程是無法直接運行的,必須要扔到事件循環里,讓事件循環驅動運行
    coro = coroutine()
    print("entering event loop")
    # 必須扔到事件循環里,這個方法的含義從名字也能看出來,直到協程運行完成
    loop.run_until_complete(coro)
finally:
    print("closing event loop")
    # 關閉事件循環
    loop.close()
'''
start coroutine
entering event loop
in coroutine
closing event loop
'''

# 第一步是得到事件循環的引用。
# 可以使用默認地循環類型,也可以實例化一個特定的循環類。
# run_until_complete方法啟動協程,協程退出時這個方法會停止循環

值得一提的是,從python3.7開始,async和await已經是關鍵字了,我們之前說的關鍵字其實是保留關鍵字,意思是你可以使用async和await作為變量名,但是在python3.7之后,就不可以了。另外在python3.7中還提供了另一種運行協程的方法

import asyncio


async def coroutine():
    print("in coroutine")


coro = coroutine()

# 我們可以直接調用asyncio.run(coroutine)來啟動一個協程
# 使用這種方法運行必須確保python的版本不低於python3.7
asyncio.run(coro)
"""
in coroutine
"""

如果協程有返回值呢?

import asyncio

'''
我們也可以獲取協程的返回值
'''


async def coroutine():
    print("in coroutine")
    return "result"


loop = asyncio.get_event_loop()
try:
    coro = coroutine()
    result = loop.run_until_complete(coro)
    print(result)
finally:
    loop.close()
'''
in coroutine
result
'''
# 在這里,run_until_complete還會返回它等待的協程的結果

對於asyncio.run來說,也是一樣的

import asyncio

'''
我們也可以獲取協程的返回值
'''


async def coroutine():
    print("in coroutine")
    return "result"


coro = coroutine()
result = asyncio.run(coro)
print(result)
"""
in coroutine
result
"""

多個協程合作

如果有多個協程呢?

import asyncio

'''
一個協程還可以驅動另一個協程並等待結果,從而可以更容易地將一個任務分解為可重用的部分。
'''


async def worker():
    print("worker....")
    # 使用await方法會驅動協程consumer執行,並得到其返回值
    # 這里類似於函數調用一樣,但是呢,協程需要加上一個await
    res = await consumer()
    print(res)


async def consumer():
    return "i am consumer"


asyncio.run(worker())
"""
worker....
i am consumer
"""
# 在這里,使用await關鍵字,而不是向循環中增加新的協程。因為控制流已經在循環管理的一個協程中,所以沒必要告訴事件循環來管理這些協程。
# 另外,協程可以並發運行,但前提是多個協程。這個協程卡住了,可以切換到另一個協程。但是就卡住的協程本身來說,該卡多長時間還是多長時間,不可能說跳過卡住的部分執行下面的代碼。

另外我們還可以通過裝飾器來模擬協程

import asyncio

'''
協程函數時asyncio設計中的關鍵部分。
它們提供了一個語言構造,可以停止程序某一部分的執行,保留這個調用的狀態,並在以后重新進入這個狀態,這些動作都是並發框架很重要的功能。

python3.5中引入了一些新的語言特性,可以使用async def以原生方式定義這些協程,以及使用await交出控制,asyncio的例子應用了這些新特性。
但是早期版本,可以使用asyncio.coroutine裝飾器將函數裝飾成一個協程並使用yield from來達到同樣的效果。
'''


@asyncio.coroutine
def worker():
    print("worker....")
    res = yield from consumer()
    print(res)


@asyncio.coroutine
def consumer():
    return "i am consumer"


asyncio.run(worker())
"""
worker....
i am consumer
"""

# 我們看到使用生成器依舊可以達到這樣的效果,然而盡管使用生成器可以達到同樣的效果,但還是推薦使用async和await
'''
生成器既可以做生成器,又可以包裝為協程,那么它到底是協程還是生成器呢?這會使得代碼出現混亂
生成器既然叫生成器,那么就應該做自己
基於async的原生協程比使用yield裝飾器的協程要快,大概快10-20%
'''
# 並且在python3.8中,已經警告了,不建議使用這種方法,定義一個協程應該使用async def

調用常規函數

call_soon

可以使用這個函數給協程綁定一個回調,從名字也能看出來是立即執行,只不過是遇到阻塞立即執行。

import asyncio
from functools import partial

'''
除了管理協程和I/P回調,asyncio事件循環還可以根據循環中保存的一個定時器值來調度常規函數調用。
'''
# 如果回調的時間不重要,那么可以使用call_soon調度下一次循環迭代的調用


def callback(*args, **kwargs):
    print("callback:", args, kwargs)


async def main(loop):
    print("register callback")
    # 接收一個回調函數,和參數
    loop.call_soon(callback, "mashiro", 16)
    print("********")
    # 另外call_soon不支持使用關鍵字參數來向回調傳遞參數
    # 所以如果想使用關鍵字參數,需要使用偏函數轉換一下
    # 其實不僅是這里的call_sonn,以及后面要介紹的call_later和call_at都不支持使用關鍵字參數來向回調傳遞參數
    # 因此如果不想使用偏函數來包裝的話,就直接使用位置參數就可以了
    wrapped = partial(callback, kwargs={"name": "satori", "age": 16})
    loop.call_soon(wrapped, "mahsiro", 16)
    print("—————————")

    await asyncio.sleep(0.6)


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
register callback
********
—————————
callback: ('mashiro', 16) {}
callback: ('mahsiro', 16) {'kwargs': {'name': 'satori', 'age': 16}}
"""
# 另外我們發現我們在調用了call_soon之后沒有立刻執行,而是先進性了print
# 這是因為只有在遇到阻塞才會立刻執行,所以當遇到await asyncio.sleep的時候會去執行
# 另外這里的阻塞,不能是time.sleep,必須是可以awaitable的阻塞

call_later

同樣是給一個協程綁定一個回調,但是從名字也能看出來這需要指定一個時間,表示多長時間之后調用。

import asyncio

'''
要將回調推遲到將來的某個時間調用,可以使用call_later。這個方法的第一個參數是延遲時間(單位為秒),第二個參數是回調。
'''


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(0.2, callback, "call_later", "0.2s")
    loop.call_later(0.1, callback, "call_later", "0.1s")
    loop.call_soon(callback, "call_soon", "None")
    print("-----------")
    await asyncio.sleep(0.6)


event_loop = asyncio.get_event_loop()
try:
    print("entering event loop")
    event_loop.run_until_complete(main(event_loop))
finally:
    print("closing event loop")
    event_loop.close()

'''
entering event loop
register callback
-----------
call_soon None
call_later 0.1s
call_later 0.2s
closing event loop
'''
# 我們注意一下main里面的第二個print
# 我們看到無論是call_soon還是call_later都是在第二個print結束之后才調用
# 說明call_later和call_soon一樣,都是在遇到異步io阻塞、比如asyncio.sleep之后才會執行

# 但是一提的是,對於call_later來說,計時是從注冊回調的那一刻就已經開始了
# 比如call_later中注冊了5s后調用,但是遇見了asyncio.sleep(4)的時候過去了兩秒,那么執行call_later注冊的回調只需要再過3s就可以了。
# 如果回調瞬間執行完畢的話,那么asyncio.sleep了4秒,只需再過1秒,asyncio.sleep也執行完畢

# 但是如果其他條件不變,而asyncio.sleep(2)怎么辦?執行call_later注冊的回調需要3s,但是asyncio.sleep異步阻塞只有2s
# 那么不好意思,程序會繼續往下走,因為asyncio.sleep結束之后,還需要1s才會執行call_later指定的回調。
# 所以程序向下執行,直到出現下一個異步io阻塞,如果不是異步io阻塞的話,那么call_later指定的回調也是不會執行的
# 因此:執行回調,是指在遇見異步io阻塞的時候才會執行
# 而call_soon則是只要遇見異步io就會執行,即使遇見異步io,call_later已經等待完畢,執行的先后順序依舊是call_soon先執行

我們來驗證一下

import asyncio
import time


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(0.2, callback, "call_later", "0.2s")
    loop.call_later(0.1, callback, "call_later", "0.1s")
    loop.call_soon(callback, "call_soon", "None")
    # time.sleep不是異步io,它是一個同步io
    time.sleep(1)
    # 當time.sleep(1)之后call_later和call_soon肯定都會執行,因為call_later里面指定的是0.2和0.1,比1小
    await asyncio.sleep(0.6)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
register callback
call_soon None
call_later 0.1s
call_later 0.2s
'''

再來看看call_later

import asyncio
import time


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(2, callback, "call_later", "0.2s")
    print("call_later注冊完畢")
    # 這里執行完畢,call_later還沒有開始
    await asyncio.sleep(1.5)
    # 1.5 + 1肯定比2大,所以time.sleep(1)之后call_later里面的指定的時間肯定已經過了
    time.sleep(1)
    print("就算時間過了,我還是比call_later指定的回調先執行,因為沒有異步io阻塞")
    print("就算時間過了,我還是比call_later指定的回調先執行,因為沒有異步io阻塞")
    print("就算時間過了,我還是比call_later指定的回調先執行,因為沒有異步io阻塞")
    print("就算時間過了,我還是比call_later指定的回調先執行,因為沒有異步io阻塞")
    await asyncio.sleep(0.1)
    print("完了,我上面出現了異步io阻塞,我要比call_later指定的回調后執行了")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
register callback
call_later注冊完畢
就算時間過了,我還是比call_later指定的回調先執行,因為沒有異步io阻塞
就算時間過了,我還是比call_later指定的回調先執行,因為沒有異步io阻塞
就算時間過了,我還是比call_later指定的回調先執行,因為沒有異步io阻塞
就算時間過了,我還是比call_later指定的回調先執行,因為沒有異步io阻塞
call_later 0.2s
完了,我上面出現了異步io阻塞,我要比call_later指定的回調后執行了
'''
import asyncio


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(2, callback, "call_later", "0.2s")
    print("call_later注冊完畢")
    await asyncio.sleep(1)
    print("call_later指定的回調能執行嗎")
    print("call_later指定的回調能執行嗎")
    print("call_later指定的回調能執行嗎")
    print("不能")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
register callback
call_later注冊完畢
call_later指定的回調能執行嗎
call_later指定的回調能執行嗎
call_later指定的回調能執行嗎
不能
'''
# 我們看到call_later指定的回調沒有執行程序就退出了
# 這是因為main里面的代碼全部執行完,之后call_later指定的時間還沒有到
# 所以直接退出了
import asyncio
import time


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(2, callback, "call_later", "0.2s")
    print("call_later注冊完畢")
    await asyncio.sleep(1)
    time.sleep(1)
    print("call_later指定的回調能執行嗎")
    print("call_later指定的回調能執行嗎")
    print("call_later指定的回調能執行嗎")
    print("能,因為時間已經到了")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
register callback
call_later注冊完畢
call_later指定的回調能執行嗎
call_later指定的回調能執行嗎
call_later指定的回調能執行嗎
能,因為時間已經到了
call_later 0.2s
'''
# 盡管一致沒有出現異步io,但是當代碼全部執行完之后,call_later指定的時間已經到了
# 所以會在最后執行它

call_at

call_at,在指定的時間執行

import asyncio
import time

'''
除了call_soon瞬間執行,和call_later延遲執行之外,還有一個call_at在指定之間內執行。
實現這個目的的循環依賴於一個單調時鍾,而不是牆上的時鍾時間,以確保now時間絕對不會逆轉。
要為一個調度回調選擇時間,必須使用循環的time方法從這個時鍾的內部開始
'''


def callback(cb, loop):
    print(f"callback {cb} invoked at {loop.time()}")


async def main(loop):
    now = loop.time()
    print("clock time:", time.time())
    print("loop time:", now)
    print("register callback")
    # 表示在當前時間(time = loop.time())之后的0.2s執行,個人覺得類似於call_later
    loop.call_at(now + 0.2, callback, "call_at", loop)
    time.sleep(1)
    print("是先打印我呢?還是先執行call_at或者call_sonn呢")
    await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
clock time: 1573291054.068545
loop time: 160079.265
register callback
是先打印我呢?還是先執行call_at或者call_sonn呢
callback call_at invoked at 160079.453
'''
# 所以這和call_later也是類似的,都是在遇到io阻塞之后才會執行

以上三者的執行順序

首先在遇到異步io阻塞的時候,call_soon是立刻執行,call_later和call_at是需要等指定過了才會執行,如果時間沒到,那么執行順序肯定是call_soon最先,這沒問題。但是,如果當遇到一個異步io阻塞的時候,call_later和call_at所指定的時間都過了,那么這個三者的執行順序是怎么樣的呢?

import asyncio


def callback(cb):
    print(f"callback {cb}")


async def main(loop):
    now = loop.time()
    loop.call_at(now + 0.2, callback, "call_at")
    loop.call_later(0.2, callback, "call_later")
    loop.call_soon(callback, "call_soon")

    #await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
callback call_soon
'''
# 首先我們發現,如果沒有異步io阻塞,那么最終只有call_soon會執行
import asyncio
import time


def callback(cb):
    print(f"callback {cb}")


async def main(loop):
    now = loop.time()
    loop.call_at(now + 0.3, callback, "call_at")
    loop.call_later(0.2, callback, "call_later")
    loop.call_soon(callback, "call_soon")
    time.sleep(1)
    await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

'''
callback call_soon
callback call_later
callback call_at
'''
# 遇到異步io,那么call_soon仍然最先執行
# 至於call_later和call_at,則是兩者指定的時間哪個先到,先執行哪個

task與future

對於協程來說,是沒有辦法直接放到事件循環里面運行的,需要的是task。而我們剛才之所以直接將協程扔進去,是因為asyncio內部會有檢測機制,如果是協程的話,會自動將協程包裝成一個task

import asyncio


async def coroutine():
    print(123)


loop = asyncio.get_event_loop()
# 如何創建一個任務呢?
task = loop.create_task(coroutine())
loop.run_until_complete(task)
"""
123
"""

因此一個協程是一個可以原生掛起的函數,而一個task則是對協程的進一步封裝,里面包含了任務的各種執行狀態。

future被稱之為未來對象,我覺得可以把它看成是task的容器,每一個task運行之后都會返回一個future,這個future可以獲取task的執行狀態、返回值等等,並且還能夠給task設置相應的返回值。所以執行的是任務,future是用來管理和查看的。

import asyncio

'''
Future表示還未完成的工作的結果。事件循環可以通過監視一個Future對象的狀態來指示它已經完成,從而允許應用的一部分等待另一部分完成一些工作。
'''


# Future的做法類似於協程,所以等待協程所用的技術同樣可以用於等待Future。


# 接收一個future和result
def mark_done(future, result):
    print("setting result")
    future.set_result(result)


event_loop = asyncio.get_event_loop()
# 如何創建一個future,使用asyncio.Future
future = asyncio.Future()

# 為future注冊回調,協程、task、future三者是遞進關系,都可以設置回調,都可以扔進事件循環
event_loop.call_later(2, mark_done, future, "我是返回值")
# future什么時候才能執行完畢呢?當future執行了set_result函數的時候
event_loop.run_until_complete(future)

print(future.result())
"""
setting result
我是返回值
"""
# 對於一個任務是否完成,就是看起對應的future有沒有set_result,如果set_result之后,那么這個任務便執行完畢了
# 通過future.result()便可以拿到任務的返回值,所以說future是task的容器,當任務執行完畢,future會調用set_result設置返回值

future = asyncio.Future()
# 設置只能設置一次,設置多次會報錯
future.set_result("xxx")
# 但是取可以取多次
print(future.result())  # xxx
print(future.result())  # xxx
print(future.result())  # xxx
print(future.result())  # xxx
print(future.result())  # xxx


# 不過concurrent.futures里面的future可以設置多次,但是以最后一次為准
from concurrent.futures import Future
future = Future()
future.set_result("aaa")
future.set_result("bbb")
print(future.result())  # bbb
print(future.result())  # bbb
print(future.result())  # bbb

future還可以結合await使用

import asyncio

'''
Future還可以結合await關鍵字使用
'''


def mark_done(future, result):
    print("setting result")
    future.set_result(result)


async def main(loop):
    future = asyncio.Future()
    print("scheduling mark_done")
    loop.call_later(2, mark_done, future, "the result")
    # 會等到這個future完成,什么時候完成,當這個future執行set_result的時候,然后await future的值則是future.result()
    # 其實我們await一個協程也是一樣,也是當協程對應的任務執行完畢、future將返回值進行set_result的時候
    # 然后我們知道await coroutine的得到的就是當前定義的coroutine的返回值,其實准確來說,應該是當前coroutine對應的future的result()
    # 只不過result()得到的就是set_result設置進去的,而set_result設置進去的正式當前定義的coroutine的返回值。盡管是一樣的,但是這個邏輯還是要理清。
    res = await future
    print("res =", res)


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
scheduling mark_done
setting result
res = the result
"""

除了做法與協程類似,future還可以綁定回調

import asyncio
import functools

'''
除了做法與協程類似,Future也可以調用回調,回調的順序按照其注冊的順序調用
'''


def callback(future, n):
    print(f"future result: {future.result()} n:{n}")


async def register_callback(future):
    print("register callback on futures")
    # 設置一個回調,同樣是只能傳遞函數名,觸發回調的時候,會自動將future本身作為第一個參數傳遞給回調函數
    # 回調什么時候執行,還是那句話,當future執行set_result的時候執行
    future.add_done_callback(functools.partial(callback, n=1))
    future.add_done_callback(functools.partial(callback, n=2))


async def main(future):
    # 等待回調注冊完成
    await register_callback(future)
    print("setting result of future")
    future.set_result("the result")


event_loop = asyncio.get_event_loop()
future = asyncio.Future()
event_loop.run_until_complete(main(future))
"""
register callback on futures
setting result of future
future result: the result n:1
future result: the result n:2
"""

執行task

任務是與事件循環交互的主要途徑之一,任務可以包裝協程,並跟蹤協程何時完成。另外future被稱之為未來對象,它是可以等待的,並且task的類對象在源碼中實際上是future的類對象的一個子類。每個任務都有一個結果,是通過set_result設置的,並且可以通過result()獲取這些結果,當然這些上面說過了,這里不再贅述了

import asyncio

'''
任務是與事件循環交互的主要途徑之一。
任務可以包裝協程,並跟蹤協程何時完成。
由於任務是Future的子類,所以其他協程可以等待任務,而且每個任務可以有一個結果,在它完成時可以獲取這些結果
'''


# 啟動一個任務,可以使用create_task函數創建一個Task實例。
# 只要循環還在運行而且協程沒有返回,create_task得到的任務便會作為事件循環管理的並發操作的一部分運行
async def task_func():
    print("in task func")
    return "the result"


async def main(loop):
    print("creating task")
    # 除了使用loop.create_task,我們還可以使用asyncio.ensure_future
    # 對於傳入一個協程的話,asyncio.ensure_future還是調用了loop.create_task
    task = loop.create_task(task_func())
    print(f"wait for {task}")
    return_value = await task
    print(f"task completed {task}")
    print(f"return value {return_value}")



loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
creating task
wait for <Task pending coro=<task_func() running at D:/satori/1.py:12>>
in task func
task completed <Task finished coro=<task_func() done, defined at D:/satori/1.py:12> result='the result'>
return value the result
"""
# 在我們還沒有await驅動任務執行的時候,是Task pending
# 當await之后,已處於finished狀態
import asyncio

'''
通過create_task可以創建對象,那么也可以在任務完成前取消操作
'''


async def task_func():
    print("in task func")
    return "the result"


async def main(loop):
    print("creating task")
    task = loop.create_task(task_func())

    print("canceling task")
    # 任務創建之后,可以調用cancel函數取消
    task.cancel()
    print(f"canceled task: {task}")

    try:
        # 任務取消之后再await則會引發CancelledError
        await task
    except asyncio.CancelledError:
        print("caught error from canceled task")
    else:
        print(f"task result: {task.result()}")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
"""
creating task
canceling task
canceled task: <Task cancelling coro=<task_func() running at D:/satori/1.py:8>>
caught error from canceled task
"""

多個task並發執行

首先我們來看一個例子。

import asyncio
import time


async def task_func():
    await asyncio.sleep(1)


async def main():
    # 兩者是一樣的
    await task_func()
    await asyncio.ensure_future(task_func())


start = time.perf_counter()
asyncio.run(main())
print(f"總用時:{time.perf_counter() - start}")  # 總用時:2.0023553

我們發現總用時為2s,這是為什么?不是說遇見異步io會自動切換么?那么整體用時應該為1s才對啊。確實理論上是這樣的,但是觀察我們的代碼是怎么寫的,我們兩個await是分開寫的,而且await coroutine是能得到當前coroutine的返回值的,如果這個coroutine對應的task都還沒執行完畢、future還沒有set_result,我們又怎么能拿到呢?還是那句話,異步是在多個協程之間進行切換,至於當前的協程阻塞了只會切換到另一個協程里面去執行,但是對於當前協程來說,該阻塞多長還是阻塞多長,不可能說這一步阻塞還沒過去,就直接調到下一行代碼去執行,這是不可能的。

因此兩個await,必須等第一個await完畢之后,才會執行下一個await。至於我們剛才的call_soon、call_later、call_at,可以看做是另一個協程,在遇到了asyncio.sleep之后就切換過去了,但是對於協程本身來說,該asyncio.sleep多少秒還是多少秒, 只有sleep結束了,才會執行await asyncio.sleep下面的代碼。還是那句話,切換是指多個協程之間切換,而我們上面代碼是兩個await,這兩個await本身來說相當於還是串行,就是main協程里面的兩行代碼,只有第一個await結束了,才會執行第二個await。

感覺說的有點啰嗦了,到后面的async for會再提一遍

那么問題來了,我們如何才能讓這兩個協程並發的執行呢?

asyncio.wait

import asyncio
import time


async def task_func():
    await asyncio.sleep(1)


async def main():
    # 將多個協程或者任務放在一個列表里面,傳給asyncio.wait
    # 里面還可以再傳其他參數:
    # timeout:超時時間,如果在這個時間段內任務沒有執行完畢,那么任務直接取消
    # return_when:FIRST_COMPLETED,第一個任務完成時結束;FIRST_EXCEPTION,第一次出現異常時結束;ALL_COMPLETED,所有任務都完成是結束。默認是ALL_COMPLETED
    await asyncio.wait([task_func(), task_func()])


start = time.perf_counter()
asyncio.run(main())
print(f"總用時:{time.perf_counter() - start}")  # 總用時:1.0012839
"""
此時總共就只用了1s
"""

獲取任務的返回值

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(1)
    return f"task{n}"


async def main():
    # 這個wait函數有兩個返回值,一個是內部任務已完成的future,一個是未完成的future
    finished, pending = await asyncio.wait([task_func(_) for _ in range(1, 5)])
    # 我們說過一旦任務完成,future就會通過set_result方法設置返回值
    # 然后我們通過future.result()就能拿到返回值
    print(f"results: {[future.result() for future in finished]}")


start = time.perf_counter()
asyncio.run(main())
print(f"總用時:{time.perf_counter() - start}")  
"""
results: ['task2', 'task3', 'task1', 'task4']
總用時:1.0015823
"""

但是我們發現執行的順序貌似不是我們添加的順序,因此wait返回的future的順序是無序的,如果希望有序,那么需要使用另一個函數

asyncio.gather

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(1)
    return f"task{n}"


async def main():

    # gather只有一個返回值,直接返回已完成的任務,注意是返回值,不是future,也就是說返回的是future.result()
    # 但是傳遞的時候就不要傳遞列表,而是需要傳遞一個個的task,因此我們這里要將列表打散
    finished = await asyncio.gather(*[task_func(_) for _ in range(1, 5)])
    print(f"results: {[res for res in finished]}")


start = time.perf_counter()
asyncio.run(main())
print(f"總用時:{time.perf_counter() - start}")
"""
results: ['task1', 'task2', 'task3', 'task4']
總用時:1.0012363
"""

使用gather是可以保證順序的,順序就是我們添加任務的順序

asyncio.as_completed

我們看到這個wait類似於concurrent.futures里面的submit,gather類似於map,而concurrent.futures里面還有一個as_completed,那么同理asyncio里面也有一個as_completed。另外個人覺得asyncio借鑒了concurrent.futures里的不少理念,而且wait里面還有一個return_when,這個里面的參數,官網就是從concurrent.futures包里面導入的。

那這個函數是用來干什么的呢?從名字也能看出來,是哪個先完成哪個就先返回

import asyncio
import time

'''
as_completed函數是一個生成器,會管理指定的一個協程列表,並生成它們的結果,每個協程結束運行時一次生成一個結果。
與wait類似,as_completed不能保證順序,從名字也能看出來,哪個先完成哪個先返回
'''


async def task_func(n):
    await asyncio.sleep(n)
    return f"task{n}"


async def main():

    # 同樣需要傳遞一個列表
    completed = asyncio.as_completed([task_func(2), task_func(1), task_func(3), task_func(4)])
    # 遍歷每一個task,進行await,哪個先完成,就先返回
    for task in completed:
        res = await task
        print(res)


start = time.perf_counter()
asyncio.run(main())
print(f"總用時:{time.perf_counter() - start}")
"""
task1
task2
task3
task4
總用時:4.0048034999999995
"""

同步原語

盡管asyncio應用通常作為單線程的進程運行,不過仍被構建為並發應用。

由於I/O以及其他外部事件的延遲和中斷,每個協程或任務可能按照一種不可預知的順序執行。

為了支持安全的並發執行,asyncio包含了threading和multiprocessing模塊中一些底層原語的實現

import asyncio

'''
Lock可以用來保護對一個共享資源的訪問,只有鎖的持有者可以使用這個資源。
如果有多個請求要得到這個鎖,那么其將會阻塞,以保證一次只有一個持有者
'''


def unlock(lock):
    print("回調釋放鎖,不然其他協程獲取不到。")
    print("但我是1秒后被調用,鎖又在只能通過調用我才能釋放,所以很遺憾,其他協程要想執行,至少要1秒后了")
    lock.release()


async def coro1(lock):
    print("coro1在等待鎖")
    # 使用async with語句很方便,是一個上下文。
    # 我們知道在多線程中,也可以使用with,相當於開始的lock.acquire和結尾lock.release
    # 那么在協程中,也有await lock.acquire和lock.release,以及專業寫法async with
    async with lock:
        print("coro1獲得了鎖")
        print("coro1釋放了鎖")


async def coro2(lock):
    print("coro2在等待鎖")
    # 使用await lock.acquire()和lock.release()這種方式也是一樣的
    await lock.acquire()
    print("coro2獲得了鎖")
    print("coro2釋放了鎖")
    # 注意release是不需要await的
    lock.release()


async def main(loop):
    # 創建共享鎖
    lock = asyncio.Lock()

    print("在開始協程之前創建一把鎖")
    await lock.acquire()  # 這里先把鎖給鎖上
    print("鎖是否被獲取:", lock.locked())

    # 執行回調將鎖釋放,不然協程無法獲取鎖
    loop.call_later(1, unlock, lock)

    # 運行想要使用鎖的協程
    print("等待所有協程")
    await asyncio.wait([coro1(lock), coro2(lock)])



loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
在開始協程之前創建一把鎖
鎖是否被獲取: True
等待所有協程
coro2在等待鎖
coro1在等待鎖
回調釋放鎖,不然其他協程獲取不到。
但我是1秒后被調用,鎖又在只能通過調用我才能釋放,所以很遺憾,其他協程要想執行,至少要1秒后了
coro2獲得了鎖
coro2釋放了鎖
coro1獲得了鎖
coro1釋放了鎖
"""

事件

和線程一樣,協程里面也有事件的概念。asyncio.Event基於threading.Event,它允許多個消費者等待某個事件發生。

Event對象可以使用set、wait、clear

  • set:設置標志位,調用is_set可以查看標志位是否被設置。一個剛創建的Event對象默認是沒有設置的
  • wait:等待,在沒有調用set的情況下,會阻塞。如果設置了set,wait則不會阻塞
  • clear:清空標志位
import asyncio


def set_event(event):
    print("設置標志位,因為協程會卡住,只有設置了標志位才會往下走")
    print("但我是一秒后才被調用,所以協程想往下走起碼也要等到1秒后了")
    event.set()


async def coro1(event):
    print("coro1在這里卡住了,快設置標志位啊")
    await event.wait()
    print(f"coro1飛起來了,不信你看現在標志位,是否設置標志位:{event.is_set()}")


async def coro2(event):
    print("coro2在這里卡住了,快設置標志位啊")
    await event.wait()
    print(f"coro2飛起來了,不信你看現在標志位,是否設置標志位:{event.is_set()}")


async def main(loop):
    # 創建共享事件
    event = asyncio.Event()
    # 現在設置標志位了嗎?
    print("是否設置標志位:", event.is_set())

    # 執行回調將標志位設置,不然協程卡住了
    loop.call_later(1, set_event, event)

    # 運行卡住的的協程
    print("等待所有協程")
    await asyncio.wait([coro1(event), coro2(event)])


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
是否設置標志位: False
等待所有協程
coro2在這里卡住了,快設置標志位啊
coro1在這里卡住了,快設置標志位啊
設置標志位,因為協程會卡住,只有設置了標志位才會往下走
但我是一秒后才被調用,所以協程想往下走起碼也要等到1秒后了
coro2飛起來了,不信你看現在標志位,是否設置標志位:True
coro1飛起來了,不信你看現在標志位,是否設置標志位:True
"""

隊列

asyncio.Queue為協程提供了一個先進先出的數據結構,這與線程的queue.Queue或者進程里面的Queue很類似

import asyncio
import time


async def consumer(q: asyncio.Queue, n):
    print(f"消費者{n}號 開始")
    while True:
        await asyncio.sleep(2)
        item = await q.get()
        print(f"消費者{n}號: 消費元素{item}")
        # 由於我們要開啟多個消費者,為了讓其停下來,我們添加None作為停下來的信號
        if item is None:
            # task_done是什么意思?隊列有一個屬性,叫做unfinished_tasks
            # 每當我們往隊列里面put一個元素的時候,這個值就會加1,
            q.task_done()
            # 並且隊列還有一個join方法,表示阻塞,什么時候不阻塞呢?當unfinished_tasks為0的時候。
            # 因此我們每put一個元素的時候,unfinished_tasks都會加上1,那么當我get一個元素的時候,unfinished_tasks是不是也應該要減去1啊,但是我們想多了
            # get方法不會自動幫我們做這件事,需要手動調用task_done方法實現
            break
        else:
            q.task_done()


async def producer(q: asyncio.Queue, consumer_num):
    print(f"生產者 開始")
    for i in range(1, 22):
        await q.put(i)
        print(f"生產者: 生產元素{i},並放在了隊列里")
    # 為了讓消費者停下來,我就把None添加進去吧
    # 開啟幾個消費者,就添加幾個None
    for i in range(consumer_num):
        await q.put(None)

    # 等待所有消費者執行完畢
    # 只要unfinished_tasks不為0,那么q.join就會卡住,直到消費者全部消費完為止
    await q.join()
    print("生產者生產的東西全被消費者消費了")


async def main(consumer_num):
    q = asyncio.Queue()
    consumers = [consumer(q, i) for i in range(consumer_num)]
    await asyncio.wait(consumers + [producer(q, consumer_num)])


start = time.perf_counter()
asyncio.run(main(3))
print(f"總用時:{time.perf_counter() - start}")
"""
生產者 開始
生產者: 生產元素1,並放在了隊列里
生產者: 生產元素2,並放在了隊列里
生產者: 生產元素3,並放在了隊列里
生產者: 生產元素4,並放在了隊列里
生產者: 生產元素5,並放在了隊列里
生產者: 生產元素6,並放在了隊列里
生產者: 生產元素7,並放在了隊列里
生產者: 生產元素8,並放在了隊列里
生產者: 生產元素9,並放在了隊列里
生產者: 生產元素10,並放在了隊列里
生產者: 生產元素11,並放在了隊列里
生產者: 生產元素12,並放在了隊列里
生產者: 生產元素13,並放在了隊列里
生產者: 生產元素14,並放在了隊列里
生產者: 生產元素15,並放在了隊列里
生產者: 生產元素16,並放在了隊列里
生產者: 生產元素17,並放在了隊列里
生產者: 生產元素18,並放在了隊列里
生產者: 生產元素19,並放在了隊列里
生產者: 生產元素20,並放在了隊列里
生產者: 生產元素21,並放在了隊列里
消費者1號 開始
消費者2號 開始
消費者0號 開始
消費者1號: 消費元素1
消費者2號: 消費元素2
消費者0號: 消費元素3
消費者1號: 消費元素4
消費者2號: 消費元素5
消費者0號: 消費元素6
消費者1號: 消費元素7
消費者2號: 消費元素8
消費者0號: 消費元素9
消費者1號: 消費元素10
消費者2號: 消費元素11
消費者0號: 消費元素12
消費者1號: 消費元素13
消費者2號: 消費元素14
消費者0號: 消費元素15
消費者1號: 消費元素16
消費者2號: 消費元素17
消費者0號: 消費元素18
消費者1號: 消費元素19
消費者2號: 消費元素20
消費者0號: 消費元素21
消費者1號: 消費元素None
消費者2號: 消費元素None
消費者0號: 消費元素None
生產者生產的東西全被消費者消費了
總用時:16.0055946
"""

我們對隊列進行循環,然后await的時候,實際上有一個更加pythonic的寫法,也就是async for

import asyncio
import time
from tornado.queues import Queue
from tornado import gen


# 注意asyncio中的Queue不支持async for,我們需要使用tornado中的Queue
async def consumer(q: Queue, n):
    print(f"消費者{n}號 開始")
    async for item in q:
        await gen.sleep(1)
        if item is None:
            print(f"生產者{n}號:消費元素{item}")
            q.task_done()
            break
        print(f"生產者{n}號:消費元素{item}")
        q.task_done()


async def producer(q: Queue, consumer_num):
    print(f"生產者 開始")
    for i in range(1, 22):
        await q.put(i)
        print(f"生產者: 生產元素{i},並放在了隊列里")
    for i in range(consumer_num):
        await q.put(None)

    await q.join()
    print("生產者生產的東西全被消費者消費了")


async def main(consumer_num):
    q = Queue()
    consumers = [consumer(q, i) for i in range(consumer_num)]
    await asyncio.wait(consumers + [producer(q, consumer_num)])


start = time.perf_counter()
asyncio.run(main(3))
print(f"總用時:{time.perf_counter() - start}")
"""
消費者2號 開始
消費者1號 開始
消費者0號 開始
生產者 開始
生產者: 生產元素1,並放在了隊列里
生產者: 生產元素2,並放在了隊列里
生產者: 生產元素3,並放在了隊列里
生產者: 生產元素4,並放在了隊列里
生產者: 生產元素5,並放在了隊列里
生產者: 生產元素6,並放在了隊列里
生產者: 生產元素7,並放在了隊列里
生產者: 生產元素8,並放在了隊列里
生產者: 生產元素9,並放在了隊列里
生產者: 生產元素10,並放在了隊列里
生產者: 生產元素11,並放在了隊列里
生產者: 生產元素12,並放在了隊列里
生產者: 生產元素13,並放在了隊列里
生產者: 生產元素14,並放在了隊列里
生產者: 生產元素15,並放在了隊列里
生產者: 生產元素16,並放在了隊列里
生產者: 生產元素17,並放在了隊列里
生產者: 生產元素18,並放在了隊列里
生產者: 生產元素19,並放在了隊列里
生產者: 生產元素20,並放在了隊列里
生產者: 生產元素21,並放在了隊列里
生產者2號:消費元素1
生產者1號:消費元素2
生產者0號:消費元素3
生產者2號:消費元素4
生產者1號:消費元素5
生產者0號:消費元素6
生產者2號:消費元素7
生產者1號:消費元素8
生產者0號:消費元素9
生產者2號:消費元素10
生產者1號:消費元素11
生產者0號:消費元素12
生產者2號:消費元素13
生產者1號:消費元素14
生產者0號:消費元素15
生產者2號:消費元素16
生產者1號:消費元素17
生產者0號:消費元素18
生產者2號:消費元素19
生產者1號:消費元素20
生產者0號:消費元素21
生產者2號:消費元素None
生產者1號:消費元素None
生產者0號:消費元素None
生產者生產的東西全被消費者消費了
總用時:8.008154500000002
"""

協程與線程結合

如果出現了一個同步耗時的任務,我們可以將其扔到線程池里面去運行。對於協程來說,仍然是單線程的,我們是可以將耗時的任務單獨開啟一個線程來執行的

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def foo(n):
    time.sleep(n)
    print(f"foo睡了{n}秒")


async def bar():
    await asyncio.sleep(3)
    return "bar"


async def main():
    # 線程池最多裝兩個任務
    executor = ThreadPoolExecutor(max_workers=2)
    # loop.run_in_executor表示扔到線程池里面運行,這個過程是瞬間返回的
    loop.run_in_executor(executor, foo, 3)
    loop.run_in_executor(executor, foo, 2)
    print("瞬間返回")
    res = await bar()
    print(res)


loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
print(f"總用時:{time.perf_counter() - start}")
"""
瞬間返回
foo睡了2秒
foo睡了3秒
bar
總用時:3.0015592
"""

關於async with 和async for

如果讓你定義一個類支持一個with和for語句,我相信肯定沒有問題,但是async with和async for呢?我們要怎么實現呢?

async with

我們知道自定義一個類支持with語句,需要實現__enter____exit__這兩個魔法方法,那么如果想支持async with,則需要實現__aenter____aexit__

import asyncio


class Open:

    def __init__(self, file, mode, encoding):
        self.file = file
        self.mode = mode
        self.encoding = encoding
        self.__fd = None

    # 要使用async def定義
    async def __aenter__(self):
        self.__fd = open(file=self.file, mode=self.mode, encoding=self.encoding)
        return self.__fd

    # 同樣使用async def定義
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.__fd.close()


# 既然是async就必須要創建協程,扔到事件循環里面運行
async def main(file, mode="r", encoding="utf-8"):
    async with Open(file, mode, encoding) as f:
        print(f.read())


asyncio.run(main("白色相簿.txt"))
"""
為什么你那么熟練啊
"""

可以看到我們自己實現了一個async with,但是注意這個不是異步的,我們還是調用了底層的open函數。

當然還可以使用contextlib

import asyncio
import contextlib


@contextlib.asynccontextmanager
async def foo():
    print("xxx")
    l = list()
    yield l 
    print(l)


async def main():
    async with foo() as l:
        l.append(1)
        l.append(2)
        l.append(3)


asyncio.run(main())
"""
xxx
[1, 2, 3]
"""

async for

我們知道自定義一個類支持for語句,需要實現__iter____next__這兩個魔法方法,那么如果想支持async for,則需要實現__aiter____anext__

import asyncio


class A:

    def __init__(self):
        self.l = [1, 2, 3, 4]
        self.__index = 0

    # 注意:定義__aiter__是不需要async的
    def __aiter__(self):
        return self

    # 但是定義__anext__需要async
    async def __anext__(self):
        try:
            res = self.l[self.__index]
            self.__index += 1
            return res
        except IndexError:
            # 捕獲異常,協程則要raise一個StopAsyncIteration
            raise StopAsyncIteration


async def main():
    async for _ in A():
        print(_)


asyncio.run(main())
"""
1
2
3
4
"""

另外我們知道,可以對for循環可以作用於生成器,那么async for則也可以作用於異步生成器中

import asyncio


# 具體版本記不清了,不知是3.5還是3.6,記得那時候引入async和await的時候,python是不允許async和yield兩個關鍵字同時出現的
# 但是現在至少python3.7是允許的,這種方式叫做異步生成器。
# 但是注意:如果async里面出現了yield,那么就不可以有return xxx了。
async def foo():
    yield 123
    yield 456
    yield 789
    print("xxx")


async def main():
    async for _ in foo():
        print(_)


asyncio.run(main())
"""
123
456
789
xxx
"""

await

很多人可能對python中的await這個關鍵字很懵逼,到底什么對象才可以被await呢?

從抽象基類的源碼中我們可以看到一個對象如果想被await,就必須要實現__await__這個魔法方法。

import asyncio


class A:

    def __await__(self):
        return "xxx"


async def main():
    res = await A()
    print(res)

try:
    asyncio.run(main())
except Exception as e:
    print(e)  # __await__() returned non-iterator of type 'str'

# 但是它報錯了,意思是必須返回一個迭代器
import asyncio


class A:

    def __await__(self):
        return "xxx".__iter__()


async def main():
    res = await A()
    print(res)

try:
    asyncio.run(main())
except Exception as e:
    print(e)  # Task got bad yield: 'x'

# 說明要返回一個迭代器,然后yield,但是這里提示我們Task got bad yield: 'x'
# 我們來分析一下這句話,bad yield: 'x',肯定是告訴我們yield出了一個不好的值
# 這個不好的值被Task獲取了,也就是不應該給Task一個'x'
# 咦,Task,這是啥?我們首先想到了asyncio里面task,而task對應的類正是Task
# 這是不是說明我們返回一個Task對象就是可以了
import asyncio


async def foo():
    return "我是foo"


class A:

    def __await__(self):
        # 同樣需要調用__iter__
        return asyncio.ensure_future(foo()).__iter__()


async def main():
    res = await A()
    print(res)

try:
    asyncio.run(main())
except Exception as e:
    print(e)  
"""
我是foo
"""
# 可以看到成功執行
# 其實兜了這么大圈子,完全沒必要
# await后面肯定是一個task(或者future),而我們await的是A(),那么A()的__await__方法返回的肯定是一個task(或者future)
# await后面如果是coroutine,會自動包裝成task,但是用我們自己的類返回的話,那么我們必須在__await__中手動的使用ensure_future或者create_task進行包裝,再返回其迭代器

手動實現異步

盡管我們實現了async for、async with、await等等方法,但是它們並沒有達到異步的效果,比如之前的async for底層還是使用了open。再比如網絡請求,對於像requests這種庫,是屬於同步的,因此在協程函數中使用requests.get是沒有用的。正如在協程函數中使用time.sleep一樣,如果想切換,就必須使用asyncio.sleep,因為這本質上還是一個協程。所以如果想在獲取網絡請求的時候,使用requests來達到異步的效果是行不通的,只能通過底層的socket重新設計,比如aiohttp。再比如那些數據庫驅動,也是一樣的。

目前來說,我們想實現異步,最好的方式就是通過線程池

現在我們有兩個文件

import asyncio
import time


class Reader:

    def __init__(self, file):
        self.fd= open(file, "r", encoding="utf-8")

    async def __aenter__(self):
        return self

    def __read(self):
        # 假設讀取文件需要兩秒鍾
        time.sleep(2)
        return self.fd.read()

    async def read(self):
        # 這個是可以await的,如果我們需要其返回值,那么需要進行await
        # 並且不傳線程池,那么默認會創建一個
        return await loop.run_in_executor(None, self.__read)

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.fd.close()


async def foo(name):
    async with Reader(name) as f:
        print(await f.read())


async def main():
    await asyncio.wait([foo("白色相簿1.txt"), foo("白色相簿2.txt")])


loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
print(f"總用時:{time.perf_counter() - start}")
"""
白色相簿什么的,已經無所謂了。
因為已經不再有歌,值得去唱了。
傳達不了的戀情,已經不需要了。
因為已經不再有人,值得去愛了。
為什么你會這么熟練啊!
你和雪菜親過多少次了啊!?
你到底要把我甩開多遠你才甘心啊!?
總用時:2.0032377
"""
# 可以看到總用時是2s
import asyncio
import time


class Reader:

    def __init__(self, file):
        self.fd= open(file, "r", encoding="utf-8").readlines()
        self.__index = 0

    def __aiter__(self):
        return self

    def __read(self):
        # 假設讀一行需要1s
        time.sleep(1)
        res = self.fd[self.__index]
        self.__index += 1
        return res

    async def __anext__(self):
        try:
            return await loop.run_in_executor(None, self.__read)
        except IndexError:
            raise StopAsyncIteration


async def foo(name):
    async for line in Reader(name):
        # 由於文件結尾含有換行,我們將其去掉,因為print自帶換行
        print(line[: -1])


async def main():
    await asyncio.wait([foo("白色相簿1.txt"), foo("白色相簿2.txt")])


loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
print(f"總用時:{time.perf_counter() - start}")
"""
為什么你會這么熟練啊!
白色相簿什么的,已經無所謂了。
你和雪菜親過多少次了啊!?
因為已經不再有歌,值得去唱了。
你到底要把我甩開多遠你才甘心啊!
傳達不了的戀情,已經不需要了。
因為已經不再有人,值得去愛了
總用時:5.0069811
"""
# 可以看到是並發交錯打印的,其實我們可以使用__next__,而不是先全部讀取再使用索引的方式
# 但是如果那么做會報出一個TypeError:StopIteration interacts badly with generators and cannot be raised into a Future
# 目前沒找到原因,不知道是bug還是本人水平問題。

並發請求網絡

asyncio是可以向web服務發送請求的,但是它不支持http請求,只支持tcp請求。但是這里不介紹了,建議使用aiohttp。

其實主要是累了,此刻凌晨3點06分,想休息了。或許有點疲憊,如果沒注意寫錯了,歡迎指正。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM