ThreadPoolExecutor內存溢出
情景一:
在數據處理中,使用ThreadPoolExecutor(線程池)處理大量數據情況下,導致內存溢出
機器卡死掛掉;
場景模擬:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from memory_profiler import profile
import queue
from line_profiler import LineProfiler
from functools import wraps
class BoundThreadPoolExecutor(ThreadPoolExecutor):
"""
對ThreadPoolExecutor 進行重寫,給隊列設置邊界
"""
def __init__(self, qsize: int = None, *args, **kwargs):
super(BoundThreadPoolExecutor, self).__init__(*args, **kwargs)
self._work_queue = queue.Queue(qsize)
def timer(func):
@wraps(func)
def decorator(*args, **kwargs):
func_return = func(*args, **kwargs)
lp = LineProfiler()
lp_wrap = lp(func)
lp_wrap(*args, **kwargs)
lp.print_stats()
return func_return
return decorator
def func(num):
print(f"the {num} run...")
time.sleep(0.5)
return num*num
# @timer
@profile
def main():
# with ThreadPoolExecutor(max_workers=2) as t:
# res = [t.submit(func, i) for i in range(100)]
# pool = BoundThreadPoolExecutor(qsize=2, max_workers=2)
pool = ThreadPoolExecutor(max_workers=2)
for i in range(100):
# func(i)
pool.submit(func, i)
print(pool._work_queue.qsize())
pool.shutdown()
if __name__ == '__main__':
main()
未對線程隊列限制時,進程將所有對象添加到self._work_queue
中
重寫ThreadPoolExecutor
, 限制self._work_queue = queue.Queue(qsize)
隊列大小
結果對比
總結
存在內存溢出的情況,原因是ThreadPoolExecutor 線程池使用的是無邊界隊列,進程在隊列中
添加對象時沒有對空閑線程進行判斷,導致內存消耗過多