Python 原生協程------asyncio


協程

  在python3.5以前,寫成的實現都是通過生成器的yield from原理實現的, 這樣實現的缺點是代碼看起來會很亂,於是3.5版本之后python實現了原生的協程,並且引入了async和await兩個關鍵字用於支持協程。於是在用async定義的協程與python的生成器徹底分開。

async def downloader(url):
    return 'bobby'

async def download_url(url):
    html = await downloader(url)
    return html
if __name__ == '__main__':
    coro = download_url('http://www/imooc.com')
    coro.send(None)

輸出結果:
Traceback (most recent call last):
  File "D:/MyCode/Cuiqingcai/Flask/test01.py", line 67, in <module>
    coro.send(None)
StopIteration: bobby

可以看到結果中可以將downloader(url)的結果返回。需要注意的是在原生協程里面不能用next()來預激協程。

async def downloader(url):
    return 'bobby'

async def download_url(url):
    html = await downloader(url)
    return html
if __name__ == '__main__':
    coro = download_url('http://www/imooc.com')
    coro.next()

結果:
AttributeError: 'coroutine' object has no attribute 'next'
sys:1: RuntimeWarning: coroutine 'download_url' was never awaited

 

原生協程async代碼中間是不能在使用yield生成器的,這樣就為了更好的將原生協程與生成器嚴格區分開來。並且await只能和async語句搭配,不能和生成器搭配。因為要調用await需要調用對象實現__await__()這個魔法方法。所以在定義協程時候注意不要混用。但是理解的時候還是可以將原生的協程中 await可以對比生成器的yield from。

 asyncio

高並發的核心模塊,3.4之后引入,最具野性的模塊,web服務器,爬蟲都可以勝任。它是一個模塊也可以看做一個框架。

協程編碼模式的三個要點:

  1. 事件循環
  2. 回調(驅動生成器(協程))
  3. epoll(IO多路復用)

  asyncio的簡單實用:

  這里需要注意的是:同步阻塞的接口不能使用在協程里面。因為協程是單線程的,只要有一個地方阻塞了,那么所有的協程都需要等待阻塞結束之后才可以向下運行,於是在協程函數中等待一定不能用time.sleep() 如果用time.sleep()就失去了協程的意義了(即程序運行的時間將會是每個協程數乘以time.sleep()的時間數。)同時用asyncio.sleep() 之前需要加上 await

import time
async def download_url(url): print('start get %s' % url) await asyncio.sleep(2) print('get %s finished.' % url) if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(download_url('https:www.baidu.com')) # 阻塞等事件完成之后再向下運行。相當於進程線程的join()方法,或者進線程池的wait() print('一共用時:%s' % (time.time() - start_time)) start get https:www.baidu.com get https:www.baidu.com finished. 一共用時:2.0011143684387207

  一次執行多個任務。(用時和一個任務一樣!)下面任務如果換成time.sleep(2)則函數需要至少20秒才能執行完畢。

import asyncio
import time async def download_url(url): print('start get %s.' % url) await asyncio.sleep(2) print('get %s end' % url) if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() url_list = [] for i in range(10): url_list.append('https:www.baidu.com.index{}'.format(i)) tasks = [download_url(url) for url in url_list] loop.run_until_complete(asyncio.wait(tasks)) print('用時 %s' %(time.time() - start_time)) ... ... 用時 2.0031144618988037

  獲取協程的返回值:

  可以用兩種方式先得到一個future對象。然后將該future對象放入loop.run_until_complete()中。(該函數即可以接受future對象也可以接受協程對象)然后future對象跟進程池和線程池中的future對象的方法是一樣的。於是可以用.result()方法的到函數的返回結果。

async def download_url(url):
    # print('start get %s' % url)
    await asyncio.sleep(2) # print('get %s finished.' % url) return 'frank is a good man.' if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() # get_future = asyncio.ensure_future(download_url('https:www.baidu.com')) get_task = loop.create_task(download_url('https:www.baidu.com')) loop.run_until_complete(download_url(get_task)) print('一共用時:%s' % (time.time() - start_time)) print(get_task.result()) 一共用時:2.0021142959594727 frank is a good man.

  future完成之后的回調函數

  設置協程完成之后的回調函數:future對象的.add_done_callback(),注意在調用add_done_callback的時候會默認將future對象傳遞給回調函數,因此回調函數必須至少接受一個參數,同時add_done_callback必須在run_until_complete()前,(協程函數創建之后調用)因為如果在run_until_complete()之后的話,協程都應結束了。就不會起作用了。

async def download_url(url):
    # print('start get %s' % url)
    await asyncio.sleep(2) # print('get %s finished.' % url) return 'frank is a good man.' def send_emai(future): print('網頁下載完畢') print(future.result()) if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() # get_future = asyncio.ensure_future(download_url('https:www.baidu.com')) get_task = loop.create_task(download_url('https:www.baidu.com')) get_task.add_done_callback(send_emai) loop.run_until_complete(download_url(get_task)) print('一共用時:%s' % (time.time() - start_time)) print(get_task.result()) 網頁下載完畢 frank is a good man. 一共用時:2.0021145343780518 frank is a good man.

 

問題來了。tasks.add_done_callback方法只能接受函數名,如果回調的方法也需要參數怎么辦?這就需要用到偏函數from functools import partial (偏函數可以將函數包裝成為另外一個函數)

import  asyncio
import time
from functools import partial
async def get_html(url):
    print('start get url')
    await asyncio.sleep(2)
    return 'bobby'

def callback_method(url, future):# 此處因為是future對象即(tasks)調用的,所以有一個默認的參數。同時要注意:回調函數的future只能放到最后,其它的函數實參放前面。
    print('download ended %s' % url)

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = asyncio.ensure_future(get_html('http://www.baidu.com'))
    # tasks = loop.create_task(get_html('http://www.baidu.com'))
    tasks.add_done_callback(partial(callback_method, 'http://www.baidu.com')) # 參數為回調方法
    loop.run_until_complete(tasks)
    print(tasks.result())


start get url
download ended http://www.baidu.com
bobby

 

gather 與 wait的區別。都是可以等待程序運行之后往下運行。但是gather比wait更加高級一點

gather可以將任務分組:

import  asyncio
import time

async def get_html(url):
    print('start get %s' % url)
    await asyncio.sleep(2)
    print('end get %s' % url)
if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    group1 = [get_html('http://goup1.com') for i in range(2)]
    group2 = [get_html('http://group2.com') for i in range(2)]
    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)
    loop.run_until_complete(asyncio.gather(group1, group2))
    # 上面三行代碼也可以合一起loop.run_until_complete(asyncio.gather(*group1, *group2)) 注意此參數前面要加*
    print(time.time()-start_time)


start get http://goup1.com
start get http://group2.com
start get http://group2.com
start get http://goup1.com
end get http://goup1.com
end get http://group2.com
end get http://group2.com
end get http://goup1.com
2.0021145343780518

loop.run_forever()協程的任務完成之后不會停止,而是會一直運行。老師吐槽:python中間loop和future的關系有點亂,loop會被放到future中間同時future又可以放到loop中間,造成一個循環。

如何取消future(task)

async def get_html(sleep_time):
    print('waiting')
    await asyncio.sleep(sleep_time)
    print('end after %s S' % sleep_time)

if __name__ == '__main__':
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(4)
    tasks = [task1, task2, task3]
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        all_tasks = asyncio.Task.all_tasks()
        for task in all_tasks:
            print('cancel task')
            print(task.cancel())# 打印是否取消 是返回Ture,否False
        loop.stop()
        loop.run_forever() # 注意此處一定要加上loop.run_forever()不然會報異常 finally:
        loop.close()

waiting
waiting
waiting
cancel task
True
cancel task
True
cancel task
True
cancel task
True

 

將此代碼進入cmd中運行,然后再中間按ctrl + C鍵,主動生成一個 KeyboardInterrupt 異常,然后異常被捕捉之后做出處理(即停止協程的運行)

 

協程的嵌套

import asyncio
async def compute(x, y):
    print('Computer %s + %s...') % (x, y))
    await asyncio.sleep(1)
    return x + y

async def print_sum(x, y):
    result = await computer(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

 

代碼分析圖:

 

  1. loop首先會為print_sum()創建一個task
  2. event_loop()驅動task運行,使task進入(pending狀態)
  3. task去執行print_sum
  4. print_sum中首先進入子協程的調度(await相當於yiel from)所以轉向執行computer,print_sum自身暫停。
  5. compute中存在await 於是也被迫進入暫停狀態,然后可以直接返回給task(await == yield from 而yield from可以在調度方與子生成器之間掠過委托方建立雙向通道)
  6. task返回給event_loop()
  7. 等待1秒鍾之后,task喚醒compute,compute繼續執行下一行代碼(即return x + y)完成之后compute就是一個done狀態,同時拋出一個stopiterationError異常,此異常將激活print_sum()(委托方),並且將異常將被await(對應之前的yield from)捕捉並提取出return的值。
  8. print_sum()被激活之后執行print然后變成done狀態,也會拋出一個stopiterationError異常,然后被task接受並處理了。

 asyncio中的其他函數(以下三個為底層函數,多數條件下用得不多)

call_soon()

  在協程運行時候,可以傳遞一些函數去執行,注意是函數不是協程。

import asyncio
def callback(sleep_times):
    print('sleep {} success'.format(sleep_times))

if __name__ =='__main__':
    loop = asyncio.get_event_loop()
    loop.call_soon(callback, 2) #第一個參數為調用的函數名,第二個單數為被調用函數的參數
    loop.run_forever()

sleep 2 success

 

  注意此處調動函數的運行需要用到loop.run_forever()而不是loop.run_until_complete()因為oop.call_soon()只是調用函數而不是loop注冊的協程。

同時loop.run_forever()會導致程序一直在運行,不會自動結束。於是需要添加以下方法使程序關閉。

import asyncio
def callback(sleep_times):
    print('sleep {} success'.format(sleep_times))

def stop_loop(loop):
    loop.stop()

if __name__ =='__main__':
    loop = asyncio.get_event_loop()
    loop.call_soon(callback, 2)
    loop.call_soon(stop_loop, loop)
    loop.run_forever()

 

call_later()

  功能,講一個callback函數在一個指定的時候運行。

import asyncio
def callback(sleep_times):
    print('sleep {} success'.format(sleep_times))

def stop_loop(loop):
    loop.stop()

if __name__ =='__main__':
    loop = asyncio.get_event_loop()
    loop.call_later(2, callback, 2) # 參數含義:第一個參數為延遲時間,延遲越少越先運行。
    loop.call_later(1, callback, 1)
    loop.call_later(3, callback, 3)
    loop.call_soon(callback, 4)
    loop.run_forever()


sleep 4 success
sleep 1 success
sleep 2 success
sleep 3 success

 

同時存在call_soon()和call_later()時,call_soon()會在那個call_later()前面調用。

call_at()

  也是調用函數在指定時間內調用,但是它的指定時間,是指的loop內的時間,而不是自己傳遞的時間。可以用loop.time()來獲取loop內時間。

import asyncio
def callback(sleep_times):
    print('sleep {} success'.format(sleep_times))

if __name__ =='__main__':
    loop = asyncio.get_event_loop()
    now = loop.time()
    loop.call_at(now+2, callback, 2) # 注意第一個參數是相對於loop系統時間而來的,不是自定義的幾秒鍾之后運行。
    loop.call_at(now+1, callback, 1)
    loop.call_at(now+3, callback, 3)
    loop.call_soon(callback, 4)
    loop.run_forever()

sleep 4 success
sleep 1 success
sleep 2 success
sleep 3 success

 

協程中線程的實現

協程不提供阻塞的方式,但是有時候有些庫,和有些接口只能提供阻塞方式連接。於是就可以在協程中繼承阻塞io

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM