協程實現了在單線程下的並發,每個協程共享線程的幾乎所有的資源,除了協程自己私有的上下文棧;協程的切換屬於程序級別的切換,對於操作系統來說是無感知的,因此切換速度更快、開銷更小、效率更高,在有多IO操作的業務中能極大提高效率。
系列文章
asyncio模塊創建協程
asyncio在python3.4后被內置在python中,使得python的協程創建變得更加方便。
import asyncio
import os
# async 關鍵字定義一個協程
async def target_func1():
print('the func start')
print(os.getpid())
print('the func end')
def run():
# 創建一個協程對象
coroutine = target_func1()
# 創建一個事件循環
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine) # 將協程對象添加到事件循環,運行直到結束
print(os.getpid())
loop.close() # 關閉事件循環
def run1():
# 創建一個事件循環
loop = asyncio.get_event_loop()
# 創建一個協程對象
coroutine = target_func1(loop)
loop.create_task(coroutine) # 創建一個任務並添加到事件循環中
loop.run_forever() # 開啟無限循環,需要在異步函數中調用stop()使停止
loop.close()
if __name__ == '__main__':
run()
# 結果
the func start
4876
the func end
4876
以上可知,所有的代碼段都是在一個進程的單線程中執行。
asyncio模塊分析
- coroutine
被async修飾的函數調用后會生成協程函數,可以通過send喚醒執行。
async def target_func1():
print('the func start')
print(os.getpid())
print('the func end')
coroutine = target_func1()
try:
coroutine.send(None) # 喚醒協程
except StopIteration:
print('xx')
coroutine.close() # 關閉
- async
async關鍵字可以定義一個協程對象,被async修飾的函數變成了一個協程對象而不是一個普通的函數。
async def target_func1():
pass
coroutine = target_func1()
print(coroutine)
- await
await用於控制事件的執行順序,它只能在異步函數中使用,即被async關鍵字定義的協程函數,否則報錯。當執行到await時,當前協程掛起,轉而去執行await后面的協程,完畢后再回到當前協程繼續往下。
# async 關鍵字定義一個協程
async def target_func1():
print('the func start')
x = await target_func2() # 當前協程掛起
print(x)
print('the func end')
return 1
async def target_func2():
"""
目標函數2
:return:
"""
time.sleep(2)
print('the func end2')
return 0
- 主要方法
asyncio.get_event_loop():創建一個事件循環,所有的異步函數都需要在事件循環中運行;
asyncio.ensure_future():創建一個任務
asyncio.gather(*fs):添加並行任務
asyncio.wait(fs):添加並行任務,可以是列表
loop.run_until_complete(func):添加協程函數同時啟動阻塞直到結束
loop.run_forever():運行事件無限循環,直到stop被調用
loop.create_task():創建一個任務並添加到循環
loop.close():關閉循環
loop.time():循環開始后到當下的時間
loop.stop():停止循環
loop.is_closed() # 判斷循環是否關閉
loop.create_future():創建一個future對象,推薦使用這個函數而不要直接創建future實例
loop.call_soon() # 設置回調函數,不能接受返回的參數,需要用到future對象,立即回調
loop.call_soon_threadsafe() # 線程安全的對象
loop.call_later() # 異步返回后開始算起,延遲回調
loop.call_at() # 循環開始多少s回調
loop.call_exception_handler() # 錯誤處理
- 主要的類
Future:主要用來保存任務的狀態;
Task:Future的子類,擴展了Future的功能;
# Future
from asyncio import Future
# future = Future()
# future.result() # 獲取任務的結果
# future.remove_done_callback(fn) # 刪除所有的回調函數並返回個數
# future.set_result('result') # 設置任務的結果,必須在result()之前執行,否則報錯
# future.exception() # 獲取任務的錯誤信息
# future.set_exception('bad') # 設置任務的錯誤信息
# future.add_done_callback('fn') # 添加回調函數
# Task
current_task():返回循環當前的任務,類方法
all_tasks():返回事件循環所有的任務
get_stack():獲取其他協程的堆棧列表
print_stack:輸出其他協程的堆棧列表
cancel:取消任務
實例
- 添加多個任務到事件循環
async def target_func3(name):
"""
:return:
"""
await asyncio.sleep(1)
print(name)
return 0
def run1():
# 創建一個事件循環
loop = asyncio.get_event_loop()
x = loop.run_until_complete(asyncio.gather(target_func3('A'),target_func3('B'),target_func3('C'),))
print(x) # 等待返回結果,一個列表,按照事件添加的順序,但是計算的順序是不定的
loop.close()
if __name__ == '__main__':
run1()
- 使用run_forever啟動循環獲取異步計算結果
run_forever()不能直接得到異步函數的返回結果,需要使用Future類來作為第三方保存結果,同時設置回調函數;
from asyncio import Future
from functools import partial
async def target_func0(name, future):
"""
目標函數2
:return:
"""
time.sleep(1)
print(name)
future.set_result(name) # 設置返回結果
def got_result(loop, future):
print(future.result()) # 處理結果
loop.stop() # 循環停止
def run():
loop = asyncio.get_event_loop()
future = Future(loop=loop)
res = asyncio.ensure_future(target_func0('A', future)) # 生成一個Task任務
print(res)
future.add_done_callback(partial(got_result, loop)) # 回調函數默認只能有一個參數future,必須使用偏函數
# print(future.result()) # future上下文必須先調用future.set_result。
loop.run_forever()
loop.close()
if __name__ == '__main__':
run()
- 鏈協程
協程里調用等待另外的協程完成后才能返回。
import asyncio
import time
# async 關鍵字定義一個協程
async def target_func1():
print('the func start')
x = await target_func2() # 等待協程完成,控制執行順序
print(x)
print('the func end')
return 1
async def target_func2():
"""
目標函數2
:return:
"""
time.sleep(2)
print('the func end2')
return 0
def run1():
# 創建一個事件循環
loop = asyncio.get_event_loop()
x = loop.run_until_complete(target_func1())
print(x)
loop.close()
if __name__ == '__main__':
run()
- 普通回調實例
import asyncio
import time
from functools import partial
# async 關鍵字定義一個協程
async def target_func1():
print('the func end')
return 1
def get_res(loop):
print('xxxx')
loop.stop()
def run1():
# 創建一個事件循環
loop = asyncio.get_event_loop()
loop.create_task(target_func1())
# loop.call_soon(partial(get_res, loop)) # 設置回調函數,不能接受返回的參數,需要用到future對象
# loop.call_soon_threadsafe() # 線程安全的對象
# loop.call_later(delay=5, callback=partial(get_res, loop)) # 異步返回后開始算起,延遲5秒回調
# loop.call_at(when=8000,callback=partial(get_res, loop)) # 循環開始第8秒回調
# loop.call_exception_handler() # 錯誤處理
loop.run_forever()
loop.close()
if __name__ == '__main__':
run1()
本地IO和網絡IO的異步使用
使用協程的目的是在系統發生io阻塞的時候,可以交出CUP的控制權,讓其去執行其他的任務。實際使用時一般的場景有本地IO和網絡IO。
- 網絡IO
# 使用asyncio+aiohttp,如果想異步化,網絡請求需要拋棄requests包
import asyncio
import time
from aiohttp import ClientSession
async def target2():
print('start2')
async with ClientSession() as session:
async with session.get(url='http://www.baidu.com') as rsp:
data = await rsp.read()
print('end2')
return data
def run1():
# 創建一個事件循環
loop = asyncio.get_event_loop()
tasks = [target2() for i in range(100)]
ts = asyncio.gather(*tasks)
t = time.time()
loop.run_until_complete(ts)
print(time.time()-t)
loop.close()
if __name__ == '__main__':
run1()
- 本地io
核心思想:將文件讀寫的while循環換成事件循環。
可參考:https://github.com/lyyyuna/script_collection/blob/master/aysncfile/asyncfile.py
協程Queue
asyncio模塊也有自己的queue實現生產消費模式,只要有三種隊列:Queue(先進先出),PriorityQueue(優先級隊列),LifoQueue(棧),但是Queue不是線程安全的類,也就是說在多進程或多線程的情況下不要使用這個隊列。
import asyncio
import time
from asyncio import Queue
# async 關鍵字定義一個協程
async def target_func1(q:Queue):
for i in range(100):
await q.put(i)
async def target_func2(q:Queue):
for i in range(100):
x = await q.get()
print(x)
def run1():
# 創建一個事件循環
loop = asyncio.get_event_loop()
q = Queue(100)
task = asyncio.gather(target_func1(q), target_func2(q))
loop.run_until_complete(task)
loop.close()
if __name__ == '__main__':
run1()
Queue的get(),join(),put()方法返回的都是協程,需要使用await關鍵字。