記一次線程池滿了導致的問題
之前系統架構設定的一些值沒有詳細看過,一直使用也沒報錯,這次遇到用戶批量導數據,因為有異步任務,導致線程池滿了, 梳理理解各參數含義
異步配置代碼如下,
@Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer { @Bean public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(10); executor.setQueueCapacity(100); executor.setThreadNamePrefix("XmlTask-"); executor.initialize(); return executor; } }
線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。 說明:Executors返回的線程池對象的弊端如下:
1)FixedThreadPool和SingleThreadPool:
允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM。
2)CachedThreadPool:
允許的創建線程數量為Integer.MAX_VALUE,可能會創建大量的線程,從而導致OOM。
ThreadPoolExecutor 為 JDK 中的JUC(java.util.concurrent), ThreadPoolTaskExecutor 是 spring 包中的。 ThreadPoolTaskExecutor 對 ThreadPoolExecutor 進行了封裝。
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object(); private int corePoolSize = 1; private int maxPoolSize = 2147483647; private int keepAliveSeconds = 60; private int queueCapacity = 2147483647; private boolean allowCoreThreadTimeOut = false; @Nullable private TaskDecorator taskDecorator; @Nullable private ThreadPoolExecutor threadPoolExecutor; //此處用到了 private final Map<Runnable, Object> decoratedTaskMap; ... protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity); //此處為queueCapacity 的值,會在后面用作隊列WorkQueue的長度 ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { public void execute(Runnable command) { Runnable decorated = ThreadPoolTaskExecutor.this.taskDecorator.decorate(command); if (decorated != command) { ThreadPoolTaskExecutor.this.decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; } }
ThreadPoolExecutor 中的構造方法
public class ThreadPoolExecutor extends AbstractExecutorService { .../** * The queue used for holding tasks and handing off to worker * threads. We do not require that workQueue.poll() returning * null necessarily means that workQueue.isEmpty(), so rely * solely on isEmpty to see if the queue is empty (which we must * do for example when deciding whether to transition from * SHUTDOWN to TIDYING). This accommodates special-purpose * queues such as DelayQueues for which poll() is allowed to * return null even if it may later return non-null when delays * expire. */ private final BlockingQueue<Runnable> workQueue; /** * Lock held on access to workers set and related bookkeeping. * While we could use a concurrent set of some sort, it turns out * to be generally preferable to use a lock. Among the reasons is * that this serializes interruptIdleWorkers, which avoids * unnecessary interrupt storms, especially during shutdown. * Otherwise exiting threads would concurrently interrupt those * that have not yet interrupted. It also simplifies some of the * associated statistics bookkeeping of largestPoolSize etc. We * also hold mainLock on shutdown and shutdownNow, for the sake of * ensuring workers set is stable while separately checking * permission to interrupt and actually interrupting. */ private final ReentrantLock mainLock = new ReentrantLock(); /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition(); /** * Tracks largest attained pool size. Accessed only under * mainLock. */ private int largestPoolSize; /** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. */ private long completedTaskCount; /* * All user control parameters are declared as volatiles so that * ongoing actions are based on freshest values, but without need * for locking, since no internal invariants depend on them * changing synchronously with respect to other actions. */ /** * Factory for new threads. All threads are created using this * factory (via method addWorker). All callers must be prepared * for addWorker to fail, which may reflect a system or user's * policy limiting the number of threads. Even though it is not * treated as an error, failure to create threads may result in * new tasks being rejected or existing ones remaining stuck in * the queue. * * We go further and preserve pool invariants even in the face of * errors such as OutOfMemoryError, that might be thrown while * trying to create threads. Such errors are rather common due to * the need to allocate a native stack in Thread.start, and users * will want to perform clean pool shutdown to clean up. There * will likely be enough memory available for the cleanup code to * complete without encountering yet another OutOfMemoryError. */ private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. */ private volatile RejectedExecutionHandler handler; /** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. */ private volatile long keepAliveTime; /** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. */ private volatile boolean allowCoreThreadTimeOut; /** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. */ private volatile int maximumPoolSize; ... public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ... }
int corePoolSize:線程池維護線程的最小數量.
int maximumPoolSize:線程池維護線程的最大數量.
long keepAliveTime:空閑線程的存活時間:一個線程如果處於空閑狀態,並且當前的線程數量大於corePoolSize,那么在指定時間后,這個空閑線程會被銷毀
TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒枚舉值.
BlockingQueue<Runnable> workQueue:持有等待執行的任務隊列. 在 ThreadPoolTaskExecutor 內是 queueCapacity 屬性, jdk中提供了四種工作隊列:
①ArrayBlockingQueue
基於數組的有界阻塞隊列,按FIFO排序。新任務進來后,會放到該隊列的隊尾,有界的數組可以防止資源耗盡問題。當線程池中線程數量達到corePoolSize后,再有新任務進來,則會將任務放入該隊列的隊尾,等待被調度。如果隊列已經是滿的,則創建一個新線程,如果線程數量已經達到maxPoolSize,則會執行拒絕策略。
②LinkedBlockingQuene
基於鏈表的無界阻塞隊列(其實最大容量為Interger.MAX),按照FIFO排序。由於該隊列的近似無界性,當線程池中線程數量達到corePoolSize后,再有新任務進來,會一直存入該隊列,而不會去創建新線程直到maxPoolSize,因此使用該工作隊列時,參數maxPoolSize其實是不起作用的。
③SynchronousQuene
一個不緩存任務的阻塞隊列,生產者放入一個任務必須等到消費者取出這個任務。也就是說新任務進來時,不會緩存,而是直接被調度執行該任務,如果沒有可用線程,則創建新線程,如果線程數量達到maxPoolSize,則執行拒絕策略。
④PriorityBlockingQueue
具有優先級的無界阻塞隊列,優先級通過參數Comparator實現。
ThreadFactory threadFactory:創建一個新線程時使用的工廠,可以用來設定線程名、是否為daemon線程等等
RejectedExecutionHandler handler:用來拒絕一個任務的執行,有兩種情況會發生這種情況。
一是在execute方法中若addIfUnderMaximumPoolSize(command)為false,即線程池已經飽和;
二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,就調用ensureQueuedTaskHandled(Runnable command),在該方法中有可能調用reject。
ThreadPoolExecutor池子的處理流程如下:
1)當池子大小小於corePoolSize就新建線程,並處理請求
2)當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去從workQueue中取任務並處理
3)當workQueue放不下新入的任務時,新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理
4)另外,當池子的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀
其會優先創建 CorePoolSiz 線程, 當繼續增加線程時,先放入Queue中,當 CorePoolSiz 和 Queue 都滿的時候,就增加創建新線程,當線程達到MaxPoolSize的時候,就會拋出錯 誤 org.springframework.core.task.TaskRejectedException
另外MaxPoolSize的設定如果比系統支持的線程數還要大時,會拋出java.lang.OutOfMemoryError: unable to create new native thread 異常。
Reject策略預定義有四種:
(1)ThreadPoolExecutor.AbortPolicy策略,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程).
對於各個參數值的設置 參考下述規則
corePoolSize: 現在通常是將corePoolSize設置成每秒需要的線程數。
平均每個任務需要花費tasktime秒來處理,則每個線程每秒可以執行1/tasktime個任務。系統每秒有tasks個任務需要處理,則需要的線程數為:tasks/(1/tasktime),即tasks * tasktime個線程數。
假設系統每秒任務數為100 ~ 1000,每個任務耗時0.1秒,則需要100 * 0.1至1000 * 0.1,即10~100個線程。那么corePoolSize應該設置為大於10,具體數字最好根據8020原則,即80%情況下系統每秒任務數,若系統80%的情況下第秒任務數小於200,最多時為1000,則corePoolSize可設置為20。
queueCapacity = (coreSizePool / taskCost) * responseTime
上式中responseTime表示系統對任務的響應時間。如果采用我們上面的例子,假設響應時間設置為2,則隊列長度可以設置為:
(corePoolSize/tasktime) * responsetime: (20/0.1)*2 = 400
隊列長度設置過大,會導致任務響應時間過長,切忌以下寫法:
LinkedBlockingQueue queue = new LinkedBlockingQueue();
這實際上是將隊列長度設置為Integer.MAX_VALUE,將會導致線程數量永遠為corePoolSize,再也不會增加,當任務數量陡增時,任務響應時間也將隨之陡增。
maxPoolSize = (max(tasks) - queueCapacity) / (1 / taskCost)
當系統負載達到最大值時,核心線程數已無法按時處理完所有任務,這時就需要增加線程。每秒200個任務需要20個線程,那么當每秒達到1000個任務時,則需要(1000 - queueCapacity) * (20 / 200),即60個線程,可將maxPoolSize設置為60。
參考: https://www.cnblogs.com/geekliu/p/11641494.html
https://blog.csdn.net/u012495579/article/details/105183245/
https://blog.csdn.net/ye17186/article/details/89467919