如何使用 asyncio 限制協程的並發數


有同學問道,如果使用 asyncio + httpx 實現並發請求,怎么限制請求的頻率呢?怎么限制最多只能有 x 個請求同時發出呢?我們今天給出兩種方案。

提出問題

假設如果我們同時發起12個請求,每個請求的時間不同,那么總共的請求時間大概跟最長耗時的請求差不多。我們先來寫一個用於測試的例子:

import asyncio
import httpx
import time


async def req(delay):
    print(f'請求一個延遲為{delay}秒的接口')
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
        result = resp.json()
        print(result)


async def main():
    start = time.time()
    delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8]
    task_list = []
    for delay in delay_list:
        task = asyncio.create_task(req(delay))
        task_list.append(task)
    await asyncio.gather(*task_list)
    end = time.time()
    print(f'一共耗時:{end - start}')

asyncio.run(main())

這段代碼,使用 for 循環創建了12個協程任務,這些任務幾乎同時運行,於是,請求完成所有的接口,總共耗時 11s

現在的問題是,由於網站有反爬蟲機制,最多只能同時發起3個請求。那么我們怎么確保同一時間最多只有3個協程在請求網絡呢?

限制協程任務數

第一個方案跟以前限制多線程的線程數的方案相同。我們創建一個列表,確保列表里面最多只有3個任務,然后持續循環檢查,發現有任務完成了,就移除這個完成的任務,並加入一個新的任務,直到待爬的列表為空,這個任務列表也為空。代碼如下:

import asyncio
import httpx
import time


async def req(delay):
    print(f'請求一個延遲為{delay}秒的接口')
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
        result = resp.json()
        print(result)


async def main():
    start = time.time()
    delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8]
    task_list = []
    while True:
        if not delay_list and not task_list:
            break
        while len(task_list) < 3:
            if delay_list:
                delay = delay_list.pop()
                task = asyncio.create_task(req(delay))
                task_list.append(task)
            else:
                break
        task_list = [task for task in task_list if not task.done()]
        await asyncio.sleep(1)
    end = time.time()
    print(f'一共耗時:{end - start}')

asyncio.run(main())

 

總共耗時大概28秒左右。比串行需要的58秒快了一半,但比全部同時並發多了一倍。

使用 Semaphore

asyncio 實際上自帶了一個限制協程數量的類,叫做Semaphore。我們只需要初始化它,傳入最大允許的協程數量,然后就可以通過上下文管理器來使用。我們看一下代碼:

import asyncio
import httpx
import time


async def req(delay, sem):
    print(f'請求一個延遲為{delay}秒的接口')
    async with sem:
        async with httpx.AsyncClient(timeout=20) as client:
            resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
            result = resp.json()
            print(result)


async def main():
    start = time.time()
    delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8]
    task_list = []
    sem = asyncio.Semaphore(3)
    for delay in delay_list:
        task = asyncio.create_task(req(delay, sem))
        task_list.append(task)
    await asyncio.gather(*task_list)

    end = time.time()
    print(f'一共耗時:{end - start}')

asyncio.run(main())

耗時為22秒,比第一個方案更快。

我們來看看Semaphore的用法,它的格式為:

sem = asyncio.Semaphore(同時運行的協程數量)

async def func(sem):
    async with sem:
        這里是並發執行的代碼

task_list = []
for _ in range(總共需要執行的任務數):
    task = asyncio.create_task(func(sem))
    task_list.append(task)
await asyncio.gather(*task_list)

當我們要限制一個協程的並發數的時候,可以在調用協程之前,先初始化一個Semaphore對象。然后把這個對象傳到需要限制並發的協程里面,在協程里面,使用異步上下文管理器包住你的正式代碼:

async with sem:
    正式代碼

這樣一來,如果並發數沒有達到限制,那么async with sem會瞬間執行完成,進入里面的正式代碼中。如果並發數已經達到了限制,那么其他的協程會阻塞在async with sem這個地方,直到正在運行的某個協程完成了,退出了,才會放行一個新的協程去替換掉這個已經完成的協程。

這個寫法其實跟多線程的加鎖很像。只不過鎖是確保同一個時間只有一個線程在運行,而Semaphore可以人為指定能有多少個協程同時運行。

如何限制1分鍾內能夠運行的協程數

可能同學看了上面的例子以后,只知道如何限制同時運行的協程數。但是怎么限制在一段時間里同時運行的協程數呢?

其實非常簡單,在並發的協程里面加個 asyncio.sleep 就可以了。例如上面的例子,我想限制每分鍾只能有3個協程,那么可以把代碼改為:

async def req(delay, sem):
    print(f'請求一個延遲為{delay}秒的接口')
    async with sem:
        async with httpx.AsyncClient(timeout=20) as client:
            resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
            result = resp.json()
            print(result)
    await asyncio.sleep(60)

總結

如果大家要限制協程的並發數,那么最簡單的辦法就是使用asyncio.Semaphore。但需要注意的是,只能在啟動協程之前初始化它,然后傳給協程。要確保所有並發協程拿到的是同一個Semaphore對象。

當然,你的程序里面,可能有多個不同的部分,有些部分限制並發數為 a,有些部分限制並發數為 b。那么你可以初始化多個Semaphore對象,分別傳給不同的協程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM