asyncio模塊作用:構建協程並發應用的工具
python並發的三大內置模塊,簡單認識:
1、multiprocessing:多進程並發處理 2、threading模塊:多線程並發處理 3、asyncio模塊:協程並發處理
1、啟動一個協程,任務無返回值,需要注意:async的使用

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio # 開頭定義async,表示要在協程運行,不定義的話,循環監聽增加不了 async def coroutine(): print('協程運行...') # 定義一個事件循環監聽 event_loop = asyncio.get_event_loop() try: print('協程開始...') coroutine_obj = coroutine() print('進入事件循環監聽...') event_loop.run_until_complete(coroutine()) # run_until_complete翻譯成中文:一直運行到完成為止 finally: print('關閉事件循環監聽..') event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_coroutine.py 協程開始... 進入事件循環監聽... 協程運行... 關閉事件循環監聽.. sys:1: RuntimeWarning: coroutine 'coroutine' was never awaited
2、啟動一個協程,任務有返回值,需要注意:async的使用

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio # 開頭定義async,表示要在協程運行,不定義的話,循環監聽增加不了 async def coroutine(): print('協程運行...') return 'ok' # 定義一個事件循環監聽 event_loop = asyncio.get_event_loop() try: coroutine_obj = coroutine() return_value = event_loop.run_until_complete(coroutine()) # run_until_complete翻譯成中文:一直運行到完成為止 print('coroutine()返回值:', return_value) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_coroutine_return.py 協程運行... coroutine()返回值: ok sys:1: RuntimeWarning: coroutine 'coroutine' was never awaited
3、啟動一個協程,任務調用其它任務運行,需要注意:await 的使用

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio # 開頭定義async,表示要在協程運行,不定義的話,循環監聽增加不了 async def coroutine(): print('coroutine內部運行') print('等待task_1運行結果') task1 = await task_1() #await作用:控制運行流程,按順序執行,即等待該函數運行完成,再繼續往后執行 print('等待task_2運行結果') task2 = await task_2(task1) return (task1, task2) async def task_1(): print('task_1內部運行') return 'task_1 ok' async def task_2(arg): print('task_2內部運行') return 'task_2 arg:{}'.format(arg) # 定義一個事件循環監聽 event_loop = asyncio.get_event_loop() try: coroutine_obj = coroutine() return_value = event_loop.run_until_complete(coroutine()) # run_until_complete翻譯成中文:一直運行到完成為止 print('coroutine()返回值:', return_value) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_coroutine_chain.py coroutine內部運行 等待task_1運行結果 task_1內部運行 等待task_2運行結果 task_2內部運行 coroutine()返回值: ('task_1 ok', 'task_2 arg:task_1 ok') sys:1: RuntimeWarning: coroutine 'coroutine' was never awaited
4、生成器而不是協程
Python3早期版本的語法如下
@asyncio.coroutine 替換為 async
yield from 替換為 await

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio # 開頭定義async,表示要在協程運行,不定義的話,循環監聽增加不了 @asyncio.coroutine def coroutine(): print('coroutine內部運行') print('等待task_1運行結果') task1 = yield from task_1() #await作用:控制運行流程,按順序執行,即等待該函數運行完成,再繼續往后執行 print('等待task_2運行結果') task2 = yield from task_2(task1) return (task1, task2) @asyncio.coroutine async def task_1(): print('task_1內部運行') return 'task_1 ok' @asyncio.coroutine async def task_2(arg): print('task_2內部運行') return 'task_2 arg:{}'.format(arg) # 定義一個事件循環監聽 event_loop = asyncio.get_event_loop() try: coroutine_obj = coroutine() return_value = event_loop.run_until_complete(coroutine()) # run_until_complete翻譯成中文:一直運行到完成為止 print('coroutine()返回值:', return_value) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_generator.py coroutine內部運行 等待task_1運行結果 task_1內部運行 等待task_2運行結果 task_2內部運行 coroutine()返回值: ('task_1 ok', 'task_2 arg:task_1 ok')
5、協程回調函數調用,此示例:訊速回調

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import functools def callback(arg, *, kwarg='default'): print('回調函數arg={},kwarg={}'.format(arg, kwarg)) async def main(loop): print('注冊回調函數') loop.call_soon(callback, 1) # 執行回調函數,傳入參數1 wrapped = functools.partial(callback, kwarg='not default') # 利用偏函數,給kwarg傳默認值 loop.call_soon(wrapped, 2) # 執行回調函數,傳入參數2 await asyncio.sleep(0.5) event_loop = asyncio.get_event_loop() try: print('進入事件循環監聽') event_loop.run_until_complete(main(event_loop)) # 將事件循環對象傳入main函數中 finally: print('關閉事件循環監聽') event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_call_soon.py 進入事件循環監聽 注冊回調函數 回調函數arg=1,kwarg=default 回調函數arg=2,kwarg=not default 關閉事件循環監聽
6、協程回調函數調用,此示例:延時回調

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio def callback(arg): print('回調函數arg={}'.format(arg)) async def main(loop): print('注冊回調函數') loop.call_later(1,callback,'延時1秒回調參數1') loop.call_later(1,callback,'延時1秒回調參數2') loop.call_soon(callback,'訊速的回調參數') await asyncio.sleep(3) event_loop = asyncio.get_event_loop() try: print('進入事件循環監聽') event_loop.run_until_complete(main(event_loop)) # 將事件循環對象傳入main函數中 finally: print('關閉事件循環監聽') event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_call_delay.py 進入事件循環監聽 注冊回調函數 回調函數arg=訊速的回調參數 回調函數arg=延時1秒回調參數1 回調函數arg=延時1秒回調參數2 關閉事件循環監聽
7、協程回調函數調用,此示例:指定時間回調

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import time def callback(arg, loop): print('回調函數arg={} 回調的時間time={}'.format(arg, loop.time())) async def main(loop): now = loop.time() print('時鍾時間:{}'.format(time.time())) print('時事件循環時間:{}'.format(loop.time())) print('注冊回調函數') loop.call_at(now + 1, callback, '參數1', loop) loop.call_at(now + 2, callback, '參數2', loop) loop.call_soon(callback, '訊速的回調參數', loop) await asyncio.sleep(4) event_loop = asyncio.get_event_loop() try: print('進入事件循環監聽') event_loop.run_until_complete(main(event_loop)) # 將事件循環對象傳入main函數中 finally: print('關閉事件循環監聽') event_loop.close()
運行結果
[root@ mnt]# python3 asyncio_call_at.py 進入事件循環監聽 時鍾時間:1576030580.730174 時事件循環時間:233762.828430848 注冊回調函數 回調函數arg=訊速的回調參數 回調的時間time=233762.828485111 回調函數arg=參數1 回調的時間time=233763.829784903 回調函數arg=參數2 回調的時間time=233764.831077136 關閉事件循環監聽
8、基於Future實現異步返回任務執行結果

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio def mark_done(future, result): """標記完成的函數""" print('設置 Future 返回結果 {}'.format(result)) future.set_result(result) event_loop = asyncio.get_event_loop() try: all_done = asyncio.Future() print('調度標記完成的函數') event_loop.call_soon(mark_done, all_done, '這個是調度傳入的數據') result = event_loop.run_until_complete(all_done) print('運行返回的結果:{}'.format(result)) finally: print('關閉事件循環監聽') event_loop.close() print('Future 返回的結果: {}'.format(all_done.result())) """ 結論: 返回結果可以從兩個地方獲取: 1、result = event_loop.run_until_complete(all_done) 2、Future.result() """
運行效果
[root@ mnt]# python3 asyncio_future_event_loop.py
調度標記完成的函數
設置 Future 返回結果 這個是調度傳入的數據
運行返回的結果:這個是調度傳入的數據
關閉事件循環監聽
Future 返回的結果: 這個是調度傳入的數據
9、基於Future+await類現異步返回任務執行結果

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio def mark_done(future, result): """標記完成的函數""" print('設置 Future 返回結果 {}'.format(result)) future.set_result(result) async def main(loop): all_done = asyncio.Future() print('調度標記完成的函數') loop.call_soon(mark_done, all_done, '這個是調度傳入的數據') result = await all_done # await作用:等all_done返回結果,再往下運行 print('mark_done()執行完成,返回值 : {}'.format(result)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: print('關閉事件循環監聽') event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_future_await.py
調度標記完成的函數
設置 Future 返回結果 這個是調度傳入的數據
mark_done()執行完成,返回值 : 這個是調度傳入的數據
關閉事件循環監聽
10、基於Future的回調

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import functools def callback(future, n): print('{}: future 完成: {}'.format(n, future.result())) async def register_callbacks(future_obj): print('將回調函數注冊到Future中') # 這里需要注意的是add_done_callback函數,還為把當前實例對象作為參數,傳給函數,所以回調函數多一個callback(future, n) future_obj.add_done_callback(functools.partial(callback, n=1)) future_obj.add_done_callback(functools.partial(callback, n=2)) async def main(future_obj): # 注冊future的回調函數 await register_callbacks(future_obj) print('設置Future返回結果') future_obj.set_result('the result') event_loop = asyncio.get_event_loop() try: # 創建一個future實例 future_obj = asyncio.Future() # 增future實例傳給main函數處理 event_loop.run_until_complete(main(future_obj)) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_future_callback.py 將回調函數注冊到Future中 設置Future返回結果 1: future 完成: the result 2: future 完成: the result
11、asyncio創建任務運行

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def task_func(): print('task_func 執行完成') return 'task_func返回值ok' async def main(loop): print('創建任務') task = loop.create_task(task_func()) print('等待task的結果 {}'.format(task)) return_value = await task #直到遇到await,上面的task開始運行 print('已完成任務{}'.format(task)) #經過上面的運行,task里面已經有result執行結果 print('return value: {}'.format(return_value)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
運行結果
[root@ mnt]# python3 asyncio_future_create_task.py 創建任務 等待task的結果 <Task pending coro=<task_func() running at asyncio_future_create_task.py:11>> task_func 執行完成 已完成任務<Task finished coro=<task_func() done, defined at asyncio_future_create_task.py:11> result='task_func返回值ok'> return value: task_func返回值ok
12、asyncio取消任務運行

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def task_func(): print('task_func 執行完成') return 'task_func返回值ok' async def main(loop): print('創建任務') task = loop.create_task(task_func()) print('取消任務') task.cancel() print('取消任務結果 {}'.format(task)) try: await task #直到遇到await,上面的task開始運行 except asyncio.CancelledError: print('從已取消的任務中捕獲錯誤') else: print('任務執行結果 {}'.format(task)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
運行效果
[root@python-mysql mnt]# python3 asyncio_future_create_cancel_task.py 創建任務 取消任務 取消任務結果 <Task cancelling coro=<task_func() running at asyncio_future_create_cancel_task.py:11>> 從已取消的任務中捕獲錯誤
13、利用回調取消任務執行

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def task_func(): print('task_func睡眠') try: await asyncio.sleep(1) except asyncio.CancelledError: print('task_func 任務取消') raise return 'task_func返回值ok' def task_canceller(task_obj): print('task_canceller運行') task_obj.cancel() print('取消task_obj任務') async def main(loop): print('創建任務') task = loop.create_task(task_func()) loop.call_soon(task_canceller, task) try: await task # 直到遇到await,上面的task開始運行 except asyncio.CancelledError: print('從已取消的任務中捕獲錯誤') else: print('任務執行結果 {}'.format(task)) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_future_create_callback_cancel_task.py
創建任務
task_func睡眠
task_canceller運行
取消task_obj任務
task_func 任務取消
從已取消的任務中捕獲錯誤
14、asyncio.ensure_future(),增加函數,直到await才運行

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def wrapped(): print('wrapped 運行') return 'wrapped result' async def inner(task): print('inner: 開始運行') print('inner: task {!r}'.format(task)) result = await task print('inner: task 返回值 {!r}'.format(result)) async def start_task(): print('開始創建task') task = asyncio.ensure_future(wrapped()) print('等待inner運行') await inner(task) print('starter: inner returned') event_loop = asyncio.get_event_loop() try: print('進程事件循環') result = event_loop.run_until_complete(start_task()) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_ensure_future.py 進程事件循環 開始創建task 等待inner運行 inner: 開始運行 inner: task <Task pending coro=<wrapped() running at asyncio_ensure_future.py:11>> wrapped 運行 inner: task 返回值 'wrapped result'
15、asyncio.wait()批量等待多個協程直到運行完成,包裝多個返回顯示結果

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def phase(i): print('phase形參傳入值{}'.format(i)) await asyncio.sleep(0.1 * i) print('完成phase的次數{}'.format(i)) return 'phase {} result'.format(i) async def main(num_phase): print('main函數開始') phases = [ phase(i) for i in range(num_phase) ] print('等待phases里面的多個函數執行完成') completed, pending = await asyncio.wait(phases) # completed : 運行完成存在這里 ,pending : 沒有運行完成存在這里 for_completed_results = [t.result() for t in completed] print('for_completed_results : {}'.format(for_completed_results)) event_loop = asyncio.get_event_loop() try: print('進程事件循環') result = event_loop.run_until_complete(main(3)) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_wait.py 進程事件循環 main函數開始 等待phases里面的多個函數執行完成 phase形參傳入值2 phase形參傳入值0 phase形參傳入值1 完成phase的次數0 完成phase的次數1 完成phase的次數2 for_completed_results : ['phase 2 result', 'phase 0 result', 'phase 1 result']
16、asyncio.wait()批量等待多個協程設置超時時間並且取消未完成的任務,包裝多個返回顯示結果

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def phase(i): print('phase形參傳入值{}'.format(i)) try: await asyncio.sleep(0.1 * i) except asyncio.CancelledError: print('phase {} 取消'.format(i)) else: print('完成phase的次數{}'.format(i)) return 'phase {} result'.format(i) async def main(num_phase): print('main函數開始') phases = [ phase(i) for i in range(num_phase) ] print('等待phases里面的多個函數執行完成') completed, pending = await asyncio.wait(phases, timeout=0.1) # completed : 運行完成存在這里 ,pending : 沒有運行完成存在這里 print('completed長度:{},pending長度 :{}'.format(len(completed), len(pending))) if pending: print('取消未完成的任務') for t in pending: t.cancel() print('main函數執行完成') event_loop = asyncio.get_event_loop() try: print('進程事件循環') result = event_loop.run_until_complete(main(3)) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_wait_timeout.py 進程事件循環 main函數開始 等待phases里面的多個函數執行完成 phase形參傳入值1 phase形參傳入值2 phase形參傳入值0 完成phase的次數0 completed長度:1,pending長度 :2 取消未完成的任務 main函數執行完成 phase 1 取消 phase 2 取消
17、asyncio.gather()多個協程運行,函數返回值接收

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def phase1(): print('phase1運行中') await asyncio.sleep(2) print('phase1運行完成') return 'phase1 result' async def phase2(): print('phase2運行中') await asyncio.sleep(1) print('phase2運行完成') return 'phase2 result' async def main(): print('main函數開始') results = await asyncio.gather( phase1(), phase2() ) print('results : {}'.format(results)) event_loop = asyncio.get_event_loop() try: print('進程事件循環') result = event_loop.run_until_complete(main()) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_gather.py 進程事件循環 main函數開始 phase1運行中 phase2運行中 phase2運行完成 phase1運行完成 results : ['phase1 result', 'phase2 result']
18、asyncio.as_completed()多個協程運行,函數返回值不是有序的接收

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def phase(i): print('phase {} 運行中'.format(i)) await asyncio.sleep(0.5 - (0.1 * i)) print('phase {} 運行完成'.format(i)) return 'phase {} result'.format(i) async def main(num_phases): print('main函數開始') phases = [ phase(i) for i in range(num_phases) ] print('等待phases運行完成') results = [] for next_to_complete in asyncio.as_completed(phases): task_result = await next_to_complete print('接到到task_result : {}'.format(task_result)) results.append(task_result) print('results : {}'.format(results)) return results event_loop = asyncio.get_event_loop() try: print('進程事件循環') event_loop.run_until_complete(main(3)) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_as_completed.py 進程事件循環 main函數開始 等待phases運行完成 phase 2 運行中 phase 1 運行中 phase 0 運行中 phase 2 運行完成 接到到task_result : phase 2 result phase 1 運行完成 接到到task_result : phase 1 result phase 0 運行完成 接到到task_result : phase 0 result results : ['phase 2 result', 'phase 1 result', 'phase 0 result']
19、asyncio.Lock() 協程鎖的打開與關閉

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import functools def unlock(lock): print('回調釋放鎖') lock.release() async def coro1(lock): """with方式獲得鎖""" async with lock: print('coro1 打開鎖運算') print('coro1 鎖已釋放') async def coro2(lock): """傳統方式獲取鎖""" await lock.acquire() try: print('coro2 打開鎖運算') finally: print('coro2 鎖已釋放') lock.release() async def main(loop): lock = asyncio.Lock() print('啟動協程之前獲取鎖') await lock.acquire() print('獲得鎖 {}'.format(lock.locked())) # 運行完成,回調解鎖 loop.call_later(0.1, functools.partial(unlock, lock)) print('等待協程運行') await asyncio.wait([coro1(lock), coro2(lock)]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_lock.py
啟動協程之前獲取鎖
獲得鎖 True
等待協程運行
回調釋放鎖
coro2 打開鎖運算
coro2 鎖已釋放
coro1 打開鎖運算
coro1 鎖已釋放
20、asyncio.Event() 事件的查看與設置

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import functools def set_event(event): print('回調設置event') event.set() async def coro1(event): print('coro1 等待事件') await event.wait() print('coro1 觸發運行') async def coro2(event): print('coro2 等待事件') await event.wait() print('coro2 觸發運行') async def main(loop): event = asyncio.Event() print('event開始之前狀態:{}'.format(event.is_set())) loop.call_later(0.1, functools.partial(set_event, event)) # 延時0.1秒后,回調set_event函數 await asyncio.wait([coro1(event), coro2(event)]) print('event開始之后狀態:{}'.format(event.is_set())) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_event.py
event開始之前狀態:False
coro1 等待事件
coro2 等待事件
回調設置event
coro1 觸發運行
coro2 觸發運行
event開始之后狀態:True
21、asyncio.Condition(),對事件狀態單獨通知執行

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def consumer(condition_obj, i): async with condition_obj: print('消費者 {} 等待中'.format(i)) await condition_obj.wait() print('消費者 {} 觸發'.format(i)) print('消費者 {} 消費結束'.format(i)) async def manipulate_condition(condition_obj): print('開始操作condition') await asyncio.sleep(0.1) for i in range(3): async with condition_obj: print('通知消費者 {}'.format(i)) condition_obj.notify(i) await asyncio.sleep(0.1) async with condition_obj: print('通知其它所有的消費者') condition_obj.notify_all() print('操作condition結束') async def main(loop): # 創建一個操作狀態的對象 condition_obj = asyncio.Condition() # 運5個消費者函數 consumers = [ consumer(condition_obj, i) for i in range(5) ] # 創建一個操作狀態的任務 loop.create_task(manipulate_condition(condition_obj)) # 等待consumers所有的函數執行完成 await asyncio.wait(consumers) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop)) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_condition.py 開始操作condition 消費者 2 等待中 消費者 3 等待中 消費者 0 等待中 消費者 4 等待中 消費者 1 等待中 通知消費者 0 通知消費者 1 消費者 2 觸發 消費者 2 消費結束 通知消費者 2 消費者 3 觸發 消費者 3 消費結束 消費者 0 觸發 消費者 0 消費結束 通知其它所有的消費者 操作condition結束 消費者 4 觸發 消費者 4 消費結束 消費者 1 觸發 消費者 1 消費結束
22、協程隊列Queue,生產者與消費者的示例

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def consumer(n, q): print('消費者 {} 開始'.format(n)) while True: print('消費費 {} 等待消費'.format(n)) item = await q.get() print('消費者 {} 消費了 {}'.format(n, item)) if item is None: q.task_done() break else: await asyncio.sleep(0.01 * item) q.task_done() print('消費者 {} 結束'.format(n)) async def producer(q, num_worker): print('生產者 開始') for i in range(num_worker * 3): await q.put(i) print('生產者 增加數據 {} 到隊列中'.format(i)) print('生產者 增加停止信號到隊列中') for i in range(num_worker): await q.put(None) print('生產者 等待隊列清空') await q.join() print('生產者 結束') async def main(loop, num_consumers): # 創建一個隊列,最大的長度是num_consumers q = asyncio.Queue(maxsize=num_consumers) consumers = [ loop.create_task(consumer(i, q)) for i in range(num_consumers) ] producer_task = loop.create_task(producer(q, num_consumers)) await asyncio.wait(consumers + [producer_task]) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main(event_loop, 2)) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_queue.py 消費者 0 開始 消費費 0 等待消費 消費者 1 開始 消費費 1 等待消費 生產者 開始 生產者 增加數據 0 到隊列中 生產者 增加數據 1 到隊列中 消費者 0 消費了 0 消費者 1 消費了 1 生產者 增加數據 2 到隊列中 生產者 增加數據 3 到隊列中 消費費 0 等待消費 消費者 0 消費了 2 生產者 增加數據 4 到隊列中 消費費 1 等待消費 消費者 1 消費了 3 生產者 增加數據 5 到隊列中 生產者 增加停止信號到隊列中 消費費 0 等待消費 消費者 0 消費了 4 消費費 1 等待消費 消費者 1 消費了 5 生產者 等待隊列清空 消費費 0 等待消費 消費者 0 消費了 None 消費者 0 結束 消費費 1 等待消費 消費者 1 消費了 None 消費者 1 結束 生產者 結束
23、利用 asyncio.Protocol 實現服務端和客戶端數據相互傳送

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys SERVER_ADDRESS = ['localhost', 8000] class EchoServer(asyncio.Protocol): def connection_made(self, transport): self.transport = transport self.address = transport.get_extra_info('peername') self.log = logging.getLogger( 'EchoServer_{}_{}'.format(*self.address) ) self.log.debug('接收連接') def data_received(self, data): self.log.debug('接收數據 {}'.format(data)) self.transport.write(data) self.log.debug('發送數據 {}'.format(data)) def eof_received(self): self.log.debug('接收數據 EOF') if self.transport.can_write_eof(): self.transport.write_eof() def connection_lost(self, exc): if exc: self.log.error('錯誤 {}'.format(exc)) else: self.log.debug('服務關閉') super(EchoServer, self).connection_lost(exc) logging.basicConfig( level=logging.DEBUG, format='%(name)s : %(message)s', stream=sys.stderr ) log = logging.getLogger('main') event_loop = asyncio.get_event_loop() factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS) server = event_loop.run_until_complete(factory) log.debug('開始運行 IP:{} Port:{}'.format(*SERVER_ADDRESS)) try: event_loop.run_forever() finally: log.debug('關閉服務') server.close() event_loop.run_until_complete(server.wait_closed()) log.debug('關閉事件循環') event_loop.close()

#!/usr/bin/env python3 # encoding: utf-8 import asyncio import functools import logging import sys MESSAGES = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] SERVER_ADDRESS = ('localhost', 8000) class EchoClient(asyncio.Protocol): def __init__(self, messages, future): super().__init__() self.messages = messages self.log = logging.getLogger('EchoClient') self.f = future def connection_made(self, transport): self.transport = transport self.address = transport.get_extra_info('peername') self.log.debug( '連接服務器IP:{} port :{}'.format(*self.address) ) for msg in self.messages: transport.write(msg) self.log.debug('發送數據 {!r}'.format(msg)) if transport.can_write_eof(): transport.write_eof() def data_received(self, data): self.log.debug('received {!r}'.format(data)) def eof_received(self): self.log.debug('接收到 EOF') self.transport.close() if not self.f.done(): self.f.set_result(True) def connection_lost(self, exc): self.log.debug('服務器關閉連接') self.transport.close() if not self.f.done(): self.f.set_result(True) super().connection_lost(exc) #設置日志級別 logging.basicConfig( level=logging.DEBUG, format='%(name)s: %(message)s', stream=sys.stderr, ) #打印日志標題 log = logging.getLogger('main') #創建一個事件循環 event_loop = asyncio.get_event_loop() #創建客戶端的Future client_completed = asyncio.Future() #利用偏函數自動傳參給EchoClient實例化類 client_factory = functools.partial( EchoClient, messages=MESSAGES, future=client_completed, ) #創建一個事件循環連接 factory_coroutine = event_loop.create_connection( client_factory, *SERVER_ADDRESS, ) log.debug('等待客戶端運行完成') try: event_loop.run_until_complete(factory_coroutine) event_loop.run_until_complete(client_completed) finally: log.debug('關閉事件循環') event_loop.close()
運行效果
服務端
[root@ mnt]# python3 asyncio_echo_server_protocol.py asyncio : Using selector: EpollSelector main : 開始運行 IP:localhost Port:8000 EchoServer_::1_54082 : 接收連接 EchoServer_::1_54082 : 接收數據 b'This is the message. It will be sent in parts.' EchoServer_::1_54082 : 發送數據 b'This is the message. It will be sent in parts.' EchoServer_::1_54082 : 接收數據 EOF EchoServer_::1_54082 : 服務關閉
客戶端
[root@ mnt]# python3 asyncio_echo_client_protocol.py asyncio: Using selector: EpollSelector main: 等待客戶端運行完成 EchoClient: 連接服務器IP:::1 port :8000 EchoClient: 發送數據 b'This is the message. ' EchoClient: 發送數據 b'It will be sent ' EchoClient: 發送數據 b'in parts.' EchoClient: received b'This is the message. It will be sent in parts.' EchoClient: 接收到 EOF EchoClient: 服務器關閉連接 main: 關閉事件循環
24、基於Coroutine 實現服務端和客戶端數據相互傳送,與22點示例功能一樣)

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys SERVER_ADDRESS = ('localhost', 8000) async def echo(reader, writer): address = writer.get_extra_info('peername') log = logging.getLogger('echo_{}_{}'.format(*address)) log.debug('開始接受連接') while True: data = await reader.read(128) if data: log.debug('接受的數據 : {}'.format(data)) writer.write(data) await writer.drain() log.debug('發送數據:{}'.format(data)) else: log.debug('關閉連接') writer.close() return logging.basicConfig( level=logging.DEBUG, format='%(name)s : %(message)s', stream=sys.stderr ) log = logging.getLogger('main') event_loop = asyncio.get_event_loop() factory = asyncio.start_server(echo, *SERVER_ADDRESS) server = event_loop.run_until_complete(factory) log.debug('開始啟動服務 IP:{},Port:{}'.format(*SERVER_ADDRESS)) try: event_loop.run_forever() except KeyboardInterrupt: pass finally: log.debug('關閉服務端') server.close() event_loop.run_until_complete(server.wait_closed()) log.debug('關閉事件循環') event_loop.close()

#!/usr/bin/env python3 # encoding: utf-8 import asyncio import logging import sys MESSAGES = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] SERVER_ADDRESS = ('localhost', 8000) async def echo_client(address, messages): log = logging.getLogger('echo_client') log.debug('連接服務器 to {} port {}'.format(*address)) # 創建與服務端連接 reader, writer = await asyncio.open_connection(*address) for msg in messages: writer.write(msg) log.debug('發送數據: {}'.format(msg)) # 判斷是否發送結束標記 if writer.can_write_eof(): writer.write_eof() # 等待所有發送完成 await writer.drain() log.debug('等待服務器響應') while True: data = await reader.read(128) if data: log.debug('接收服務器數據 :{}'.format(data)) else: log.debug('關閉與服務器的連接') writer.close() return logging.basicConfig( level=logging.DEBUG, format='%(name)s: %(message)s', stream=sys.stderr, ) log = logging.getLogger('main') event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete( echo_client(SERVER_ADDRESS, MESSAGES) ) finally: log.debug('closing event loop') event_loop.close()
運行效果
服務端
[root@ mnt]# python3 asyncio_echo_server_coroutine.py asyncio : Using selector: EpollSelector main : 開始啟動服務 IP:localhost,Port:8000 echo_::1_54084 : 開始接受連接 echo_::1_54084 : 接受的數據 : b'This is the message. It will be sent in parts.' echo_::1_54084 : 發送數據:b'This is the message. It will be sent in parts.' echo_::1_54084 : 關閉連接 #這里使用Ctrl + C,運行后面的功能 ^Cmain : 關閉服務端 main : 關閉事件循環
客戶端
[root@ mnt]# python3 asyncio_echo_client_coroutine.py asyncio: Using selector: EpollSelector echo_client: 連接服務器 to localhost port 8000 echo_client: 發送數據: b'This is the message. ' echo_client: 發送數據: b'It will be sent ' echo_client: 發送數據: b'in parts.' echo_client: 等待服務器響應 echo_client: 接收服務器數據 :b'This is the message. It will be sent in parts.' echo_client: 關閉與服務器的連接 main: closing event loop
25、基於Coroutine ,實現SSL的Socket通訊
#創建ssl證書
openssl req -newkey rsa:2048 -nodes -keyout test_ssl.key -x509 -days 800 -out test_ssl.crt

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys import ssl SERVER_ADDRESS = ('localhost', 8000) async def echo(reader, writer): address = writer.get_extra_info('peername') log = logging.getLogger('echo_{}_{}'.format(*address)) log.debug('開始接受連接') while True: data = await reader.read(128) #因為ssl不支持EOF結束,所以需要用'\x00'結束 terminate = data.endswith(b'\x00') data = data.rstrip(b'\x00') if data: log.debug('接受的數據 : {}'.format(data)) writer.write(data) await writer.drain() log.debug('發送數據:{}'.format(data)) if not data or terminate: log.debug('關閉連接') writer.close() return logging.basicConfig( level=logging.DEBUG, format='%(name)s : %(message)s', stream=sys.stderr ) log = logging.getLogger('main') event_loop = asyncio.get_event_loop() # 創建SSL所需要的對象 ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.check_hostname = False ssl_context.load_cert_chain('test_ssl.crt', 'test_ssl.key') factory = asyncio.start_server(echo, *SERVER_ADDRESS, ssl=ssl_context) server = event_loop.run_until_complete(factory) log.debug('開始啟動服務 IP:{},Port:{}'.format(*SERVER_ADDRESS)) try: event_loop.run_forever() except KeyboardInterrupt: pass finally: log.debug('關閉服務端') server.close() event_loop.run_until_complete(server.wait_closed()) log.debug('關閉事件循環') event_loop.close()

#!/usr/bin/env python3 # encoding: utf-8 import asyncio import logging import sys import ssl MESSAGES = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] SERVER_ADDRESS = ('localhost', 8000) async def echo_client(address, messages): log = logging.getLogger('echo_client') # 客戶端ssl所需要帶證書訪問 ssl_context = ssl.create_default_context( ssl.Purpose.SERVER_AUTH ) ssl_context.check_hostname = False ssl_context.load_verify_locations('test_ssl.crt') log.debug('連接服務器 to {} port {}'.format(*address)) # 創建與服務端連接 reader, writer = await asyncio.open_connection(*address, ssl=ssl_context) for msg in messages: writer.write(msg) log.debug('發送數據: {}'.format(msg)) # 判斷是否發送結束標記 # 非ssl # if writer.can_write_eof(): # writer.write_eof() # ssl writer.write(b'\x00') # 等待所有發送完成 await writer.drain() log.debug('等待服務器響應') while True: data = await reader.read(128) if data: log.debug('接收服務器數據 :{}'.format(data)) else: log.debug('關閉與服務器的連接') writer.close() return logging.basicConfig( level=logging.DEBUG, format='%(name)s: %(message)s', stream=sys.stderr, ) log = logging.getLogger('main') event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete( echo_client(SERVER_ADDRESS, MESSAGES) ) finally: log.debug('closing event loop') event_loop.close()
運行效果
服務端
[root@ mnt]# python3 asyncio_echo_server_ssl.py asyncio : Using selector: EpollSelector main : 開始啟動服務 IP:localhost,Port:8000 echo_::1_54094 : 開始接受連接 echo_::1_54094 : 接受的數據 : b'This is the message. It will be sent in parts.' echo_::1_54094 : 發送數據:b'This is the message. It will be sent in parts.' echo_::1_54094 : 關閉連接 ^Cmain : 關閉服務端 main : 關閉事件循環
客戶端
[root@ mnt]# python3 asyncio_echo_client_ssl.py asyncio: Using selector: EpollSelector echo_client: 連接服務器 to localhost port 8000 echo_client: 發送數據: b'This is the message. ' echo_client: 發送數據: b'It will be sent ' echo_client: 發送數據: b'in parts.' echo_client: 等待服務器響應 echo_client: 接收服務器數據 :b'This is the message. It will be sent in parts.' echo_client: 關閉與服務器的連接 main: closing event loop
26、利用asyncio.SubprocessProtocol類繼承的方式實現子進程的調用

#!/usr/bin/env python3 # encoding: utf-8 #end_pymotw_header import asyncio import functools class DFProtocol(asyncio.SubprocessProtocol): FD_NAMES = ['stdin', 'stdout', 'stderr'] def __init__(self, done_future): self.done = done_future self.buffer = bytearray() super().__init__() def connection_made(self, transport): print('process started {}'.format(transport.get_pid())) self.transport = transport def pipe_data_received(self, fd, data): print('read {} bytes from {}'.format(len(data), self.FD_NAMES[fd])) if fd == 1: self.buffer.extend(data) def process_exited(self): print('process exited') return_code = self.transport.get_returncode() print('return code {}'.format(return_code)) if not return_code: cmd_output = bytes(self.buffer).decode() results = self._parse_results(cmd_output) else: results = [] self.done.set_result((return_code, results)) def _parse_results(self, output): print('parsing results') if not output: return [] lines = output.splitlines() headers = lines[0].split() devices = lines[1:] results = [ dict(zip(headers, line.split())) for line in devices ] return results async def run_df(loop): print('in run_df') cmd_done = asyncio.Future(loop=loop) factory = functools.partial(DFProtocol, cmd_done) proc = loop.subprocess_exec( factory, 'df', '-hl', stdin=None, stderr=None, ) try: print('launching process') transport, protocol = await proc print('waiting for process to complete') await cmd_done finally: transport.close() return cmd_done.result() event_loop = asyncio.get_event_loop() try: return_code, results = event_loop.run_until_complete( run_df(event_loop) ) finally: event_loop.close() if return_code: print('error exit {}'.format(return_code)) else: print('\nFree space:') for r in results: print('{Mounted:25}: {Avail}'.format(**r))
運行效果
由於我使用的Python版本是3.6.6,調用的優先級是 1、調用 def connection_made(self, transport) 函數 2、調用 def process_exited(self)函數 3、調用 def pipe_data_received(self, fd, data)函數這里是輸出結果,所以結束進程的時候process_exited解析是空,導致結果出不來,這里待pyhton版本驗證
27、利用協程子進程的調用

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio def _parse_results(output): print('解析結果') if not output: return [] lines = output.splitlines() headers = lines[0].split() devices = lines[1:] results = [ dict(zip(headers, line.split())) for line in devices ] return results async def run_df(): print('run_df函數運行') buffer = bytearray() create = asyncio.create_subprocess_exec( 'df', '-h', stdout=asyncio.subprocess.PIPE ) print('df -h開始運行') proc = await create print('進程開始 {}'.format(proc.pid)) while True: line = await proc.stdout.readline() print('讀取 : {}'.format(line)) if not line: print('命令不再輸出') break buffer.extend(line) print('等待進程運行完成') await proc.wait() return_code = proc.returncode print('運行返回碼:{}'.format(return_code)) if not return_code: cmd_output = bytes(buffer).decode() results = _parse_results(cmd_output) else: results = [] return (return_code, results) event_loop = asyncio.get_event_loop() try: return_code, results = event_loop.run_until_complete( run_df() ) finally: event_loop.close() if return_code: print('錯誤退出,錯誤信息:{}'.format(return_code)) else: print('運行結果:') for r in results: print('{Mounted:25}:{Avail}'.format(**r))
運行效果
[root@ mnt]# python3 asyncio_subprocess_coroutine.py run_df函數運行 df -h開始運行 進程開始 44244 讀取 : b'Filesystem Size Used Avail Use% Mounted on\n' 讀取 : b'/dev/mapper/centos-root 17G 7.9G 9.2G 47% /\n' 讀取 : b'devtmpfs 478M 0 478M 0% /dev\n' 讀取 : b'tmpfs 489M 0 489M 0% /dev/shm\n' 讀取 : b'tmpfs 489M 56M 433M 12% /run\n' 讀取 : b'tmpfs 489M 0 489M 0% /sys/fs/cgroup\n' 讀取 : b'/dev/sda1 1014M 125M 890M 13% /boot\n' 讀取 : b'tmpfs 98M 0 98M 0% /run/user/0\n' 讀取 : b'' 命令不再輸出 等待進程運行完成 運行返回碼:0 解析結果 運行結果: / :9.2G /dev :478M /dev/shm :489M /run :433M /sys/fs/cgroup :489M /boot :890M /run/user/0 :98M
28、利用協程管道傳數據給子進程的調用處理

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio async def to_upper(input): print('進程轉大寫的to_upper函數') create = asyncio.create_subprocess_exec( 'tr', '[:lower:]', '[:upper:]', stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE, ) print('等待子進程運行完成') proc = await create print('子進程PID {}'.format(proc.pid)) print('查看子進程運行的標准輸出和錯誤') stdout, stderr = await proc.communicate(input.encode()) print('等待子進程完成') await proc.wait() return_code = proc.returncode print('return code {}'.format(return_code)) if not return_code: results = bytes(stdout).decode() else: results = '' return (return_code, results) MESSAGE = """ This message will be converted to all caps. """ event_loop = asyncio.get_event_loop() try: return_code, results = event_loop.run_until_complete( to_upper(MESSAGE) ) finally: event_loop.close() if return_code: print('錯誤時,退出的返回狀態碼 {}'.format(return_code)) else: print('源數據: {!r}'.format(MESSAGE)) print('處理過的數據 : {!r}'.format(results))
運行效果
[root@ mnt]# python3 asyncio_subprocess_coroutine_write.py 進程轉大寫的to_upper函數 等待子進程運行完成 子進程PID 78254 查看子進程運行的標准輸出和錯誤 等待子進程完成 return code 0 源數據: '\nThis message will be converted\nto all caps.\n' 處理過的數據 : '\nTHIS MESSAGE WILL BE CONVERTED\nTO ALL CAPS.\n'
29、協程之信號的注冊處理

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import functools import os import signal def signal_handler(name): print('正在處理信號 : {}'.format(name)) event_loop = asyncio.get_event_loop() # 給信號綁定處理的事件 event_loop.add_signal_handler( signal.SIGHUP, functools.partial(signal_handler, name='SIGHUP'), ) event_loop.add_signal_handler( signal.SIGUSR1, functools.partial(signal_handler, name='SIGUSR1'), ) event_loop.add_signal_handler( signal.SIGINT, functools.partial(signal_handler, name='SIGINT'), ) async def send_signals(): pid = os.getpid() print('開始發送信號給PID:{}'.format(pid)) for name in ['SIGHUP', 'SIGHUP', 'SIGUSR1', 'SIGINT']: print('發送信號名字:{}'.format(name)) # 跟linux 命令kill一樣,利用pid結束進程 os.kill(pid, getattr(signal, name)) print('放棄控制') await asyncio.sleep(0.01) return try: event_loop.run_until_complete(send_signals()) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_signal.py 開始發送信號給PID:78320 發送信號名字:SIGHUP 放棄控制 正在處理信號 : SIGHUP 發送信號名字:SIGHUP 放棄控制 正在處理信號 : SIGHUP 發送信號名字:SIGUSR1 放棄控制 正在處理信號 : SIGUSR1 發送信號名字:SIGINT 放棄控制 正在處理信號 : SIGINT
29、協程與線程結合(ThreadPoolExecutor)

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys import concurrent.futures import time def blocks(n): log = logging.getLogger('blocks({})'.format(n)) log.info('運行') time.sleep(0.1) log.info('done') return n ** 2 async def run_blocking_tasks(executor): """運行阻塞的任務""" log = logging.getLogger('run_blocking_tasks') log.info('開始運行') log.info('創建執行任務') loop = asyncio.get_event_loop() blocking_tasks = [ loop.run_in_executor(executor, blocks, i) for i in range(6) ] log.info('等待執行任務') completed, pending = await asyncio.wait(blocking_tasks) results = [t.result() for t in completed] log.info('運行結果: {!r}'.format(results)) log.info('exitrun_blocking_tasks 退出') if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='%(threadName)10s %(name)18s: %(message)s', stream=sys.stderr ) # 創建一個線程池執行器,最大開啟3個工作線程 executor = concurrent.futures.ThreadPoolExecutor( max_workers=3 ) # 創建一個事件循環 event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(run_blocking_tasks(executor)) finally: event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_ThreadPoolExecutor.py MainThread run_blocking_tasks: 開始運行 MainThread run_blocking_tasks: 創建執行任務 ThreadPoolExecutor-0_0 blocks(0): 運行 ThreadPoolExecutor-0_1 blocks(1): 運行 ThreadPoolExecutor-0_2 blocks(2): 運行 MainThread run_blocking_tasks: 等待執行任務 ThreadPoolExecutor-0_0 blocks(0): done ThreadPoolExecutor-0_0 blocks(3): 運行 ThreadPoolExecutor-0_1 blocks(1): done ThreadPoolExecutor-0_1 blocks(4): 運行 ThreadPoolExecutor-0_2 blocks(2): done ThreadPoolExecutor-0_2 blocks(5): 運行 ThreadPoolExecutor-0_1 blocks(4): done ThreadPoolExecutor-0_0 blocks(3): done ThreadPoolExecutor-0_2 blocks(5): done MainThread run_blocking_tasks: 運行結果: [0, 9, 16, 25, 1, 4] MainThread run_blocking_tasks: exitrun_blocking_tasks 退出
30、協程與進程結合(ProcessPoolExecutor)

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys import concurrent.futures import time def blocks(n): log = logging.getLogger('blocks({})'.format(n)) log.info('運行') time.sleep(0.1) log.info('done') return n ** 2 async def run_blocking_tasks(executor): """運行阻塞的任務""" log = logging.getLogger('run_blocking_tasks') log.info('開始運行') log.info('創建執行任務') loop = asyncio.get_event_loop() blocking_tasks = [ loop.run_in_executor(executor, blocks, i) for i in range(6) ] log.info('等待執行任務') completed, pending = await asyncio.wait(blocking_tasks) results = [t.result() for t in completed] log.info('運行結果: {!r}'.format(results)) log.info('exitrun_blocking_tasks 退出') if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='PID %(process)5s %(name)18s: %(message)s', stream=sys.stderr ) # 創建一個線程池執行器,最大開啟3個工作線程 executor = concurrent.futures.ProcessPoolExecutor( max_workers=3 ) # 創建一個事件循環 event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(run_blocking_tasks(executor)) finally: event_loop.close()
運行效果
[root@mnt]# python3 asyncio_ProcessPoolExecutor.py PID 91883 run_blocking_tasks: 開始運行 PID 91883 run_blocking_tasks: 創建執行任務 PID 91883 run_blocking_tasks: 等待執行任務 PID 91884 blocks(0): 運行 PID 91885 blocks(1): 運行 PID 91886 blocks(2): 運行 PID 91884 blocks(0): done PID 91884 blocks(3): 運行 PID 91886 blocks(2): done PID 91885 blocks(1): done PID 91886 blocks(4): 運行 PID 91885 blocks(5): 運行 PID 91884 blocks(3): done PID 91886 blocks(4): done PID 91885 blocks(5): done PID 91883 run_blocking_tasks: 運行結果: [25, 1, 4, 9, 0, 16] PID 91883 run_blocking_tasks: exitrun_blocking_tasks 退出
31、asyncio調試模式的開啟

#!/usr/bin/env python # -*- coding: utf-8 -*- import argparse import asyncio import logging import sys import time import warnings #接收命令行 parser = argparse.ArgumentParser('debugging asyncio') parser.add_argument( '-v', dest='verbose', default=False, action='store_true', ) args = parser.parse_args() #設置日志級別 logging.basicConfig( level=logging.DEBUG, format='%(levelname)7s: %(message)s', stream=sys.stderr, ) LOG = logging.getLogger('') async def inner(): LOG.info('inner 函數開始') time.sleep(0.1) LOG.info('inner 運行完成') async def outer(loop): LOG.info('outer 函數開始') #ensure_future,直到await才運行 await asyncio.ensure_future(loop.create_task(inner())) LOG.info('outer 運行完成') event_loop = asyncio.get_event_loop() if args.verbose: LOG.info('開啟DEBUG模式') event_loop.set_debug(True) # 使“慢”任務的閾值非常小以便於說明。默認值為0.1,即100毫秒。 event_loop.slow_callback_duration = 0.001 # 在警告過濾器列表中插入一個簡單的條目(在前面)。 warnings.simplefilter('always', ResourceWarning) LOG.info('entering event loop') event_loop.run_until_complete(outer(event_loop))
運行效果
#開啟Debug [root@ mnt]# python3 asyncio_debug.py DEBUG: Using selector: EpollSelector INFO: entering event loop INFO: outer 函數開始 INFO: inner 函數開始 INFO: inner 運行完成 INFO: outer 運行完成 [root@python-mysql mnt]# python3 asyncio_debug.py -v DEBUG: Using selector: EpollSelector INFO: 開啟DEBUG模式 INFO: entering event loop INFO: outer 函數開始 WARNING: Executing <Task pending coro=<outer() running at asyncio_debug.py:42> wait_for=<Task pending coro=<inner() running at asyncio_debug.py:33> cb=[<TaskWakeupMethWrapper object at 0x7f882e06c0d8>()] created at asyncio_debug.py:42> cb=[_run_until_complete_cb() at /usr/local/Python-3.6.6/lib/python3.6/asyncio/base_events.py:177] created at /usr/local/Python-3.6.6/lib/python3.6/asyncio/base_events.py:447> took 0.003 seconds INFO: inner 函數開始 INFO: inner 運行完成 WARNING: Executing <Task finished coro=<inner() done, defined at asyncio_debug.py:33> result=None created at asyncio_debug.py:42> took 0.101 seconds INFO: outer 運行完成 #正常運行 [root@ mnt]# python3 asyncio_debug.py DEBUG: Using selector: EpollSelector INFO: entering event loop INFO: outer 函數開始 INFO: inner 函數開始 INFO: inner 運行完成 INFO: outer 運行完成 You have new mail in /var/spool/mail/root
32、利用生成器的方式,創建協程socket監聽

#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio import logging import sys @asyncio.coroutine def echo(reader, writer): address = writer.get_extra_info('peername') log = logging.getLogger('echo_{}_{}'.format(*address)) log.debug('connection accepted') while True: data = yield from reader.read(128) if data: log.debug('received {!r}'.format(data)) writer.write(data) yield from writer.drain() log.debug('sent {!r}'.format(data)) else: log.debug('closing') writer.close() return #開啟Debug模式 logging.basicConfig( level=logging.DEBUG, format='%(name)s: %(message)s', stream=sys.stderr, ) #設置日志的title log = logging.getLogger('main') #設置開啟服務的IP+端口 server_address = ('localhost', 8888) #獲取事件循環 event_loop = asyncio.get_event_loop() # 創建服務器,讓循環在之前完成協同工作。並且啟動實際事件循環 coroutine = asyncio.start_server(echo, *server_address,loop=event_loop) server = event_loop.run_until_complete(coroutine) log.debug('starting up on {} port {}'.format(*server_address)) try: #開啟一直循環處理任務 event_loop.run_forever() finally: #結束后清理的工作 log.debug('closing server') server.close() event_loop.run_until_complete(server.wait_closed()) log.debug('closing event loop') event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_echo_server_generator asyncio: Using selector: EpollSelector main: starting up on localhost port 8888
33、協程的關閉示例

import asyncio import logging import sys logging.basicConfig( level=logging.DEBUG, format='%(name)s: %(message)s', stream=sys.stderr, ) LOG = logging.getLogger('main') async def stopper(loop): LOG.debug('stopper invoked') loop.stop() event_loop = asyncio.get_event_loop() event_loop.create_task(stopper(event_loop)) try: LOG.debug('entering event loop') event_loop.run_forever() finally: LOG.debug('closing event loop') event_loop.close()
運行效果
[root@ mnt]# python3 asyncio_stop.py
asyncio: Using selector: EpollSelector
main: entering event loop
main: stopper invoked
main: closing event loop