python---異步IO(asyncio)協程


簡單了解

在py3中內置了asyncio模塊。其編程模型就是一個消息循環。

模塊查看:

from .base_events import *
from .coroutines import *  #協程模塊,可以將函數裝飾為協程
from .events import *  #事件模塊,事件循環和任務調度都將使用到他
from .futures import *   #異步並發模塊,該模塊對task封裝了許多方法,代表將來執行或沒有執行的任務的結果。它和task上沒有本質上的區別
from .locks import *  #異步保證資源同步
from .protocols import *
from .queues import *
from .streams import *
from .subprocess import *
from .tasks import *  #創建任務,是對協程的封裝,可以查看協程的狀態。可以將任務集合
from .transports import *

調用步驟:

1.當我們給一個函數添加了async關鍵字,或者使用asyncio.coroutine裝飾器裝飾,就會把它變成一個異步函數。 
2.每個線程有一個事件循環,主線程調用asyncio.get_event_loop時會創建事件循環

3.將任務封裝為集合asyncio.gather(*args)
,之后一起傳入事件循環中

4.要把異步的任務丟給這個循環的run_until_complete方法,事件循環會安排協同程序的執行。和方法名字一樣,該方法會等待異步的任務完全執行才會結束。

簡單使用:

import asyncio,time

@asyncio.coroutine  #設為異步函數
def func1(num):
print(num,'before---func1----')
yield from asyncio.sleep(5)
print(num,'after---func1----')

task = [func1(1),func1(2)]

if __name__ == "__main__":
begin = time.time()
loop = asyncio.get_event_loop()  #進入事件循環
loop.run_until_complete(asyncio.gather(*task))  #將協同程序注冊到事件循環中
loop.close()
end = time.time()
print(end-begin)
1 before---func1----
2 before---func1----
1 after---func1----
2 after---func1----
5.00528621673584
輸出結果

定義一個協程(不同於上面的實例)

import asyncio,time

async def func1(num):  #使用async關鍵字定義一個協程,協程也是一種對象,不能直接運行,需要加入事件循環中,才能被調用。
    print(num,'before---func1----')

if __name__ == "__main__":
    begin = time.time()

    coroutine = func1(2)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(coroutine)
    loop.close()
    end = time.time()
    print(end-begin)
    func1(2)  #由於使用async異步關鍵字,所以不能直接運行

    D:/MyPython/day25/mq/multhread.py:15: RuntimeWarning: coroutine 'func1' was never awaited
    func1(2)

    print(type(func1),type(coroutine))  #<class 'function'> <class 'coroutine'>

同:python---await/async關鍵字

我們可以使用send(None)調用協程(這里不這么使用),這里是將協程放入事件循環中進行處理

    coroutine = func1(2)
    try:
        coroutine.send(None)
    except StopIteration:
        pass

創建一個任務(對協程進一步封裝,可以查看狀態等)

協程對象不能直接運行在注冊事件循環的時候其實是run_until_complete方法將協程包裝成為了一個任務(task)對象.

task對象是Future類的子類,保存了協程運行后的狀態,用於未來獲取協程的結果

run_until_complete方法查看:

class BaseEventLoop(events.AbstractEventLoop):

   def run_until_complete(self, future):
        """Run until the Future is done.

        If the argument is a coroutine, it is wrapped in a Task.

        WARNING: It would be disastrous to call run_until_complete()
        with the same coroutine twice -- it would wrap it in two
        different Tasks and that can't be good.

        Return the Future's result, or raise its exception.
        """
        self._check_closed()

        new_task = not futures.isfuture(future)
        future = tasks.ensure_future(future, loop=self) if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False

        future.add_done_callback(_run_until_complete_cb)
        try:
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                # The coroutine raised a BaseException. Consume the exception
                # to not log a warning, the caller doesn't have access to the
                # local task.
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')

        return future.result()

由源碼可以知道,在協程注冊后會被自動封裝為task任務。所以我們不是必須傳入task。但是去創建一個task對象,有利於我們理解協程的狀態。

import asyncio,time

async def func1(num):
    print(num,'before---func1----')

if __name__ == "__main__":
    begin = time.time()

    coroutine = func1(2)

    loop = asyncio.get_event_loop()

    task = loop.create_task(coroutine)  #創建了任務
    print(task) #pending

    loop.run_until_complete(task)
    loop.close()
    print(task) #finished
    end = time.time()
    print(end-begin)

對於協程的4種狀態:python---協程理解

    print(task) #pending
    print(getcoroutinestate(coroutine))

    loop.run_until_complete(task)
    loop.close()
    print(task) #finished
    print(getcoroutinestate(coroutine))
CORO_CREATED
2 before---func1----
<Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/multhread.py:4> result=None>
CORO_CLOSED

 深入了解:關於Task,create_task(),ensure_future都可以用來創建任務,那么應該使用哪個?

條件使用ensure_future,他是最外層函數,其中調用了create_task()方法,功能全面,而Task官方不推薦直接使用

asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以創建一個task,run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。 isinstance(task, asyncio.Future)將會輸出True。

綁定回調add_done_callback

async def func1(num):
print(num,'before---func1----')
return "recv num %s"%num

def callback(future):
print(future.result())

if __name__ == "__main__":
begin = time.time()

coroutine1 = func1(1)
loop = asyncio.get_event_loop()
task1=asyncio.ensure_future(coroutine1)
task1.add_done_callback(callback)
loop.run_until_complete(task1)
loop.close()
end = time.time()
print(end-begin)

1 before---func1----
recv num 1
0.004000186920166016

可以看到,coroutine執行結束時候會調用回調函數。並通過參數future獲取協程執行的結果。我們創建的task和回調里的future對象,實際上是同一個對象。

我也可以不使用回調函數,單純獲取返回值

當task狀態為finished時候,我們可以直接使用result方法(在future模塊)獲取返回值

async def func1(num):
    print(num,'before---func1----')
    return "recv num %s"%num


if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(1)
    loop = asyncio.get_event_loop()
    task1=asyncio.ensure_future(coroutine1)
    loop.run_until_complete(task1)
    print(task1)
    print(task1.result())
    loop.close()
    end = time.time()
    print(end-begin)
1 before---func1----
<Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/multhread.py:6> result='recv num 1'>
recv num 1
0.0030002593994140625

阻塞和await

使用async關鍵字定義的協程對象,使用await可以針對耗時的操作進行掛起(是生成器中的yield的替代,但是本地協程函數不允許使用),讓出當前控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其他協程也掛起,或者執行完畢在進行下一個協程的執行

使用asyncio.sleep模擬阻塞操作。

import asyncio,time


async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num) return "recv num %s"%num


if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    loop = asyncio.get_event_loop()
    task1=asyncio.ensure_future(coroutine1)
    task2=asyncio.ensure_future(coroutine2)
    tasks = asyncio.gather(*[task1,task2])    #gather可以實現同時注冊多個任務,實現並發操作。wait方法使用一致
    loop.run_until_complete(tasks)
    loop.close()
    end = time.time()
    print(end-begin)

 

並發:使用gather或者wait可以同時注冊多個任務,實現並發

gather:Return a future aggregating results from the given coroutines or futures.  返回結果

    task1=asyncio.ensure_future(coroutine1)
    task2=asyncio.ensure_future(coroutine2)
    tasks = asyncio.gather(*[task1,task2])
    loop.run_until_complete(tasks)

wait:Returns two sets of Future: (done, pending).   #返回dones是已經完成的任務,pending是未完成的任務,都是集合類型

    task1=asyncio.ensure_future(coroutine1)
    task2=asyncio.ensure_future(coroutine2)
    tasks = asyncio.wait([task1,task2])
    loop.run_until_complete(tasks)

 

Usage:

done, pending = yield from asyncio.wait(fs)

wait是接收一個列表,而后gather是接收一堆任務數據。

兩者的返回值也是不同的

協程嵌套,將多個協程封裝到一個主協程中

import asyncio,aiohttp

async def fetch_async(url):
    print(url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(resp.status)
            print(await resp.text())

tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.cnblogs.com/ssyfj/')]

event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()
關於aiohttp模塊的協程嵌套,嵌套更加明顯
import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

async def main():
    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]

    dones, pendings = await asyncio.wait(tasks) for task in dones:  #對已完成的任務集合進行操作
        print("Task ret: ",task.result())

if __name__ == "__main__":
    begin = time.time()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    end = time.time()
    print(end-begin)
5 before---func1----
3 before---func1----
4 before---func1----
Task ret:  recv num 4
Task ret:  recv num 5
Task ret:  recv num 3
5.000285863876343

也可以直接使用gather直接獲取值

    results = await asyncio.gather(*tasks)
    for result in results:
        print("Task ret: ",result)

我們也可以不在main中處理結果,而是返回到主調用方進行處理

async def main():
    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]

    return await asyncio.gather(*tasks) if __name__ == "__main__":
    begin = time.time()

    loop = asyncio.get_event_loop()
 results = loop.run_until_complete(main()) for result in results: print("Task ret: ",result)
    loop.close()
    end = time.time()
    print(end-begin)

或者使用wait掛起

    return await asyncio.wait(tasks)

----------------------------------------------------
    dones,pendings = loop.run_until_complete(main())
    for task in dones:
        print("Task ret: ",task.result())

或者使用asyncio中的as_completed方法

Return an iterator whose values are coroutines.  #返回一個可迭代的協程函數值

    When waiting for the yielded coroutines you'll get the results (or
    exceptions!) of the original Futures (or coroutines), in the order
    in which and as soon as they complete.
import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

async def main():
    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]

    for task in asyncio.as_completed(tasks): result = await task print("Task ret: ",result) if __name__ == "__main__":
    begin = time.time()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    loop.close()
    end = time.time()
    print(end-begin)

協程停止

future對象有幾個狀態:

  • Pending
  • Running
  • Done
  • Cacelled

 創建future的時候,task為pending,

事件循環調用執行的時候當然就是running,

調用完畢自然就是done,

如果需要停止事件循環,就需要先把task取消。

可以使用asyncio.Task獲取事件循環的task

import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]


    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        print(asyncio.Task.all_tasks())
        for task in asyncio.Task.all_tasks():  #獲取所有任務
            print(task.cancel())  #單個任務取消
        loop.stop()    #需要先stop循環
        loop.run_forever()  #需要在開啟事件循環 finally:
        loop.close()  #統一關閉
    end = time.time()
    print(end-begin)
5 before---func1----
3 before---func1----
4 before---func1----
{<Task pending coro=<func1() running at multhread.py:5> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<loc
als>._on_completion() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]>, <
Task pending coro=<wait() running at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks
.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<func1() running at multhread.py:5> wait
_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at C:\Users\Administrator\AppData\Loca
l\Programs\Python\Python35\lib\asyncio\tasks.py:428]>, <Task pending coro=<func1() running at multhread.py:5> wait_f
or=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at C:\Users\Administrator\AppData\Local\
Programs\Python\Python35\lib\asyncio\tasks.py:428]>}  #未處理,剛剛掛起為pending狀態
True  #返回True,表示cancel取消成功
True
True
True
3.014172315597534

True表示cannel成功,loop stop之后還需要再次開啟事件循環,最后在close,不然還會拋出異常:

Task was destroyed but it is pending!

因為cancel后task的狀態依舊是pending

        for task in asyncio.Task.all_tasks():
            print(task)
            print(task.cancel())
            print(task)
<Task pending coro=<func1() running at multhread.py:5> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<loca
ls>._on_completion() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]> True <Task pending coro=<func1() running at multhread.py:5> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion
() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]>

或者使用協程嵌套,main協程相當於最外層task,處理main函數即可

import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

async def main():
    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]

    dones,pendings=await asyncio.wait(tasks)

    for task in dones:
        print("Task ret: ",task.result())


if __name__ == "__main__":
    begin = time.time()

    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(main())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt as e:
        print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())  #我們只是把上面的單個寫成了所有任務集合取消,和協程嵌套關系不大。上面也可以這樣寫。不過協程嵌套可以簡化代碼
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()
    end = time.time()
    print(end-begin)
5 before---func1----
3 before---func1----
4 before---func1----
<class 'asyncio.tasks._GatheringFuture'>
True
3.008172035217285

感覺有點多余...

import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]


    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        print(asyncio.gather(*tasks).cancel())  
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()
    end = time.time()
    print(end-begin)
5 before---func1----
3 before---func1----
4 before---func1----
True
3.008171796798706

上面討論的都是在同一線程下的事件循環,下面來談談不同線程的事件循環

在當前線程中創建一個事件循環(不啟用,單純獲取標識),開啟一個新的線程,在新的線程中啟動事件循環。在當前線程依據事件循環標識,可以向事件中添加協程對象。當前線程不會由於事件循環而阻塞了。

上面在一個線程中執行的事件循環,只有我們主動關閉事件close,事件循環才會結束,會阻塞。

同一線程:

import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]


 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.run_forever()
    end = time.time()
    print(end-begin)

不同線程事件循環(不涉及協程):

import asyncio,time,threading

def func1(num):
    print(num,'before---func1----')
    time.sleep(num)
    return "recv num %s"%num

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

if __name__ == "__main__":
    begin = time.time()

    new_loop = asyncio.new_event_loop() #在當前線程下創建時間循環,(未啟用)
    t = threading.Thread(target=start_loop,args=(new_loop,))    #開啟新的線程去啟動事件循環
    t.start()

    new_loop.call_soon_threadsafe(func1,3)
    new_loop.call_soon_threadsafe(func1,2)
    new_loop.call_soon_threadsafe(func1,6)

    end = time.time()
    print(end-begin)    #當前線程未阻塞,耗時0.02800154685974121
3 before---func1----
0.02800154685974121
2 before---func1----
6 before---func1----

新線程協程:

import asyncio,time,threading

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    new_loop = asyncio.new_event_loop() #在當前線程下創建時間循環,(未啟用)
    t = threading.Thread(target=start_loop,args=(new_loop,))    #開啟新的線程去啟動事件循環
    t.start()

    asyncio.run_coroutine_threadsafe(coroutine1,new_loop)  #傳參必須是協程對象
    asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
    asyncio.run_coroutine_threadsafe(coroutine3,new_loop)

    end = time.time()
    print(end-begin)    #當前線程未阻塞,耗時0.010000467300415039
5 before---func1----
3 before---func1----
4 before---func1----
0.010000467300415039

主線程通過run_coroutine_threadsafe新注冊協程對象。這樣就能在子線程中進行事件循環的並發操作,同時主線程又不會被block。

推文:Python黑魔法 --- 異步IO( asyncio) 協程

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM