python之異步編程


一、異步編程概述

異步編程是一種並發編程的模式,其關注點是通過調度不同任務之間的執行和等待時間,通過減少處理器的閑置時間來達到減少整個程序的執行時間;異步編程跟同步編程模型最大的不同就是其任務的切換,當遇到一個需要等待長時間執行的任務的時候,我們可以切換到其他的任務執行;

與多線程和多進程編程模型相比,異步編程只是在同一個線程之內的的任務調度,無法充分利用多核CPU的優勢,所以特別適合IO阻塞性任務;

python版本 3.9.5

二、python的異步框架模型

python提供了asyncio模塊來支持異步編程,其中涉及到coroutines、event loops、futures三個重要概念;

event loops主要負責跟蹤和調度所有異步任務,編排具體的某個時間點執行的任務;

coroutines是對具體執行任務的封裝,是一個可以在執行中暫停並切換到event loops執行流程的特殊類型的函數;其一般還需要創建task才能被event loops調度;

futures負責承載coroutines的執行結果,其隨着任務在event loops中的初始化而創建,並隨着任務的執行來記錄任務的執行狀態;

異步編程框架的整個執行過程涉及三者的緊密協作;

首先event loops啟動之后,會從任務隊列獲取第一個要執行的coroutine,並隨之創建對應task和future;

然后隨着task的執行,當遇到coroutine內部需要切換任務的地方,task的執行就會暫停並釋放執行線程給event loop,event loop接着會獲取下一個待執行的coroutine,並進行相關的初始化之后,執行這個task;

隨着event loop執行完隊列中的最后一個coroutine才會切換到第一個coroutine;

隨着task的執行結束,event loops會將task清除出隊列,對應的執行結果會同步到future中,這個過程會持續到所有的task執行結束;

image

三、順序執行多個可重疊的任務

每個任務執行中間會暫停給定的時間,循序執行的時間就是每個任務執行的時間加和;

import time

def count_down(name, delay):
    indents = (ord(name) - ord('A')) * '\t'

    n = 3
    while n:
        time.sleep(delay)
        duration = time.perf_counter() - start
        print('-' * 40)
        print(f'{duration:.4f} \t{indents}{name} = {n}')
        n -= 1


start = time.perf_counter()

count_down('A', 1)
count_down('B', 0.8)
count_down('C', 0.5)
print('-' * 40)
print('Done')

# ----------------------------------------
# 1.0010 	A = 3
# ----------------------------------------
# 2.0019 	A = 2
# ----------------------------------------
# 3.0030 	A = 1
# ----------------------------------------
# 3.8040 		B = 3
# ----------------------------------------
# 4.6050 		B = 2
# ----------------------------------------
# 5.4059 		B = 1
# ----------------------------------------
# 5.9065 			C = 3
# ----------------------------------------
# 6.4072 			C = 2
# ----------------------------------------
# 6.9078 			C = 1
# ----------------------------------------
# Done

四、異步化同步代碼

python在語法上提供了async、await兩個關鍵字來簡化將同步代碼修改為異步;

async使用在函數的def關鍵字前邊,標記這是一個coroutine函數;

await用在conroutine里邊,用於標記需要暫停釋放執行流程給event loops;

await 后邊的表達式需要返回waitable的對象,例如conroutine、task、future等;

asyncio模塊主要提供了操作event loop的方式;

我們可以通過async將count_down標記為coroutine,然后使用await和asyncio.sleep來實現異步的暫停,從而將控制權交給event loop;

async def count_down(name, delay, start):
    indents = (ord(name) - ord('A')) * '\t'

    n = 3
    while n:
        await asyncio.sleep(delay)
        duration = time.perf_counter() - start
        print('-' * 40)
        print(f'{duration:.4f} \t{indents}{name} = {n}')
        n -= 1

我們定義一個異步的main方法,主要完成task的創建和等待任務執行結束;

async def main():
    start = time.perf_counter()
    tasks = [asyncio.create_task(count_down(name,delay,start)) for name, delay in [('A', 1),('B', 0.8),('C', 0.5)]]
    await asyncio.wait(tasks)
    print('-' * 40)
    print('Done')

執行我們可以看到時間已經變為了執行時間最長的任務的時間了;

asyncio.run(main())

# ----------------------------------------
# 0.5010 			C = 3
# ----------------------------------------
# 0.8016 		B = 3
# ----------------------------------------
# 1.0011 	A = 3
# ----------------------------------------
# 1.0013 			C = 2
# ----------------------------------------
# 1.5021 			C = 1
# ----------------------------------------
# 1.6026 		B = 2
# ----------------------------------------
# 2.0025 	A = 2
# ----------------------------------------
# 2.4042 		B = 1
# ----------------------------------------
# 3.0038 	A = 1
# ----------------------------------------
# Done

五、使用多線程克服具體任務的異步限制

異步編程要求具體的任務必須是coroutine,也就是要求方法是異步的,否則只有任務執行完了,才能將控制權釋放給event loop;

python中的concurent.futures提供了ThreadPoolExecutor和ProcessPoolExecutor,可以直接在異步編程中使用,從而可以在單獨的線程或者進程至今任務;

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor

def count_down(name, delay, start):
    indents = (ord(name) - ord('A')) * '\t'

    n = 3
    while n:
        time.sleep(delay)

        duration = time.perf_counter() - start
        print('-'*40)
        print(f'{duration:.4f} \t{indents}{name} = {n}')
        n -=1

async def main():
    start = time.perf_counter()
    loop = asyncio.get_running_loop()
    executor = ThreadPoolExecutor(max_workers=3)
    fs = [
       loop.run_in_executor(executor, count_down, *args)  for args in [('A', 1, start), ('B', 0.8, start), ('C', 0.5, start)]
    ]

    await asyncio.wait(fs)
    print('-'*40)
    print('Done.')

asyncio.run(main())

# ----------------------------------------
# 0.5087 			C = 3
# ----------------------------------------
# 0.8196 		B = 3
# ----------------------------------------
# 1.0073 	A = 3
# ----------------------------------------
# 1.0234 			C = 2
# ----------------------------------------
# 1.5350 			C = 1
# ----------------------------------------
# 1.6303 		B = 2
# ----------------------------------------
# 2.0193 	A = 2
# ----------------------------------------
# 2.4406 		B = 1
# ----------------------------------------
# 3.0210 	A = 1
# ----------------------------------------
# Done.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM