pip install sync2asyncio
python 快速萬能同步轉異步語法。
使任意同步庫快速變asyncio異步語法的方式 ,simple_run_in_executor
這是一個異步對象Asyncio的Future了,可以被await和異步任務編排。
await simple_run_in_executor(requests.get, url='http://www.baidu.com') # 等效 await aiohttp.request('get',url)
await simple_run_in_executor(time.sleep, 5) # 等效 await asyncio.sleep(5)
2 代碼實現和測試
from functools import partial import threading import asyncio from threadpool_executor_shrink_able import ThreadPoolExecutorShrinkAble # 沒有使用內置的 concurrent.futures里面的,這個是優化4點功能的。 global_threadpool_executor = ThreadPoolExecutorShrinkAble(200) # 這個是智能線程池,不是官方的concurrent.futures.threadpoolexecutor async def simple_run_in_executor(f, *args, async_loop=None, threadpool_executor=None, **kwargs): """ 函數的目的是轉化任何同步方法或函數為異步鏈路語法中的一環。例如你寫一個功能,要調用10個包,其中有9個有對應的異步庫,有一個還沒有對應的異步庫, 因為一旦異步需要處處異步,不能因為某一個功能沒有對應的異步庫就前功盡棄。本函數就能夠做到一個異步的鏈路里面調用同步庫但不阻塞整個asyncio的loop循環。 這個函數看起來很簡單,主要是調用官方的 run_in_executor 。 第1個特點是由官方的 方法改成了現在的函數 (方法是類里面面的,函數是模塊下面的,我一般這么划分python方法和函數) 第2個特點是直接內置了線程池,用戶可以無需傳參了。並且這個線程池功能比官方的線程池要好,可以設置一個很大的值,他會自適應自動擴大縮小。 第3個特點是最重要的提高了易用性的地方。使用了整體偏函數把所有入參和函數生成一個偏函數,進而解決了官方只支持位置入參,不支持關鍵字入參的, 當函數入參達到幾十個時候,例如requests.get 如果你想設置timeout參數,如果不支持關鍵字入參,你需要把timeout參數之前的其他不重要參數全都傳遞一遍使用默認None來占位。 函數入參個數比較多的情況下,不支持關鍵字入參就會很容易導致傳參錯誤。 :param f: 任意同步阻塞函數,是非 async def的函數 :param args: 同步函數的入參 :param async_loop: loop :param threadpool_executor: 在項城市里面運行。 :param kwargs: 同步函數的入參 :return: 同步函數的結果 """ loopx = async_loop or asyncio.get_event_loop() # print(id(loopx)) executor = threadpool_executor or global_threadpool_executor result = await loopx.run_in_executor(executor, partial(f, *args, **kwargs)) return result if __name__ == '__main__': import time import requests # 這是同步阻塞函數之一 def block_fun(x): # 這是自定義的第二個同步阻塞函數 time.sleep(5) print(x) return x * 10 async def enter_fun(xx): # 入口函數,因為為一旦異步,必須處處異步。不能直接調用block_fun,否則阻塞其他任務。 await asyncio.sleep(1) # # 如果你這么寫 time.sleep(1) 那就完了個蛋,程序運行完成需要更長的時間。 await simple_run_in_executor(time.sleep,1) # 如果世上沒有asyncio.sleep異步函數,那么可以這么做。 r = await simple_run_in_executor(block_fun, xx) # # 如果你這么寫 r = block_fun(xx) 那就完了個蛋,程序運行完成需要更長的時間。 print(r) resp = await simple_run_in_executor(requests.get, url='http://www.baidu.com', timeout=5) # 如果你這么寫 resp = requests.get( url='http://www.baidu.com') 那就完了個蛋,如果網站每次響應時間很大會發生嚴重影響,程序運行完成需要更長的時間。 # 這個是調用了同步requests請求庫,如果同步庫請求一個網站需要10秒響應,asyncio中如果直接使用了同步庫,會發生滅頂之災,整個loop就成了廢物。如果網站每次響應是1毫秒,那么異步中調用同步庫還可以勉強接受的。 # 但用 simple_run_in_executor來運行requests 即使網站響應時間很長,也不會對asyncio的loop產生嚴重阻塞影響了,這就是 simple_run_in_executor 要達到的目的。 print(resp) loopy = asyncio.get_event_loop() print(id(loopy)) tasks = [] tasks.append(simple_run_in_executor(requests.get, url='http://www.baidu.com')) tasks.append(simple_run_in_executor(block_fun, 1)) tasks.append(simple_run_in_executor(block_fun, 2)) tasks.append(simple_run_in_executor(block_fun, 3)) for i in range(100, 120): tasks.append(enter_fun(i)) print('開始') loopy.run_until_complete(asyncio.wait(tasks)) # 通過以上可以觀察到,所有的block_fun的print都是同一時間打印出來的,而不是每隔5秒一個接一個打印的。 print('結束')
雖然block_fun同步函數里面需要阻塞5秒,但使用了 simple_run_in_executor 來執行這個同步阻塞函數,
雖然運行了十幾個協程任務,不會造成整體整體運行時間延長。
如果不使用 simple_run_in_executor 來運行 block_fun,運行完這十幾個協程任務最起碼需要1分鍾以上。
4 比較asyncio.run_coroutine_threadsafe 和 run_in_executor區別
asyncio並發真的太難了,比線程池用法難很多,里面的概念太難了,例如介紹這兩個概念。
asyncio.run_coroutine_threadsafe 和 run_in_executor 是一對反義詞。
asyncio.run_coroutine_threadsafe 是在非異步的上下文環境(也就是正常的同步語法的函數里面)下調用異步函數對象(協程),
因為當前函數定義沒有被async修飾,就不能在函數里面使用await,必須使用這。這個是將asyncio包的future對象轉化返回一個concurrent.futures包的future對象。
run_in_executor 是在異步環境(被async修飾的異步函數)里面,調用同步函數,將函數放到線程池運行防止阻塞整個事件循環的其他任務。
這個是將 一個concurrent.futures包的future對象 轉化為 asyncio包的future對象,
asyncio包的future對象是一個asyncio包的awaitable對象,所以可以被await,concurrent.futures.Future對象不能被await。
