Python之asyncio模塊的使用


asyncio模塊作用:構建協程並發應用的工具

python並發的三大內置模塊,簡單認識:

1、multiprocessing:多進程並發處理
2、threading模塊:多線程並發處理
3、asyncio模塊:協程並發處理

 1、啟動一個協程,任務無返回值,需要注意:async的使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

# 開頭定義async,表示要在協程運行,不定義的話,循環監聽增加不了
async def coroutine():
    print('協程運行...')

# 定義一個事件循環監聽
event_loop = asyncio.get_event_loop()

try:
    print('協程開始...')
    coroutine_obj = coroutine()
    print('進入事件循環監聽...')
    event_loop.run_until_complete(coroutine())  # run_until_complete翻譯成中文:一直運行到完成為止
finally:
    print('關閉事件循環監聽..')
    event_loop.close()
asyncio_coroutine.py

運行效果

[root@ mnt]# python3 asyncio_coroutine.py 
協程開始...
進入事件循環監聽...
協程運行...
關閉事件循環監聽..
sys:1: RuntimeWarning: coroutine 'coroutine' was never awaited

 2、啟動一個協程,任務有返回值,需要注意:async的使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

# 開頭定義async,表示要在協程運行,不定義的話,循環監聽增加不了
async def coroutine():
    print('協程運行...')
    return 'ok'

# 定義一個事件循環監聽
event_loop = asyncio.get_event_loop()

try:
    coroutine_obj = coroutine()
    return_value = event_loop.run_until_complete(coroutine())  # run_until_complete翻譯成中文:一直運行到完成為止
    print('coroutine()返回值:', return_value)
finally:
    event_loop.close()
asyncio_coroutine_return.py

運行效果

[root@ mnt]# python3 asyncio_coroutine_return.py 
協程運行...
coroutine()返回值: ok
sys:1: RuntimeWarning: coroutine 'coroutine' was never awaited

 3、啟動一個協程,任務調用其它任務運行,需要注意:await 的使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

# 開頭定義async,表示要在協程運行,不定義的話,循環監聽增加不了
async def coroutine():
    print('coroutine內部運行')

    print('等待task_1運行結果')
    task1 = await task_1() #await作用:控制運行流程,按順序執行,即等待該函數運行完成,再繼續往后執行

    print('等待task_2運行結果')
    task2 = await task_2(task1)

    return (task1, task2)

async def task_1():
    print('task_1內部運行')
    return 'task_1 ok'


async def task_2(arg):
    print('task_2內部運行')
    return 'task_2 arg:{}'.format(arg)

# 定義一個事件循環監聽
event_loop = asyncio.get_event_loop()

try:
    coroutine_obj = coroutine()
    return_value = event_loop.run_until_complete(coroutine())  # run_until_complete翻譯成中文:一直運行到完成為止
    print('coroutine()返回值:', return_value)
finally:
    event_loop.close()
asyncio_coroutine_chain.py

運行效果

[root@ mnt]# python3 asyncio_coroutine_chain.py 
coroutine內部運行
等待task_1運行結果
task_1內部運行
等待task_2運行結果
task_2內部運行
coroutine()返回值: ('task_1 ok', 'task_2 arg:task_1 ok')
sys:1: RuntimeWarning: coroutine 'coroutine' was never awaited

 4、生成器而不是協程

Python3早期版本的語法如下

@asyncio.coroutine 替換為 async
yield from 替換為 await
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

# 開頭定義async,表示要在協程運行,不定義的話,循環監聽增加不了
@asyncio.coroutine
def coroutine():
    print('coroutine內部運行')

    print('等待task_1運行結果')
    task1 = yield from task_1() #await作用:控制運行流程,按順序執行,即等待該函數運行完成,再繼續往后執行

    print('等待task_2運行結果')
    task2 = yield from task_2(task1)

    return (task1, task2)

@asyncio.coroutine
async def task_1():
    print('task_1內部運行')
    return 'task_1 ok'

@asyncio.coroutine
async def task_2(arg):
    print('task_2內部運行')
    return 'task_2 arg:{}'.format(arg)

# 定義一個事件循環監聽
event_loop = asyncio.get_event_loop()

try:
    coroutine_obj = coroutine()
    return_value = event_loop.run_until_complete(coroutine())  # run_until_complete翻譯成中文:一直運行到完成為止
    print('coroutine()返回值:', return_value)
finally:
    event_loop.close()
asyncio_generator.py

運行效果

[root@ mnt]# python3 asyncio_generator.py 
coroutine內部運行
等待task_1運行結果
task_1內部運行
等待task_2運行結果
task_2內部運行
coroutine()返回值: ('task_1 ok', 'task_2 arg:task_1 ok')

 5、協程回調函數調用,此示例:訊速回調

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import functools

def callback(arg, *, kwarg='default'):
    print('回調函數arg={},kwarg={}'.format(arg, kwarg))

async def main(loop):
    print('注冊回調函數')
    loop.call_soon(callback, 1)  # 執行回調函數,傳入參數1
    wrapped = functools.partial(callback, kwarg='not default')  # 利用偏函數,給kwarg傳默認值
    loop.call_soon(wrapped, 2)  # 執行回調函數,傳入參數2
    await asyncio.sleep(0.5)

event_loop = asyncio.get_event_loop()
try:
    print('進入事件循環監聽')
    event_loop.run_until_complete(main(event_loop))  # 將事件循環對象傳入main函數中
finally:
    print('關閉事件循環監聽')
    event_loop.close()
asyncio_call_soon.py

運行效果

[root@ mnt]# python3 asyncio_call_soon.py 
進入事件循環監聽
注冊回調函數
回調函數arg=1,kwarg=default
回調函數arg=2,kwarg=not default
關閉事件循環監聽

  6、協程回調函數調用,此示例:延時回調

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

def callback(arg):
    print('回調函數arg={}'.format(arg))

async def main(loop):
    print('注冊回調函數')
    loop.call_later(1,callback,'延時1秒回調參數1')
    loop.call_later(1,callback,'延時1秒回調參數2')
    loop.call_soon(callback,'訊速的回調參數')
    await asyncio.sleep(3)

event_loop = asyncio.get_event_loop()
try:
    print('進入事件循環監聽')
    event_loop.run_until_complete(main(event_loop))  # 將事件循環對象傳入main函數中
finally:
    print('關閉事件循環監聽')
    event_loop.close()
asyncio_call_delay.py

運行效果

[root@ mnt]# python3 asyncio_call_delay.py 
進入事件循環監聽
注冊回調函數
回調函數arg=訊速的回調參數
回調函數arg=延時1秒回調參數1
回調函數arg=延時1秒回調參數2
關閉事件循環監聽

  7、協程回調函數調用,此示例:指定時間回調

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import time

def callback(arg, loop):
    print('回調函數arg={} 回調的時間time={}'.format(arg, loop.time()))

async def main(loop):
    now = loop.time()
    print('時鍾時間:{}'.format(time.time()))
    print('時事件循環時間:{}'.format(loop.time()))
    print('注冊回調函數')
    loop.call_at(now + 1, callback, '參數1', loop)
    loop.call_at(now + 2, callback, '參數2', loop)
    loop.call_soon(callback, '訊速的回調參數', loop)
    await asyncio.sleep(4)

event_loop = asyncio.get_event_loop()
try:
    print('進入事件循環監聽')
    event_loop.run_until_complete(main(event_loop))  # 將事件循環對象傳入main函數中
finally:
    print('關閉事件循環監聽')
    event_loop.close()
asyncio_call_at.py

運行結果

[root@ mnt]# python3 asyncio_call_at.py 
進入事件循環監聽
時鍾時間:1576030580.730174
時事件循環時間:233762.828430848
注冊回調函數
回調函數arg=訊速的回調參數 回調的時間time=233762.828485111
回調函數arg=參數1 回調的時間time=233763.829784903
回調函數arg=參數2 回調的時間time=233764.831077136
關閉事件循環監聽

 8、基於Future實現異步返回任務執行結果

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

def mark_done(future, result):
    """標記完成的函數"""
    print('設置 Future 返回結果 {}'.format(result))
    future.set_result(result)

event_loop = asyncio.get_event_loop()
try:
    all_done = asyncio.Future()
    print('調度標記完成的函數')
    event_loop.call_soon(mark_done, all_done, '這個是調度傳入的數據')
    result = event_loop.run_until_complete(all_done)
    print('運行返回的結果:{}'.format(result))
finally:
    print('關閉事件循環監聽')
    event_loop.close()

print('Future 返回的結果: {}'.format(all_done.result()))
"""
結論:
    返回結果可以從兩個地方獲取:
    1、result = event_loop.run_until_complete(all_done)
    2、Future.result()
"""
asyncio_future_event_loop.py

運行效果

[root@ mnt]# python3 asyncio_future_event_loop.py 
調度標記完成的函數
設置 Future 返回結果 這個是調度傳入的數據
運行返回的結果:這個是調度傳入的數據
關閉事件循環監聽
Future 返回的結果: 這個是調度傳入的數據

 9、基於Future+await類現異步返回任務執行結果

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

def mark_done(future, result):
    """標記完成的函數"""
    print('設置 Future 返回結果 {}'.format(result))
    future.set_result(result)

async def main(loop):
    all_done = asyncio.Future()
    print('調度標記完成的函數')
    loop.call_soon(mark_done, all_done, '這個是調度傳入的數據')
    result = await all_done  # await作用:等all_done返回結果,再往下運行
    print('mark_done()執行完成,返回值 : {}'.format(result))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    print('關閉事件循環監聽')
    event_loop.close()
asyncio_future_await.py

運行效果

[root@ mnt]# python3 asyncio_future_await.py 
調度標記完成的函數
設置 Future 返回結果 這個是調度傳入的數據
mark_done()執行完成,返回值 : 這個是調度傳入的數據
關閉事件循環監聽

 10、基於Future的回調

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import functools

def callback(future, n):
    print('{}: future 完成: {}'.format(n, future.result()))


async def register_callbacks(future_obj):
    print('將回調函數注冊到Future中')

    # 這里需要注意的是add_done_callback函數,還為把當前實例對象作為參數,傳給函數,所以回調函數多一個callback(future, n)
    future_obj.add_done_callback(functools.partial(callback, n=1))
    future_obj.add_done_callback(functools.partial(callback, n=2))

async def main(future_obj):
    # 注冊future的回調函數
    await register_callbacks(future_obj)
    print('設置Future返回結果')
    future_obj.set_result('the result')

event_loop = asyncio.get_event_loop()
try:
    # 創建一個future實例
    future_obj = asyncio.Future()

    # 增future實例傳給main函數處理
    event_loop.run_until_complete(main(future_obj))
finally:
    event_loop.close()
asyncio_future_callback.py

運行效果

[root@ mnt]# python3 asyncio_future_callback.py 
將回調函數注冊到Future中
設置Future返回結果
1: future 完成: the result
2: future 完成: the result

 11、asyncio創建任務運行

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def task_func():
    print('task_func 執行完成')
    return 'task_func返回值ok'

async def main(loop):
    print('創建任務')
    task = loop.create_task(task_func())
    print('等待task的結果 {}'.format(task))
    return_value = await task #直到遇到await,上面的task開始運行
    print('已完成任務{}'.format(task)) #經過上面的運行,task里面已經有result執行結果
    print('return value: {}'.format(return_value))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_future_create_task.py

運行結果

[root@ mnt]# python3 asyncio_future_create_task.py 
創建任務
等待task的結果 <Task pending coro=<task_func() running at asyncio_future_create_task.py:11>>
task_func 執行完成
已完成任務<Task finished coro=<task_func() done, defined at asyncio_future_create_task.py:11> result='task_func返回值ok'>
return value: task_func返回值ok

 12、asyncio取消任務運行

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def task_func():
    print('task_func 執行完成')
    return 'task_func返回值ok'

async def main(loop):
    print('創建任務')
    task = loop.create_task(task_func())
    print('取消任務')
    task.cancel()
    print('取消任務結果 {}'.format(task))
    try:
        await task #直到遇到await,上面的task開始運行
    except asyncio.CancelledError:
        print('從已取消的任務中捕獲錯誤')
    else:
        print('任務執行結果 {}'.format(task))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_future_create_cancel_task.py

 運行效果

[root@python-mysql mnt]# python3 asyncio_future_create_cancel_task.py 
創建任務
取消任務
取消任務結果 <Task cancelling coro=<task_func() running at asyncio_future_create_cancel_task.py:11>>
從已取消的任務中捕獲錯誤

 13、利用回調取消任務執行

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def task_func():
    print('task_func睡眠')
    try:
        await asyncio.sleep(1)
    except asyncio.CancelledError:
        print('task_func 任務取消')
        raise
    return 'task_func返回值ok'

def task_canceller(task_obj):
    print('task_canceller運行')
    task_obj.cancel()
    print('取消task_obj任務')

async def main(loop):
    print('創建任務')
    task = loop.create_task(task_func())
    loop.call_soon(task_canceller, task)
    try:
        await task  # 直到遇到await,上面的task開始運行
    except asyncio.CancelledError:
        print('從已取消的任務中捕獲錯誤')
    else:
        print('任務執行結果 {}'.format(task))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_future_create_callback_cancel_task.py

 運行效果

[root@ mnt]# python3 asyncio_future_create_callback_cancel_task.py 
創建任務
task_func睡眠
task_canceller運行
取消task_obj任務
task_func 任務取消
從已取消的任務中捕獲錯誤

 14、asyncio.ensure_future(),增加函數,直到await才運行

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def wrapped():
    print('wrapped 運行')
    return 'wrapped result'

async def inner(task):
    print('inner: 開始運行')
    print('inner: task {!r}'.format(task))
    result = await task
    print('inner: task 返回值 {!r}'.format(result))

async def start_task():
    print('開始創建task')
    task = asyncio.ensure_future(wrapped())
    print('等待inner運行')
    await inner(task)
    print('starter: inner returned')

event_loop = asyncio.get_event_loop()
try:
    print('進程事件循環')
    result = event_loop.run_until_complete(start_task())
finally:
    event_loop.close()
asyncio_ensure_future.py

運行效果

[root@ mnt]# python3 asyncio_ensure_future.py 
進程事件循環
開始創建task
等待inner運行
inner: 開始運行
inner: task <Task pending coro=<wrapped() running at asyncio_ensure_future.py:11>>
wrapped 運行
inner: task 返回值 'wrapped result'

 15、asyncio.wait()批量等待多個協程直到運行完成,包裝多個返回顯示結果

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def phase(i):
    print('phase形參傳入值{}'.format(i))
    await asyncio.sleep(0.1 * i)
    print('完成phase的次數{}'.format(i))
    return 'phase {} result'.format(i)

async def main(num_phase):
    print('main函數開始')
    phases = [
        phase(i) for i in range(num_phase)
    ]
    print('等待phases里面的多個函數執行完成')
    completed, pending = await asyncio.wait(phases)  # completed : 運行完成存在這里 ,pending : 沒有運行完成存在這里
    for_completed_results = [t.result() for t in completed]
    print('for_completed_results : {}'.format(for_completed_results))

event_loop = asyncio.get_event_loop()
try:
    print('進程事件循環')
    result = event_loop.run_until_complete(main(3))
finally:
    event_loop.close()
asyncio_wait.py

運行效果

[root@ mnt]# python3 asyncio_wait.py 
進程事件循環
main函數開始
等待phases里面的多個函數執行完成
phase形參傳入值2
phase形參傳入值0
phase形參傳入值1
完成phase的次數0
完成phase的次數1
完成phase的次數2
for_completed_results : ['phase 2 result', 'phase 0 result', 'phase 1 result']

  16、asyncio.wait()批量等待多個協程設置超時時間並且取消未完成的任務,包裝多個返回顯示結果

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def phase(i):
    print('phase形參傳入值{}'.format(i))
    try:
        await asyncio.sleep(0.1 * i)
    except asyncio.CancelledError:
        print('phase {} 取消'.format(i))
    else:
        print('完成phase的次數{}'.format(i))
        return 'phase {} result'.format(i)

async def main(num_phase):
    print('main函數開始')
    phases = [
        phase(i) for i in range(num_phase)
    ]
    print('等待phases里面的多個函數執行完成')
    completed, pending = await asyncio.wait(phases, timeout=0.1)  # completed : 運行完成存在這里 ,pending : 沒有運行完成存在這里

    print('completed長度:{},pending長度 :{}'.format(len(completed), len(pending)))

    if pending:
        print('取消未完成的任務')
        for t in pending:
            t.cancel()
    print('main函數執行完成')


event_loop = asyncio.get_event_loop()
try:
    print('進程事件循環')
    result = event_loop.run_until_complete(main(3))
finally:
    event_loop.close()
asyncio_wait_timeout.py

運行效果

[root@ mnt]# python3 asyncio_wait_timeout.py 
進程事件循環
main函數開始
等待phases里面的多個函數執行完成
phase形參傳入值1
phase形參傳入值2
phase形參傳入值0
完成phase的次數0
completed長度:1,pending長度 :2
取消未完成的任務
main函數執行完成
phase 1 取消
phase 2 取消

 17、asyncio.gather()多個協程運行,函數返回值接收

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def phase1():
    print('phase1運行中')
    await asyncio.sleep(2)
    print('phase1運行完成')
    return 'phase1 result'

async def phase2():
    print('phase2運行中')
    await asyncio.sleep(1)
    print('phase2運行完成')
    return 'phase2 result'

async def main():
    print('main函數開始')
    results = await asyncio.gather(
        phase1(),
        phase2()
    )
    print('results : {}'.format(results))

event_loop = asyncio.get_event_loop()
try:
    print('進程事件循環')
    result = event_loop.run_until_complete(main())
finally:
    event_loop.close()
asyncio_gather.py

運行效果

[root@ mnt]# python3 asyncio_gather.py 
進程事件循環
main函數開始
phase1運行中
phase2運行中
phase2運行完成
phase1運行完成
results : ['phase1 result', 'phase2 result']

  18、asyncio.as_completed()多個協程運行,函數返回值不是有序的接收

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def phase(i):
    print('phase {} 運行中'.format(i))
    await asyncio.sleep(0.5 - (0.1 * i))
    print('phase {} 運行完成'.format(i))
    return 'phase {} result'.format(i)

async def main(num_phases):
    print('main函數開始')
    phases = [
        phase(i) for i in range(num_phases)
    ]
    print('等待phases運行完成')
    results = []
    for next_to_complete in asyncio.as_completed(phases):
        task_result = await next_to_complete
        print('接到到task_result : {}'.format(task_result))
        results.append(task_result)
    print('results : {}'.format(results))
    return results

event_loop = asyncio.get_event_loop()
try:
    print('進程事件循環')
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()
asyncio_as_completed.py

運行效果

[root@ mnt]# python3 asyncio_as_completed.py 
進程事件循環
main函數開始
等待phases運行完成
phase 2 運行中
phase 1 運行中
phase 0 運行中
phase 2 運行完成
接到到task_result : phase 2 result
phase 1 運行完成
接到到task_result : phase 1 result
phase 0 運行完成
接到到task_result : phase 0 result
results : ['phase 2 result', 'phase 1 result', 'phase 0 result']

   19、asyncio.Lock() 協程鎖的打開與關閉

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import functools

def unlock(lock):
    print('回調釋放鎖')
    lock.release()

async def coro1(lock):
    """with方式獲得鎖"""
    async with lock:
        print('coro1 打開鎖運算')
    print('coro1 鎖已釋放')

async def coro2(lock):
    """傳統方式獲取鎖"""
    await lock.acquire()
    try:
        print('coro2 打開鎖運算')
    finally:
        print('coro2 鎖已釋放')
        lock.release()

async def main(loop):
    lock = asyncio.Lock()
    print('啟動協程之前獲取鎖')
    await lock.acquire()
    print('獲得鎖 {}'.format(lock.locked()))

    # 運行完成,回調解鎖
    loop.call_later(0.1, functools.partial(unlock, lock))

    print('等待協程運行')
    await asyncio.wait([coro1(lock), coro2(lock)])


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_lock.py

運行效果

[root@ mnt]# python3 asyncio_lock.py 
啟動協程之前獲取鎖
獲得鎖 True
等待協程運行
回調釋放鎖
coro2 打開鎖運算
coro2 鎖已釋放
coro1 打開鎖運算
coro1 鎖已釋放

 20、asyncio.Event() 事件的查看與設置

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import functools

def set_event(event):
    print('回調設置event')
    event.set()

async def coro1(event):
    print('coro1 等待事件')
    await event.wait()
    print('coro1 觸發運行')

async def coro2(event):
    print('coro2 等待事件')
    await event.wait()
    print('coro2 觸發運行')

async def main(loop):
    event = asyncio.Event()
    print('event開始之前狀態:{}'.format(event.is_set()))
    loop.call_later(0.1, functools.partial(set_event, event))  # 延時0.1秒后,回調set_event函數
    await asyncio.wait([coro1(event), coro2(event)])
    print('event開始之后狀態:{}'.format(event.is_set()))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_event

運行效果

[root@ mnt]# python3 asyncio_event.py 
event開始之前狀態:False
coro1 等待事件
coro2 等待事件
回調設置event
coro1 觸發運行
coro2 觸發運行
event開始之后狀態:True

 21、asyncio.Condition(),對事件狀態單獨通知執行

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def consumer(condition_obj, i):
    async with condition_obj:
        print('消費者 {} 等待中'.format(i))
        await condition_obj.wait()
        print('消費者 {} 觸發'.format(i))
    print('消費者 {} 消費結束'.format(i))

async def manipulate_condition(condition_obj):
    print('開始操作condition')
    await asyncio.sleep(0.1)
    for i in range(3):
        async with condition_obj:
            print('通知消費者 {}'.format(i))
            condition_obj.notify(i)
        await asyncio.sleep(0.1)

    async with condition_obj:
        print('通知其它所有的消費者')
        condition_obj.notify_all()
    print('操作condition結束')

async def main(loop):
    # 創建一個操作狀態的對象
    condition_obj = asyncio.Condition()

    # 運5個消費者函數
    consumers = [
        consumer(condition_obj, i) for i in range(5)
    ]

    # 創建一個操作狀態的任務
    loop.create_task(manipulate_condition(condition_obj))

    # 等待consumers所有的函數執行完成
    await asyncio.wait(consumers)

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_condition.py

運行效果

[root@ mnt]# python3 asyncio_condition.py 
開始操作condition
消費者 2 等待中
消費者 3 等待中
消費者 0 等待中
消費者 4 等待中
消費者 1 等待中
通知消費者 0
通知消費者 1
消費者 2 觸發
消費者 2 消費結束
通知消費者 2
消費者 3 觸發
消費者 3 消費結束
消費者 0 觸發
消費者 0 消費結束
通知其它所有的消費者
操作condition結束
消費者 4 觸發
消費者 4 消費結束
消費者 1 觸發
消費者 1 消費結束

  22、協程隊列Queue,生產者與消費者的示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def consumer(n, q):
    print('消費者 {} 開始'.format(n))
    while True:
        print('消費費 {} 等待消費'.format(n))
        item = await q.get()
        print('消費者 {} 消費了 {}'.format(n, item))
        if item is None:
            q.task_done()
            break
        else:
            await asyncio.sleep(0.01 * item)
            q.task_done()
    print('消費者 {} 結束'.format(n))

async def producer(q, num_worker):
    print('生產者 開始')

    for i in range(num_worker * 3):
        await q.put(i)
        print('生產者 增加數據 {} 到隊列中'.format(i))

    print('生產者 增加停止信號到隊列中')
    for i in range(num_worker):
        await q.put(None)
    print('生產者 等待隊列清空')
    await q.join()
    print('生產者 結束')

async def main(loop, num_consumers):
    # 創建一個隊列,最大的長度是num_consumers
    q = asyncio.Queue(maxsize=num_consumers)

    consumers = [
        loop.create_task(consumer(i, q)) for i in range(num_consumers)
    ]

    producer_task = loop.create_task(producer(q, num_consumers))

    await asyncio.wait(consumers + [producer_task])

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop, 2))
finally:
    event_loop.close()
asyncio_queue.py

運行效果

[root@ mnt]# python3 asyncio_queue.py 
消費者 0 開始
消費費 0 等待消費
消費者 1 開始
消費費 1 等待消費
生產者 開始
生產者 增加數據 0 到隊列中
生產者 增加數據 1 到隊列中
消費者 0 消費了 0
消費者 1 消費了 1
生產者 增加數據 2 到隊列中
生產者 增加數據 3 到隊列中
消費費 0 等待消費
消費者 0 消費了 2
生產者 增加數據 4 到隊列中
消費費 1 等待消費
消費者 1 消費了 3
生產者 增加數據 5 到隊列中
生產者 增加停止信號到隊列中
消費費 0 等待消費
消費者 0 消費了 4
消費費 1 等待消費
消費者 1 消費了 5
生產者 等待隊列清空
消費費 0 等待消費
消費者 0 消費了 None
消費者 0 結束
消費費 1 等待消費
消費者 1 消費了 None
消費者 1 結束
生產者 結束

 23、利用 asyncio.Protocol 實現服務端和客戶端數據相互傳送

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys

SERVER_ADDRESS = ['localhost', 8000]

class EchoServer(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log = logging.getLogger(
            'EchoServer_{}_{}'.format(*self.address)
        )
        self.log.debug('接收連接')

    def data_received(self, data):
        self.log.debug('接收數據 {}'.format(data))
        self.transport.write(data)
        self.log.debug('發送數據 {}'.format(data))

    def eof_received(self):
        self.log.debug('接收數據 EOF')
        if self.transport.can_write_eof():
            self.transport.write_eof()

    def connection_lost(self, exc):
        if exc:
            self.log.error('錯誤 {}'.format(exc))
        else:
            self.log.debug('服務關閉')
        super(EchoServer, self).connection_lost(exc)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s : %(message)s',
    stream=sys.stderr
)

log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)

server = event_loop.run_until_complete(factory)

log.debug('開始運行 IP:{} Port:{}'.format(*SERVER_ADDRESS))

try:
    event_loop.run_forever()
finally:
    log.debug('關閉服務')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('關閉事件循環')
    event_loop.close()
asyncio_echo_server_protocol.py
#!/usr/bin/env python3
# encoding: utf-8

import asyncio
import functools
import logging
import sys

MESSAGES = [
    b'This is the message. ',
    b'It will be sent ',
    b'in parts.',
]
SERVER_ADDRESS = ('localhost', 8000)

class EchoClient(asyncio.Protocol):

    def __init__(self, messages, future):
        super().__init__()
        self.messages = messages
        self.log = logging.getLogger('EchoClient')
        self.f = future

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log.debug(
            '連接服務器IP:{} port :{}'.format(*self.address)
        )

        for msg in self.messages:
            transport.write(msg)
            self.log.debug('發送數據 {!r}'.format(msg))
        if transport.can_write_eof():
            transport.write_eof()

    def data_received(self, data):
        self.log.debug('received {!r}'.format(data))

    def eof_received(self):
        self.log.debug('接收到 EOF')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)

    def connection_lost(self, exc):
        self.log.debug('服務器關閉連接')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)
        super().connection_lost(exc)

#設置日志級別
logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)

#打印日志標題
log = logging.getLogger('main')

#創建一個事件循環
event_loop = asyncio.get_event_loop()

#創建客戶端的Future
client_completed = asyncio.Future()

#利用偏函數自動傳參給EchoClient實例化類
client_factory = functools.partial(
    EchoClient,
    messages=MESSAGES,
    future=client_completed,
)

#創建一個事件循環連接
factory_coroutine = event_loop.create_connection(
    client_factory,
    *SERVER_ADDRESS,
)

log.debug('等待客戶端運行完成')
try:
    event_loop.run_until_complete(factory_coroutine)
    event_loop.run_until_complete(client_completed)
finally:
    log.debug('關閉事件循環')
    event_loop.close()
asyncio_echo_client_protocol.py

運行效果

服務端

[root@ mnt]# python3 asyncio_echo_server_protocol.py 
asyncio : Using selector: EpollSelector
main : 開始運行 IP:localhost Port:8000
EchoServer_::1_54082 : 接收連接
EchoServer_::1_54082 : 接收數據 b'This is the message. It will be sent in parts.'
EchoServer_::1_54082 : 發送數據 b'This is the message. It will be sent in parts.'
EchoServer_::1_54082 : 接收數據 EOF
EchoServer_::1_54082 : 服務關閉

客戶端

[root@ mnt]# python3 asyncio_echo_client_protocol.py 
asyncio: Using selector: EpollSelector
main: 等待客戶端運行完成
EchoClient: 連接服務器IP:::1 port :8000
EchoClient: 發送數據 b'This is the message. '
EchoClient: 發送數據 b'It will be sent '
EchoClient: 發送數據 b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: 接收到 EOF
EchoClient: 服務器關閉連接
main: 關閉事件循環

  24、基於Coroutine 實現服務端和客戶端數據相互傳送,與22點示例功能一樣)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys

SERVER_ADDRESS = ('localhost', 8000)

async def echo(reader, writer):
    address = writer.get_extra_info('peername')
    log = logging.getLogger('echo_{}_{}'.format(*address))
    log.debug('開始接受連接')
    while True:
        data = await reader.read(128)
        if data:
            log.debug('接受的數據 : {}'.format(data))
            writer.write(data)
            await writer.drain()
            log.debug('發送數據:{}'.format(data))
        else:
            log.debug('關閉連接')
            writer.close()
            return

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s : %(message)s',
    stream=sys.stderr
)

log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

factory = asyncio.start_server(echo, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('開始啟動服務 IP:{},Port:{}'.format(*SERVER_ADDRESS))

try:
    event_loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    log.debug('關閉服務端')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('關閉事件循環')
    event_loop.close()
asyncio_echo_server_coroutine.py
#!/usr/bin/env python3
# encoding: utf-8

import asyncio
import logging
import sys

MESSAGES = [
    b'This is the message. ',
    b'It will be sent ',
    b'in parts.',
]
SERVER_ADDRESS = ('localhost', 8000)

async def echo_client(address, messages):
    log = logging.getLogger('echo_client')

    log.debug('連接服務器 to {} port {}'.format(*address))

    # 創建與服務端連接
    reader, writer = await asyncio.open_connection(*address)

    for msg in messages:
        writer.write(msg)
        log.debug('發送數據: {}'.format(msg))

    # 判斷是否發送結束標記
    if writer.can_write_eof():
        writer.write_eof()
    # 等待所有發送完成
    await writer.drain()

    log.debug('等待服務器響應')
    while True:
        data = await reader.read(128)
        if data:
            log.debug('接收服務器數據 :{}'.format(data))
        else:
            log.debug('關閉與服務器的連接')
            writer.close()
            return

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

try:
    event_loop.run_until_complete(
        echo_client(SERVER_ADDRESS, MESSAGES)
    )
finally:
    log.debug('closing event loop')
    event_loop.close()
asyncio_echo_client_coroutine.py

運行效果

服務端

[root@ mnt]# python3 asyncio_echo_server_coroutine.py 
asyncio : Using selector: EpollSelector
main : 開始啟動服務 IP:localhost,Port:8000
echo_::1_54084 : 開始接受連接
echo_::1_54084 : 接受的數據 : b'This is the message. It will be sent in parts.'
echo_::1_54084 : 發送數據:b'This is the message. It will be sent in parts.'
echo_::1_54084 : 關閉連接
#這里使用Ctrl + C,運行后面的功能
^Cmain : 關閉服務端
main : 關閉事件循環

客戶端

[root@ mnt]# python3 asyncio_echo_client_coroutine.py 
asyncio: Using selector: EpollSelector
echo_client: 連接服務器 to localhost port 8000
echo_client: 發送數據: b'This is the message. '
echo_client: 發送數據: b'It will be sent '
echo_client: 發送數據: b'in parts.'
echo_client: 等待服務器響應
echo_client: 接收服務器數據 :b'This is the message. It will be sent in parts.'
echo_client: 關閉與服務器的連接
main: closing event loop

 25、基於Coroutine ,實現SSL的Socket通訊

#創建ssl證書

openssl req -newkey rsa:2048 -nodes -keyout test_ssl.key -x509 -days 800 -out test_ssl.crt
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys
import ssl

SERVER_ADDRESS = ('localhost', 8000)

async def echo(reader, writer):
    address = writer.get_extra_info('peername')
    log = logging.getLogger('echo_{}_{}'.format(*address))
    log.debug('開始接受連接')
    while True:
        data = await reader.read(128)

        #因為ssl不支持EOF結束,所以需要用'\x00'結束
        terminate = data.endswith(b'\x00')
        data = data.rstrip(b'\x00')
        if data:
            log.debug('接受的數據 : {}'.format(data))
            writer.write(data)
            await writer.drain()
            log.debug('發送數據:{}'.format(data))
        if not data or terminate:
            log.debug('關閉連接')
            writer.close()
            return

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s : %(message)s',
    stream=sys.stderr
)

log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

# 創建SSL所需要的對象
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.check_hostname = False
ssl_context.load_cert_chain('test_ssl.crt', 'test_ssl.key')

factory = asyncio.start_server(echo, *SERVER_ADDRESS, ssl=ssl_context)
server = event_loop.run_until_complete(factory)
log.debug('開始啟動服務 IP:{},Port:{}'.format(*SERVER_ADDRESS))

try:
    event_loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    log.debug('關閉服務端')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('關閉事件循環')
    event_loop.close()
asyncio_echo_server_ssl.py
#!/usr/bin/env python3
# encoding: utf-8

import asyncio
import logging
import sys
import ssl

MESSAGES = [
    b'This is the message. ',
    b'It will be sent ',
    b'in parts.',
]
SERVER_ADDRESS = ('localhost', 8000)

async def echo_client(address, messages):
    log = logging.getLogger('echo_client')

    # 客戶端ssl所需要帶證書訪問
    ssl_context = ssl.create_default_context(
        ssl.Purpose.SERVER_AUTH
    )
    ssl_context.check_hostname = False
    ssl_context.load_verify_locations('test_ssl.crt')

    log.debug('連接服務器 to {} port {}'.format(*address))

    # 創建與服務端連接
    reader, writer = await asyncio.open_connection(*address, ssl=ssl_context)

    for msg in messages:
        writer.write(msg)
        log.debug('發送數據: {}'.format(msg))

    # 判斷是否發送結束標記
    # 非ssl
    # if writer.can_write_eof():
    #     writer.write_eof()

    # ssl
    writer.write(b'\x00')

    # 等待所有發送完成
    await writer.drain()

    log.debug('等待服務器響應')
    while True:
        data = await reader.read(128)
        if data:
            log.debug('接收服務器數據 :{}'.format(data))
        else:
            log.debug('關閉與服務器的連接')
            writer.close()
            return

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

try:
    event_loop.run_until_complete(
        echo_client(SERVER_ADDRESS, MESSAGES)
    )
finally:
    log.debug('closing event loop')
    event_loop.close()
asyncio_echo_client_ssl.py

運行效果

服務端

[root@ mnt]# python3 asyncio_echo_server_ssl.py 
asyncio : Using selector: EpollSelector
main : 開始啟動服務 IP:localhost,Port:8000
echo_::1_54094 : 開始接受連接
echo_::1_54094 : 接受的數據 : b'This is the message. It will be sent in parts.'
echo_::1_54094 : 發送數據:b'This is the message. It will be sent in parts.'
echo_::1_54094 : 關閉連接
^Cmain : 關閉服務端
main : 關閉事件循環

客戶端

[root@ mnt]# python3 asyncio_echo_client_ssl.py 
asyncio: Using selector: EpollSelector
echo_client: 連接服務器 to localhost port 8000
echo_client: 發送數據: b'This is the message. '
echo_client: 發送數據: b'It will be sent '
echo_client: 發送數據: b'in parts.'
echo_client: 等待服務器響應
echo_client: 接收服務器數據 :b'This is the message. It will be sent in parts.'
echo_client: 關閉與服務器的連接
main: closing event loop

 26、利用asyncio.SubprocessProtocol類繼承的方式實現子進程的調用

#!/usr/bin/env python3
# encoding: utf-8

#end_pymotw_header
import asyncio
import functools

class DFProtocol(asyncio.SubprocessProtocol):

    FD_NAMES = ['stdin', 'stdout', 'stderr']

    def __init__(self, done_future):
        self.done = done_future
        self.buffer = bytearray()
        super().__init__()

    def connection_made(self, transport):
        print('process started {}'.format(transport.get_pid()))
        self.transport = transport

    def pipe_data_received(self, fd, data):
        print('read {} bytes from {}'.format(len(data),
                                             self.FD_NAMES[fd]))
        if fd == 1:
            self.buffer.extend(data)

    def process_exited(self):
        print('process exited')
        return_code = self.transport.get_returncode()
        print('return code {}'.format(return_code))
        if not return_code:
            cmd_output = bytes(self.buffer).decode()
            results = self._parse_results(cmd_output)
        else:
            results = []
        self.done.set_result((return_code, results))

    def _parse_results(self, output):
        print('parsing results')
        if not output:
            return []
        lines = output.splitlines()
        headers = lines[0].split()
        devices = lines[1:]
        results = [
            dict(zip(headers, line.split()))
            for line in devices
        ]
        return results


async def run_df(loop):
    print('in run_df')

    cmd_done = asyncio.Future(loop=loop)
    factory = functools.partial(DFProtocol, cmd_done)
    proc = loop.subprocess_exec(
        factory,
        'df', '-hl',
        stdin=None,
        stderr=None,
    )
    try:
        print('launching process')
        transport, protocol = await proc
        print('waiting for process to complete')
        await cmd_done
    finally:
        transport.close()

    return cmd_done.result()


event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        run_df(event_loop)
    )
finally:
    event_loop.close()

if return_code:
    print('error exit {}'.format(return_code))
else:
    print('\nFree space:')
    for r in results:
        print('{Mounted:25}: {Avail}'.format(**r))
asyncio_subprocess_protocol.py

運行效果

由於我使用的Python版本是3.6.6,調用的優先級是
1、調用    def connection_made(self, transport) 函數
2、調用    def process_exited(self)函數
3、調用   def pipe_data_received(self, fd, data)函數這里是輸出結果,所以結束進程的時候process_exited解析是空,導致結果出不來,這里待pyhton版本驗證

 27、利用協程子進程的調用

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

def _parse_results(output):
    print('解析結果')
    if not output:
        return []
    lines = output.splitlines()
    headers = lines[0].split()
    devices = lines[1:]
    results = [
        dict(zip(headers, line.split())) for line in devices
    ]
    return results

async def run_df():
    print('run_df函數運行')
    buffer = bytearray()
    create = asyncio.create_subprocess_exec(
        'df', '-h',
        stdout=asyncio.subprocess.PIPE
    )

    print('df -h開始運行')
    proc = await create
    print('進程開始 {}'.format(proc.pid))

    while True:
        line = await proc.stdout.readline()
        print('讀取 : {}'.format(line))
        if not line:
            print('命令不再輸出')
            break
        buffer.extend(line)
    print('等待進程運行完成')
    await proc.wait()
    return_code = proc.returncode
    print('運行返回碼:{}'.format(return_code))
    if not return_code:
        cmd_output = bytes(buffer).decode()
        results = _parse_results(cmd_output)
    else:
        results = []

    return (return_code, results)

event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        run_df()
    )
finally:
    event_loop.close()

if return_code:
    print('錯誤退出,錯誤信息:{}'.format(return_code))
else:
    print('運行結果:')
    for r in results:
        print('{Mounted:25}:{Avail}'.format(**r))
asyncio_subprocess_coroutine.py

運行效果

[root@ mnt]# python3 asyncio_subprocess_coroutine.py 
run_df函數運行
df -h開始運行
進程開始 44244
讀取 : b'Filesystem               Size  Used Avail Use% Mounted on\n'
讀取 : b'/dev/mapper/centos-root   17G  7.9G  9.2G  47% /\n'
讀取 : b'devtmpfs                 478M     0  478M   0% /dev\n'
讀取 : b'tmpfs                    489M     0  489M   0% /dev/shm\n'
讀取 : b'tmpfs                    489M   56M  433M  12% /run\n'
讀取 : b'tmpfs                    489M     0  489M   0% /sys/fs/cgroup\n'
讀取 : b'/dev/sda1               1014M  125M  890M  13% /boot\n'
讀取 : b'tmpfs                     98M     0   98M   0% /run/user/0\n'
讀取 : b''
命令不再輸出
等待進程運行完成
運行返回碼:0
解析結果
運行結果:
/                        :9.2G
/dev                     :478M
/dev/shm                 :489M
/run                     :433M
/sys/fs/cgroup           :489M
/boot                    :890M
/run/user/0              :98M

  28、利用協程管道傳數據給子進程的調用處理

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def to_upper(input):
    print('進程轉大寫的to_upper函數')

    create = asyncio.create_subprocess_exec(
        'tr', '[:lower:]', '[:upper:]',
        stdout=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )
    print('等待子進程運行完成')
    proc = await create
    print('子進程PID {}'.format(proc.pid))

    print('查看子進程運行的標准輸出和錯誤')
    stdout, stderr = await proc.communicate(input.encode())

    print('等待子進程完成')
    await proc.wait()

    return_code = proc.returncode
    print('return code {}'.format(return_code))
    if not return_code:
        results = bytes(stdout).decode()
    else:
        results = ''
    return (return_code, results)

MESSAGE = """
This message will be converted
to all caps.
"""

event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        to_upper(MESSAGE)
    )
finally:
    event_loop.close()

if return_code:
    print('錯誤時,退出的返回狀態碼 {}'.format(return_code))
else:
    print('源數據: {!r}'.format(MESSAGE))
    print('處理過的數據 : {!r}'.format(results))
asyncio_subprocess_coroutine_write.py

運行效果

[root@ mnt]# python3 asyncio_subprocess_coroutine_write.py 
進程轉大寫的to_upper函數
等待子進程運行完成
子進程PID 78254
查看子進程運行的標准輸出和錯誤
等待子進程完成
return code 0
源數據: '\nThis message will be converted\nto all caps.\n'
處理過的數據 : '\nTHIS MESSAGE WILL BE CONVERTED\nTO ALL CAPS.\n'

   29、協程之信號的注冊處理

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import functools
import os
import signal

def signal_handler(name):
    print('正在處理信號 : {}'.format(name))

event_loop = asyncio.get_event_loop()

# 給信號綁定處理的事件
event_loop.add_signal_handler(
    signal.SIGHUP,
    functools.partial(signal_handler, name='SIGHUP'),
)
event_loop.add_signal_handler(
    signal.SIGUSR1,
    functools.partial(signal_handler, name='SIGUSR1'),
)
event_loop.add_signal_handler(
    signal.SIGINT,
    functools.partial(signal_handler, name='SIGINT'),
)

async def send_signals():
    pid = os.getpid()
    print('開始發送信號給PID:{}'.format(pid))

    for name in ['SIGHUP', 'SIGHUP', 'SIGUSR1', 'SIGINT']:
        print('發送信號名字:{}'.format(name))
        # 跟linux 命令kill一樣,利用pid結束進程
        os.kill(pid, getattr(signal, name))
        print('放棄控制')
        await asyncio.sleep(0.01)
    return

try:
    event_loop.run_until_complete(send_signals())
finally:
    event_loop.close()
asyncio_signal.py

運行效果

[root@ mnt]# python3 asyncio_signal.py 
開始發送信號給PID:78320
發送信號名字:SIGHUP
放棄控制
正在處理信號 : SIGHUP
發送信號名字:SIGHUP
放棄控制
正在處理信號 : SIGHUP
發送信號名字:SIGUSR1
放棄控制
正在處理信號 : SIGUSR1
發送信號名字:SIGINT
放棄控制
正在處理信號 : SIGINT

    29、協程與線程結合(ThreadPoolExecutor

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys
import concurrent.futures
import time

def blocks(n):
    log = logging.getLogger('blocks({})'.format(n))
    log.info('運行')
    time.sleep(0.1)
    log.info('done')
    return n ** 2

async def run_blocking_tasks(executor):
    """運行阻塞的任務"""
    log = logging.getLogger('run_blocking_tasks')
    log.info('開始運行')
    log.info('創建執行任務')
    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, i) for i in range(6)
    ]
    log.info('等待執行任務')
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info('運行結果: {!r}'.format(results))

    log.info('exitrun_blocking_tasks 退出')

if __name__ == '__main__':
    logging.basicConfig(
        level=logging.INFO,
        format='%(threadName)10s %(name)18s: %(message)s',
        stream=sys.stderr
    )

    # 創建一個線程池執行器,最大開啟3個工作線程
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=3
    )

    # 創建一個事件循環
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(run_blocking_tasks(executor))
    finally:
        event_loop.close()
asyncio_ThreadPoolExecutor.py

運行效果

[root@ mnt]# python3 asyncio_ThreadPoolExecutor.py 
MainThread run_blocking_tasks: 開始運行
MainThread run_blocking_tasks: 創建執行任務
ThreadPoolExecutor-0_0          blocks(0): 運行
ThreadPoolExecutor-0_1          blocks(1): 運行
ThreadPoolExecutor-0_2          blocks(2): 運行
MainThread run_blocking_tasks: 等待執行任務
ThreadPoolExecutor-0_0          blocks(0): done
ThreadPoolExecutor-0_0          blocks(3): 運行
ThreadPoolExecutor-0_1          blocks(1): done
ThreadPoolExecutor-0_1          blocks(4): 運行
ThreadPoolExecutor-0_2          blocks(2): done
ThreadPoolExecutor-0_2          blocks(5): 運行
ThreadPoolExecutor-0_1          blocks(4): done
ThreadPoolExecutor-0_0          blocks(3): done
ThreadPoolExecutor-0_2          blocks(5): done
MainThread run_blocking_tasks: 運行結果: [0, 9, 16, 25, 1, 4]
MainThread run_blocking_tasks: exitrun_blocking_tasks 退出

     30、協程與進程結合(ProcessPoolExecutor)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys
import concurrent.futures
import time

def blocks(n):
    log = logging.getLogger('blocks({})'.format(n))
    log.info('運行')
    time.sleep(0.1)
    log.info('done')
    return n ** 2

async def run_blocking_tasks(executor):
    """運行阻塞的任務"""
    log = logging.getLogger('run_blocking_tasks')
    log.info('開始運行')
    log.info('創建執行任務')
    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, i) for i in range(6)
    ]
    log.info('等待執行任務')
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info('運行結果: {!r}'.format(results))

    log.info('exitrun_blocking_tasks 退出')

if __name__ == '__main__':
    logging.basicConfig(
        level=logging.INFO,
        format='PID %(process)5s %(name)18s: %(message)s',
        stream=sys.stderr
    )

    # 創建一個線程池執行器,最大開啟3個工作線程
    executor = concurrent.futures.ProcessPoolExecutor(
        max_workers=3
    )

    # 創建一個事件循環
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(run_blocking_tasks(executor))
    finally:
        event_loop.close()
asyncio_ProcessPoolExecutor.py

運行效果

[root@mnt]# python3 asyncio_ProcessPoolExecutor.py 
PID 91883 run_blocking_tasks: 開始運行
PID 91883 run_blocking_tasks: 創建執行任務
PID 91883 run_blocking_tasks: 等待執行任務
PID 91884          blocks(0): 運行
PID 91885          blocks(1): 運行
PID 91886          blocks(2): 運行
PID 91884          blocks(0): done
PID 91884          blocks(3): 運行
PID 91886          blocks(2): done
PID 91885          blocks(1): done
PID 91886          blocks(4): 運行
PID 91885          blocks(5): 運行
PID 91884          blocks(3): done
PID 91886          blocks(4): done
PID 91885          blocks(5): done
PID 91883 run_blocking_tasks: 運行結果: [25, 1, 4, 9, 0, 16]
PID 91883 run_blocking_tasks: exitrun_blocking_tasks 退出

 31、asyncio調試模式的開啟

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import asyncio
import logging
import sys
import time
import warnings

#接收命令行
parser = argparse.ArgumentParser('debugging asyncio')
parser.add_argument(
    '-v',
    dest='verbose',
    default=False,
    action='store_true',
)
args = parser.parse_args()

#設置日志級別
logging.basicConfig(
    level=logging.DEBUG,
    format='%(levelname)7s: %(message)s',
    stream=sys.stderr,
)
LOG = logging.getLogger('')

async def inner():
    LOG.info('inner 函數開始')
    time.sleep(0.1)
    LOG.info('inner 運行完成')

async def outer(loop):
    LOG.info('outer 函數開始')
    #ensure_future,直到await才運行
    await asyncio.ensure_future(loop.create_task(inner()))
    LOG.info('outer 運行完成')

event_loop = asyncio.get_event_loop()
if args.verbose:
    LOG.info('開啟DEBUG模式')
    event_loop.set_debug(True)

    # 使“慢”任務的閾值非常小以便於說明。默認值為0.1,即100毫秒。
    event_loop.slow_callback_duration = 0.001

    # 在警告過濾器列表中插入一個簡單的條目(在前面)。
    warnings.simplefilter('always', ResourceWarning)

LOG.info('entering event loop')
event_loop.run_until_complete(outer(event_loop))
asyncio_debug.py

運行效果

#開啟Debug
[root@ mnt]# python3 asyncio_debug.py 
  DEBUG: Using selector: EpollSelector
   INFO: entering event loop
   INFO: outer 函數開始
   INFO: inner 函數開始
   INFO: inner 運行完成
   INFO: outer 運行完成
[root@python-mysql mnt]# python3 asyncio_debug.py -v
  DEBUG: Using selector: EpollSelector
   INFO: 開啟DEBUG模式
   INFO: entering event loop
   INFO: outer 函數開始
WARNING: Executing <Task pending coro=<outer() running at asyncio_debug.py:42> wait_for=<Task pending coro=<inner() running at asyncio_debug.py:33> cb=[<TaskWakeupMethWrapper object at 0x7f882e06c0d8>()] created at asyncio_debug.py:42> cb=[_run_until_complete_cb() at /usr/local/Python-3.6.6/lib/python3.6/asyncio/base_events.py:177] created at /usr/local/Python-3.6.6/lib/python3.6/asyncio/base_events.py:447> took 0.003 seconds
   INFO: inner 函數開始
   INFO: inner 運行完成
WARNING: Executing <Task finished coro=<inner() done, defined at asyncio_debug.py:33> result=None created at asyncio_debug.py:42> took 0.101 seconds
   INFO: outer 運行完成

#正常運行
[root@ mnt]# python3 asyncio_debug.py 
  DEBUG: Using selector: EpollSelector
   INFO: entering event loop
   INFO: outer 函數開始
   INFO: inner 函數開始
   INFO: inner 運行完成
   INFO: outer 運行完成
You have new mail in /var/spool/mail/root

 32、利用生成器的方式,創建協程socket監聽

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys

@asyncio.coroutine
def echo(reader, writer):
    address = writer.get_extra_info('peername')
    log = logging.getLogger('echo_{}_{}'.format(*address))
    log.debug('connection accepted')
    while True:
        data = yield from reader.read(128)
        if data:
            log.debug('received {!r}'.format(data))
            writer.write(data)
            yield from writer.drain()
            log.debug('sent {!r}'.format(data))
        else:
            log.debug('closing')
            writer.close()
            return

#開啟Debug模式
logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
#設置日志的title
log = logging.getLogger('main')

#設置開啟服務的IP+端口
server_address = ('localhost', 8888)

#獲取事件循環
event_loop = asyncio.get_event_loop()

# 創建服務器,讓循環在之前完成協同工作。並且啟動實際事件循環
coroutine = asyncio.start_server(echo, *server_address,loop=event_loop)
server = event_loop.run_until_complete(coroutine)
log.debug('starting up on {} port {}'.format(*server_address))

try:
    #開啟一直循環處理任務
    event_loop.run_forever()
finally:
    #結束后清理的工作
    log.debug('closing server')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('closing event loop')
    event_loop.close()
asyncio_echo_server_generator

運行效果

[root@ mnt]# python3 asyncio_echo_server_generator 
asyncio: Using selector: EpollSelector
main: starting up on localhost port 8888

 33、協程的關閉示例

import asyncio
import logging
import sys

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
LOG = logging.getLogger('main')


async def stopper(loop):
    LOG.debug('stopper invoked')
    loop.stop()


event_loop = asyncio.get_event_loop()

event_loop.create_task(stopper(event_loop))

try:
    LOG.debug('entering event loop')
    event_loop.run_forever()
finally:
    LOG.debug('closing event loop')
    event_loop.close()
asyncio_stop.py

運行效果

[root@ mnt]# python3 asyncio_stop.py 
asyncio: Using selector: EpollSelector
main: entering event loop
main: stopper invoked
main: closing event loop


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM