python 異步編程


1:協程

協程不是操作系統提供的,是一種用戶狀態內的上下文切換技術,簡言而之,其實就是通過一個線程實現代碼塊相互切換執行。

def func1():
    print(1)
    ...
    print(2)
    
def func2():
    print(3)
    ...
    print(4)

func1()
func2()

上述代碼是普通的函數定義和執行,按流程分別執行兩個函數中的代碼,並先后會輸出:1、2、3、4。但如果介入協程技術那么就可以實現函數見代碼切換執行,最終輸入:1、3、2、4

2:協程的實現方式

python種有多種可以實現協程的方式,例如:

  • greenlet,是一個第三方實現協程的代碼,(Gevent協程就是通過greenlet實現的)
  • yield,生成器,借助生成器的特點亦可以實現協程代碼
  • asyncio,在python3.4 種引入的模塊,用於編寫協程代碼
  • async & awiat,在python3.5種引入的兩個關鍵字,結合asyncio模塊可以方便白那些協程代碼

2.1:greenlet 

greentlet是一個第三方模塊,需要提前安裝 pip3 install greenlet才能使用。

from greenlet import greenlet


def func1():
    print(1)        # 第1步:輸出 1
    gr2.switch()    # 第3步:切換到 func2 函數
    print(2)        # 第6步:輸出 2
    gr2.switch()    # 第7步:切換到 func2 函數,從上一次執行的位置繼續向后執行


def func2():
    print(3)        # 第4步:輸出 3
    gr1.switch()    # 第5步:切換到 func1 函數,從上一次執行的位置繼續向后執行
    print(4)        # 第8步:輸出 4


gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第1步:去執行 func1 函數
注意:switch中也可以傳遞參數用於在切換執行時相互傳遞值。

2.2 yield 基於python 的生成器yield 和 yield from 關鍵字實現協程代碼

def func1():
    yield 1
    yield from func2()
    yield 2


def func2():
    yield 3
    yield 4


f1 = func1()
for item in f1:
    print(item)  # 1,3,4,2

 

2.3 asyncio

在python3.4 之后官方提供的正式支持協程

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2)  # 遇到IO耗時操作,自動化切換到tasks中的其他任務
    print(2)


@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務
    print(4)


tasks = [
    asyncio.ensure_future( func1() ),
    asyncio.ensure_future( func2() )
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

2.4 async & awit

async & awit 關鍵字在Python3.5版本中正式引入,基於他編寫的協程代碼其實就是 上一示例 的加強版,讓代碼可以更加簡便。

Python3.8之后 @asyncio.coroutine 裝飾器就會被移除,推薦使用async & awit 關鍵字實現協程代碼。

import asyncio


async def func1():
    print(1)
    await asyncio.sleep(2)
    print(2)


async def func2():
    print(3)
    await asyncio.sleep(2)
    print(4)


tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

3:協程的意義

協程就是可以通過一個線程在多個上下文種進行來回切換執行。但是協程協程來回切換執行的意義何在呢?

計算型的操作,利用協程來回切換執行,沒有任何意義,來回切換並保存狀態 反倒會降低性能。
IO型的操作,利用協程在IO等待時間就去切換執行其他任務,當IO操作結束后再自動回調,那么就會大大節省資源並提供性能,從而實現異步編程(不等待任務結束就可以去執行其他代碼)

4:異步編程

基於async & await關鍵字的協程可以實現異步編程,這也是目前python異步相關的主流技術。

想要真正的了解Python中內置的異步編程,根據下文的順序一點點來看。

4.1 事件循環

事件循環,可以把他當做是一個while循環,這個while循環在周期性的運行並執行一些任務,在特定條件下終止循環。

# 偽代碼

任務列表 = [ 任務1, 任務2, 任務3,... ]

while True:
    可執行的任務列表,已完成的任務列表 = 去任務列表中檢查所有的任務,將'可執行''已完成'的任務返回
    
    for 就緒任務 in 已准備就緒的任務列表:
        執行已就緒的任務
        
    for 已完成的任務 in 已完成的任務列表:
        在任務列表中移除 已完成的任務

    如果 任務列表 中的任務都已完成,則終止循環

在編寫程序時候可以通過如下代碼來獲取和創建事件循環。

import asyncio

loop = asyncio.get_event_loop()

4.2 協程和異步編程

協程函數,定義形式為 async def 的函數。

協程對象,調用 協程函數 所返回的對象。

# 定義一個協程函數
async def func():
    pass

# 調用協程函數,返回一個協程對象
result = func()

注意:調用協程函數時,函數內部代碼不會執行,只是會返回一個協程對象。

4.2.1 基本使用

程序中,如果想要執行協程函數的內部代碼,需要 事件循環協程對象 配合才能實現,如:

import asyncio


async def func():
    print("協程內部代碼")

# 調用協程函數,返回一個協程對象。
result = func()

# 方式一
# loop = asyncio.get_event_loop() # 創建一個事件循環
# loop.run_until_complete(result) # 將協程當做任務提交到事件循環的任務列表中,協程執行完成之后終止。

# 方式二
# 本質上方式一是一樣的,內部先 創建事件循環 然后執行 run_until_complete,一個簡便的寫法。
# asyncio.run 函數在 Python 3.7 中加入 asyncio 模塊,
asyncio.run(result)

這個過程可以簡單理解為:將協程當做任務添加到 事件循環 的任務列表,然后事件循環檢測列表中的協程是否 已准備就緒(默認可理解為就緒狀態),如果准備就緒則執行其內部代碼。

4.2.2 await

await是一個只能在協程函數中使用的關鍵字,用於遇到IO操作時掛起 當前協程(任務),當前協程(任務)掛起過程中 事件循環可以去執行其他的協程(任務),當前協程IO處理完成時,可以再次切換回來執行await之后的代碼。代碼如下:

import asyncio


async def func():
    print("執行協程函數內部代碼")

    # 遇到IO操作掛起當前協程(任務),等IO操作完成之后再繼續往下執行。
    # 當前協程掛起時,事件循環可以去執行其他協程(任務)。
    response = await asyncio.sleep(2)

    print("IO請求結束,結果為:", response)

result = func()

asyncio.run(result)

例二:

import asyncio


async def others():
    print("start")
    await asyncio.sleep(2)
    print('end')
    return '返回值'


async def func():
    print("執行協程函數內部代碼")

    # 遇到IO操作掛起當前協程(任務),等IO操作完成之后再繼續往下執行。當前協程掛起時,事件循環可以去執行其他協程(任務)。
    response = await others()

    print("IO請求結束,結果為:", response)
    
asyncio.run( func() )

例三:

import asyncio


async def others():
    print("start")
    await asyncio.sleep(2)
    print('end')
    return '返回值'


async def func():
    print("執行協程函數內部代碼")

    # 遇到IO操作掛起當前協程(任務),等IO操作完成之后再繼續往下執行。當前協程掛起時,事件循環可以去執行其他協程(任務)。
    response1 = await others()
    print("IO請求結束,結果為:", response1)
    
    response2 = await others()
    print("IO請求結束,結果為:", response2)
    
asyncio.run( func() )

上述的所有示例都只是創建了一個任務,即:事件循環的任務列表中只有一個任務,所以在IO等待時無法演示切換到其他任務效果。

在程序想要創建多個任務對象,需要使用Task對象來實現。

4.2.3 Task對象

官方:

Tasks are used to schedule coroutines concurrently.

When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon。

Tasks用於並發調度協程,通過asyncio.create_task(協程對象)的方式創建Task對象,這樣可以讓協程加入事件循環中等待被調度執行。除了使用 asyncio.create_task() 函數以外,還可以用低層級的 loop.create_task()ensure_future() 函數。不建議手動實例化 Task 對象。

本質上是將協程對象封裝成task對象,並將協程立即加入事件循環,同時追蹤協程的狀態。

注意:asyncio.create_task() 函數在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低層級的 asyncio.ensure_future() 函數

 

例一:

import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"


async def main():
    print("main開始")

    # 創建協程,將協程封裝到一個Task對象中並立即添加到事件循環的任務列表中,等待事件循環去執行(默認是就緒狀態)。
    task1 = asyncio.create_task(func())

    # 創建協程,將協程封裝到一個Task對象中並立即添加到事件循環的任務列表中,等待事件循環去執行(默認是就緒狀態)。
    task2 = asyncio.create_task(func())

    print("main結束")

    # 當執行某協程遇到IO操作時,會自動化切換執行其他任務。
    # 此處的await是等待相對應的協程全都執行完畢並獲取結果
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)


asyncio.run(main())

 

例二:

import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"


async def main():
    print("main開始")

    # 創建協程,將協程封裝到Task對象中並添加到事件循環的任務列表中,等待事件循環去執行(默認是就緒狀態)。
    # 在調用
    task_list = [
        asyncio.create_task(func(), name="n1"),
        asyncio.create_task(func(), name="n2")
    ]

    print("main結束")

    # 當執行某協程遇到IO操作時,會自動化切換執行其他任務。
    # 此處的await是等待所有協程執行完畢,並將所有協程的返回值保存到done
    # 如果設置了timeout值,則意味着此處最多等待的秒,完成的協程返回值寫入到done中,未完成則寫到pending中。
    done, pending = await asyncio.wait(task_list, timeout=None)
    print(done, pending)


asyncio.run(main())

注意:asyncio.wait 源碼內部會對列表中的每個協程執行ensure_future從而封裝為Task對象,所以在和wait配合使用時task_list的值為[func(),func()] 也是可以的。

例三:

import asyncio


async def func():
    print("執行協程函數內部代碼")

    # 遇到IO操作掛起當前協程(任務),等IO操作完成之后再繼續往下執行。當前協程掛起時,事件循環可以去執行其他協程(任務)。
    response = await asyncio.sleep(2)

    print("IO請求結束,結果為:", response)


coroutine_list = [func(), func()]

# 錯誤:coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ]  
# 此處不能直接 asyncio.create_task,因為將Task立即加入到事件循環的任務列表,
# 但此時事件循環還未創建,所以會報錯。


# 使用asyncio.wait將列表封裝為一個協程,並調用asyncio.run實現執行兩個協程
# asyncio.wait內部會對列表中的每個協程執行ensure_future,封裝為Ta

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM