asyncio時間循環中運行阻塞任務


 場景:

在某個異步循環中 需要執行某個阻塞任務(例如文件讀寫., 保存圖片等)

如果這個時候直接在異步事件循環中直接運行, 那么所有任務都會阻塞在這里, 明顯是不行的

 

解決方案:

https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools

 

import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

# 在某個異步事件循環中
async def main():
    loop = asyncio.get_running_loop()  # 獲取當前事件循環

    result = await loop.run_in_executor(None, blocking_io)

    with concurrent.futures.ThreadPoolExecutor() as pool:
          result = await loop.run_in_executor(
            pool, blocking_io)
          print('custom thread pool', result)

asyncio.run(main())

 

loop.run_in_exeutor(pool, func)

pool為線程池 在python里面為concurrent.futures.ThreadPoolExecutor的實例

func為需要運行的同步函數 如果需要傳入參數, 要用functools.partial(func, *arg, **kwargs)  傳入.

 

運行之后, 則會在不阻塞異步時間循環的情況下 ,在新的線程中運行func 結束之后用await接收

 

如果一開始不指定線程池,那么官方文檔中說的是默認線程池

這里的默認線程池可以用

loop.set_default_executor(executor)

來指定

如果一開始也不指定, 那么經過閱讀源碼之后 可知:

    def run_in_executor(self, executor, func, *args):
        self._check_closed()
        if self._debug:
            self._check_callback(func, 'run_in_executor')
        if executor is None:
            executor = self._default_executor
            if executor is None:
                executor = concurrent.futures.ThreadPoolExecutor()
                self._default_executor = executor
        return futures.wrap_future(
            executor.submit(func, *args), loop=self)

他會幫我們自動啟用一個線程池, 丟進去運行.

 

總之異步的時間循環中如果要加入阻塞型代碼,  千萬不能直接阻塞, 用此法可破.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM