線程池 ThreadPoolTaskExecutor 參數


記一次線程池滿了導致的問題 

之前系統架構設定的一些值沒有詳細看過,一直使用也沒報錯,這次遇到用戶批量導數據,因為有異步任務,導致線程池滿了, 梳理理解各參數含義

異步配置代碼如下, 

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    
    @Bean
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("XmlTask-");
        executor.initialize();
        return executor;
    }
}

線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。 說明:Executors返回的線程池對象的弊端如下:
1)FixedThreadPool和SingleThreadPool:
  允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM。
2)CachedThreadPool:
  允許的創建線程數量為Integer.MAX_VALUE,可能會創建大量的線程,從而導致OOM。

ThreadPoolExecutor 為 JDK 中的JUC(java.util.concurrent), ThreadPoolTaskExecutor 是 spring 包中的。 ThreadPoolTaskExecutor 對 ThreadPoolExecutor 進行了封裝。

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
    private final Object poolSizeMonitor = new Object();
    private int corePoolSize = 1;
    private int maxPoolSize = 2147483647;
    private int keepAliveSeconds = 60;
    private int queueCapacity = 2147483647;
    private boolean allowCoreThreadTimeOut = false;
    @Nullable
    private TaskDecorator taskDecorator;
    @Nullable
    private ThreadPoolExecutor threadPoolExecutor;    //此處用到了
    private final Map<Runnable, Object> decoratedTaskMap;


    ...
    
    protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
        BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);  //此處為queueCapacity 的值,會在后面用作隊列WorkQueue的長度
        ThreadPoolExecutor executor;
        if (this.taskDecorator != null) {
            executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
                public void execute(Runnable command) {
                    Runnable decorated = ThreadPoolTaskExecutor.this.taskDecorator.decorate(command);
                    if (decorated != command) {
                        ThreadPoolTaskExecutor.this.decoratedTaskMap.put(decorated, command);
                    }

                    super.execute(decorated);
                }
            };
        } else {
            executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
        }

        if (this.allowCoreThreadTimeOut) {
            executor.allowCoreThreadTimeOut(true);
        }

        this.threadPoolExecutor = executor;
        return executor;
    }
    
}

 

ThreadPoolExecutor 中的構造方法

public class ThreadPoolExecutor extends AbstractExecutorService {
    .../**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue<Runnable> workQueue;

    /**
     * Lock held on access to workers set and related bookkeeping.
     * While we could use a concurrent set of some sort, it turns out
     * to be generally preferable to use a lock. Among the reasons is
     * that this serializes interruptIdleWorkers, which avoids
     * unnecessary interrupt storms, especially during shutdown.
     * Otherwise exiting threads would concurrently interrupt those
     * that have not yet interrupted. It also simplifies some of the
     * associated statistics bookkeeping of largestPoolSize etc. We
     * also hold mainLock on shutdown and shutdownNow, for the sake of
     * ensuring workers set is stable while separately checking
     * permission to interrupt and actually interrupting.
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     */
    private long completedTaskCount;

    /*
     * All user control parameters are declared as volatiles so that
     * ongoing actions are based on freshest values, but without need
     * for locking, since no internal invariants depend on them
     * changing synchronously with respect to other actions.
     */

    /**
     * Factory for new threads. All threads are created using this
     * factory (via method addWorker).  All callers must be prepared
     * for addWorker to fail, which may reflect a system or user's
     * policy limiting the number of threads.  Even though it is not
     * treated as an error, failure to create threads may result in
     * new tasks being rejected or existing ones remaining stuck in
     * the queue.
     *
     * We go further and preserve pool invariants even in the face of
     * errors such as OutOfMemoryError, that might be thrown while
     * trying to create threads.  Such errors are rather common due to
     * the need to allocate a native stack in Thread.start, and users
     * will want to perform clean pool shutdown to clean up.  There
     * will likely be enough memory available for the cleanup code to
     * complete without encountering yet another OutOfMemoryError.
     */
    private volatile ThreadFactory threadFactory;

    /**
     * Handler called when saturated or shutdown in execute.
     */
    private volatile RejectedExecutionHandler handler;

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     */
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     */
    private volatile int maximumPoolSize;
    
    ...

     public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    ...
}

int corePoolSize:線程池維護線程的最小數量.    

int maximumPoolSize:線程池維護線程的最大數量.    

long keepAliveTime:空閑線程的存活時間:一個線程如果處於空閑狀態,並且當前的線程數量大於corePoolSize,那么在指定時間后,這個空閑線程會被銷毀

TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒枚舉值.    

BlockingQueue<Runnable> workQueue:持有等待執行的任務隊列.   在 ThreadPoolTaskExecutor 內是 queueCapacity 屬性, jdk中提供了四種工作隊列: 

  ①ArrayBlockingQueue

  基於數組的有界阻塞隊列,按FIFO排序。新任務進來后,會放到該隊列的隊尾,有界的數組可以防止資源耗盡問題。當線程池中線程數量達到corePoolSize后,再有新任務進來,則會將任務放入該隊列的隊尾,等待被調度。如果隊列已經是滿的,則創建一個新線程,如果線程數量已經達到maxPoolSize,則會執行拒絕策略。  

  ②LinkedBlockingQuene

  基於鏈表的無界阻塞隊列(其實最大容量為Interger.MAX),按照FIFO排序。由於該隊列的近似無界性,當線程池中線程數量達到corePoolSize后,再有新任務進來,會一直存入該隊列,而不會去創建新線程直到maxPoolSize,因此使用該工作隊列時,參數maxPoolSize其實是不起作用的。

  ③SynchronousQuene

  一個不緩存任務的阻塞隊列,生產者放入一個任務必須等到消費者取出這個任務。也就是說新任務進來時,不會緩存,而是直接被調度執行該任務,如果沒有可用線程,則創建新線程,如果線程數量達到maxPoolSize,則執行拒絕策略。

  ④PriorityBlockingQueue

  具有優先級的無界阻塞隊列,優先級通過參數Comparator實現。

ThreadFactory threadFactory:創建一個新線程時使用的工廠,可以用來設定線程名、是否為daemon線程等等

RejectedExecutionHandler handler:用來拒絕一個任務的執行,有兩種情況會發生這種情況。

  一是在execute方法中若addIfUnderMaximumPoolSize(command)為false,即線程池已經飽和;    

  二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,就調用ensureQueuedTaskHandled(Runnable command),在該方法中有可能調用reject。

ThreadPoolExecutor池子的處理流程如下:  

1)當池子大小小於corePoolSize就新建線程,並處理請求

2)當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去從workQueue中取任務並處理

3)當workQueue放不下新入的任務時,新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理

4)另外,當池子的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀

其會優先創建  CorePoolSiz 線程, 當繼續增加線程時,先放入Queue中,當 CorePoolSiz  和 Queue 都滿的時候,就增加創建新線程,當線程達到MaxPoolSize的時候,就會拋出錯 誤 org.springframework.core.task.TaskRejectedException

另外MaxPoolSize的設定如果比系統支持的線程數還要大時,會拋出java.lang.OutOfMemoryError: unable to create new native thread 異常。

Reject策略預定義有四種: 

(1)ThreadPoolExecutor.AbortPolicy策略,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。 

(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄. 

(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄. 

(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程).

 

對於各個參數值的設置 參考下述規則

corePoolSize: 現在通常是將corePoolSize設置成每秒需要的線程數。
平均每個任務需要花費tasktime秒來處理,則每個線程每秒可以執行1/tasktime個任務。系統每秒有tasks個任務需要處理,則需要的線程數為:tasks/(1/tasktime),即tasks * tasktime個線程數。
假設系統每秒任務數為100 ~ 1000,每個任務耗時0.1秒,則需要100 * 0.1至1000 * 0.1,即10~100個線程。那么corePoolSize應該設置為大於10,具體數字最好根據8020原則,即80%情況下系統每秒任務數,若系統80%的情況下第秒任務數小於200,最多時為1000,則corePoolSize可設置為20。

queueCapacity = (coreSizePool / taskCost) * responseTime

上式中responseTime表示系統對任務的響應時間。如果采用我們上面的例子,假設響應時間設置為2,則隊列長度可以設置為:
(corePoolSize/tasktime) * responsetime: (20/0.1)*2 = 400

隊列長度設置過大,會導致任務響應時間過長,切忌以下寫法:
LinkedBlockingQueue queue = new LinkedBlockingQueue();
這實際上是將隊列長度設置為Integer.MAX_VALUE,將會導致線程數量永遠為corePoolSize,再也不會增加,當任務數量陡增時,任務響應時間也將隨之陡增。

maxPoolSize = (max(tasks) - queueCapacity) / (1 / taskCost)

當系統負載達到最大值時,核心線程數已無法按時處理完所有任務,這時就需要增加線程。每秒200個任務需要20個線程,那么當每秒達到1000個任務時,則需要(1000 - queueCapacity) * (20 / 200),即60個線程,可將maxPoolSize設置為60。

 

 

參考:   https://www.cnblogs.com/geekliu/p/11641494.html

    https://blog.csdn.net/u012495579/article/details/105183245/

    https://blog.csdn.net/ye17186/article/details/89467919


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM