python:Asyncio模塊處理“事件循環”中的異步進程和並發執行任務


python模塊Asynico提供了管理事件、攜程、任務和線程的功能已經編寫並發代碼的同步原語。

組成模塊:

事件循,Asyncio 每個進程都有一個事件循環。

協程,子例程概念的泛化,可以暫停任務,等待哇愛不處理程序完成再從暫停之處返回。

Futures:定義了futures對象。

任務tasks:是Asyncio的一個子類,用於封裝並管理並行模式下的協程。

 

管理事件循環的方法:

loop = asyncio.get_event_loop()  獲得當前上下文事件循環
loop.call_later(time_delay,caallback,argument) 在給定時間time_delay秒后,調用某個回調對象。
loop.call_soon(callback,argument) 該方法馬上安排一個被調用的回調對象。
loop.time() 以浮點數形式返回根據事件循環的內部時鍾確定的當前時間。
asyncio.set_event_loop() 將當前上下文的時間循環設置為3給定循環。
asyncio.new_event_loop() 根據此函數的規則創建並返回一個新的時間循環對象。
loop.run_forever() 一直執行知道調用stop()為止。

異步調用子例程

import asyncio

def fun_1(end_time,loop):
    print("fun1__callback")

    if (loop.time() + 1.0) < end_time:
        loop.call_later(1,fun_2,end_time,loop)
        print("fun1print")
    else:
        loop.stop()


def fun_2(end_time, loop):
    print("fun2__callback")

    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, fun_3, end_time, loop)

    else:
        loop.stop()


def fun_3(end_time, loop):
    print("fun3__callback")

    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, fun_1, end_time, loop)

    else:
        loop.stop()


loop = asyncio.get_event_loop()

end_loop = loop.time() + 9.0

loop.call_soon(fun_1,end_loop,loop)

loop.run_forever()

loop.close()



結果:
fun1__callback
fun1print
fun2__callback
fun3__callback
fun1__callback
fun1print
fun2__callback
fun3__callback
fun1__callback
fun1print
fun2__callback
fun3__callback

從輸出結果上我們可以看到這個任務調用是完全異步的,開始loop.call_soon(fun_1,end_loop,loop) 立刻調用fun_1 當if條件成立時延遲一秒執行fun_2 但是fun_1的下一句print依然直接輸出。但是我后來又測試他實際上還是要等fun_1里的其他語句執行完才會切換到fun_2。

總結:只是在fun_1 1S后調用fun_2期間他會執行fun_1中的其他語句,但是如果需要的時間過長就會等待fun_1所有語句執行完畢才會切換到fun_2不僅僅等一秒。

 

使用Asyncio處理協程

與子例程相似但是不存在用於協調結果的主程序,協程之間可以相互連接形成一個管道,不需要任何監督函數來按順序調用協程。謝承忠可以暫停執行,同時保存干預時的本地狀態,便於后續繼續執行任務。有了協程池,協程計算就能夠相互交錯。

特點:

協程支持多個進入點,可以多次生成。

協程能夠將執行轉移至任意其他協程。

下方實現了一個有限狀態機。

如圖:系統有5個狀態 ,start 、s1、s2、s3、end

這里s1-s3是自動循環切換。開始通過start進入狀態機從end退出狀態機。

實現代碼如下

 

import asyncio
import time
from random import randint

@asyncio.coroutine
def StartState():
    print("start state call \n")
    input_value = randint(0,1)
    time.sleep(1)
    if (input_value == 0):
        result = yield from start1(input_value)
    else:
        result = yield from start2(input_value)
    print("start + " + result)
    return result
@asyncio.coroutine
def start1(value):
    v = str(value)
    input_value = randint(0,1)
    if input_value == 0:
        result = yield from start2(input_value)
    else:
        result = yield from start3(input_value)

    print("1 end +" + result)
    return v

@asyncio.coroutine
def start2(value):
    v = str(value)
    input_value = randint(0, 1)
    if input_value == 0:
        result = yield from start1(input_value)
    else:
        result = yield from start3(input_value)

    print("2 end +"  + result)
    return v

@asyncio.coroutine
def start3(value):
    v = str(value)
    input_value = randint(0, 1)
    if input_value == 0:
        result = yield from endy(input_value)
    else:
        result = yield from start1(input_value)

    print("3 end +"+ result)
    return v

@asyncio.coroutine
def endy(value):
    v = str(value)
    print("end +" +v )
    return v
if __name__ == "__main__":

    print("開始")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(StartState())

 是否切換下一個狀態由input_value決定,而他是由python的random模塊中的randint(0,1)函數定義的。該函數隨機返回值0或1.通過這種方法,可以隨機決定有限狀態機被傳遞哪個狀態。

 

利用Asyncio 並發協程
Asyncio提供了一個處理任務計算的方法,asynico.Task(coroutine).該方法用於調度協程的執行,任務負責執行時間循環中的協程對象。一個事件循環只執行一個任務,也就是添加進Task中的每個任務都通過線程並發處理。
import asyncio


@asyncio.coroutine
def task1(number):
    f = 0
    for i in range(number):
        f += i
        print("task1 + %d" % i)
        yield from asyncio.sleep(1)
    print("task1 the end number =%d" % f)

@asyncio.coroutine
def task2(number):
    f = 0
    for i in  range(number):
        f *= i
        print("task2 * %d" % i)
        yield from asyncio.sleep(1)

    print("task2 the end number = %d" % f)

@asyncio.coroutine
def task3(number):
    f = 0
    for i in range(number):
        f -= i
        print("task2 - %d" % i)
        yield from asyncio.sleep(1)

    print("task2 the end number = %d" % f)

if __name__ == "__main__":
    tasks = [asyncio.Task(task1(10)),
             asyncio.Task(task2(10)),
             asyncio.Task(task3(10))
             ]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))  #wait 等協程結束后返回
    loop.close()


輸出結果:
task1 + 0
task2 * 0
task2 - 0
task1 + 1
task2 * 1
task2 - 1
task1 + 2
task2 * 2
task2 - 2
task1 + 3
task2 * 3
task2 - 3
task1 + 4
task2 * 4
task2 - 4
task1 + 5
task2 * 5
task2 - 5
task1 + 6
task2 * 6
task2 - 6
task1 + 7
task2 * 7
task2 - 7
task1 + 8
task2 * 8
task2 - 8
task1 + 9
task2 * 9
task2 - 9
task1 the end number =45
task2 the end number = 0
task2 the end number = -45

 

使用Asyncio和Futures

future = asyncio.Future()
future.cancel() 取消future,並安排回調函數。
future.result() 返回future鎖代表的結果
future.exception() 返回fture上設置的異常
future.add_done_callback() 添加一個在future執行時運行的回調函數
future.remove_done_callback() 從借宿后調用列表中溢出一個回調對象的所有實例
future.set_result() 將future標記為已完成並設置其結果
future.set_exception() 將future標記為已完成,並設置一個異常

import asyncio


@asyncio.coroutine
def firest_coroutine(future):
    count = 0
    for i in range(10):
        count += i

    yield from asyncio.sleep(4)
    future.set_result("first corountine sum %d" % count)

@asyncio.coroutine
def second_coroutine(future):
    count = 1
    for i in range(1,10):
        count *= i

    yield from asyncio.sleep(3)
    future.set_result("second corountine sum %d" % count)

def callback_result(future):
    print(future.result())

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    future1 = asyncio.Future()
    future2 = asyncio.Future()

    tasks = [
        firest_coroutine(future1),
        second_coroutine(future2)
    ]

    future1.add_done_callback(callback_result)
    future2.add_done_callback(callback_result)

    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

總結一下:主要就是通過線程和協程 實現的事件編程,通過不同事件不同狀態的調用,最后這段代碼主要是添加了事件中可回調對象的操作。



 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM