python模塊Asynico提供了管理事件、攜程、任務和線程的功能已經編寫並發代碼的同步原語。
組成模塊:
事件循,Asyncio 每個進程都有一個事件循環。
協程,子例程概念的泛化,可以暫停任務,等待哇愛不處理程序完成再從暫停之處返回。
Futures:定義了futures對象。
任務tasks:是Asyncio的一個子類,用於封裝並管理並行模式下的協程。
管理事件循環的方法:
loop = asyncio.get_event_loop() 獲得當前上下文事件循環
loop.call_later(time_delay,caallback,argument) 在給定時間time_delay秒后,調用某個回調對象。
loop.call_soon(callback,argument) 該方法馬上安排一個被調用的回調對象。
loop.time() 以浮點數形式返回根據事件循環的內部時鍾確定的當前時間。
asyncio.set_event_loop() 將當前上下文的時間循環設置為3給定循環。
asyncio.new_event_loop() 根據此函數的規則創建並返回一個新的時間循環對象。
loop.run_forever() 一直執行知道調用stop()為止。
異步調用子例程
import asyncio def fun_1(end_time,loop): print("fun1__callback") if (loop.time() + 1.0) < end_time: loop.call_later(1,fun_2,end_time,loop) print("fun1print") else: loop.stop() def fun_2(end_time, loop): print("fun2__callback") if (loop.time() + 1.0) < end_time: loop.call_later(1, fun_3, end_time, loop) else: loop.stop() def fun_3(end_time, loop): print("fun3__callback") if (loop.time() + 1.0) < end_time: loop.call_later(1, fun_1, end_time, loop) else: loop.stop() loop = asyncio.get_event_loop() end_loop = loop.time() + 9.0 loop.call_soon(fun_1,end_loop,loop) loop.run_forever() loop.close() 結果: fun1__callback fun1print fun2__callback fun3__callback fun1__callback fun1print fun2__callback fun3__callback fun1__callback fun1print fun2__callback fun3__callback
從輸出結果上我們可以看到這個任務調用是完全異步的,開始loop.call_soon(fun_1,end_loop,loop) 立刻調用fun_1 當if條件成立時延遲一秒執行fun_2 但是fun_1的下一句print依然直接輸出。但是我后來又測試他實際上還是要等fun_1里的其他語句執行完才會切換到fun_2。
總結:只是在fun_1 1S后調用fun_2期間他會執行fun_1中的其他語句,但是如果需要的時間過長就會等待fun_1所有語句執行完畢才會切換到fun_2不僅僅等一秒。
使用Asyncio處理協程
與子例程相似但是不存在用於協調結果的主程序,協程之間可以相互連接形成一個管道,不需要任何監督函數來按順序調用協程。謝承忠可以暫停執行,同時保存干預時的本地狀態,便於后續繼續執行任務。有了協程池,協程計算就能夠相互交錯。
特點:
協程支持多個進入點,可以多次生成。
協程能夠將執行轉移至任意其他協程。
下方實現了一個有限狀態機。
如圖:系統有5個狀態 ,start 、s1、s2、s3、end
這里s1-s3是自動循環切換。開始通過start進入狀態機從end退出狀態機。
實現代碼如下
import asyncio import time from random import randint @asyncio.coroutine def StartState(): print("start state call \n") input_value = randint(0,1) time.sleep(1) if (input_value == 0): result = yield from start1(input_value) else: result = yield from start2(input_value) print("start + " + result) return result @asyncio.coroutine def start1(value): v = str(value) input_value = randint(0,1) if input_value == 0: result = yield from start2(input_value) else: result = yield from start3(input_value) print("1 end +" + result) return v @asyncio.coroutine def start2(value): v = str(value) input_value = randint(0, 1) if input_value == 0: result = yield from start1(input_value) else: result = yield from start3(input_value) print("2 end +" + result) return v @asyncio.coroutine def start3(value): v = str(value) input_value = randint(0, 1) if input_value == 0: result = yield from endy(input_value) else: result = yield from start1(input_value) print("3 end +"+ result) return v @asyncio.coroutine def endy(value): v = str(value) print("end +" +v ) return v if __name__ == "__main__": print("開始") loop = asyncio.get_event_loop() loop.run_until_complete(StartState())
是否切換下一個狀態由input_value決定,而他是由python的random模塊中的randint(0,1)函數定義的。該函數隨機返回值0或1.通過這種方法,可以隨機決定有限狀態機被傳遞哪個狀態。
利用Asyncio 並發協程
Asyncio提供了一個處理任務計算的方法,asynico.Task(coroutine).該方法用於調度協程的執行,任務負責執行時間循環中的協程對象。一個事件循環只執行一個任務,也就是添加進Task中的每個任務都通過線程並發處理。
import asyncio @asyncio.coroutine def task1(number): f = 0 for i in range(number): f += i print("task1 + %d" % i) yield from asyncio.sleep(1) print("task1 the end number =%d" % f) @asyncio.coroutine def task2(number): f = 0 for i in range(number): f *= i print("task2 * %d" % i) yield from asyncio.sleep(1) print("task2 the end number = %d" % f) @asyncio.coroutine def task3(number): f = 0 for i in range(number): f -= i print("task2 - %d" % i) yield from asyncio.sleep(1) print("task2 the end number = %d" % f) if __name__ == "__main__": tasks = [asyncio.Task(task1(10)), asyncio.Task(task2(10)), asyncio.Task(task3(10)) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) #wait 等協程結束后返回 loop.close() 輸出結果: task1 + 0 task2 * 0 task2 - 0 task1 + 1 task2 * 1 task2 - 1 task1 + 2 task2 * 2 task2 - 2 task1 + 3 task2 * 3 task2 - 3 task1 + 4 task2 * 4 task2 - 4 task1 + 5 task2 * 5 task2 - 5 task1 + 6 task2 * 6 task2 - 6 task1 + 7 task2 * 7 task2 - 7 task1 + 8 task2 * 8 task2 - 8 task1 + 9 task2 * 9 task2 - 9 task1 the end number =45 task2 the end number = 0 task2 the end number = -45
使用Asyncio和Futures
future = asyncio.Future()
future.cancel() 取消future,並安排回調函數。
future.result() 返回future鎖代表的結果
future.exception() 返回fture上設置的異常
future.add_done_callback() 添加一個在future執行時運行的回調函數
future.remove_done_callback() 從借宿后調用列表中溢出一個回調對象的所有實例
future.set_result() 將future標記為已完成並設置其結果
future.set_exception() 將future標記為已完成,並設置一個異常
import asyncio @asyncio.coroutine def firest_coroutine(future): count = 0 for i in range(10): count += i yield from asyncio.sleep(4) future.set_result("first corountine sum %d" % count) @asyncio.coroutine def second_coroutine(future): count = 1 for i in range(1,10): count *= i yield from asyncio.sleep(3) future.set_result("second corountine sum %d" % count) def callback_result(future): print(future.result()) if __name__ == "__main__": loop = asyncio.get_event_loop() future1 = asyncio.Future() future2 = asyncio.Future() tasks = [ firest_coroutine(future1), second_coroutine(future2) ] future1.add_done_callback(callback_result) future2.add_done_callback(callback_result) loop.run_until_complete(asyncio.wait(tasks)) loop.close()
總結一下:主要就是通過線程和協程 實現的事件編程,通過不同事件不同狀態的調用,最后這段代碼主要是添加了事件中可回調對象的操作。