Asyncio 異步編程模塊asyncio 總結


協程語法

在Python 3.5+發布之前,asyncio模塊使用生成器模擬異步調用,因此具有與當前Python 3.5版本不同的語法 ;以下代碼均基於python3.7

從Python 3.5開始引入了異步async及await關鍵字。注意,在await func()調用時無需帶上括號,先感受一下如下代碼:

import asyncio


async def main():

    print(await func())


async def func():


    # 這里可以寫耗時較長的代碼 
    return "Hello, world!"

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

事件循環EventLoop

事件循環是asyncio的核心,異步任務的運行、任務完成之后的回調、網絡IO操作、子進程的運行,都是通過事件循環完成的。在python3.7中,我們甚至完全不用管事件循環,只需要使用高層API,即asyncio中的方法,我們很少直接與事件循環打交道,但是為了更加熟悉asyncio的運行原理,最好還是了解EventLoop的設計原理。

1、事件循環的創建、獲取、設置

(1)asyncio.get_running_loop()。python3.7新添加的

(2)asyncio.get_event_loop()

(3)asyncio.set_event_loop(loop)

(4)asyncio.new_event_loop()

2、運行和停止事件循環

(1)loop.run_until_complete(future)。運行事件循環,直到future運行結束

(2)loop.run_forever()。在python3.7中已經取消了,表示事件循環會一直運行,直到遇到stop。

(3)loop.stop()。停止事件循環

(4)loop.is_running()。如果事件循環依然在運行,則返回True

(5)loop.is_closed()。如果事件循環已經close,則返回True

(6)loop.close()。關閉事件循環

3、創建Future和Task

(1)loop.create_future(coroutine) ,返回future對象

(2)loop.create_task(corootine) ,返回task對象

(3)loop.set_task_factory(factory)

(4)loop.get_task_factory()

4、事件循環的時鍾

loop.time()。可以這么理解,事件循環內部也維護着一個時鍾,可以查看事件循環現在運行的時間點是多少,就像普通的time.time()類似,它返回的是一個浮點數值,比如下面的代碼。轉自:

接下來,在python雲環境上運行下面的代碼,實地體現一下異步程序調用:

import asyncio


async def cor1():


    print("cor1 start")

    for i in range(10):
        await asyncio.sleep(1.5)
        print("cor1", i)


async def cor2():


    print("cor2 start")

    for i in range(15):
        await asyncio.sleep(1)
        print("cor2", i)

loop = asyncio.get_event_loop()
cors = asyncio.wait([cor1(), cor2()])
loop.run_until_complete(cors)

# 
 cor2 start
 cor1 start
 cor2 0
 cor1 0
 cor2 1
 cor1 1
 cor2 2
 cor2 3
 cor1 2
 cor2 4
 cor1 3
 cor2 5
 cor2 6
 cor1 4
 cor2 7
 cor1 5
 cor2 8
 cor2 9
 cor1 6
 cor2 10
 cor1 7
 cor2 11
 cor2 12
 cor1 8
 cor2 13
 cor1 9
 cor2 14

一開始還是通過 asyncio.get_event_loop() 得到事件循環, 之后調用了 run_until_complete( 運行 loop ,等到 future 完成,run_until_complete 即返回 ) ,而調用的參數是一個 async 修飾過的函數的返回值

異步執行

asyncio 支持在異步調用任務中使用 Executor 對象 ,將Executor對象作為參數、可調用函數及其自身參數共同作為參數去調用run_in_executor() 函數來驅動異步執行事件

Executor 調度任務:

import asyncio
from concurrent.futures import ThreadPoolExecutor


def func(a, b):
    #  做耗時的事情 
    return 'test '+a + b


async def main(loop):
    executor = ThreadPoolExecutor() #初始化線程池執行器
    result = await loop.run_in_executor(executor, func, "Hello", " world!")
    print(result)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

# 輸出: test Hello world!

每個事件循環還有一個“默認”執行程序槽,可以分配給執行程序。要分配執行程序並從循環中調度任務,可以使用set default Executor()方法

import asyncio
from concurrent.futures import ThreadPoolExecutor


def func(a, b):
    #  做耗時的事情 
    return 'test,'+a + b


async def main(loop):
    #  注意:使用“None”作為第一個參數指定“默認”執行器 .
    result = await loop.run_in_executor(None, func, "Hello,", " world!")
    print(result)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.set_default_executor(ThreadPoolExecutor())
    loop.run_until_complete(main(loop))

#輸出 test,Hello, world!

parallel.futures中有兩種主要的Executor類型,即ThreadPoolExecutor和
ProcessPoolExecutor。 ThreadPoolExecutor包含一個線程池,可以手動將其設置為通過構造函數的特定線程數,或者默認為計算機上的內核數乘以5。


ThreadPoolExecutor使用線程池執行分配給它的任務,並且通常在CPU綁定方面更好操作,而不用受I / O約束。與此形成對比的是ProcessPoolExecutor:


ProcessPoolExecutor只能接受可修改的任務和參數。最常見的非picklable任務的對象的方法。如果必須將對象的方法調度為執行程序中的任務,則必須使用ThreadPoolExecutor

UVLoop

uvloop是異步的一種實現。基於libuv的AbstractEventLoop(由nodejs使用)。它兼容99%的異步特性,並且比傳統的asyncio. eventloop快得多。uvloop目前在Windows上不可用,使用前先在python雲環境上用命令 python -m pip install uvloop安裝它

import asyncio
import uvloop

if __name__ == "__main__":
    asyncio.set_event_loop(uvloop.new_event_loop())

    # 你的業務代碼...

    

也可以通過 EventLoopPolicy 在uvloop中設置事件循環工廠 :

import asyncio
import uvloop

if __name__ == "__main__":
 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
 loop = asyncio.new_event_loop()

Event事件異步驅動模型

概念

可以使用event事件同步多個協程的調度

簡單地說,就像一場賽跑中,一場令下槍聲響起,所有賽跑者根據槍聲離開起跑線,開始並行的工作

舉例:

import asyncio
import functools


# 事件觸發函數
def trigger(event):
    print('EVENT SET')
    event.set()


# 喚醒協程等待
# 事件消費者
async def consumer_a(event):
    consumer_name = 'Consumer A'
    print('{} waiting'.format(consumer_name))
    await event.wait()
    print('{} triggered'.format(consumer_name))

async def consumer_b(event):
    consumer_name = 'Consumer B'
    print('{} waiting'.format(consumer_name))
    await event.wait()
    print('{} triggered'.format(consumer_name))

# 事件
event = asyncio.Event()
# 將 coroutines 放入 future
main_future = asyncio.wait([consumer_a(event),
                            consumer_b(event)])
# 事件循環
event_loop = asyncio.get_event_loop()
event_loop.call_later(3, functools.partial(trigger, event)) #使用偏函數封裝(詳見函數式編程https://blog.csdn.net/oSuiYing12/article/details/106211761)

# 3秒后觸發事件
# 完成主future
done, pending = event_loop.run_until_complete(main_future)

#輸出:

Consumer B waiting
Consumer A waiting
#等待3秒后繼續輸出
EVENT SET
Consumer B triggered
Consumer A triggered

實現簡單的Websocket

在這里,我們使用asyncio創建一個簡單的 websocket。我們定義了協程以連接到服務器,並發送/接收消息。網絡套接字的通信在主協程中運行,該協程由
事件循環event loop驅動:

import asyncio
import aiohttp  # 需要先在雲環境中安裝 python -m pip install aiohttp

session = aiohttp.ClientSession()

#  處理上下文管理器
class EchoWebsocket:

    async def connect(self):
        self.websocket = await session.ws_connect("wss://echo.websocket.org")  #建立連接的websocket服務器地址,由於在國外可能訪問不到

    async def send(self, message):
        self.websocket.send_str(message) #發送消息函數


    async def receive(self):
        result = (await self.websocket.receive()) #接收消息函數
        return result.data


async def main():
    echo = EchoWebsocket()
    await echo.connect()
    await echo.send("Hello World!")
    print(await echo.receive()) # 正常情況下會返回打印出"Hello World!"

if __name__ == '__main__':
    # The main loop
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM