asyncio 基礎用法
-
python也是在python 3.4中引入了協程的概念。也通過這次整理更加深刻理解這個模塊的使用
asyncio 是干什么的?
-
asyncio
是Python 3.4版本引入的標准庫,直接內置了對異步IO的支持。 -
異步網絡操作
-
並發
-
協程
asyncio的一些關鍵字:
- event_loop 事件循環:程序開啟一個無限循環,把一些函數注冊到事件循環上,當滿足事件發生的時候,調用相應的協程函數
- **coroutine **協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會立即執行函數,而是會返回一個協程對象。協程對象需要注冊到事件循環,由事件循環調用。
- **task **任務:一個協程對象就是一個原生可以掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各種狀態
- future: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質上的區別
- async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。
Python 3.4 asyncio 用法
import asyncio
import threading
@asyncio.coroutine
def hello():
print("Hello world!", threading.currentThread())
# 異步調用asyncio.sleep(1):
r = yield from asyncio.sleep(2)
# time.sleep(2)
print("Hello world!", threading.currentThread())
# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
-
Python 3.5 定義了 async/await 直接替換 @asyncio.coroutine 和 yield from
基礎用法:
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print(f"協程執行: {x}")
await asyncio.sleep(x)
return "done after {}".format(x)
def callback(future):
print("回調執行獲取返回值: ", future.result())
start = now()
# 這里是一個協程對象,這個時候do_some_work函數並沒有執行
coroutine = do_some_work(2)
# print(coroutine) # <coroutine object do_some_work at 0x000001A2AAA9FCA8>
loop = asyncio.get_event_loop()
# 創建一個 task 對象
# task = loop.create_task(coroutine)
# 第二種創建方式 通過 asyncio 創建
task = asyncio.ensure_future(coroutine)
# 綁定回調,在task執行完成的時候可以獲取執行的結果,回調的最后一個參數是future對象,通過該對象可以獲取協程返回值。
task.add_done_callback(callback)
print("未執行的task pending 狀態: ", task)
loop.run_until_complete(task)
print("執行完的task finished 狀態: ", task)
print("執行時間: ", now() - start)
-
執行效果
未執行的task pending 狀態: <Task pending coro=<do_some_work() running at F:/爬蟲/爬蟲項目使用 pycharm/PyppeteerDemo/asyncioDemo.py:7> cb=[callback() at F:/爬蟲/爬蟲項目使用 pycharm/PyppeteerDemo/asyncioDemo.py:14]>
協程執行: 2
回調執行獲取返回值: done after 2
執行完的task finished 狀態: <Task finished coro=<do_some_work() done, defined at F:/爬蟲/爬蟲項目使用 pycharm/PyppeteerDemo/asyncioDemo.py:7> result='done after 2'>
執行時間: 2.0016515254974365
並發和並行
-
並發指的是同時具有多個活動的系統
並行值得是用並發來使一個系統運行的更快。並行可以在操作系統的多個抽象層次進行運用
所以並發通常是指有多個任務需要同時進行,並行則是同一個時刻有多個任務執行
用了aysncio實現了並發
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("協程執行", x)
await asyncio.sleep(x)
return f"done after {x}"
start = now()
coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]
loop = asyncio.get_event_loop()
# 執行所有的 task 接收 一個 task 列表
# loop.run_until_complete(asyncio.wait(tasks))
# 第二種寫法 接收 一堆 task
loop.run_until_complete(asyncio.gather(*tasks))
for item in tasks:
print(item.result())
print("執行時間: ", now() - start)
- 執行效果
協程執行 2
協程執行 3
協程執行 4
done after 2
done after 3
done after 4
執行時間: 4.00330114364624
協程嵌套
- 封裝更多的io操作過程,即一個協程中await了另外一個協程,連接起來。這樣就實現了嵌套的協程.
import asyncio
import time
now = lambda: time.time()
start = now()
async def do_some_work(x):
print("協程執行: ", x)
await asyncio.sleep(x)
return f"done after {x}"
async def main():
coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]
# 第一種
# dones 完成的 task對象 pendings 等待的 task
# dones, pendings = await asyncio.wait(tasks)
# print(pendings)
# for task in dones:
# print(task.result())
# 或者直接返回
# return await asyncio.wait(tasks)
# 第二種
# 使用 asyncio.gather 直接得到結果列表
# results = await asyncio.gather(*tasks)
# print(results)
# for result in results:
# print(result)
# 或者直接返回
# return results
# 第三種
# asyncio.as_completed(tasks) 是一個生成器
# print( asyncio.as_completed(tasks)) # <generator object as_completed at 0x000001F0DB4767D8>
for task in asyncio.as_completed(tasks):
result = await task
print(task) # <generator object as_completed.<locals>._wait_for_one at 0x000001557DA46830>
print(result) # done after 2
loop = asyncio.get_event_loop()
# 第一種 返回值
# dines, pendings = loop.run_until_complete(main())
# print(pendings)
# for task in dines:
# print(task.result())
# 第二種返回值
# results = loop.run_until_complete(main())
# for result in results:
# print("返回的內容 : ", result)
# 第三種
loop.run_until_complete(main())
print("執行時間: ", now() - start)
協程的停止
-
future對象有幾個狀態:
- Pending
- Running
- Done
- Cacelled
創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,就需要先把task取消。可以使用asyncio.Task獲取事件循環的task.
import asyncio
import time
now = lambda: time.time()
start = now()
async def do_some_work(x):
print("協程執行: {}".format(x))
await asyncio.sleep(x)
return "done after {}".format(x)
coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]
loop = asyncio.get_event_loop()
try:
results = loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as k:
# print(k)
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel()) # 循環task,逐個cancel
loop.stop() # stop之后還需要再次開啟事件循環
loop.run_forever()
finally:
loop.close() # 最后在close,不然還會拋出異常
print(now() - start)
- 執行結果 : 使用 命令窗口執行 Ctrl + c 會拋出 run_until_complete 的 KeyboardInterrupt 異常
協程執行: 2
協程執行: 3
協程執行: 4
{<Task pending coro=<do_some_work() running at asyncioDemo.py:158> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000025F84BF3DF8>()]> cb=[_wait.<locals>._on_comp
letion() at c:\python36\Lib\asyncio\tasks.py:380]>, <Task pending coro=<do_some_work() running at asyncioDemo.py:158> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at
0x0000025F84B64C18>()]> cb=[_wait.<locals>._on_completion() at c:\python36\Lib\asyncio\tasks.py:380]>, <Task pending coro=<wait() running at c:\python36\Lib\asyncio\tasks.py:313> w
ait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000025F84BF3F18>()]>>, <Task pending coro=<do_some_work() running at asyncioDemo.py:158> wait_for=<Future pending cb
=[<TaskWakeupMethWrapper object at 0x0000025F84BF3D98>()]> cb=[_wait.<locals>._on_completion() at c:\python36\Lib\asyncio\tasks.py:380]>}
True
True
True
True
2.00264573097229
不同線程的事件循環
- 我們的事件循環用於注冊協程,而有的協程需要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程創建一個事件循環,然后在新建一個線程,在新線程中啟動事件循環。當前線程不會被block。
import asyncio
from threading import Thread
import time
now = lambda: time.time()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_some_work(x):
print('Waiting {}'.format(x))
await asyncio.sleep(x)
print('Done after {}s'.format(x))
def work(x):
print("開始", x)
time.sleep(x)
print("結束", x)
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
new_loop.call_soon_threadsafe(work, 6)
new_loop.call_soon_threadsafe(work, 3)
print(now() - start)
"""
開始 6
0.002008199691772461
結束 6
開始 3
結束 3
"""
'''
啟動上述代碼之后,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法注冊的more_work方法, 后者因為time.sleep操作是同步阻塞的,因此運行完畢more_work需要大致6 + 3
'''
#
# asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
# asyncio.run_coroutine_threadsafe(do_some_work(3), new_loop)
# print(now() - start)
"""
Waiting 6
Waiting 3
0.0009968280792236328
Done after 3s
Done after 6s
"""
'''
上述的例子,主線程中創建一個new_loop,然后在另外的子線程中開啟一個無限事件循環。 主線程通過run_coroutine_threadsafe新注冊協程對象。這樣就能在子線程中進行事件循環的並發操作,同時主線程又不會被block。一共執行的時間大概在6s左右。
'''
- 參考 廖雪峰 文檔
async def wget(host):
print('wget %s...' % host)
reader, writer = await asyncio.open_connection(host, 80)
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
await writer.drain()
while True:
line = await reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
wget www.sina.com.cn...
wget www.sohu.com...
wget www.163.com...
www.sina.com.cn header > HTTP/1.1 302 Moved Temporarily
www.sina.com.cn header > Server: nginx
www.sina.com.cn header > Date: Sat, 27 Apr 2019 14:14:29 GMT
www.sina.com.cn header > Content-Type: text/html
www.sina.com.cn header > Content-Length: 154
www.sina.com.cn header > Connection: close
www.sina.com.cn header > Location: https://www.sina.com.cn/
www.sina.com.cn header > X-Via-CDN: f=edge,s=cmcc.hebei.ha2ts4.140.nb.sinaedge.com,c=183.197.88.253;
www.sina.com.cn header > X-Via-Edge: 1556374469876fd58c5b798403e6f6e7f94d2
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html;charset=UTF-8
www.sohu.com header > Connection: close
www.sohu.com header > Server: nginx
www.sohu.com header > Date: Sat, 27 Apr 2019 14:13:44 GMT
www.sohu.com header > Cache-Control: max-age=60
www.sohu.com header > X-From-Sohu: X-SRC-Cached
www.sohu.com header > Content-Encoding: gzip
www.sohu.com header > FSS-Cache: HIT from 4742539.7953813.5615036
www.sohu.com header > FSS-Proxy: Powered by 3628410.5725572.4500890
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
www.163.com header > Date: Sat, 27 Apr 2019 14:14:29 GMT
www.163.com header > Content-Length: 0
www.163.com header > Location: http://www.163.com/special/0077jt/error_isp.html
www.163.com header > X-Via: 1.0 xiyidong136:1 (Cdn Cache Server V2.0)
www.163.com header > Connection: close