線程與線程池的那些事之線程池篇(萬字長文)


本文關鍵字:

線程線程池單線程多線程線程池的好處線程回收創建方式核心參數底層機制拒絕策略,參數設置,動態監控線程隔離

線程和線程池相關的知識,是Java學習或者面試中一定會遇到的知識點,本篇我們會從線程和進程,並行與並發,單線程和多線程等,一直講解到線程池,線程池的好處,創建方式,重要的核心參數,幾個重要的方法,底層實現,拒絕策略,參數設置,動態調整,線程隔離等等。主要的大綱如下:

線程池的好處

線程池,使用了池化思想來管理線程,池化技術就是為了最大化效益,最小化用戶風險,將資源統一放在一起管理的思想。這種思想在很多地方都有使用到,不僅僅是計算機,比如金融,企業管理,設備管理等。

為什么要線程池?如果在並發的場景,編碼人員根據需求來創建線程池,可能會有以下的問題:

  • 我們很難確定系統有多少線程在運行,如果使用就創建,不使用就銷毀,那么創建和銷毀線程的消耗也是比較大的
  • 假設來了很多請求,可能是爬蟲,瘋狂創建線程,可能把系統資源耗盡。

實現線程池有什么好處呢?

  • 降低資源消耗:池化技術可以重復利用已經創建的線程,降低線程創建和銷毀的損耗。
  • 提高響應速度:利用已經存在的線程進行處理,少去了創建線程的時間
  • 管理線程可控:線程是稀缺資源,不能無限創建,線程池可以做到統一分配和監控
  • 拓展其他功能:比如定時線程池,可以定時執行任務

其實池化技術,用在比較多地方,比如:

  • 數據庫連接池:數據庫連接是稀缺資源,先創建好,提高響應速度,重復利用已有的連接
  • 實例池:先創建好對象放到池子里面,循環利用,減少來回創建和銷毀的消耗

線程池相關的類

下面是與線程池相關的類的繼承關系:

Executor

Executor 是頂級接口,里面只有一個方法execute(Runnable command),定義的是調度線程池來執行任務,它定義了線程池的基本規范,執行任務是它的天職。

ExecutorService

ExecutorService 繼承了Executor,但是它仍然是一個接口,它多了一些方法:

  • void shutdown():關閉線程池,會等待任務執行完。
  • List<Runnable> shutdownNow():立刻關閉線程池,嘗試停止所有正在積極執行的任務,停止等待任務的處理,並返回一個正在等待執行的任務列表(還沒有執行的)
  • boolean isShutdown():判斷線程池是不是已經關閉,但是可能線程還在執行。
  • boolean isTerminated():在執行shutdown/shutdownNow之后,所有的任務已經完成,這個狀態就是true。
  • boolean awaitTermination(long timeout, TimeUnit unit):執行shutdown之后,阻塞等到terminated狀態,除非超時或者被打斷。
  • <T> Future<T> submit(Callable<T> task): 提交一個有返回值的任務,並且返回該任務尚未有結果的Future,調用future.get()方法,可以返回任務完成的時候的結果。
  • <T> Future<T> submit(Runnable task, T result):提交一個任務,傳入返回結果,這個result沒有什么作用,只是指定類型和一個返回的結果。
  • Future<?> submit(Runnable task): 提交任務,返回Future
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks):批量執行tasks,獲取Future的list,可以批量提交任務。
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit):批量提交任務,並指定超時時間
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks): 阻塞,獲取第一個完成任務的結果值,
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit):阻塞,獲取第一個完成結果的值,指定超時時間

可能有同學對前面的<T> Future<T> submit(Runnable task, T result)有疑問,這個reuslt有什么作用?

其實它沒有什么作用,只是持有它,任務完成后,還是調用 future.get()返回這個結果,用result new 了一個 ftask,其內部其實是使用了Runnable的包裝類 RunnableAdapter,沒有對result做特殊的處理,調用 call() 方法的時候,直接返回這個結果。(Executors 中具體的實現)

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            // 返回傳入的結果
            return result;
        }
    }

還有一個方法值得一提:invokeAny(): 在 ThreadPoolExecutor中使用ExecutorService 中的方法 invokeAny() 取得第一個完成的任務的結果,當第一個任務執行完成后,會調用 interrupt() 方法將其他任務中斷。

注意,ExecutorService是接口,里面都是定義,並沒有涉及實現,而前面的講解都是基於它的名字(規定的規范)以及它的普遍實現來說的。

可以看到 ExecutorService 定義的是線程池的一些操作,包括關閉,判斷是否關閉,是否停止,提交任務,批量提交任務等等。

AbstractExecutorService

AbstractExecutorService 是一個抽象類,實現了 ExecutorService接口,這是大部分線程池的基本實現,定時的線程池先不關注,主要的方法如下:

不僅實現了submitinvokeAllinvokeAny 等方法,而且提供了一個 newTaskFor 方法用於構建 RunnableFuture 對象,那些能夠獲取到任務返回結果的對象都是通過 newTaskFor 來獲取的。不展開里面所有的源碼的介紹,僅以submit()方法為例:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 封裝任務
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 執行任務
        execute(ftask);
        // 返回 RunnableFuture 對象
        return ftask;
    }

但是在 AbstractExecutorService 是沒有對最最重要的方法進行實現的,也就是 execute() 方法。線程池具體是怎么執行的,這個不同的線程池可以有不同的實現,一般都是繼承 AbstractExecutorService (定時任務有其他的接口),我們最最常用的就是ThreadPoolExecutor

ThreadPoolExecutor

重點來了!!! ThreadPoolExecutor 一般就是我們平時常用到的線程池類,所謂創建線程池,如果不是定時線程池,就是使用它。

先看ThreadPoolExecutor的內部結構(屬性):

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 狀態控制,主要用來控制線程池的狀態,是核心的遍歷,使用的是原子類
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  	// 用來表示線程數量的位數(使用的是位運算,一部分表示線程的數量,一部分表示線程池的狀態)
    // SIZE = 32 表示32位,那么COUNT_BITS就是29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
  	// 線程池的容量,也就是27位表示的最大值
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 狀態量,存儲在高位,32位中的前3位
  	// 111(第一位是符號位,1表示負數),線程池運行中
    private static final int RUNNING    = -1 << COUNT_BITS; 
  	// 000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
  	// 001
    private static final int STOP       =  1 << COUNT_BITS;
  	// 010
    private static final int TIDYING    =  2 << COUNT_BITS;
  	// 011
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 取出運行狀態
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
  	// 取出線程數量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
  	// 用運行狀態和線程數獲取ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }
  	
  	// 任務等待隊列
    private final BlockingQueue<Runnable> workQueue;
  	// 可重入主鎖(保證一些操作的線程安全)
    private final ReentrantLock mainLock = new ReentrantLock();
  	// 線程的集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
  
  	// 在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),
    // 傳統線程的通信方式,Condition都可以實現,Condition和傳統的線程通信沒什么區別,Condition的強大之處在於它可以為多個線程間建立不同的Condition
    private final Condition termination = mainLock.newCondition();
  
  	// 最大線程池大小
    private int largestPoolSize;
  	// 完成的任務數量
    private long completedTaskCount;
  	// 線程工廠
    private volatile ThreadFactory threadFactory;
  	// 任務拒絕處理器
    private volatile RejectedExecutionHandler handler;
 		// 非核心線程的存活時間
    private volatile long keepAliveTime;
  	// 允許核心線程的超時時間
    private volatile boolean allowCoreThreadTimeOut;
 		// 核心線程數
    private volatile int corePoolSize;
		// 工作線程最大容量
    private volatile int maximumPoolSize;
 		// 默認的拒絕處理器(丟棄任務)
  	private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
  	// 運行時關閉許可
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");
  	// 上下文
    private final AccessControlContext acc;
  	// 只有一個線程
    private static final boolean ONLY_ONE = true;
}

線程池狀態

從上面的代碼可以看出,用一個32位的對象保存線程池的狀態以及線程池的容量,高3位是線程池的狀態,而剩下的29位,則是保存線程的數量:

    // 狀態量,存儲在高位,32位中的前3位
  	// 111(第一位是符號位,1表示負數),線程池運行中
    private static final int RUNNING    = -1 << COUNT_BITS; 
  	// 000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
  	// 001
    private static final int STOP       =  1 << COUNT_BITS;
  	// 010
    private static final int TIDYING    =  2 << COUNT_BITS;
  	// 011
    private static final int TERMINATED =  3 << COUNT_BITS;

各種狀態之間是不一樣的,他們的狀態之間變化如下:

  • RUNNING:運行狀態,可以接受任務,也可以處理任務
  • SHUTDOWN:不可以接受任務,但是可以處理任務
  • STOP:不可以接受任務,也不可以處理任務,中斷當前任務
  • TIDYING:所有線程停止
  • TERMINATED:線程池的最后狀態

Worker 實現

線程池,肯定得有池子,並且是放線程的地方,在 ThreadPoolExecutor 中表現為 Worker,這是內部類:

線程池其實就是 Worker (打工人,不斷的領取任務,完成任務)的集合,這里使用的是 HashSet:

private final HashSet<Worker> workers = new HashSet<Worker>();

Worker 怎么實現的呢?

Worker 除了繼承了 AbstractQueuedSynchronizer,也就是 AQSAQS 本質上就是個隊列鎖,一個簡單的互斥鎖,一般是在中斷或者修改 worker 狀態的時候使用。

內部引入AQS,是為了線程安全,線程執行任務的時候,調用的是runWorker(Worker w),這個方法不是worker的方法,而是 ThreadPoolExecutor的方法。從下面的代碼可以看出,每次修改Worker的狀態的時候,都是線程安全的。Worker里面,持有了一個線程Thread,可以理解為是對線程的封裝。

至於runWorker(Worker w)是怎么運行的?先保持這個疑問,后面詳細講解。

    // 實現 Runnable,封裝了線程
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        // 序列化id
        private static final long serialVersionUID = 6138294804551838833L;

        // worker運行的線程
        final Thread thread;
        
        // 初始化任務,有可能是空的,如果任務不為空的時候,其他進來的任務,可以直接運行,不在添加到任務隊列
        Runnable firstTask;
        // 線程任務計數器
        volatile long completedTasks;

        // 指定一個任務讓工人忙碌起來,這個任務可能是空的
        Worker(Runnable firstTask) {
          	// 初始化AQS隊列鎖的狀態
            setState(-1); // 禁止中斷直到 runWorker
            this.firstTask = firstTask;
            // 從線程工廠,取出一個線程初始化
            this.thread = getThreadFactory().newThread(this);
        }

        // 實際上運行調用的是runWorker
        public void run() {
          	// 不斷循環獲取任務進行執行
            runWorker(this);
        }

        // 0表示沒有被鎖
        // 1表示被鎖的狀態
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        // 獨占,嘗試獲取鎖,如果成功返回true,失敗返回false
        protected boolean tryAcquire(int unused) {
            // CAS 樂觀鎖
            if (compareAndSetState(0, 1)) {
                // 成功,當前線程獨占鎖
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 獨占方式,嘗試釋放鎖
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        // 上鎖,調用的是AQS的方法
        public void lock()        { acquire(1); }
        // 嘗試上鎖
        public boolean tryLock()  { return tryAcquire(1); }
        // 解鎖
        public void unlock()      { release(1); }
        // 是否鎖住
        public boolean isLocked() { return isHeldExclusively(); }

        // 如果開始可就中斷
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

任務隊列

除了放線程池的地方,要是任務很多,沒有那么多線程,肯定需要一個地方放任務,充當緩沖作用,也就是任務隊列,在代碼中表現為:

private final BlockingQueue<Runnable> workQueue;

拒絕策略和處理器

計算機的內存總是有限的,我們不可能一直往隊列里面增加內容,所以線程池為我們提供了選擇,可以選擇多種隊列。同時當任務實在太多,占滿了線程,並且把任務隊列也占滿的時候,我們需要做出一定的反應,那就是拒絕還是拋出錯誤,丟掉任務?丟掉哪些任務,這些都是可能需要定制的內容。

如何創建線程池

關於如何創建線程池,其實 ThreadPoolExecutor提供了構造方法,主要參數如下,不傳的話會使用默認的:

  • 核心線程數:核心線程數,一般是指常駐的線程,沒有任務的時候通常也不會銷毀
  • 最大線程數:線程池允許創建的最大的線程數量
  • 非核心線程的存活時間:指的是沒有任務的時候,非核心線程能夠存活多久
  • 時間的單位:存活時間的單位
  • 存放任務的隊列:用來存放任務
  • 線程工廠
  • 拒絕處理器:如果添加任務失敗,將由該處理器處理
	// 指定核心線程數,最大線程數,非核心線程沒有任務的存活時間,時間單位,任務隊列    
	public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
	  // 指定核心線程數,最大線程數,非核心線程沒有任務的存活時間,時間單位,任務隊列,線程池工廠    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
	  // 指定核心線程數,最大線程數,非核心線程沒有任務的存活時間,時間單位,任務隊列,拒絕任務處理器
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
		// 最后其實都是調用了這個方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
      ...
    }

其實,除了顯示的指定上面的參數之外,JDK也封裝了一些直接創建線程池的方法給我們,那就是Executors:

		// 固定線程數量的線程池,無界的隊列
		public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
		// 單個線程的線程池,無界的隊列,按照任務提交的順序,串行執行    
		public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
		// 動態調節,沒有核心線程,全部都是普通線程,每個線程存活60s,使用容量為1的阻塞隊列
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
	  // 定時任務線程池
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

但是一般是不推薦使用上面別人封裝的線程池的哈!!!

線程池的底層參數以及核心方法

看完上面的創建參數大家可能會有點懵,但是沒關系,一一為大家道來:

可以看出,當有任務進來的時候,先判斷核心線程池是不是已經滿了,如果還沒有,將會繼續創建線程。注意,如果一個任務進來,創建線程執行,執行完成,線程空閑下來,這時候再來一個任務,是會繼續使用之前的線程,還是重新創建一個線程來執行呢?

答案是重新創建線程,這樣線程池可以快速達到核心線程數的規模大小,以便快速響應后面的任務。

如果線程數量已經到達核心線程數,來了任務,線程池的線程又都不是空閑狀態,那么就會判斷隊列是不是滿的,倘若隊列還有空間,那么就會把任務放進去隊列中,等待線程領取執行。

如果任務隊列已經滿了,放不下任務,那么就會判斷線程數是不是已經到最大線程數了,要是還沒有到達,就會繼續創建線程並執行任務,這個時候創建的是非核心部分線程。

如果已經到達最大線程數,那么就不能繼續創建線程了,只能執行拒絕策略,默認的拒絕策略是丟棄任務,我們可以自定義拒絕策略。

值得注意的是,倘若之前任務比較多,創建出了一些非核心線程,那么任務少了之后,領取不到任務,過了一定時間,非核心線程就會銷毀,只剩下核心線程池的數量的線程。這個時間就是前面說的keepAliveTime

提交任務

提交任務,我們看execute(),會先獲取線程池的狀態和個數,要是線程個數還沒達到核心線程數,會直接添加線程,否則會放到任務隊列,如果任務隊列放不下,會繼續增加線程,但是不是增加核心線程。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 獲取狀態和個數
        int c = ctl.get();
      	// 如果個數小於核心線程數
        if (workerCountOf(c) < corePoolSize) {
          	// 直接添加
            if (addWorker(command, true))
                return;
          	// 添加失敗則繼續獲取
            c = ctl.get();
        }
      	// 判斷線程池狀態是不是運行中,任務放到隊列中
        if (isRunning(c) && workQueue.offer(command)) {
          	// 再次檢查
            int recheck = ctl.get();
          	// 判斷線程池是不是還在運行
            if (! isRunning(recheck) && remove(command))
              	// 如果不是,那么就拒絕並移除任務
                reject(command);
            else if (workerCountOf(recheck) == 0)
              	// 如果線程數為0,並且還在運行,那么就直接添加
                addWorker(null, false);
        }else if (!addWorker(command, false))
          	// 添加任務隊列失敗,拒絕
            reject(command);
    }

上面的源碼中,調用了一個重要的方法:addWorker(Runnable firstTask, boolean core),該方法主要是為了增加工作的線程,我們來看看它是如何執行的:

    private boolean addWorker(Runnable firstTask, boolean core) {
      	// 回到當前位置重試
        retry:
        for (;;) {
          	// 獲取狀態
            int c = ctl.get();
            int rs = runStateOf(c);

            // 大於SHUTDOWN說明線程池已經停止
          	// ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) 表示三個條件至少有一個不滿足
          	// 不等於SHUTDOWN說明是大於shutdown
          	// firstTask != null 任務不是空的
          	// workQueue.isEmpty() 隊列是空的
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 工作線程數
                int wc = workerCountOf(c);
              	// 是否符合容量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
              	// 添加成功,跳出循環
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
              	// cas失敗,重新嘗試
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

      	// 前面線程計數增加成功
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
          	// 創建了一個worker,包裝了任務
            w = new Worker(firstTask);
            final Thread t = w.thread;
          	// 線程創建成功
            if (t != null) {
              	// 獲取鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 再次確認狀態
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                      	// 如果線程已經啟動,失敗
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                      	// 新增線程到集合
                        workers.add(w);
                      	// 獲取大小
                        int s = workers.size();
                      	// 判斷最大線程池數量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                      	// 已經添加工作線程
                        workerAdded = true;
                    }
                } finally {
                  	// 解鎖
                    mainLock.unlock();
                }
              	// 如果已經添加
                if (workerAdded) {
                  	// 啟動線程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
          	// 如果沒有啟動
            if (! workerStarted)
              	// 失敗處理
                addWorkerFailed(w);
        }
        return workerStarted;
    }

處理任務

前面在介紹Worker這個類的時候,我們講解到其實它的run()方法調用的是外部的runWorker()方法,那么我們來看看runWorkder()方法:

首先,它會直接處理自己的firstTask,這個任務並沒有在任務隊列里面,而是它自己持有的:

final void runWorker(Worker w) {
  			// 當前線程
        Thread wt = Thread.currentThread();
  			// 第一個任務
        Runnable task = w.firstTask;
  			// 重置為null
        w.firstTask = null;
  			// 允許打斷
        w.unlock();
        boolean completedAbruptly = true;
        try {
           // 任務不為空,或者獲取的任務不為空
            while (task != null || (task = getTask()) != null) {
              	// 加鎖
                w.lock();
								//如果線程池停止,確保線程被中斷;
								//如果不是,確保線程沒有被中斷。這
								//在第二種情況下需要復查處理
								// shutdown - now競賽同時清除中斷
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                  	// 執行之前回調方法(可以由我們自己實現)
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      	// 執行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                      	// 執行之后回調方法
                        afterExecute(task, thrown);
                    }
                } finally {
                  	// 置為null
                    task = null;
                  	// 更新完成任務
                    w.completedTasks++;
                    w.unlock();
                }
            }
          	// 完成
            completedAbruptly = false;
        } finally {
          	// 處理線程退出相關工作
            processWorkerExit(w, completedAbruptly);
        }
    }

上面可以看到如果當前的任務是null,會去獲取一個task,我們看看getTask(),里面涉及到了兩個參數,一個是是不是允許核心線程銷毀,另外一個是線程數是不是大於核心線程數,如果滿足條件,就從隊列中取出任務,如果超時取不到,那就返回空,表示沒有取到任務,沒有取到任務,就不會執行前面的循環,就會觸發線程銷毀processWorkerExit()等工作。

private Runnable getTask() {
  	// 是否超時
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // SHUTDOWN狀態繼續處理隊列中的任務,但是不接收新的任務
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
      	// 線程數
        int wc = workerCountOf(c);

        // 是否允許核心線程超時或者線程數大於核心線程數
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          	// 減少線程成功,就返回null,后面由processWorkerExit()處理
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
          	// 如果允許核心線程關閉,或者超過了核心線程,就可以在超時的時間內獲取任務,或者直接取出任務
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
          	// 如果能取到任務,那就肯定可以執行
            if (r != null)
                return r;
          	// 否則就獲取不到任務,超時了
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

銷毀線程

前面提到,如果線程當前任務為空,又允許核心線程銷毀,或者線程超過了核心線程數,等待了一定時間,超時了卻沒有從任務隊列獲取到任務的話,就會跳出循環執行到后面的線程銷毀(結束)程序。那銷毀線程的時候怎么做呢?

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
      	// 如果是突然結束的線程,那么之前的線程數是沒有調整的,這里需要調整
        if (completedAbruptly)
            decrementWorkerCount();
      	// 獲取鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
      
        try {
          	// 完成的任務數
            completedTaskCount += w.completedTasks;
            // 移除線程
          	workers.remove(w);
        } finally {
          	// 解鎖
            mainLock.unlock();
        }
      	// 試圖停止
        tryTerminate();
      	// 獲取狀態
        int c = ctl.get();
      	// 比stop小,至少是shutdown
        if (runStateLessThan(c, STOP)) {
          	// 如果不是突然完成
            if (!completedAbruptly) {
              	// 最小值要么是0,要么是核心線程數,要是允許核心線程超時銷毀,那么就是0
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
              	// 如果最小的是0或者隊列不是空的,那么保留一個線程
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
              	// 只要大於等於最小的線程數,就結束當前線程
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
          	// 否則的話,可能還需要新增工作線程
            addWorker(null, false);
        }
    }

如何停止線程池

停止線程池可以使用shutdown()或者shutdownNow()shutdown()可以繼續處理隊列中的任務,而shutdownNow()會立即清理任務,並返回未執行的任務。

    public void shutdown() {
        // 獲取鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          	// 檢查停止權限
            checkShutdownAccess();
          	// 更新狀態
            advanceRunState(SHUTDOWN);
          	// 中斷所有線程
            interruptIdleWorkers();
          	// 回調鈎子
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
		// 立刻停止
   public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
     		// 獲取鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          	// 檢查停止權限
            checkShutdownAccess();
          	// 更新狀態到stop
            advanceRunState(STOP);
          	// 中斷所有線程
            interruptWorkers();
            // 清理隊列
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
     		// 返回任務列表(未完成)
        return tasks;
    }

execute()和submit()方法

  • execute() 方法可以提交不需要返回值的任務,無法判斷任務是否被線程池執行是否成功
  • submit()方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個對象,我們調用get()方法就可以阻塞,直到獲取到線程執行完成的結果,同時我們也可以使用有超時時間的等待方法get(long timeout,TimeUnit unit),這樣不管線程有沒有執行完成,如果到時間,也不會阻塞,直接返回null。返回的是RunnableFuture對象,繼承了Runnable, Future<V>兩個接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

線程池為什么使用阻塞隊列?

阻塞隊列,首先是一個隊列,肯定具有先進先出的屬性。

而阻塞,則是這個模型的演化,一般隊列,可以用在生產消費者模型,也就是數據共享,有人往里面放任務,有人不斷的往里面取出任務,這是一個理想的狀態。

但是倘若不理想,產生任務和消費任務的速度不一樣,要是任務放在隊列里面比較多,消費比較慢,還可以慢慢消費,或者生產者得暫停一下產生任務(阻塞生產者線程)。可以使用 offer(E o, long timeout, TimeUnit unit)設定等待的時間,如果在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗,也可以使用put(Object),將對象放到阻塞隊列里面,如果沒有空間,那么這個方法會阻塞到有空間才會放進去。

如果消費速度快,生產者來不及生產,獲取任務的時候,可以使用poll(time),有數據則直接取出來,沒數據則可以等待time時間后,返回null。也可以使用take()取出第一個任務,沒有任務就會一直阻塞到隊列有任務為止。

上面說了阻塞隊列的屬性,那么為啥要用呢?

  • 如果產生任務,來了就往隊列里面放,資源很容易被耗盡。
  • 創建線程需要獲取鎖,這個一個線程池的全局鎖,如果各個線程不斷的獲取鎖,解鎖,線程上下文切換之類的開銷也比較大,不如在隊列為空的時候,然一個線程阻塞等待。

常見的阻塞隊列

  • ArrayBlockingQueue:基於數組實現,內部有一個定長的數組,同時保存着隊列頭和尾部的位置。
  • LinkedBlockingQueue:基於鏈表的阻塞對壘,生產者和消費者使用獨立的鎖,並行能力強,如果不指定容量,默認是無效容量,容易系統內存耗盡。
  • DelayQueue:延遲隊列,沒有大小限制,生產數據不會被阻塞,消費數據會,只有指定的延遲時間到了,才能從隊列中獲取到該元素。
  • PriorityBlockingQueue:基於優先級的阻塞隊列,按照優先級進行消費,內部控制同步的是公平鎖。
  • SynchronousQueue:沒有緩沖,生產者直接把任務交給消費者,少了中間的緩存區。

線程池如何復用線程的?執行完成的線程怎么處理

前面的源碼分析,其實已經講解過這個問題了,線程池的線程調用的run()方法,其實調用的是runWorker(),里面是死循環,除非獲取不到任務,如果沒有了任務firstTask並且從任務隊列中獲取不到任務,超時的時候,會再判斷是不是可以銷毀核心線程,或者超過了核心線程數,滿足條件的時候,才會讓當前的線程結束。

否則,一直都在一個循環中,不會結束。

我們知道start()方法只能調用一次,因此調用到run()方法的時候,調用外面的runWorker(),讓其在runWorker()的時候,不斷的循環,獲取任務。獲取到任務,調用任務的run()方法。

執行完成的線程會調用processWorkerExit(),前面有分析,里面會獲取鎖,把線程數減少,從工作線程從集合中移除,移除掉之后,會判斷線程是不是太少了,如果是,會再加回來,個人以為是一種補救。

如何配置線程池參數?

一般而言,有個公式,如果是計算(CPU)密集型的任務,那么核心線程數設置為處理器核數-1,如果是io密集型(很多網絡請求),那么就可以設置為2*處理器核數。但是這並不是一個銀彈,一切要從實際出發,最好就是在測試環境進行壓測,實踐出真知,並且很多時候一台機器不止一個線程池或者還會有其他的線程,因此參數不可設置得太過飽滿。

一般 8 核的機器,設置 10-12 個核心線程就差不多了,這一切必須按照業務具體值進行計算。設置過多的線程數,上下文切換,競爭激烈,設置過少,沒有辦法充分利用計算機的資源。

計算(CPU)密集型消耗的主要是 CPU 資源,可以將線程數設置為 N(CPU 核心數)+1,比 CPU 核心數多出來的一個線程是為了防止線程偶發的缺頁中斷,或者其它原因導致的任務暫停而帶來的影響。一旦任務暫停,CPU 就會處於空閑狀態,而在這種情況下多出來的一個線程就可以充分利用 CPU 的空閑時間。

io密集型系統會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內不會占用 CPU 來處理,這時就可以將 CPU 交出給其它線程使用。因此在 I/O 密集型任務的應用中,我們可以多配置一些線程,具體的計算方法是 2N。

為什么不推薦默認的線程池創建方式?

阿里的編程規范里面,不建議使用默認的方式來創建線程,是因為這樣創建出來的線程很多時候參數都是默認的,可能創建者不太了解,很容易出問題,最好通過new ThreadPoolExecutor()來創建,方便控制參數。默認的方式創建的問題如下:

  • Executors.newFixedThreadPool():無界隊列,內存可能被打爆
  • Executors.newSingleThreadExecutor():單個線程,效率低,串行。
  • Executors.newCachedThreadPool():沒有核心線程,最大線程數可能為無限大,內存可能還會爆掉。

使用具體的參數創建線程池,開發者必須了解每個參數的作用,不會胡亂設置參數,減少內存溢出等問題。

一般體現在幾個問題:

  • 任務隊列怎么設置?
  • 核心線程多少個?
  • 最大線程數多少?
  • 怎么拒絕任務?
  • 創建線程的時候沒有名稱,追溯問題不好找。

線程池的拒絕策略

線程池一般有以下四種拒絕策略,其實我們可以從它的內部類看出來:

  • AbortPolicy: 不執行新的任務,直接拋出異常,提示線程池已滿
  • DisCardPolicy:不執行新的任務,但是也不會拋出異常,默默的
  • DisCardOldSetPolicy:丟棄消息隊列中最老的任務,變成新進來的任務
  • CallerRunsPolicy:直接調用當前的execute來執行任務

一般而言,上面的拒絕策略都不會特別理想,一般要是任務滿了,首先需要做的就是看任務是不是必要的,如果非必要,非核心,可以考慮拒絕掉,並報錯提醒,如果是必須的,必須把它保存起來,不管是使用mq消息,還是其他手段,不能丟任務。在這些過程中,日志是非常必要的。既要保護線程池,也要對業務負責。

線程池監控與動態調整

線程池提供了一些API,可以動態獲取線程池的狀態,並且還可以設置線程池的參數,以及狀態:

查看線程池的狀態:

修改線程池的狀態:

關於這一點,美團的線程池文章講得很清楚,甚至做了一個實時調整線程池參數的平台,可以進行跟蹤監控,線程池活躍度、任務的執行Transaction(頻率、耗時)、Reject異常、線程池內部統計信息等等。這里我就不展開了,原文:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html ,這是我們可以參考的思路。

線程池隔離

線程隔離,很多同學可能知道,就是不同的任務放在不同的線程里面運行,而線程池隔離,一般是按照業務類型來隔離,比如訂單的處理線程放在一個線程池,會員相關的處理放在一個線程池。

也可以通過核心和非核心來隔離,核心處理流程放在一起,非核心放在一起,兩個使用不一樣的參數,不一樣的拒絕策略,盡量保證多個線程池之間不影響,並且最大可能保住核心線程的運行,非核心線程可以忍受失敗。

Hystrix里面運用到這個技術,Hystrix的線程隔離技術,來防止不同的網絡請求之間的雪崩,即使依賴的一個服務的線程池滿了,也不會影響到應用程序的其他部分。

關於作者

秦懷,公眾號【秦懷雜貨店】作者,技術之路不在一時,山高水長,縱使緩慢,馳而不息。個人寫作方向:Java源碼解析,JDBC,Mybatis,Spring,redis,分布式,劍指Offer,LeetCode等,認真寫好每一篇文章,不喜歡標題黨,不喜歡花里胡哨,大多寫系列文章,不能保證我寫的都完全正確,但是我保證所寫的均經過實踐或者查找資料。遺漏或者錯誤之處,還望指正。

2020年我寫了什么?

開源編程筆記


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM