本文首發於:行者AI
在近期的編碼工作過程中遇到了async和await裝飾的函數,查詢資料后了解到這種函數是基於協程的異步函數。這類編程方式稱為異步編程,常用在IO較頻繁的系統中,如:Tornado web框架、文件下載、網絡爬蟲等應用。協程能夠在IO等待時間就去切換執行其他任務,當IO操作結束后再自動回調,那么就會大大節省資源並提供性能。接下來便簡單的講解一下異步編程相關概念以及案例演示。
1. 協程簡介
1.1 協程的含義及實現方法
協程(Coroutine),也可以被稱為微線程,是一種用戶態內的上下文切換技術。簡而言之,其實就是通過一個線程實現代碼塊相互切換執行。例如:
def func1():
print(1)
... # 協程介入
print(2)
def func2():
print(3)
... # 協程介入
print(4)
func1()
func2()
上述代碼是普通的函數定義和執行,按流程分別執行兩個函數中的代碼,並先后會輸出:1、2、3、4。但如果介入協程技術那么就可以實現函數見代碼切換執行,最終輸入:1、3、2、4 。
在Python中有多種方式可以實現協程,例如:
-
greenlet,是一個第三方模塊,用於實現協程代碼(Gevent協程就是基於greenlet實現);
-
yield,生成器,借助生成器的特點也可以實現協程代碼;
-
asyncio,在Python3.4中引入的模塊用於編寫協程代碼;
-
async & awiat,在Python3.5中引入的兩個關鍵字,結合asyncio模塊可以更方便的編寫協程代碼。
前兩種實現方式較為老舊,所以重點關注后面的方式
標准庫實現方法
asyncio是Python 3.4版本引入的標准庫,直接內置了對異步IO的支持。
import asyncio
@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務
print(2)
@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務
print(4)
tasks = [
asyncio.ensure_future( func1() ),
asyncio.ensure_future( func2() )
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
關鍵字實現方法
async & await 關鍵字在Python3.5版本中正式引入,代替了asyncio.coroutine 裝飾器,基於他編寫的協程代碼其實就是上一示例的加強版,讓代碼可以更加簡便可讀。
import asyncio
async def func1():
print(1)
await asyncio.sleep(2) # 耗時操作
print(2)
async def func2():
print(3)
await asyncio.sleep(2) # 耗時操作
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
1.2 案例演示
例如:用代碼實現下載 url_list 中的圖片。
- 方式一:同步編程實現
# requests庫僅支持同步的http網絡請求
import requests
def download_image(url):
print("開始下載:",url)
# 發送網絡請求,下載圖片
response = requests.get(url)
# 圖片保存到本地文件
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
print("下載完成")
if __name__ == '__main__':
url_list = [
'https://www.1.jpg',
'https://www.2.jpg',
'https://www.3.jpg'
]
for item in url_list:
download_image(item)
輸出:按順序發送請求,請求一次下載一張圖片,假如每次下載花費1s,完成任務需要3s 以上。
- 方式二:基於協程的程實現
# aiohttp 為支持異步編程的http請求庫
import aiohttp
import asyncio
async def fetch(session, url):
print("發送請求:", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www.1.jpg',
'https://www.2.jpg',
'https://www.3.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
輸出:一次發送三個下載請求,同時下載,假如每次下載花費1s,完成任務僅需要1s 左右,第一種方法的耗時為第二種的三倍。
1.3 小結
協程可以讓原來要使用異步+回調方式寫的非人類代碼,用看似同步的方式寫出來。
2. 異步編程簡介
2.1 同步和異步的區別
同步 :循序漸進執行操作、請求
異步:無需等待上一步操作、請求完成,就開始下一步(每個操作仍然有先后順序)

目前python異步相關的主流技術是通過包含關鍵字async&await的async模塊實現。
2.2 異步編程-事件循環
事件循環,可以把他當做是一個while循環,這個while循環在周期性的運行並執行一些任務,在特定條件下終止循環。
# 偽代碼
任務列表 = [ 任務1, 任務2, 任務3,... ]
while True:
可執行的任務列表,已完成的任務列表 = 去任務列表中檢查所有的任務,將'可執行'和'已完成'的任務返回
for 就緒任務 in 已准備就緒的任務列表:
執行已就緒的任務
for 已完成的任務 in 已完成的任務列表:
在任務列表中移除 已完成的任
如果 任務列表 中的任務都已完成,則終止循環
在編寫程序時候可以通過如下代碼來獲取和創建事件循環。
# 方式一:
import asyncio
# 生成或獲取一個事件循環
loop = asyncio.get_event_loop()
# 將任務添加到事件循環中
loop.run_until_complete(任務)
# 方式二(python3.7及以上版本支持):
asyncio.run( 任務 )
2.3 異步編程-快速上手
async 關鍵字
- 協程函數:定義函數時候由async關鍵字裝飾的函數
async def 函數名 - 協程對象:執行協程函數得到的協程對象。
# 協程函數
async def func():
pass
# 協程對象
result = func()
注意:執行協程函數只會創建協程對象,函數內部代碼不會執行。如果想要運行協程函數內部代碼,必須要將協程對象交給事件循環來處理。
import asyncio
async def func():
print("執行協程函數內部代碼!")
result = func()
# 調用方法1:
# loop = asyncio.get_event_loop()
# loop.run_until_complete( result )
# 調用方法2:
asyncio.run( result )
await 關鍵字
await + 可等待的對象(協程對象、Future、Task對象 -> IO等待),遇到IO操作掛起當前協程(任務),等IO操作完成之后再繼續往下執行。當前協程掛起時,事件循環可以去執行其他協程(任務)。
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print("執行協程函數內部代碼")
# await等待對象的值得到結果之后再繼續向下走
response = await others()
print("IO請求結束,結果為:", response)
asyncio.run( func() )
Task 對象
Task對象的作用是在事件循環中添加多個任務,用於並發調度協程,通過asyncio.create_task(協程對象)的方式創建Task對象,這樣可以讓協程加入事件循環中等待被調度執行。
async def module_a():
print("start module_a")
await asyncio.sleep(2) # 模擬 module_a 的io操作
print('end module_a')
return 'module_a 完成'
async def module_b():
print("start module_b")
await asyncio.sleep(1) # 模擬 module_a 的io操作
print('end module_b')
return 'module_b 完成'
task_list = [
module_a(),
module_b(),
]
done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)
2.4 案例演示
例如:用代碼實現連接並查詢數據庫的同時,下載一個APK文件到本地。
import asyncio
import aiomysql
import os
import aiofiles as aiofiles
from aiohttp import ClientSession
async def get_app():
url = "http://www.123.apk"
async with ClientSession() as session:
# 網絡IO請求,獲取響應
async with session.get(url)as res:
if res.status == 200:
print("下載成功", res)
# 磁盤IO請求,讀取響應數據
apk = await res.content.read()
async with aiofiles.open("demo2.apk", "wb") as f:
# 磁盤IO請求,數據寫入本地磁盤
await f.write(apk)
else:
print("下載失敗")
async def excute_sql(sql):
# 網絡IO操作:連接MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )
# 網絡IO操作:創建CURSOR
cur = await conn.cursor()
# 網絡IO操作:執行SQL
await cur.execute(sql)
# 網絡IO操作:獲取SQL結果
result = await cur.fetchall()
print(result)
# 網絡IO操作:關閉鏈接
await cur.close()
conn.close()
task_list = [get_app(), execute_sql(sql="SELECT Host,User FROM user")]
asyncio.run(asyncio.wait(task_list))
代碼邏輯分析:
【step1】asyncio.run()創建了事件循環。wait()方法將task任務列表加入到當前的事件循環中;(注意:必須先創建事件循環,后加入任務列表,否則會報錯)
【step2】事件循環監聽事件狀態,開始執行代碼,先執行列表中的get_app()方法,當代碼執行到async with session.get(url)as res:時,遇到await關鍵字表示有IO耗時操作,線程會將該任務掛起在后台執行,並切換到另外一個異步函數excute_sql();
【step3】當代碼執行到excute_sql()的第一個IO耗時操作后,線程會重復先前的操作,將該任務掛起,去執行其他可執行代碼。假如此時事件循環監聽到get_app()中的第一IO耗時操作已經執行完成,那么線程會切換到該方法第一個IO操作后的代碼,並按順序執行直到遇上下一個await裝飾的IO操作;假如事件循環監聽到excute_sql()中的第一個IO操作先於get_app()的第一個IO操作完成,那么線程會繼續執行excute_sql的后續代碼;
【step4】線程會重復進行上述第3點中的步驟,直到代碼全部執行完成,事件循環也會隨之停止。
2.5 小結
一般來說CPU的耗時運算方式有:
計算密集型的操作:計算密集型任務的特點是要進行大量的計算、邏輯判斷,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等。
IO密集型的操作:涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和內存的速度)。
異步編程基於協程實現,如果利用協程實現計算密集型操作,因為線程在上下文之間的來回切換總會經歷類似於”計算“-->”保存“-->”創建新環境“ 的一系列操作,導致系統的整體性能反而會下降。所以異步編程並不適用於計算密集型的程序。然而在IO密集型操作匯總,協程在IO等待時間就去切換執行其他任務,當IO操作結束后再自動回調,那么就會大大節省資源並提供性能。
PS:更多技術干貨,快關注【公眾號 | xingzhe_ai】,與行者一起討論吧!
