線程池ThreadPoolExcutor詳解


Java線程池概述

線程池技術在並發時經常會使用到,java中的線程池的使用是通過調用ThreadPoolExecutor來實現的。

ThreadPoolExecutor提供了四個構造函數,最后都會歸結於下面這個構造方法:

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

這些參數的意義如下:

corePoolSize:該線程池中核心線程數最大值

maximumPoolSize: 該線程池中線程總數最大值

keepAliveTime:該線程池中非核心線程閑置超時時長

unit:keepAliveTime的單位

workQueue:阻塞隊列BlockingQueue,維護着等待執行的Runnable對象

threadFactory:創建線程的接口,需要實現他的Thread newThread(Runnable r)方法。

RejectedExecutionHandler:飽和策略,最大線程和工作隊列容量且已經飽和時execute方法都將調用RejectedExecutionHandler 

ThreadPoolExecutor工作流程

 

 

大致過程陳述為:

1. 向線程池中添加任務,當任務數量少於corePoolSize時,會自動創建thead來處理這些任務;

2. 當添加任務數大於corePoolSize且少於maximmPoolSize時,不在創建線程,而是將這些任務放到阻塞隊列中,等待被執行;

3. 接上面2的條件,且當阻塞隊列滿了之后,繼續創建thread,從而加速處理阻塞隊列;

4. 當添加任務大於maximmPoolSize時,根據飽和策略決定是否容許繼續向線程池中添加任務,默認的飽和策略是AbortPolicy(直接丟棄)。

線程池中使用的阻塞隊列

ArrayBlockingQueue:基於數組結構的有界阻塞隊列,構造函數一定要傳大小,FIFO(先進先出);

LinkedBlockingQueue:無界,默認大小65536(Integer.MAX_VALUE),當大量請求任務時,容易造成內存耗盡。

SynchronousQueue:同步隊列,是一個特殊的BlockingQueue,它沒有容量(這是因為在SynchronousQueue中,插入將等待另一個線程的刪除操作,反之亦然)。具體可以參考:《Java SynchronousQueue Examples(譯)》

PriorityBlockingQueue: 優先隊列,無界。DelayedWorkQueue:這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務

阻塞隊列常見的方法如下表所示:

 

 

常見四種線程池

newCachedThreadPool

newFixedThreadPool

newSingleThreadExecutor

newScheduledThreadPool

 

 

它們通過Executors以靜態方法的方式直接調用,實質上是它們最終調用的是ThreadPoolExecutor的構造方法,也就是本文最前面那段代碼。

注:KeepAliveTime=0的話,表示不等待

 

摘自阿里巴巴開發手冊:

【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣 的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。

說明:Executors 返回的線程池對象的弊端如下:

1)FixedThreadPool 和 SingleThreadPool: 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。

2)CachedThreadPool 和 ScheduledThreadPool: 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。

應用樣例,更多請參看我的github

package multiThread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorTest {
    public static void main(String[] args) {

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                1, 2, 10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                new ThreadPoolExecutor.DiscardPolicy());

        System.out.println("getQueue:" + threadPool.getQueue().size());
        System.out.println("remainingCapacity:" + threadPool.getQueue().remainingCapacity());

        threadPool.execute(() -> {
            try {
                int count = 0;
                Thread.currentThread().setName("aa");
                while (count <= 10) {
                    System.out.println(Thread.currentThread().getName() + "getQueue:" + threadPool.getQueue().size());
                    System.out.println(Thread.currentThread().getName() + System.currentTimeMillis());
                    Thread.sleep(1000);
                    count++;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        threadPool.execute(() -> {
            try {
                int count = 0;
                Thread.currentThread().setName("bbb");
                while (count <= 100) {
                    System.out.println(Thread.currentThread().getName() + "getQueue:" + threadPool.getQueue().size());
                    System.out.println(Thread.currentThread().getName() + System.currentTimeMillis());
                    Thread.sleep(1000);
                    count++;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM