場景:
在某個異步循環中 需要執行某個阻塞任務(例如文件讀寫., 保存圖片等)
如果這個時候直接在異步事件循環中直接運行, 那么所有任務都會阻塞在這里, 明顯是不行的
解決方案:
https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools
import concurrent.futures def blocking_io(): # File operations (such as logging) can block the # event loop: run them in a thread pool. with open('/dev/urandom', 'rb') as f: return f.read(100) # 在某個異步事件循環中 async def main(): loop = asyncio.get_running_loop() # 獲取當前事件循環 result = await loop.run_in_executor(None, blocking_io) with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool', result) asyncio.run(main())
loop.run_in_exeutor(pool, func)
pool為線程池 在python里面為concurrent.futures.ThreadPoolExecutor的實例
func為需要運行的同步函數 如果需要傳入參數, 要用functools.partial(func, *arg, **kwargs) 傳入.
運行之后, 則會在不阻塞異步時間循環的情況下 ,在新的線程中運行func 結束之后用await接收
如果一開始不指定線程池,那么官方文檔中說的是默認線程池
這里的默認線程池可以用
loop.set_default_executor(executor)
來指定
如果一開始也不指定, 那么經過閱讀源碼之后 可知:
def run_in_executor(self, executor, func, *args): self._check_closed() if self._debug: self._check_callback(func, 'run_in_executor') if executor is None: executor = self._default_executor if executor is None: executor = concurrent.futures.ThreadPoolExecutor() self._default_executor = executor return futures.wrap_future( executor.submit(func, *args), loop=self)
他會幫我們自動啟用一個線程池, 丟進去運行.
總之異步的時間循環中如果要加入阻塞型代碼, 千萬不能直接阻塞, 用此法可破.