python異步(Async)編程
異步和同步的概念
同步:一個一個步驟的往下執行。只有在上一步完成后,程序才會進入下一個步驟。例子:批處理程序、命令行程序
異步:不用於同步的是,系統不會等待執行步驟完成后再繼續執行下一個步驟。
異步的優勢
同步網絡服務器,在需要處理大量的訪問請求的時候,服務器有多少個處理單元就只能處理多少個請求,且這些請求處理會涉及到網絡速度、文件IO速度、數據庫處理速度等不需要處理單元過多干涉的操作。結果會導致同步網絡服務器陷入低效率的工作狀態。而異步編程技術允許您的程序通過釋放 CPU 來做其他工作來利用相對較慢的 IO 進程。
使用例子
同步程序
import queue
def task(name, work_queue):
if work_queue.empty():
print(f"Task {name} nothing to do")
else:
while not work_queue.empty():
count = work_queue.get()
total = 0
print(f"Task {name} running")
for x in range(count):
total += 1
print(f"Task {name} total: {total}")
def main():
"""
This is the main entry point for the program
"""
# Create the queue of work
work_queue = queue.Queue()
# Put some work in the queue
for work in [15, 10, 5, 2]:
work_queue.put(work)
# Create some synchronous tasks
tasks = [(task, "One", work_queue), (task, "Two", work_queue)]
# Run the tasks
for t, n, q in tasks:
t(n, q)
if __name__ == "__main__":
main()
非阻塞調用(異步友好)的協作並發程序
import asyncio
from codetiming import Timer
# async定義為異步函數
async def task(name, work_queue):
timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
while not work_queue.empty():
delay = await work_queue.get()
print(f"Task {name} running")
timer.start()
await asyncio.sleep(delay) # 創建一個非阻塞延遲,執行上下文切換回調用者main()
timer.stop()
async def main():
"""
This is the main entry point for the program
"""
# Create the queue of work
work_queue = asyncio.Queue()
# Put some work in the queue
for work in [15, 10, 5, 2]:
await work_queue.put(work) # 到達await關鍵字,發生上下文切換
# Run the tasks
with Timer(text="\nTotal elapsed time: {:.1f}"):
await asyncio.gather(
asyncio.create_task(task("One", work_queue)),
asyncio.create_task(task("Two", work_queue)),
)
if __name__ == "__main__":
asyncio.run(main())
異步(非阻塞)HTTP調用
import asyncio
import aiohttp
from codetiming import Timer
async def task(name, work_queue):
timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
async with aiohttp.ClientSession() as session:
while not work_queue.empty():
url = await work_queue.get()
print(f"Task {name} getting URL: {url}")
timer.start()
async with session.get(url) as response:
await response.text()
timer.stop()
async def main():
"""
This is the main entry point for the program
"""
# Create the queue of work
work_queue = asyncio.Queue()
# Put some work in the queue
for url in [
"http://google.com",
"http://yahoo.com",
"http://linkedin.com",
"http://apple.com",
"http://microsoft.com",
"http://facebook.com",
"http://twitter.com",
]:
await work_queue.put(url)
# Run the tasks
with Timer(text="\nTotal elapsed time: {:.1f}"):
await asyncio.gather(
asyncio.create_task(task("One", work_queue)),
asyncio.create_task(task("Two", work_queue)),
)
if __name__ == "__main__":
asyncio.run(main())
拓展概念
可等待的對象氛圍三種類型:協程、任務和Future.
當遇到可等待對象,進程會發生上下文切換(任務調度)
協程
async def main():
print('runing...')
await asyncio.sleep(2)
注意,直接調用協程並不會使其調度執行。
有三種方式運行一個協程:1. asyncio.run(main()) 2. await main() 3. asyncio.create_task(main())
任務
用來“並行的”調度協程。
當一個協程通過 asyncio.create_task()
等函數被封裝為一個任務,該協程會被自動調度執行。
task = asyncio.create_task(main())
await task
Future
通常情況下 沒有必要 在應用層級的代碼中創建 Future 對象。這是一種特殊的 低層級 可等待對象。(有什么應用場景,我目前還不清楚)
API
創建任務
asyncio.create_task(main())
休眠:掛起當前任務,以允許其他任務運行。
await asyncio.sleep(1)
“並發”運行任務
await asyncio.gather(
asyncio.create_task(task("One", work_queue)),
asyncio.create_task(task("Two", work_queue)),
)
總結
異步編程經常用在存在很多IO操作的場景。我們要記住用asyncio來並發運行任務,仍然是單進程程序,與同步單進程不同的是,有任務調度的概念,且可等待對象協程被作為一個任務調度。
參考
[1] Getting Started With Async Features in Python. LINK
[2] 協程與任務高層級API介紹