python多線程內存溢出--ThreadPoolExecutor內存溢出


ThreadPoolExecutor內存溢出

情景一:
在數據處理中,使用ThreadPoolExecutor(線程池)處理大量數據情況下,導致內存溢出
機器卡死掛掉;
場景模擬:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from memory_profiler import profile
import queue
from line_profiler import LineProfiler
from functools import wraps


class BoundThreadPoolExecutor(ThreadPoolExecutor):
    """
    對ThreadPoolExecutor 進行重寫,給隊列設置邊界
    """
    def __init__(self, qsize: int = None, *args, **kwargs):
        super(BoundThreadPoolExecutor, self).__init__(*args, **kwargs)
        self._work_queue = queue.Queue(qsize)

def timer(func):
    @wraps(func)
    def decorator(*args, **kwargs):
        func_return = func(*args, **kwargs)
        lp = LineProfiler()
        lp_wrap = lp(func)
        lp_wrap(*args, **kwargs)
        lp.print_stats()
        return func_return

    return decorator

def func(num):
    print(f"the {num} run...")
    time.sleep(0.5)
    return num*num

# @timer
@profile
def main():
    # with ThreadPoolExecutor(max_workers=2) as t:
    #     res = [t.submit(func, i) for i in range(100)]
    # pool = BoundThreadPoolExecutor(qsize=2, max_workers=2)
    pool = ThreadPoolExecutor(max_workers=2)
    for i in range(100):
        # func(i)
        pool.submit(func, i)
        print(pool._work_queue.qsize())
    pool.shutdown()

if __name__ == '__main__':
    main()

未對線程隊列限制時,進程將所有對象添加到self._work_queue

重寫ThreadPoolExecutor, 限制self._work_queue = queue.Queue(qsize)隊列大小

結果對比

總結

存在內存溢出的情況,原因是ThreadPoolExecutor 線程池使用的是無邊界隊列,進程在隊列中
添加對象時沒有對空閑線程進行判斷,導致內存消耗過多


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM