python3.7中asyncio的具體實現


異步IO 協程 是寫爬蟲目前來說最好的方式.

比多線程和多進程都好. 開辟新的線程和進程是非常耗時的

 

講講我在使用python異步IO語法時踩過的坑

簡單介紹異步IO的原理

以及利用最新語法糖實現異步IO的步驟,

然后給出實現異步的不同例子

 

 

網上找了很多python的asyncio示例.很多都是用

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
loop.run_until_complete(hello())
loop.close()

 

通過create_future向里面添加task的方法來進行異步IO調用.

這種方法顯然不是很好理解,在python3.7中 asyncio引入了新的語法糖

asyncio.run()
asyncio.create_task()
asyncio.gather()

 

下面通過實例具體分析asyncio異步的原理和使用方法

 

假設有一個異步操作, 它可以是爬蟲的請求等待網頁響應, 數據庫的操作, 或者是定時任務. 不管如何, 我們都可以抽象成下面這個函數來表示

 

async def foo():
    print('----start foo')
    await asyncio.sleep(1)
    print('----end foo')

/*
預期想要的結果
----start foo
(等待一秒)
----end foo
*/

 

async是舊版本裝飾器的語法糖

await是舊版本yield from 的語法糖

這個函數表示,先打印start foo 然后等待一秒, 然后再打印end foo

 

這個函數不能直接被執行. 它必須放在一個異步環境中才能執行. 這個異步環境獨立在整個程序之外,可以把所有的異步環境打包成一個箱子, 看成是一個同步事件.

(異步環境是我自己創造的為了理解異步操作發明的詞匯)

把這個函數裝在這個異步環境里 異步環境的長度取決於環境里需要執行事件最長的那個函數

開啟這個異步環境的標志是

asyncio.run(foo())

這條命令執行了之后,異步環境就被開啟了. 需要主要的事, 同一線程同一時間只能開啟一個異步環境. 換句話說, 在run函數里面的函數(本例中為bar())里面不能再包含run函數.

因此, 上例需要執行的話:

 

async def foo():
    print('start foo')
    await asyncio.sleep(1)
    print('----end foo')

if __name__ == '__main__':
    asyncio.run(foo())

執行以下之后發現結果沒問題

 

異步是為了處理IO密集型事件的.一個讀取操作需要1秒, 另一個需要2秒, 如果並發執行,需要3秒,

def foo2():
    print('----start foo')
    time.sleep(1)
    print('----end foo')

def bar2():
    print('----start bar')
    time.sleep(2)
    print('----end bar')

if __name__ == '__main__':
    foo2()
    bar2()

/*
預期輸出:
----start foo
(等待1秒)
----end foo
----start bar
(等待2秒)
----end bar
*/

 

把上面的函數改寫成異步之后

 

async def foo():
    print('----start foo')
    await asyncio.sleep(1)
    print('----end foo')

async def bar():
    print('****start bar')
    await asyncio.sleep(2)
    print('****end bar')

async def main():
    await foo()
    await bar()

if __name__ == '__main__':
    asyncio.run(main())

 

我們想要的結果是

----start foo

****start bar

(等待1秒)

----end foo

(等待1秒)

****end bar

但是運行上面的程序 結果卻是

----start foo

(等待1秒)

----end foo

****start bar

(等待2秒)

****end bar

這是為什么呢

 

await表示 等待后面的異步函數操作完了之后, 執行下面的語句.

所以在在本例中,await foo 等待foo函數完全結束了之后, 再去執行

 

那么如何一起執行呢

基本的有兩種方法

1.采用函數gather

官方文檔中的解釋是

awaitable asyncio.gather(*awsloop=Nonereturn_exceptions=False)

並發 運行 aws 序列中的 可等待對象

如果 aws 中的某個可等待對象為協程,它將自動作為一個任務加入日程。

如果所有可等待對象都成功完成,結果將是一個由所有返回值聚合而成的列表。結果值的順序與 aws 中可等待對象的順序一致。

如果 return_exceptions 為 False (默認),所引發的首個異常會立即傳播給等待 gather() 的任務。aws 序列中的其他可等待對象 不會被取消 並將繼續運行。

如果 return_exceptions 為 True,異常會和成功的結果一樣處理,並聚合至結果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待對象也會 被取消

如果 aws 序列中的任一 Task 或 Future 對象 被取消,它將被當作引發了 CancelledError 一樣處理 -- 在此情況下 gather() 調用 不會 被取消。這是為了防止一個已提交的 Task/Future 被取消導致其他 Tasks/Future 也被取消。

 

因此代碼就有了

async def foo():
    print('----start foo')
    await asyncio.sleep(1)
    print('----end foo')

async def bar():
    print('****start bar')
    await asyncio.sleep(2)
    print('****end bar')

async def main():
    res = await asyncio.gather(foo(), bar())
    print(res)

if __name__ == '__main__':
    asyncio.run(main())

返回值為函數的返回值列表 本例中為[None, None]

 

第二種方法 創建task

asyncio.create_task(coro)

將 coro 協程 打包為一個 Task 排入日程准備執行。返回 Task 對象。

該任務會在 get_running_loop() 返回的loop中執行,如果當前線程沒有在運行的loop則會引發 RuntimeError

此函數 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低層級的 asyncio.ensure_future() 函數。

 

async def foo():
    print('----start foo')
    await asyncio.sleep(1)
    print('----end foo')

async def bar():
    print('****start bar')
    await asyncio.sleep(2)
    print('****end bar')

async def main():
    asyncio.create_task(foo())
    asyncio.create_task(bar())

if __name__ == '__main__':
    asyncio.run(main())

 

但是運行一下就會發現, 只輸出了

----start foo
****start bar

這是因為,create_task函數只是把任務打包放進了隊列,至於它們有沒有運行完.  不管.

因此需要等待它們執行完畢.

最后的代碼為

async def foo():
    print('----start foo')
    await asyncio.sleep(1)
    print('----end foo')

async def bar():
    print('****start bar')
    await asyncio.sleep(2)
    print('****end bar')

async def main():
    task1 = asyncio.create_task(foo())
    task2 = asyncio.create_task(bar())

    await task1
    await task2

if __name__ == '__main__':
    asyncio.run(main())

 

如果有多個請求

 

async def foo():
    print('----start foo')
    await asyncio.sleep(1)
    print('----end foo')

async def main():
    tasks = []
    for i in range(10):
        tasks.append(asyncio.create_task(foo()))
    await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

 

async def foo():
    print('----start foo')
    await asyncio.sleep(1)
    print('----end foo')

async def bar():
    print('****start bar')
    await asyncio.sleep(2)
    print('****end bar')

async def main():
    tasks = []
    for i in range(10):
        tasks.append(asyncio.create_task(foo()))
    for j in range(10):
        tasks.append(asyncio.create_task(bar()))
    await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

 

異步嵌套

async def foo():
    print('----start foo')
    await asyncio.sleep(1)
    print('----end foo')

async def bar():
    print('****start bar')
    await asyncio.sleep(2)
    print('****end bar')

async def foos():
    print('----------------------')
    tasks = []
    for i in range(3):
        tasks.append(asyncio.create_task(foo()))
    await asyncio.wait(tasks)

async def main():
    tasks = []
    for i in range(3):
        tasks.append(asyncio.create_task(foos()))
    for j in range(3):
        tasks.append(asyncio.create_task(bar()))
    await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

 

把每一個create_task當成新增了一條線. 這條線如果遇到IO操作了(即遇到了await) 那么就先等待在這里, 先執行別的線上的操作(如果已經有了結果)

create了線才可以跳來跳去, 如果不create, 是不會跳走的

async def foo():
    print('----start foo')
    await asyncio.sleep(1)
    print('----end foo')

async def foos():
    print('----------------------')
    tasks = []
    await foo()
    await foo()
    await foo()

async def main():
    tasks = []
    for i in range(3):
        tasks.append(asyncio.create_task(foos()))
    await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

這個例子里面 只創造了3條線, 因此只能3個3個執行, 其實應該9個一起等, 但是因為沒有create_task所以並不會一起執行.

 

 

import asyncio
import aiohttp

async def fetch(session, url, sem):
    timeout = aiohttp.ClientTimeout(total=2)
    try:
        async with sem:
            print(f'start get: {url}')
            async with session.get(url, timeout=timeout) as response:
                res = await response.text()
                print(f'get {url} successfully')
    except:
        print('timeout')


async def main():
    url_list = [
        # 'https://www.google.com.hk/',
        'https://www.cnblogs.com/DjangoBlog/p/5783125.html',
        'http://www.360doc.com/content/18/0614/19/3175779_762447601.shtml',
        'https://www.baidu.com/',
    ]
    url_list2 = ['http://es6.ruanyifeng.com/#docs/decorator' for _ in range(100)]
    url_list3 = ['https://www.baidu.com' for _ in range(100)]

    # async with aiohttp.ClientSession() as session:
    #     tasks = []
    #     sem = asyncio.Semaphore(20)
    #     for url in url_list3:
    #         tasks.append(fetch(session, url, sem))
    #     await asyncio.gather(*tasks)

    async with aiohttp.ClientSession() as session:
        sem = asyncio.Semaphore(20)
        url = 'https://www.baidu.com'
        task_list = [fetch(session, url, sem) for _ in range(100)]
        await asyncio.gather(*task_list)

if __name__ == '__main__':
    asyncio.run(main())

被注釋掉的代碼和下面的代碼干的是同樣的事

Semaphore是一個計數器 超過容量的時候會阻塞. 可以限制並發數量


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM