Python協程與異步asyncio總結


異步IO:就是發起一個IO操作(如:網絡請求,文件讀寫等),這些操作一般是比較耗時的,不用等待它結束,可以繼續做其他事情,結束時會發來通知。
協程:又稱為微線程,在一個線程中執行,執行函數時可以隨時中斷,由程序(用戶)自身控制,執行效率極高,與多線程比較,沒有切換線程的開銷和多線程鎖機制。

Python中異步IO操作是通過asyncio來實現的。



異步IO(asyncio)
異步IO的asyncio庫使用事件循環驅動的協程實現並發。用戶可主動控制程序,在認為耗時IO處添加await(yield from)。在asyncio庫中,協程使用@asyncio.coroutine裝飾,使用yield from來驅動,在python3.5中作了如下更改:

@asyncio.coroutine -> async

yield from -> await

Python3.8之后 @asyncio.coroutine 裝飾器就會被移除,推薦使用async & await 關鍵字實現協程代碼。

asyncio中幾個重要概念
1.事件循環

管理所有的事件,在整個程序運行過程中不斷循環執行並追蹤事件發生的順序將它們放在隊列中,空閑時調用相應的事件處理者來處理這些事件。

2.Future

Future對象表示尚未完成的計算,還未完成的結果

3.Task

是Future的子類,作用是在運行某個任務的同時可以並發的運行多個任務。

asyncio.Task用於實現協作式多任務的庫,且Task對象不能用戶手動實例化,通過下面2個函數創建:

asyncio.async()

loop.create_task() 或 asyncio.ensure_future()

最簡單的異步IO示例
run_until_complete():

阻塞調用,直到協程運行結束才返回。參數是future,傳入協程對象時內部會自動變為future

asyncio.sleep():

模擬IO操作,這樣的休眠不會阻塞事件循環,前面加上await后會把控制權交給主事件循環,在休眠(IO操作)結束后恢復這個協程。

提示:

若在協程中需要有延時操作,應該使用 await asyncio.sleep(),而不是使用time.sleep(),因為使用time.sleep()后會釋放GIL,阻塞整個主線程,從而阻塞整個事件循環。

import asyncio
 
async def coroutine_example():
    await asyncio.sleep(1)
    print('zhihu ID: Zarten')
 
coro = coroutine_example()
 
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)
loop.close()
上面輸出:會暫停1秒,等待 asyncio.sleep(1) 返回后打印

創建Task
loop.create_task():

接收一個協程,返回一個asyncio.Task的實例,也是asyncio.Future的實例,畢竟Task是Future的子類。返回值可直接傳入run_until_complete()

返回的Task對象可以看到協程的運行情況

import asyncio
 
async def coroutine_example():
    await asyncio.sleep(1)
    print('zhihu ID: Zarten')
 
coro = coroutine_example()
 
loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print('運行情況:', task)
 
loop.run_until_complete(task)
print('再看下運行情況:', task)
loop.close()
輸出結果:

從下圖可看到,當task為finished狀態時,有個result()的方法,我們可以通過這個方法來獲取協程的返回值

 

 
         

獲取協程返回值
有2種方案可以獲取返回值。

第1種方案:通過task.result()

可通過調用 task.result() 方法來獲取協程的返回值,但是只有運行完畢后才能獲取,若沒有運行完畢,result()方法不會阻塞去等待結果,而是拋出 asyncio.InvalidStateError 錯誤

import asyncio
 
async def coroutine_example():
    await asyncio.sleep(1)
    return 'zhihu ID: Zarten'
 
coro = coroutine_example()
 
loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print('運行情況:', task)
try:
    print('返回值:', task.result())
except asyncio.InvalidStateError:
    print('task狀態未完成,捕獲了 InvalidStateError 異常')
 
loop.run_until_complete(task)
print('再看下運行情況:', task)
print('返回值:', task.result())
loop.close()
運行結果可以看到:只有task狀態運行完成時才能捕獲返回值

 


第2種方案:通過add_done_callback()回調

import asyncio
 
def my_callback(future):
    print('返回值:', future.result())
 
async def coroutine_example():
    await asyncio.sleep(1)
    return 'zhihu ID: Zarten'
 
coro = coroutine_example()
 
loop = asyncio.get_event_loop()
 
task = loop.create_task(coro)
task.add_done_callback(my_callback)
 
loop.run_until_complete(task)
loop.close()

 

 
         

 


控制任務
通過asyncio.wait()可以控制多任務

asyncio.wait()是一個協程,不會阻塞,立即返回,返回的是協程對象。傳入的參數是future或協程構成的可迭代對象。最后將返回值傳給run_until_complete()加入事件循環

最簡單控制多任務

下面代碼asyncio.wait()中,參數傳入的是由協程構成的可迭代對象

import asyncio
 
async def coroutine_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    print('執行完畢name:', name)
 
loop = asyncio.get_event_loop()
 
tasks = [coroutine_example('Zarten_' + str(i)) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)
loop.close()
輸出結果:

 

 
         

 


多任務中獲取返回值

方案1:需要通過loop.create_task()創建task對象,以便后面來獲取返回值

下面代碼asyncio.wait()中,參數傳入的是由future(task)對象構成的可迭代對象

import asyncio
 
async def coroutine_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    print('執行完畢name:', name)
    return '返回值:' + name
 
loop = asyncio.get_event_loop()
 
tasks = [loop.create_task(coroutine_example('Zarten_' + str(i))) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)
 
for task in tasks:
    print(task.result())
 
loop.close()
方案2:通過回調add_done_callback()來獲取返回值

import asyncio
 
def my_callback(future):
    print('返回值:', future.result())
 
async def coroutine_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    print('執行完畢name:', name)
    return '返回值:' + name
 
loop = asyncio.get_event_loop()
 
tasks = []
for i in range(3):
    task = loop.create_task(coroutine_example('Zarten_' + str(i)))
    task.add_done_callback(my_callback)
    tasks.append(task)
 
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)
 
loop.close()
輸出結果:

 

 
         

 


動態添加協程
方案是創建一個線程,使事件循環在線程內永久運行

相關函數介紹:

loop.call_soon_threadsafe() :與 call_soon()類似,等待此函數返回后馬上調用回調函數,返回值是一個 asyncio.Handle 對象,此對象內只有一個方法為 cancel()方法,用來取消回調函數。

loop.call_soon() : 與call_soon_threadsafe()類似,call_soon_threadsafe() 是線程安全的

loop.call_later():延遲多少秒后執行回調函數

loop.call_at():在指定時間執行回調函數,這里的時間統一使用 loop.time() 來替代 time.sleep()

asyncio.run_coroutine_threadsafe(): 動態的加入協程,參數為一個回調函數和一個loop對象,返回值為future對象,通過future.result()獲取回調函數返回值

動態添加協程同步方式

通過調用 call_soon_threadsafe()函數,傳入一個回調函數callback和一個位置參數

注意:同步方式,回調函數 thread_example()為普通函數

import asyncio
from threading import Thread
 
def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
 
def thread_example(name):
    print('正在執行name:', name)
    return '返回結果:' + name
 
 
new_loop = asyncio.new_event_loop()
t = Thread(target= start_thread_loop, args=(new_loop,))
t.start()
 
handle = new_loop.call_soon_threadsafe(thread_example, 'Zarten1')
handle.cancel()
 
new_loop.call_soon_threadsafe(thread_example, 'Zarten2')
 
print('主線程不會阻塞')
 
new_loop.call_soon_threadsafe(thread_example, 'Zarten3')
 
print('繼續運行中...')
輸出結果:

動態添加協程異步方式

通過調用 asyncio.run_coroutine_threadsafe()函數,傳入一個回調函數callback和一個loop對象

注意:異步方式,回調函數 thread_example()為協程

import asyncio
from threading import Thread
 
def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
 
async def thread_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    return '返回結果:' + name
 
 
new_loop = asyncio.new_event_loop()
t = Thread(target= start_thread_loop, args=(new_loop,))
t.start()
 
future = asyncio.run_coroutine_threadsafe(thread_example('Zarten1'), new_loop)
print(future.result())
 
asyncio.run_coroutine_threadsafe(thread_example('Zarten2'), new_loop)
 
print('主線程不會阻塞')
 
asyncio.run_coroutine_threadsafe(thread_example('Zarten3'), new_loop)
 
print('繼續運行中...')
輸出結果:

 

 
         

 



從上面2個例子中,當主線程運行完成后,由於子線程還沒有退出,故主線程還沒退出,等待子線程退出中。若要主線程退出時子線程也退出,可以設置子線程為守護線程 t.setDaemon(True)

協程中生產-消費模型設計
通過上面的動態添加協程的思想,我們可以設計一個生產-消費的模型,至於中間件(管道)是什么無所謂,下面以內置隊列和redis隊列來舉例說明。

提示:若想主線程退出時,子線程也隨之退出,需要將子線程設置為守護線程,函數 setDaemon(True)

內置雙向隊列模型
使用內置雙向隊列deque

import asyncio
from threading import Thread
from collections import deque
import random
import time
 
def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
 
def consumer():
    while True:
        if dq:
            msg = dq.pop()
            if msg:
                asyncio.run_coroutine_threadsafe(thread_example('Zarten'+ msg), new_loop)
 
 
async def thread_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(2)
    return '返回結果:' + name
 
 
 
dq = deque()
 
new_loop = asyncio.new_event_loop()
loop_thread = Thread(target= start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()
 
consumer_thread = Thread(target= consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()
 
while True:
    i = random.randint(1, 10)
    dq.appendleft(str(i))
    time.sleep(2)
輸出結果:

 

 
         

 



redis隊列模型
下面代碼的主線程和雙向隊列的主線程有些不同,只是換了一種寫法而已,代碼如下

生產者代碼:

import redis
 
conn_pool = redis.ConnectionPool(host='127.0.0.1')
redis_conn = redis.Redis(connection_pool=conn_pool)
 
redis_conn.lpush('coro_test', '1')
redis_conn.lpush('coro_test', '2')
redis_conn.lpush('coro_test', '3')
redis_conn.lpush('coro_test', '4')
消費者代碼:

import asyncio
from threading import Thread
import redis
 
def get_redis():
    conn_pool = redis.ConnectionPool(host= '127.0.0.1')
    return redis.Redis(connection_pool= conn_pool)
 
def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
 
async def thread_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(2)
    return '返回結果:' + name
 
 
redis_conn = get_redis()
 
new_loop = asyncio.new_event_loop()
loop_thread = Thread(target= start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()
 
#循環接收redis消息並動態加入協程
while True:
    msg = redis_conn.rpop('coro_test')
    if msg:
        asyncio.run_coroutine_threadsafe(thread_example('Zarten' + bytes.decode(msg, 'utf-8')), new_loop)
輸出結果:

 

 
         

 


asyncio在aiohttp中的應用
aiohttp是一個異步庫,分為客戶端和服務端,下面只是簡單對客戶端做個介紹以及一個經常遇到的異常情況。aiohttp客戶端為異步網絡請求庫

aiohttp客戶端最簡單的例子
import asyncio
import aiohttp
 
count = 0
 
async def get_http(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            global count
            count += 1
            print(count, res.status)
 
def main():
    loop = asyncio.get_event_loop()
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}'
    tasks = [get_http(url.format(i)) for i in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
if __name__ == '__main__':
    main()
aiohttp並發量太大的異常解決方案
在使用aiohttp客戶端進行大量並發請求時,程序會拋出 ValueError: too many file descriptors in select() 的錯誤。

異常代碼示例

說明:測試機器為windows系統

import asyncio
import aiohttp
 
count = 0
 
async def get_http(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            global count
            count += 1
            print(count, res.status)
 
def main():
    loop = asyncio.get_event_loop()
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}'
    tasks = [get_http(url.format(i)) for i in range(600)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
if __name__ == '__main__':
    main()

 

 
         

原因分析:使用aiohttp時,python內部會使用select(),操作系統對文件描述符最大數量有限制,linux為1024個,windows為509個。

解決方案:

最常見的解決方案是:限制並發數量(一般500),若並發的量不大可不作限制。其他方案這里不做介紹,如windows下使用loop = asyncio.ProactorEventLoop() 以及使用回調方式等

限制並發數量方法
提示:此方法也可用來作為異步爬蟲的限速方法(反反爬)

使用semaphore = asyncio.Semaphore(500) 以及在協程中使用 async with semaphore: 操作

具體代碼如下:

import asyncio
import aiohttp
 
 
async def get_http(url):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as res:
                global count
                count += 1
                print(count, res.status)
 
if __name__ == '__main__':
    count = 0
 
    semaphore = asyncio.Semaphore(500)
    loop = asyncio.get_event_loop()
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}'
    tasks = [get_http(url.format(i)) for i in range(600)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

原文地址,原作Zarten。

https://zhuanlan.zhihu.com/p/59621713
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM