python 多進程和異步io的有機結合 Error in atexit._run_exitfuncs


  眾所周知,python的多線程開發在GIL(全局器解釋鎖)下飽受詬病,在單核模式下搞多線程對效率的提升相當有限。於是大家的共識就是搞io密集的程序,建議采用多線程,計算密集型的程序就搞多進程。近期的一些開發經歷,讓我大量嘗試采用多進程和異步io的方式來提高效率。
  一.采用多進程。
  1.用過multiprocessing.process和queue及pool,但是一直有報錯,位置在multiprocessing.spawn的_main(fd,parent_sentinel)中reduction.pickle.load(from_parent)。網上提供了各種諸如修改權限,修改reduction協議類型的方案,始終未能解決。最后竟然是更新了python版本后不葯而愈,但最后放棄了這種方式。
  2.使用concurrent.futures.ProcessPoolExecutor,官方文檔很詳細。程序運行也很不錯,但是在腳本執行完畢后退出時拋出異常。Error in atexit._run_exitfuncs。大致的內容是handle is closed,通過追蹤大致可以判斷出腳本執行完畢時,會有futures.process的_python_exit()執行,此時ProcessPoolExecutor執行完畢后釋放,沒有線程可以被wakeup,所以報錯。引起這個問題的是官方推薦的with as寫法,with as 執行完畢后會將對象釋放,結果在退出的時候引發異常。
  解決方案也很反常,不使用with as的自動釋放,也不使用shutdown手動釋放,而是不釋放,在整個腳本執行完畢的時候,由_python_exit()進行釋放。
  有意思的是concurrent.futures.process文檔里有這么一段注釋,可以自行研究。
# Workers are created as daemon threads and processes. This is done to allow the
# interpreter to exit when there are still idle processes in a
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
# allowing workers to die with the interpreter has two undesirable properties:
# - The workers would still be running during interpreter shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
  二.采用異步io。
  python在3.4中有了asyncio。官方給的示例可以看出來這是個消息循環,eventloop是在當前的上下文中提供的,而且可以和協程一起執行。最新的文檔里可以看到eventloop已經不再只是同協程一起提供異步了。
  我們可以將多個耗時的任務進行封裝,丟進eventloop中,當線程執行到需要等待的操作如io,主線程不會等待,而是切換執行到下一個可執行任務,因此可以實現並發執行。
  三.多進程和異步io結合。
  經過嘗試多線程中沒有找到使用異步io的方式,因為eventloop是從當前上下文中提供的,也就是主線程。於是換了個思路,使用多進程,讓每個進程的‘主線程’通過異步io來實現並發,經多次嘗試后這種方案是可行的。
  因為同時運行的進程數量上限是受cpu內核數量的上限影響的,一般的建議是不超過內核數量,但是在一些場景的限制下為了提高效率,單純受限的多進程不能滿足要求的時候,不妨將多進程的‘主進程’結合並發來提高效率。
  由於沒有實際測試性能,不能斷言究竟哪種方式效率更高,畢竟也與任務內容有關。這只是一種思路,留待日后驗證。
  代碼如下:
import asyncio
import aiohttp
from concurrent.futures import ProcessPoolExecutor, as_completed ,ThreadPoolExecutor
import time


async def post_http():
    # 示例
    url = ''
    data = ''
    async  with aiohttp.ClientSession() as session:
        async with session.post(url=url, data=data, headers={}, timeout=60) as resp:
            r_json = await resp.json()
            return r_json


async def t_handler(data, t_flag, p_flag, semaphore):
    async with semaphore:
        for d in data:
            print(f'pid:{p_flag} tid:{t_flag} data:{d}')
            await asyncio.sleep(1)  # 處理費時的io操作,比如httprequest
    return


def p_handler(datas, p_flag):
    # 線程並發數需要有限制  linux打開文件最大默認為1024 win為509 待確認
    ts = time.time()
    num = 10  # 最大並發數
    count = len(datas)
    block = int(count / num) + 1
    tar_datas = [datas[i * block: (i + 1) * block if (i + 1) * block < count else count] for i in range(num)]
    semaphore = asyncio.Semaphore(num)
    tasks = [t_handler(d, i, p_flag, semaphore) for i, d in enumerate(tar_datas)]

    loop = asyncio.get_event_loop()  # 基於當前線程 ,故在多線程中無法使用 只能在多進程中使用
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

    return f'\033[0;32mprocess {p_flag} :cost {time.time() - ts}\033[0m'




if __name__ == '__main__':
    ts = time.time()
    datas = [i for i in range(1000)]
    datas = [datas[i * 100:(i + 1) * 100] for i in range(10)]  # 每個進程要處理的數據

    # 啟動異步io 主線程調用 event_loop 在當前線程下啟動異步io 實現並發
    # res = p_handler(datas,1)
    # print(res)

    p_num = 10
    block_len = 100

    datas = [datas[i * 100:(i + 1) * 100] for i in range(p_num)]  # 每個進程要處理的數據
    # ProcessPoolExecutor 可能與運行環境有關 官方的 with as 會主動釋放線程 導致主線程退出時找不到進程池內進程已經被釋放 導致Error in atexit._run_exitfuncs異常
    executor = ProcessPoolExecutor(p_num)
    futures = [executor.submit(p_handler, d, p_flag) for p_flag, d in enumerate(datas)]
    for f in as_completed(futures):
        if f.done():
                res = f.result()
                print(res)

    print(f'Exit!! cost:{time.time() - ts}')

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM