asyncio 基礎用法


asyncio 基礎用法

  • python也是在python 3.4中引入了協程的概念。也通過這次整理更加深刻理解這個模塊的使用

asyncio 是干什么的?

  • asyncio是Python 3.4版本引入的標准庫,直接內置了對異步IO的支持。

  • 異步網絡操作

  • 並發

  • 協程

asyncio的一些關鍵字:

  • event_loop 事件循環:程序開啟一個無限循環,把一些函數注冊到事件循環上,當滿足事件發生的時候,調用相應的協程函數
  • **coroutine **協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會立即執行函數,而是會返回一個協程對象。協程對象需要注冊到事件循環,由事件循環調用。
  • **task **任務:一個協程對象就是一個原生可以掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各種狀態
  • future: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質上的區別
  • async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。

Python 3.4 asyncio 用法

import asyncio
import threading

@asyncio.coroutine
def hello():
    print("Hello world!", threading.currentThread())
    # 異步調用asyncio.sleep(1):
    r = yield from asyncio.sleep(2)
    # time.sleep(2)
    print("Hello world!", threading.currentThread())


# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

  • Python 3.5 定義了 async/await 直接替換 @asyncio.coroutine 和 yield from

基礎用法:

import asyncio
import time

now = lambda: time.time()


async def do_some_work(x):
    print(f"協程執行: {x}")
    await asyncio.sleep(x)

    return "done after {}".format(x)

def callback(future):
    print("回調執行獲取返回值: ", future.result())


start = now()
# 這里是一個協程對象,這個時候do_some_work函數並沒有執行
coroutine = do_some_work(2)
# print(coroutine)  # <coroutine object do_some_work at 0x000001A2AAA9FCA8>
loop = asyncio.get_event_loop()

# 創建一個 task 對象
# task = loop.create_task(coroutine)
# 第二種創建方式  通過 asyncio  創建
task = asyncio.ensure_future(coroutine)

# 綁定回調,在task執行完成的時候可以獲取執行的結果,回調的最后一個參數是future對象,通過該對象可以獲取協程返回值。
task.add_done_callback(callback)
print("未執行的task pending 狀態: ", task)
loop.run_until_complete(task)
print("執行完的task finished 狀態: ", task)

print("執行時間: ", now() - start)

  • 執行效果
未執行的task pending 狀態:  <Task pending coro=<do_some_work() running at F:/爬蟲/爬蟲項目使用 pycharm/PyppeteerDemo/asyncioDemo.py:7> cb=[callback() at F:/爬蟲/爬蟲項目使用 pycharm/PyppeteerDemo/asyncioDemo.py:14]>

協程執行: 2

回調執行獲取返回值:  done after 2

執行完的task finished 狀態:  <Task finished coro=<do_some_work() done, defined at F:/爬蟲/爬蟲項目使用 pycharm/PyppeteerDemo/asyncioDemo.py:7> result='done after 2'>
 
執行時間:  2.0016515254974365

並發和並行

  • 並發指的是同時具有多個活動的系統

    並行值得是用並發來使一個系統運行的更快。並行可以在操作系統的多個抽象層次進行運用

    所以並發通常是指有多個任務需要同時進行,並行則是同一個時刻有多個任務執行

用了aysncio實現了並發

import asyncio
import time

now = lambda: time.time()


async def do_some_work(x):
    print("協程執行", x)
    await asyncio.sleep(x)
    return f"done after {x}"


start = now()

coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3),
]

loop = asyncio.get_event_loop()
# 執行所有的 task  接收 一個 task 列表
# loop.run_until_complete(asyncio.wait(tasks))
# 第二種寫法  接收 一堆 task
loop.run_until_complete(asyncio.gather(*tasks))

for item in tasks:
    print(item.result())
print("執行時間: ", now() - start)
  • 執行效果
協程執行 2
協程執行 3
協程執行 4
done after 2
done after 3
done after 4
執行時間:  4.00330114364624

協程嵌套

  • 封裝更多的io操作過程,即一個協程中await了另外一個協程,連接起來。這樣就實現了嵌套的協程.
import asyncio
import time

now = lambda: time.time()

start = now()


async def do_some_work(x):
    print("協程執行: ", x)
    await asyncio.sleep(x)
    return f"done after  {x}"


async def main():
    coroutine1 = do_some_work(2)
    coroutine2 = do_some_work(3)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]
    # 第一種
    # dones 完成的 task對象  pendings 等待的 task
    # dones, pendings = await asyncio.wait(tasks)
    # print(pendings)
    # for task in dones:
    #     print(task.result())
    # 或者直接返回
    # return await asyncio.wait(tasks)

    # 第二種
    # 使用  asyncio.gather 直接得到結果列表
    # results = await asyncio.gather(*tasks)
    # print(results)
    # for result in results:
    #     print(result)
    # 或者直接返回
    # return results

    # 第三種
    # asyncio.as_completed(tasks)  是一個生成器
    # print( asyncio.as_completed(tasks))  # <generator object as_completed at 0x000001F0DB4767D8>

    for task in asyncio.as_completed(tasks):
        result = await task
        print(task)  # <generator object as_completed.<locals>._wait_for_one at 0x000001557DA46830>
        print(result)  # done after  2


loop = asyncio.get_event_loop()

# 第一種 返回值
# dines, pendings = loop.run_until_complete(main())
# print(pendings)
# for task in dines:
#     print(task.result())

# 第二種返回值
# results = loop.run_until_complete(main())
# for result in results:
#     print("返回的內容 : ", result)

# 第三種
loop.run_until_complete(main())

print("執行時間: ", now() - start)

協程的停止

  • future對象有幾個狀態:

    • Pending
    • Running
    • Done
    • Cacelled

    創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,就需要先把task取消。可以使用asyncio.Task獲取事件循環的task.

import asyncio
import time

now = lambda: time.time()

start = now()


async def do_some_work(x):
    print("協程執行: {}".format(x))
    await asyncio.sleep(x)
    return "done after  {}".format(x)


coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3),
]

loop = asyncio.get_event_loop()

try:
    results = loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as k:
    # print(k)

    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())  # 循環task,逐個cancel
    loop.stop()  # stop之后還需要再次開啟事件循環
    loop.run_forever()

finally:
    loop.close()  # 最后在close,不然還會拋出異常

print(now() - start)

  • 執行結果 : 使用 命令窗口執行 Ctrl + c 會拋出 run_until_complete 的 KeyboardInterrupt 異常
協程執行: 2
協程執行: 3
協程執行: 4

{<Task pending coro=<do_some_work() running at asyncioDemo.py:158> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000025F84BF3DF8>()]> cb=[_wait.<locals>._on_comp
letion() at c:\python36\Lib\asyncio\tasks.py:380]>, <Task pending coro=<do_some_work() running at asyncioDemo.py:158> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at
0x0000025F84B64C18>()]> cb=[_wait.<locals>._on_completion() at c:\python36\Lib\asyncio\tasks.py:380]>, <Task pending coro=<wait() running at c:\python36\Lib\asyncio\tasks.py:313> w
ait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000025F84BF3F18>()]>>, <Task pending coro=<do_some_work() running at asyncioDemo.py:158> wait_for=<Future pending cb
=[<TaskWakeupMethWrapper object at 0x0000025F84BF3D98>()]> cb=[_wait.<locals>._on_completion() at c:\python36\Lib\asyncio\tasks.py:380]>}

True
True
True
True

2.00264573097229

不同線程的事件循環

  • 我們的事件循環用於注冊協程,而有的協程需要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程創建一個事件循環,然后在新建一個線程,在新線程中啟動事件循環。當前線程不會被block。

import asyncio
from threading import Thread
import time

now = lambda: time.time()


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))


def work(x):
    print("開始", x)
    time.sleep(x)
    print("結束", x)


start = now()

new_loop = asyncio.new_event_loop()

t = Thread(target=start_loop, args=(new_loop,))
t.start()

new_loop.call_soon_threadsafe(work, 6)
new_loop.call_soon_threadsafe(work, 3)
print(now() - start)
"""
開始 6
0.002008199691772461
結束 6
開始 3
結束 3

"""
'''
啟動上述代碼之后,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法注冊的more_work方法, 后者因為time.sleep操作是同步阻塞的,因此運行完畢more_work需要大致6 + 3
'''
#
# asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
# asyncio.run_coroutine_threadsafe(do_some_work(3), new_loop)
# print(now() - start)
"""
Waiting 6
Waiting 3
0.0009968280792236328
Done after 3s
Done after 6s

"""
'''
上述的例子,主線程中創建一個new_loop,然后在另外的子線程中開啟一個無限事件循環。 主線程通過run_coroutine_threadsafe新注冊協程對象。這樣就能在子線程中進行事件循環的並發操作,同時主線程又不會被block。一共執行的時間大概在6s左右。
'''
  • 參考 廖雪峰 文檔
async def wget(host):
    print('wget %s...' % host)

    reader, writer = await asyncio.open_connection(host, 80)

    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    await writer.drain()
    while True:
        line = await reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()


loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
wget www.sina.com.cn...
wget www.sohu.com...
wget www.163.com...
www.sina.com.cn header > HTTP/1.1 302 Moved Temporarily
www.sina.com.cn header > Server: nginx
www.sina.com.cn header > Date: Sat, 27 Apr 2019 14:14:29 GMT
www.sina.com.cn header > Content-Type: text/html
www.sina.com.cn header > Content-Length: 154
www.sina.com.cn header > Connection: close
www.sina.com.cn header > Location: https://www.sina.com.cn/
www.sina.com.cn header > X-Via-CDN: f=edge,s=cmcc.hebei.ha2ts4.140.nb.sinaedge.com,c=183.197.88.253;
www.sina.com.cn header > X-Via-Edge: 1556374469876fd58c5b798403e6f6e7f94d2
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html;charset=UTF-8
www.sohu.com header > Connection: close
www.sohu.com header > Server: nginx
www.sohu.com header > Date: Sat, 27 Apr 2019 14:13:44 GMT
www.sohu.com header > Cache-Control: max-age=60
www.sohu.com header > X-From-Sohu: X-SRC-Cached
www.sohu.com header > Content-Encoding: gzip
www.sohu.com header > FSS-Cache: HIT from 4742539.7953813.5615036
www.sohu.com header > FSS-Proxy: Powered by 3628410.5725572.4500890
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
www.163.com header > Date: Sat, 27 Apr 2019 14:14:29 GMT
www.163.com header > Content-Length: 0
www.163.com header > Location: http://www.163.com/special/0077jt/error_isp.html
www.163.com header > X-Via: 1.0 xiyidong136:1 (Cdn Cache Server V2.0)
www.163.com header > Connection: close


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM