Python協程 & 異步編程(asyncio) 入門介紹


本文首發於:行者AI

在近期的編碼工作過程中遇到了async和await裝飾的函數,查詢資料后了解到這種函數是基於協程的異步函數。這類編程方式稱為異步編程,常用在IO較頻繁的系統中,如:Tornado web框架、文件下載、網絡爬蟲等應用。協程能夠在IO等待時間就去切換執行其他任務,當IO操作結束后再自動回調,那么就會大大節省資源並提供性能。接下來便簡單的講解一下異步編程相關概念以及案例演示。

1. 協程簡介

1.1 協程的含義及實現方法

協程(Coroutine),也可以被稱為微線程,是一種用戶態內的上下文切換技術。簡而言之,其實就是通過一個線程實現代碼塊相互切換執行。例如:

def func1():
	print(1)
    ...   # 協程介入
	print(2)
	
def func2():
	print(3)
    ...   # 協程介入
	print(4)

func1()
func2()

上述代碼是普通的函數定義和執行,按流程分別執行兩個函數中的代碼,並先后會輸出:1、2、3、4。但如果介入協程技術那么就可以實現函數見代碼切換執行,最終輸入:1、3、2、4

在Python中有多種方式可以實現協程,例如:

  • greenlet,是一個第三方模塊,用於實現協程代碼(Gevent協程就是基於greenlet實現);

  • yield,生成器,借助生成器的特點也可以實現協程代碼;

  • asyncio,在Python3.4中引入的模塊用於編寫協程代碼;

  • async & awiat,在Python3.5中引入的兩個關鍵字,結合asyncio模塊可以更方便的編寫協程代碼。

前兩種實現方式較為老舊,所以重點關注后面的方式

標准庫實現方法

asyncio是Python 3.4版本引入的標准庫,直接內置了對異步IO的支持。

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2)  # 遇到IO耗時操作,自動化切換到tasks中的其他任務
    print(2)

@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務
    print(4)

tasks = [
    asyncio.ensure_future( func1() ),
    asyncio.ensure_future( func2() )
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

關鍵字實現方法

async & await 關鍵字在Python3.5版本中正式引入,代替了asyncio.coroutine 裝飾器,基於他編寫的協程代碼其實就是上一示例的加強版,讓代碼可以更加簡便可讀。


import asyncio

async def func1():
    print(1)
    await asyncio.sleep(2)  # 耗時操作  
    print(2)

async def func2():
    print(3)
    await asyncio.sleep(2)   # 耗時操作
    print(4)

tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

1.2 案例演示

例如:用代碼實現下載 url_list 中的圖片。

  • 方式一:同步編程實現
# requests庫僅支持同步的http網絡請求
import requests

def download_image(url):
  print("開始下載:",url)
  # 發送網絡請求,下載圖片
  response = requests.get(url)
  # 圖片保存到本地文件
  file_name = url.rsplit('_')[-1]
  with open(file_name, mode='wb') as file_object:
      file_object.write(response.content)
print("下載完成")


if __name__ == '__main__':
  url_list = [
      'https://www.1.jpg',
      'https://www.2.jpg',
      'https://www.3.jpg'
  ]
  for item in url_list:
      download_image(item)

輸出:按順序發送請求,請求一次下載一張圖片,假如每次下載花費1s,完成任務需要3s 以上。

  • 方式二:基於協程的程實現
# aiohttp 為支持異步編程的http請求庫
import aiohttp
import asyncio

async def fetch(session, url):
  print("發送請求:", url)
  async with session.get(url, verify_ssl=False) as response:
      content = await response.content.read()
      file_name = url.rsplit('_')[-1]
      with open(file_name, mode='wb') as file_object:
          file_object.write(content)

async def main():
  async with aiohttp.ClientSession() as session:
      url_list = [
          'https://www.1.jpg',
          'https://www.2.jpg',
          'https://www.3.jpg'
      ]
      tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
      await asyncio.wait(tasks)


if __name__ == '__main__':
  asyncio.run(main())

輸出:一次發送三個下載請求,同時下載,假如每次下載花費1s,完成任務僅需要1s 左右,第一種方法的耗時為第二種的三倍。

1.3 小結

協程可以讓原來要使用異步+回調方式寫的非人類代碼,用看似同步的方式寫出來。

2. 異步編程簡介

2.1 同步和異步的區別

同步 :循序漸進執行操作、請求
異步:無需等待上一步操作、請求完成,就開始下一步(每個操作仍然有先后順序)


目前python異步相關的主流技術是通過包含關鍵字async&await的async模塊實現。

2.2 異步編程-事件循環

事件循環,可以把他當做是一個while循環,這個while循環在周期性的運行並執行一些任務,在特定條件下終止循環。

# 偽代碼
任務列表 = [ 任務1, 任務2, 任務3,... ]
while True:
    可執行的任務列表,已完成的任務列表 = 去任務列表中檢查所有的任務,將'可執行'和'已完成'的任務返回
    for 就緒任務 in 已准備就緒的任務列表:
        執行已就緒的任務
        
    for 已完成的任務 in 已完成的任務列表:
        在任務列表中移除 已完成的任
        
	如果 任務列表 中的任務都已完成,則終止循環

在編寫程序時候可以通過如下代碼來獲取和創建事件循環。

# 方式一:
import asyncio
# 生成或獲取一個事件循環
loop = asyncio.get_event_loop()
# 將任務添加到事件循環中
loop.run_until_complete(任務)

# 方式二(python3.7及以上版本支持):
asyncio.run( 任務 )

2.3 異步編程-快速上手

async 關鍵字

  • 協程函數:定義函數時候由async關鍵字裝飾的函數 async def 函數名
  • 協程對象:執行協程函數得到的協程對象。
# 協程函數
async def func():
    pass
# 協程對象
result = func()

注意:執行協程函數只會創建協程對象,函數內部代碼不會執行。如果想要運行協程函數內部代碼,必須要將協程對象交給事件循環來處理。

import asyncio 
async def func():
    print("執行協程函數內部代碼!")
result = func()

# 調用方法1:
# loop = asyncio.get_event_loop()
# loop.run_until_complete( result )

# 調用方法2:
asyncio.run( result )

await 關鍵字

await + 可等待的對象(協程對象、Future、Task對象 -> IO等待),遇到IO操作掛起當前協程(任務),等IO操作完成之后再繼續往下執行。當前協程掛起時,事件循環可以去執行其他協程(任務)。

import asyncio

async def others():
    print("start")
    await asyncio.sleep(2)
    print('end')
    return '返回值'

async def func():
    print("執行協程函數內部代碼")
    # await等待對象的值得到結果之后再繼續向下走
    response = await others()
    print("IO請求結束,結果為:", response)

asyncio.run( func() )

Task 對象

Task對象的作用是在事件循環中添加多個任務,用於並發調度協程,通過asyncio.create_task(協程對象)的方式創建Task對象,這樣可以讓協程加入事件循環中等待被調度執行。

async def module_a():
    print("start module_a")
    await asyncio.sleep(2) # 模擬 module_a 的io操作
    print('end module_a')
    return 'module_a 完成'

async def module_b():
    print("start module_b")
    await asyncio.sleep(1) # 模擬 module_a 的io操作
    print('end module_b')
    return 'module_b 完成'  

task_list = [
    module_a(),
	module_b(), 
]

done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)

2.4 案例演示

例如:用代碼實現連接並查詢數據庫的同時,下載一個APK文件到本地。

import asyncio
import aiomysql
import os
import aiofiles as aiofiles
from aiohttp import ClientSession

async def get_app():

    url = "http://www.123.apk"
    async with ClientSession() as session:
        # 網絡IO請求,獲取響應
        async with session.get(url)as res:
            if res.status == 200:
                print("下載成功", res)
                # 磁盤IO請求,讀取響應數據
                apk = await res.content.read()
                async  with  aiofiles.open("demo2.apk", "wb") as f:
                    # 磁盤IO請求,數據寫入本地磁盤
                    await f.write(apk)
            else:
                print("下載失敗")

async def excute_sql(sql):
    # 網絡IO操作:連接MySQL
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )
    # 網絡IO操作:創建CURSOR
    cur = await conn.cursor()
    # 網絡IO操作:執行SQL
    await cur.execute(sql)
    # 網絡IO操作:獲取SQL結果
    result = await cur.fetchall()
    print(result)
    # 網絡IO操作:關閉鏈接
    await cur.close()
    conn.close()

task_list = [get_app(), execute_sql(sql="SELECT Host,User FROM user")]
asyncio.run(asyncio.wait(task_list))

代碼邏輯分析:

【step1】asyncio.run()創建了事件循環。wait()方法將task任務列表加入到當前的事件循環中;(注意:必須先創建事件循環,后加入任務列表,否則會報錯)

【step2】事件循環監聽事件狀態,開始執行代碼,先執行列表中的get_app()方法,當代碼執行到async with session.get(url)as res:時,遇到await關鍵字表示有IO耗時操作,線程會將該任務掛起在后台執行,並切換到另外一個異步函數excute_sql()

【step3】當代碼執行到excute_sql()的第一個IO耗時操作后,線程會重復先前的操作,將該任務掛起,去執行其他可執行代碼。假如此時事件循環監聽到get_app()中的第一IO耗時操作已經執行完成,那么線程會切換到該方法第一個IO操作后的代碼,並按順序執行直到遇上下一個await裝飾的IO操作;假如事件循環監聽到excute_sql()中的第一個IO操作先於get_app()的第一個IO操作完成,那么線程會繼續執行excute_sql的后續代碼;

【step4】線程會重復進行上述第3點中的步驟,直到代碼全部執行完成,事件循環也會隨之停止。

2.5 小結

一般來說CPU的耗時運算方式有:

計算密集型的操作:計算密集型任務的特點是要進行大量的計算、邏輯判斷,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等。

IO密集型的操作:涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和內存的速度)。

異步編程基於協程實現,如果利用協程實現計算密集型操作,因為線程在上下文之間的來回切換總會經歷類似於”計算“-->”保存“-->”創建新環境“ 的一系列操作,導致系統的整體性能反而會下降。所以異步編程並不適用於計算密集型的程序。然而在IO密集型操作匯總,協程在IO等待時間就去切換執行其他任務,當IO操作結束后再自動回調,那么就會大大節省資源並提供性能。


PS:更多技術干貨,快關注【公眾號 | xingzhe_ai】,與行者一起討論吧!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM