協程語法
在Python 3.5+發布之前,asyncio模塊使用生成器模擬異步調用,因此具有與當前Python 3.5版本不同的語法 ;以下代碼均基於python3.7
從Python 3.5開始引入了異步async及await關鍵字。注意,在await func()調用時無需帶上括號,先感受一下如下代碼:
import asyncio
async def main():
print(await func())
async def func():
# 這里可以寫耗時較長的代碼
return "Hello, world!"
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
事件循環EventLoop
事件循環是asyncio的核心,異步任務的運行、任務完成之后的回調、網絡IO操作、子進程的運行,都是通過事件循環完成的。在python3.7中,我們甚至完全不用管事件循環,只需要使用高層API,即asyncio中的方法,我們很少直接與事件循環打交道,但是為了更加熟悉asyncio的運行原理,最好還是了解EventLoop的設計原理。
1、事件循環的創建、獲取、設置
(1)asyncio.get_running_loop()。python3.7新添加的
(2)asyncio.get_event_loop()
(3)asyncio.set_event_loop(loop)
(4)asyncio.new_event_loop()
2、運行和停止事件循環
(1)loop.run_until_complete(future)。運行事件循環,直到future運行結束
(2)loop.run_forever()。在python3.7中已經取消了,表示事件循環會一直運行,直到遇到stop。
(3)loop.stop()。停止事件循環
(4)loop.is_running()。如果事件循環依然在運行,則返回True
(5)loop.is_closed()。如果事件循環已經close,則返回True
(6)loop.close()。關閉事件循環
3、創建Future和Task
(1)loop.create_future(coroutine) ,返回future對象
(2)loop.create_task(corootine) ,返回task對象
(3)loop.set_task_factory(factory)
(4)loop.get_task_factory()
4、事件循環的時鍾
loop.time()。可以這么理解,事件循環內部也維護着一個時鍾,可以查看事件循環現在運行的時間點是多少,就像普通的time.time()類似,它返回的是一個浮點數值,比如下面的代碼。轉自:https://blog.csdn.net/qq_27825451/article/details/86292513
接下來,在python雲環境上運行下面的代碼,實地體現一下異步程序調用:
import asyncio
async def cor1():
print("cor1 start")
for i in range(10):
await asyncio.sleep(1.5)
print("cor1", i)
async def cor2():
print("cor2 start")
for i in range(15):
await asyncio.sleep(1)
print("cor2", i)
loop = asyncio.get_event_loop()
cors = asyncio.wait([cor1(), cor2()])
loop.run_until_complete(cors)
#
cor2 start
cor1 start
cor2 0
cor1 0
cor2 1
cor1 1
cor2 2
cor2 3
cor1 2
cor2 4
cor1 3
cor2 5
cor2 6
cor1 4
cor2 7
cor1 5
cor2 8
cor2 9
cor1 6
cor2 10
cor1 7
cor2 11
cor2 12
cor1 8
cor2 13
cor1 9
cor2 14
一開始還是通過 asyncio.get_event_loop() 得到事件循環, 之后調用了 run_until_complete( 運行 loop ,等到 future 完成,run_until_complete 即返回 ) ,而調用的參數是一個 async 修飾過的函數的返回值
異步執行
asyncio 支持在異步調用任務中使用 Executor 對象 ,將Executor對象作為參數、可調用函數及其自身參數共同作為參數去調用run_in_executor() 函數來驅動異步執行事件
Executor 調度任務:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def func(a, b):
# 做耗時的事情
return 'test '+a + b
async def main(loop):
executor = ThreadPoolExecutor() #初始化線程池執行器
result = await loop.run_in_executor(executor, func, "Hello", " world!")
print(result)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
# 輸出: test Hello world!
每個事件循環還有一個“默認”執行程序槽,可以分配給執行程序。要分配執行程序並從循環中調度任務,可以使用set default Executor()方法
import asyncio
from concurrent.futures import ThreadPoolExecutor
def func(a, b):
# 做耗時的事情
return 'test,'+a + b
async def main(loop):
# 注意:使用“None”作為第一個參數指定“默認”執行器 .
result = await loop.run_in_executor(None, func, "Hello,", " world!")
print(result)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor())
loop.run_until_complete(main(loop))
#輸出 test,Hello, world!
parallel.futures中有兩種主要的Executor類型,即ThreadPoolExecutor和
ProcessPoolExecutor。 ThreadPoolExecutor包含一個線程池,可以手動將其設置為通過構造函數的特定線程數,或者默認為計算機上的內核數乘以5。
ThreadPoolExecutor使用線程池執行分配給它的任務,並且通常在CPU綁定方面更好操作,而不用受I / O約束。與此形成對比的是ProcessPoolExecutor:
ProcessPoolExecutor只能接受可修改的任務和參數。最常見的非picklable任務的對象的方法。如果必須將對象的方法調度為執行程序中的任務,則必須使用ThreadPoolExecutor
UVLoop
uvloop是異步的一種實現。基於libuv的AbstractEventLoop(由nodejs使用)。它兼容99%的異步特性,並且比傳統的asyncio. eventloop快得多。uvloop目前在Windows上不可用,使用前先在python雲環境上用命令 python -m pip install uvloop安裝它
import asyncio
import uvloop
if __name__ == "__main__":
asyncio.set_event_loop(uvloop.new_event_loop())
# 你的業務代碼...
也可以通過 EventLoopPolicy 在uvloop中設置事件循環工廠 :
import asyncio
import uvloop
if __name__ == "__main__":
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.new_event_loop()
Event事件異步驅動模型
概念
可以使用event事件同步多個協程的調度
簡單地說,就像一場賽跑中,一場令下槍聲響起,所有賽跑者根據槍聲離開起跑線,開始並行的工作
舉例:
import asyncio
import functools
# 事件觸發函數
def trigger(event):
print('EVENT SET')
event.set()
# 喚醒協程等待
# 事件消費者
async def consumer_a(event):
consumer_name = 'Consumer A'
print('{} waiting'.format(consumer_name))
await event.wait()
print('{} triggered'.format(consumer_name))
async def consumer_b(event):
consumer_name = 'Consumer B'
print('{} waiting'.format(consumer_name))
await event.wait()
print('{} triggered'.format(consumer_name))
# 事件
event = asyncio.Event()
# 將 coroutines 放入 future
main_future = asyncio.wait([consumer_a(event),
consumer_b(event)])
# 事件循環
event_loop = asyncio.get_event_loop()
event_loop.call_later(3, functools.partial(trigger, event)) #使用偏函數封裝(詳見函數式編程https://blog.csdn.net/oSuiYing12/article/details/106211761)
# 3秒后觸發事件
# 完成主future
done, pending = event_loop.run_until_complete(main_future)
#輸出:
Consumer B waiting
Consumer A waiting
#等待3秒后繼續輸出
EVENT SET
Consumer B triggered
Consumer A triggered
實現簡單的Websocket
在這里,我們使用asyncio創建一個簡單的 websocket。我們定義了協程以連接到服務器,並發送/接收消息。網絡套接字的通信在主協程中運行,該協程由
事件循環event loop驅動:
import asyncio
import aiohttp # 需要先在雲環境中安裝 python -m pip install aiohttp
session = aiohttp.ClientSession()
# 處理上下文管理器
class EchoWebsocket:
async def connect(self):
self.websocket = await session.ws_connect("wss://echo.websocket.org") #建立連接的websocket服務器地址,由於在國外可能訪問不到
async def send(self, message):
self.websocket.send_str(message) #發送消息函數
async def receive(self):
result = (await self.websocket.receive()) #接收消息函數
return result.data
async def main():
echo = EchoWebsocket()
await echo.connect()
await echo.send("Hello World!")
print(await echo.receive()) # 正常情況下會返回打印出"Hello World!"
if __name__ == '__main__':
# The main loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main())