Python中協程異步IO(asyncio)詳解


介紹

異步IO:就是發起一個IO操作(如:網絡請求,文件讀寫等),這些操作一般是比較耗時的,不用等待它結束,可以繼續做其他事情,結束時會發來通知。

協程:又稱為微線程,在一個線程中執行,執行函數時可以隨時中斷,由程序(用戶)自身控制,執行效率極高,與多線程比較,沒有切換線程的開銷和多線程鎖機制。

python中異步IO操作是通過asyncio來實現的。

為了更加詳細說明asyncio,我們先從協程的最基礎開始講解。

協程基礎

從語句上看,協程和生成器類似,都是包含了yield關鍵字,不同之處在於協程中yield關鍵詞通常出現在=右邊,可以產出值a(y = yield a)或不產出值時為None(y = yield)。調用方可以用send函數發送值給協程。

激活協程時在yield處暫停,等待調用方發送數據,下次繼續在yield暫停。從根本上看,yield是流程控制的工具,可以實現協作式多任務,這也是后面講解異步IO的基礎。

最簡單的協程示例

使用協程時需要預激活(next函數)后才能使用send發送值。(a = yield b),next時會產出yield右邊的值b,send時接收值的是yield左邊的值a

def coroutine_example(name): print('start coroutine...name:', name) x = yield name #調用next()時,產出yield右邊的值后暫停;調用send()時,產出值賦給x,並往下運行 print('send值:', x) coro = coroutine_example('Zarten') print('next的返回值:', next(coro)) print('send的返回值:', coro.send(6))

輸出結果:

必須先調用next()函數預激活協程,不然send()函數無法使用。

調用next()時,產出yield右邊的值后暫停不再往yield的下一行執行(一般不需要next產出值),等待send的到來,調用send()時,產出值賦給x(可對x作進一步處理),並往下運行。

協程結束時會跟生成器一樣拋出StopIteration的異常給調用方,調用方可以捕獲它后處理。

讓協程返回值以及yield from說明

獲取協程的返回值

當結束協程時,會返回返回值,調用方會拋出StopIteration異常,返回值就在異常對象的value屬性中

def coroutine_example(name): print('start coroutine...name:', name) while True: x = yield name #調用next()時,產出yield右邊的值后暫停;調用send()時,產出值賦給x,並往下運行 if x is None: return 'zhihuID: Zarten' print('send值:', x) coro = coroutine_example('Zarten') next(coro) print('send的返回值:', coro.send(6)) try: coro.send(None) except StopIteration as e: print('返回值:', e.value) 

yield from 說明

yield from跟for循環很相似,但功能更多一些,不信你看下面代碼

def for_test(): for i in range(3): yield i print(list(for_test())) def yield_from_test(): yield from range(3) print(list(yield_from_test())) 

下面是輸出結果:

其實yield from內部會自動捕獲StopIteration異常,並把異常對象的value屬性變成yield from表達式的值。

yield from x 表達式內部首先是調用iter(x),然后再調用next(),因此x是任何的可迭代對象。yield from 的主要功能就是打開雙向通道,把最外層的調用方和最內層的子生成器連接起來。

下面代碼展示:調用方發送的值在yield from表達式處直接傳遞給子生成器,並在yield from處等待子生成器的返回

def coroutine_example(name): print('start coroutine...name:', name) x = yield name #調用next()時,產出yield右邊的值后暫停;調用send()時,產出值賦給x,並往下運行 print('send值:', x) return 'zhihuID: Zarten' def grouper2(): result2 = yield from coroutine_example('Zarten') #在此處暫停,等待子生成器的返回后繼續往下執行 print('result2的值:', result2) return result2 def grouper(): result = yield from grouper2() #在此處暫停,等待子生成器的返回后繼續往下執行 print('result的值:', result) return result def main(): g = grouper() next(g) try: g.send(10) except StopIteration as e: print('返回值:', e.value) if __name__ == '__main__': main()

輸出結果:

從上面也可看到yield from起到一個雙向通道的作用,同時子生成器也可使用yield from調用另一個子生成器,一直嵌套下去直到遇到yield表達式結束鏈式。

yield from一般用於asyncio模塊做異步IO

 

異步IO(asyncio)

從上面我們知道了協程的基礎,異步IO的asyncio庫使用事件循環驅動的協程實現並發。用戶可主動控制程序,在認為耗時IO處添加await(yield from)。在asyncio庫中,協程使用@asyncio.coroutine裝飾,使用yield from來驅動,在python3.5中作了如下更改:

@asyncio.coroutine -> async

yield from -> await

asyncio中幾個重要概念

1.事件循環

管理所有的事件,在整個程序運行過程中不斷循環執行並追蹤事件發生的順序將它們放在隊列中,空閑時調用相應的事件處理者來處理這些事件。

2.Future

Future對象表示尚未完成的計算,還未完成的結果

3.Task

是Future的子類,作用是在運行某個任務的同時可以並發的運行多個任務。

asyncio.Task用於實現協作式多任務的庫,且Task對象不能用戶手動實例化,通過下面2個函數創建:

asyncio.async()

loop.create_task() 或 asyncio.ensure_future()

最簡單的異步IO示例

run_until_complete():

阻塞調用,直到協程運行結束才返回。參數是future,傳入協程對象時內部會自動變為future

asyncio.sleep():

模擬IO操作,這樣的休眠不會阻塞事件循環,前面加上await后會把控制權交給主事件循環,在休眠(IO操作)結束后恢復這個協程。

提示:若在協程中需要有延時操作,應該使用 await asyncio.sleep(),而不是使用time.sleep(),因為使用time.sleep()后會釋放GIL,阻塞整個主線程,從而阻塞整個事件循環。

import asyncio async def coroutine_example(): await asyncio.sleep(1) print('zhihu ID: Zarten') coro = coroutine_example() loop = asyncio.get_event_loop() loop.run_until_complete(coro) loop.close()

上面輸出:會暫停1秒,等待 asyncio.sleep(1) 返回后打印

創建Task

loop.create_task():

接收一個協程,返回一個asyncio.Task的實例,也是asyncio.Future的實例,畢竟Task是Future的子類。返回值可直接傳入run_until_complete()

返回的Task對象可以看到協程的運行情況

import asyncio async def coroutine_example(): await asyncio.sleep(1) print('zhihu ID: Zarten') coro = coroutine_example() loop = asyncio.get_event_loop() task = loop.create_task(coro) print('運行情況:', task) loop.run_until_complete(task) print('再看下運行情況:', task) loop.close()

輸出結果:

從下圖可看到,當task為finished狀態時,有個result()的方法,我們可以通過這個方法來獲取協程的返回值

 

獲取協程返回值

有2種方案可以獲取返回值。

  • 第1種方案:通過task.result()

可通過調用 task.result() 方法來獲取協程的返回值,但是只有運行完畢后才能獲取,若沒有運行完畢,result()方法不會阻塞去等待結果,而是拋出 asyncio.InvalidStateError 錯誤

import asyncio async def coroutine_example(): await asyncio.sleep(1) return 'zhihu ID: Zarten' coro = coroutine_example() loop = asyncio.get_event_loop() task = loop.create_task(coro) print('運行情況:', task) try: print('返回值:', task.result()) except asyncio.InvalidStateError: print('task狀態未完成,捕獲了 InvalidStateError 異常') loop.run_until_complete(task) print('再看下運行情況:', task) print('返回值:', task.result()) loop.close()

運行結果可以看到:只有task狀態運行完成時才能捕獲返回值

  • 第2種方案:通過add_done_callback()回調
import asyncio def my_callback(future): print('返回值:', future.result()) async def coroutine_example(): await asyncio.sleep(1) return 'zhihu ID: Zarten' coro = coroutine_example() loop = asyncio.get_event_loop() task = loop.create_task(coro) task.add_done_callback(my_callback) loop.run_until_complete(task) loop.close()

控制任務

通過asyncio.wait()可以控制多任務

asyncio.wait()是一個協程,不會阻塞,立即返回,返回的是協程對象。傳入的參數是future或協程構成的可迭代對象。最后將返回值傳給run_until_complete()加入事件循環

  • 最簡單控制多任務

下面代碼asyncio.wait()中,參數傳入的是由協程構成的可迭代對象

import asyncio async def coroutine_example(name): print('正在執行name:', name) await asyncio.sleep(1) print('執行完畢name:', name) loop = asyncio.get_event_loop() tasks = [coroutine_example('Zarten_' + str(i)) for i in range(3)] wait_coro = asyncio.wait(tasks) loop.run_until_complete(wait_coro) loop.close()

輸出結果:

 

  • 多任務中獲取返回值

方案1:需要通過loop.create_task()創建task對象,以便后面來獲取返回值

下面代碼asyncio.wait()中,參數傳入的是由future(task)對象構成的可迭代對象

import asyncio async def coroutine_example(name): print('正在執行name:', name) await asyncio.sleep(1) print('執行完畢name:', name) return '返回值:' + name loop = asyncio.get_event_loop() tasks = [loop.create_task(coroutine_example('Zarten_' + str(i))) for i in range(3)] wait_coro = asyncio.wait(tasks) loop.run_until_complete(wait_coro) for task in tasks: print(task.result()) loop.close()

 

方案2:通過回調add_done_callback()來獲取返回值

import asyncio def my_callback(future): print('返回值:', future.result()) async def coroutine_example(name): print('正在執行name:', name) await asyncio.sleep(1) print('執行完畢name:', name) return '返回值:' + name loop = asyncio.get_event_loop() tasks = [] for i in range(3): task = loop.create_task(coroutine_example('Zarten_' + str(i))) task.add_done_callback(my_callback) tasks.append(task) wait_coro = asyncio.wait(tasks) loop.run_until_complete(wait_coro) loop.close()

輸出結果:

 

動態添加協程

方案是創建一個線程,使事件循環在線程內永久運行

相關函數介紹:

loop.call_soon_threadsafe() :與 call_soon()類似,等待此函數返回后馬上調用回調函數,返回值是一個 asyncio.Handle 對象,此對象內只有一個方法為 cancel()方法,用來取消回調函數。

loop.call_soon() : 與call_soon_threadsafe()類似,call_soon_threadsafe() 是線程安全的

loop.call_later():延遲多少秒后執行回調函數

loop.call_at():在指定時間執行回調函數,這里的時間統一使用 loop.time() 來替代 time.sleep()

asyncio.run_coroutine_threadsafe(): 動態的加入協程,參數為一個回調函數和一個loop對象,返回值為future對象,通過future.result()獲取回調函數返回值

  • 動態添加協程同步方式

通過調用 call_soon_threadsafe()函數,傳入一個回調函數callback和一個位置參數

注意:同步方式,回調函數 thread_example()為普通函數

import asyncio from threading import Thread def start_thread_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def thread_example(name): print('正在執行name:', name) return '返回結果:' + name new_loop = asyncio.new_event_loop() t = Thread(target= start_thread_loop, args=(new_loop,)) t.start() handle = new_loop.call_soon_threadsafe(thread_example, 'Zarten1') handle.cancel() new_loop.call_soon_threadsafe(thread_example, 'Zarten2') print('主線程不會阻塞') new_loop.call_soon_threadsafe(thread_example, 'Zarten3') print('繼續運行中...')

輸出結果:

 

  • 動態添加協程異步方式

通過調用 asyncio.run_coroutine_threadsafe()函數,傳入一個回調函數callback和一個loop對象

注意:異步方式,回調函數 thread_example()為協程

import asyncio from threading import Thread def start_thread_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def thread_example(name): print('正在執行name:', name) await asyncio.sleep(1) return '返回結果:' + name new_loop = asyncio.new_event_loop() t = Thread(target= start_thread_loop, args=(new_loop,)) t.start() future = asyncio.run_coroutine_threadsafe(thread_example('Zarten1'), new_loop) print(future.result()) asyncio.run_coroutine_threadsafe(thread_example('Zarten2'), new_loop) print('主線程不會阻塞') asyncio.run_coroutine_threadsafe(thread_example('Zarten3'), new_loop) print('繼續運行中...')

輸出結果:

從上面2個例子中,當主線程運行完成后,由於子線程還沒有退出,故主線程還沒退出,等待子線程退出中。若要主線程退出時子線程也退出,可以設置子線程為守護線程 t.setDaemon(True)

 

協程中生產-消費模型設計

通過上面的動態添加協程的思想,我們可以設計一個生產-消費的模型,至於中間件(管道)是什么無所謂,下面以內置隊列和redis隊列來舉例說明。

提示:若想主線程退出時,子線程也隨之退出,需要將子線程設置為守護線程,函數 setDaemon(True)

內置雙向隊列模型

使用內置雙向隊列deque

import asyncio from threading import Thread from collections import deque import random import time def start_thread_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def consumer(): while True: if dq: msg = dq.pop() if msg: asyncio.run_coroutine_threadsafe(thread_example('Zarten'+ msg), new_loop) async def thread_example(name): print('正在執行name:', name) await asyncio.sleep(2) return '返回結果:' + name dq = deque() new_loop = asyncio.new_event_loop() loop_thread = Thread(target= start_thread_loop, args=(new_loop,)) loop_thread.setDaemon(True) loop_thread.start() consumer_thread = Thread(target= consumer) consumer_thread.setDaemon(True) consumer_thread.start() while True: i = random.randint(1, 10) dq.appendleft(str(i)) time.sleep(2)

輸出結果:

 

redis隊列模型

下面代碼的主線程和雙向隊列的主線程有些不同,只是換了一種寫法而已,代碼如下

生產者代碼:

import redis conn_pool = redis.ConnectionPool(host='127.0.0.1') redis_conn = redis.Redis(connection_pool=conn_pool) redis_conn.lpush('coro_test', '1') redis_conn.lpush('coro_test', '2') redis_conn.lpush('coro_test', '3') redis_conn.lpush('coro_test', '4') 

消費者代碼:

import asyncio from threading import Thread import redis def get_redis(): conn_pool = redis.ConnectionPool(host= '127.0.0.1') return redis.Redis(connection_pool= conn_pool) def start_thread_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def thread_example(name): print('正在執行name:', name) await asyncio.sleep(2) return '返回結果:' + name redis_conn = get_redis() new_loop = asyncio.new_event_loop() loop_thread = Thread(target= start_thread_loop, args=(new_loop,)) loop_thread.setDaemon(True) loop_thread.start() #循環接收redis消息並動態加入協程 while True: msg = redis_conn.rpop('coro_test') if msg: asyncio.run_coroutine_threadsafe(thread_example('Zarten' + bytes.decode(msg, 'utf-8')), new_loop) 

輸出結果:

 

asyncio在aiohttp中的應用

aiohttp是一個異步庫,分為客戶端和服務端,下面只是簡單對客戶端做個介紹以及一個經常遇到的異常情況。aiohttp客戶端為異步網絡請求庫

aiohttp客戶端最簡單的例子

import asyncio import aiohttp count = 0 async def get_http(url): async with aiohttp.ClientSession() as session: async with session.get(url) as res: global count count += 1 print(count, res.status) def main(): loop = asyncio.get_event_loop() url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}' tasks = [get_http(url.format(i)) for i in range(10)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() if __name__ == '__main__': main()

 

aiohttp並發量太大的異常解決方案

在使用aiohttp客戶端進行大量並發請求時,程序會拋出 ValueError: too many file descriptors in select() 的錯誤。

異常代碼示例

說明:測試機器為windows系統

import asyncio import aiohttp count = 0 async def get_http(url): async with aiohttp.ClientSession() as session: async with session.get(url) as res: global count count += 1 print(count, res.status) def main(): loop = asyncio.get_event_loop() url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}' tasks = [get_http(url.format(i)) for i in range(600)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() if __name__ == '__main__': main()

原因分析:使用aiohttp時,python內部會使用select(),操作系統對文件描述符最大數量有限制,linux為1024個,windows為509個。

解決方案:

最常見的解決方案是:限制並發數量(一般500),若並發的量不大可不作限制。其他方案這里不做介紹,如windows下使用loop = asyncio.ProactorEventLoop() 以及使用回調方式等

限制並發數量方法

提示:此方法也可用來作為異步爬蟲的限速方法(反反爬)

使用semaphore = asyncio.Semaphore(500) 以及在協程中使用 async with semaphore: 操作

具體代碼如下:

import asyncio import aiohttp async def get_http(url): async with semaphore: async with aiohttp.ClientSession() as session: async with session.get(url) as res: global count count += 1 print(count, res.status) if __name__ == '__main__': count = 0 semaphore = asyncio.Semaphore(500) loop = asyncio.get_event_loop() url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}' tasks = [get_http(url.format(i)) for i in range(600)] loop.run_until_complete(asyncio.wait(tasks))


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM