深入理解Java線程池原理


微信公眾號:大黃奔跑
關注我,可了解更多有趣的面試相關問題。

1. 線程池介紹

在web開發中,服務器需要接受並處理請求,所以會為一個請求來分配一個線程來進行處理。如果每次請求都新創建一個線程的話實現起來非常簡便,但是存在一個問題:

如果並發的請求數量非常多,但每個線程執行的時間很短,這樣就會頻繁的創建和銷毀線程,如此一來會大大降低系統的效率。可能出現服務器在為每個請求創建新線程和銷毀線程上花費的時間和消耗的系統資源要比處理實際的用戶請求的時間和資源更多。

那么有沒有一種辦法使執行完一個任務,並不被銷毀,而是可以繼續執行其他的任務呢?

這就是線程池的目的了。線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。

什么時候使用線程池?

  • 單個任務處理時間比較短
  • 需要處理的任務數量很大

使用線程池的好處:

  1. 降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
  2. 提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。
  3. 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。

Java中的線程池是用ThreadPoolExecutor類來實現的. 本文就結合JDK 1.8對該類的源碼來分析一下這個類內部對於線程的創建, 管理以及后台任務的調度等方面的執行原理。

先看一下線程池的類圖:

在這里插入圖片描述
在這里插入圖片描述

2. 源碼分析

1. Executor框架接口

Executor框架是一個根據一組執行策略調用,調度,執行和控制的異步任務的框架,目的是提供一種將”任務提交”與”任務如何運行”分離開來的機制。

J.U.C中有三個Executor接口:

  • Executor:一個運行新任務的簡單接口;
  • ExecutorService:擴展了Executor接口。添加了一些用來管理執行器生命周期和任務生命周期的方法;
  • ScheduledExecutorService:擴展了ExecutorService。支持Future和定期執行任務。

1. Executo接口

public interface Executor {
    void execute(Runnable command);
}

Executor接口只有一個execute方法,用來替代通常創建或啟動線程的方法。例如,使用Thread來創建並啟動線程的代碼如下:

Thread t = new Thread();
t.start();

使用Executor來啟動線程執行任務的代碼如下:

Thread t = new Thread();
executor.execute(t);

對於不同的Executor實現,execute()方法可能是創建一個新線程並立即啟動,也有可能是使用已有的工作線程來運行傳入的任務,也可能是根據設置線程池的容量或者阻塞隊列的容量來決定是否要將傳入的線程放入阻塞隊列中或者拒絕接收傳入的線程。

2. ExecutorService

ExecutorService該接口繼承與Executor接口。

提供了管理終止的方法,以及可為跟蹤一個或多個異步任務執行狀況而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支持即時關閉,也就是shutDownNow()方法,則任務需要正確處理中斷。
其主要的方法如下:

ExecutorService方法
ExecutorService方法

3. AbstractExecutorService

該抽象方法實現於ExecutorService
該方法主要重寫了ExecutorService中的一些方法比如submit()、invokeAll()等。
同時增加了幾個自己的方法:

在這里插入圖片描述
在這里插入圖片描述

4. ThreadPoolExecutor

接下來重點看一下線程池的重要的實現類,該類繼承與AbstractExecutorService

1. 幾個重要的字段
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

下面分別解釋一下每個字段的含義:

  1. ctl是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段, 它包含兩部分的信息: 線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount)
  2. 使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。
  3. RUNNING:能接受新提交的任務,並且也能處理阻塞隊列中的任務;
  4. SHUTDOWN:關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務。在線程池處於 RUNNING 狀態時,調用 shutdown()方法會使線程池進入到該狀態。
  5. STOP: 不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態;
  6. TIDYING:如果所有的任務都已終止了,workerCount (有效線程數) 為0,線程池進入該狀態后會調用 terminated() 方法進入TERMINATED 狀態。
  7. TERMINATED:在terminated() 方法執行完后進入該狀態,默認terminated()方法中什么也沒有做。

進入TERMINATED的條件如下:

  • 線程池不是RUNNING狀態;
  • 線程池狀態不是TIDYING狀態或TERMINATED狀態;
  • 如果線程池狀態是SHUTDOWN並且workerQueue為空;
  • workerCount為0;
  • 設置TIDYING狀態成功。

下面是線程池狀態的流轉:

在這里插入圖片描述
在這里插入圖片描述

ctl相關方法
這里還有幾個對ctl進行計算的方法:

private static int runStateOf(int c)     return c & ~CAPACITY; }
private static int workerCountOf(int c)  return c & CAPACITY; }
private static int ctlOf(int rs, int wc) return rs | wc; }

runStateOf:獲取運行狀態;
workerCountOf:獲取活動線程數;
ctlOf:獲取運行狀態和活動線程數的值。

2. ThreadPoolExecutor構造方法
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler
{
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

構造方法中的字段含義如下:

  • corePoolSize:核心線程數量,當有新任務在execute()方法提交時,會執行以下判斷:
  1. 如果運行的線程少於 corePoolSize,則創建新線程來處理任務,即使線程池中的其他線程是空閑的;
  2. 如果線程池中的線程數量大於等於 corePoolSize 且小於 maximumPoolSize,任務來了首先放在隊列中,只有當workQueue滿時才創建新的線程去處理任務;
  3. 如果設置的corePoolSize 和 maximumPoolSize相同,則創建的線程池的大小是固定的,這時如果有新任務提交,若workQueue未滿,則將請求放入workQueue中,等待有空閑的線程去從workQueue中取任務並處理;
  4. 如果運行的線程數量大於等於maximumPoolSize,這時如果workQueue已經滿了,則通過handler所指定的策略來處理任務;
    所以,任務提交時,判斷的順序為 corePoolSize --> workQueue --> maximumPoolSize。
  • maximumPoolSize:最大線程數量;
  • workQueue:等待隊列,當任務提交時,如果線程池中的線程數量大於等於corePoolSize的時候,把該任務封裝成一個Worker對象放入等待隊列;
  • workQueue:保存等待執行的任務的阻塞隊列,當提交一個新的任務到線程池以后, 線程池會根據當前線程池中正在運行着的線程的數量來決定對該任務的處理方式,主要有以下幾種處理方式:
  1. 直接切換:這種方式常用的隊列是SynchronousQueue
  2. 使用無界隊列:一般使用基於鏈表的阻塞隊列LinkedBlockingQueue。如果使用這種方式,那么線程池中能夠創建的最大線程數就是corePoolSize,而maximumPoolSize就不會起作用了(后面也會說到)。當線程池中所有的核心線程都是RUNNING狀態時,這時一個新的任務提交就會放入等待隊列中。
  3. 使用有界隊列:一般使用ArrayBlockingQueue。使用該方式可以將線程池的最大線程數量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時這種方式也使得線程池對線程的調度變得更困難,因為線程池和隊列的容量都是有限的值,所以要想使線程池處理任務的吞吐率達到一個相對合理的范圍,又想使線程調度相對簡單,並且還要盡可能的降低線程池對資源的消耗,就需要合理的設置這兩個數量。
  • 如果要想降低系統資源的消耗(包括CPU的使用率,操作系統資源的消耗,上下文環境切換的開銷等), 可以設置較大的隊列容量和較小的線程池容量, 但這樣也會降低線程處理任務的吞吐量。
  • 如果提交的任務經常發生阻塞,那么可以考慮通過調用 setMaximumPoolSize() 方法來重新設定線程池的容量。
  • 如果隊列的容量設置的較小,通常需要將線程池的容量設置大一點,這樣CPU的使用率會相對的高一些。但如果線程池的容量設置的過大,則在提交的任務數量太多的情況下,並發量會增加,那么線程之間的調度就是一個要考慮的問題,因為這樣反而有可能降低處理任務的吞吐量。
  • keepAliveTime:線程池維護線程所允許的空閑時間。當線程池中的線程數量大於corePoolSize的時候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;
  • threadFactory:它是ThreadFactory類型的變量,用來創建新線程。默認使用Executors.defaultThreadFactory() 來創建線程。使用默認的ThreadFactory來創建線程時,會使新創建的線程具有相同的NORM_PRIORITY優先級並且是非守護線程,同時也設置了線程的名稱。
  • handler:它是RejectedExecutionHandler類型的變量,表示線程池的飽和策略。如果阻塞隊列滿了並且沒有空閑的線程,這時如果繼續提交任務,就需要采取一種策略處理該任務。線程池提供了4種策略:
  1. AbortPolicy:直接拋出異常,這是默認策略;
  2. CallerRunsPolicy:用調用者所在的線程來執行任務;
  3. DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
  4. DiscardPolicy:直接丟棄任務;
3. execute()

execute()方法用來提交任務,源代碼如下:

/**
  * 主要分三種情況處理
  * @param command
  */

 public void execute(Runnable command{
     if (command == null)
         throw new NullPointerException();

     //clt記錄着runState和workerCount
     int c = ctl.get();

     /**
      * 第一種情況:workerCountOf方法取出低29位的值,表示當前活動的線程數;
      * 如果當前活動線程數小於corePoolSize,則新建一個線程放入線程池中;並且把任務放到線程池中
      * 如果添加失敗了,則重新獲取ctl值
      */

     if (workerCountOf(c) < corePoolSize) {
         /**
          * addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷還是maximumPoolSize來判斷;
          * 如果為true,根據corePoolSize來判斷;
          * 如果為false,則根據maximumPoolSize來判斷
          */

         if (addWorker(command, true))
             return;
         c = ctl.get();
     }

     /**
      * 第二種情況:如果當前線程池是運行狀態並且任務可以添加到隊列
      * 重新獲取ctl值,判斷狀態,因為有可能在上次檢查之后線程再次改變狀態
      *
      */

     if (isRunning(c) && workQueue.offer(command)) {
         int recheck = ctl.get();
         /**
          * 再次判斷線程池的運行狀態,如果不是運行狀態,由於之前已經把command添加到workQueue中了,這時需要移除該command
          * 執行過后通過handler使用拒絕策略對該任務進行處理,整個方法返回
          */

         if (! isRunning(recheck) && remove(command))
             reject(command);
         /**
          * 獲取線程池中的有效線程數,如果數量是0,則執行addWorker方法
          * 這里傳入的參數表示:
          * 1. 第一個參數為null,表示在線程池中創建一個線程,但不去啟動;
          * 2. 第二個參數為false,將線程池的有限線程數量的上限設置為maximumPoolSize,添加線程時根據maximumPoolSize來判斷;
          * 如果判斷workerCount大於0,則直接返回,在workQueue中新增的command會在將來的某個時刻被執行。
          */

         else if (workerCountOf(recheck) == 0)
             addWorker(nullfalse);
     }

     /**
      * 第三種情況:需要執行拒絕策略
      * 如果執行到這里,有兩種情況:
      * 1. 線程池已經不是RUNNING狀態;
      * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize並且workQueue已滿。
      * 再次調用addWorker方法,但第二個參數傳入為false,將線程池的有限線程數量的上限設置為maximumPoolSize;
      * 如果失敗則拒絕該任務
      */

     else if (!addWorker(command, false))
         reject(command);
 }

簡單來說,在執行execute()方法時如果狀態一直是RUNNING時,的執行過程如下:

  1. 如果workerCount &lt; corePoolSize,則創建並啟動一個線程來執行新提交的任務;
  2. 如果workerCount &gt;= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;
  3. 如果workerCount &gt;= corePoolSize &amp;&amp; workerCount &lt; maximumPoolSize,且線程池內的阻塞隊列已滿,則創建並啟動一個線程來執行新提交的任務;
  4. 如果workerCount &gt;= maximumPoolSize,並且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。

這里要注意一下addWorker(null, false),也就是創建一個線程,但並沒有傳入任務,因為任務已經被添加到workQueue中了,所以worker在執行的時候,會直接從workQueue中獲取任務。所以,在workerCountOf(recheck) == 0時執行addWorker(null, false);也是為了保證線程池在RUNNING狀態下必須要有一個線程來執行任務。
execute方法執行流程如下:

在這里插入圖片描述
在這里插入圖片描述
4. addWorker()

在提交線程中,可以看到線程添加到線程池中是通過addWorker()方法進行的,因此接着分析一下該方法:

/**
  * @param firstTask
  * @param core   如果是依據corePoolSize判斷則為true,maximumPoolSize判斷為false
  * @return
  */

 private boolean addWorker(Runnable firstTask, boolean core{
     retry:

     //第一層循環
     for (;;) {
         //獲取線程的狀態
         int c = ctl.get();
         int rs = runStateOf(c);

         /**
          * 該判斷條件:
          * 1. 如果rs >= SHUTDOWN,則表示此時不再接收新任務;
          * 2. 接着判斷以下3個條件,只要有1個不滿足,則返回false:
          *      1. 狀態為SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務
          *      2. firsTask為空
          *      3. 阻塞隊列不為空
          * 首先考慮rs == SHUTDOWN的情況
          * 這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會返回false;
          * 然后,如果firstTask為空,並且workQueue也為空,則返回false,
          * 因為隊列中已經沒有任務了,不需要再添加線程了
          */

         if (rs >= SHUTDOWN &&
             ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
             return false;

         // 第二層循環
         for (;;) {
             // 獲取線程數
             int wc = workerCountOf(c);
             /**
              * 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),方法直接返回false;
              * 這里的core是addWorker方法的第二個參數,如果為true表示根據corePoolSize來比較,如果為false則根據maximumPoolSize來比較。
              */

             if (wc >= CAPACITY ||
                 wc >= (core ? corePoolSize : maximumPoolSize))
                 return false;

             //嘗試增加workerCount,如果成功,則跳出第一個for循環
             if (compareAndIncrementWorkerCount(c))
                 break retry;
             // 如果增加workerCount失敗,則重新獲取ctl的值
             c = ctl.get();
             // 如果當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行
             if (runStateOf(c) != rs)
                 continue retry;
         }
     }

     boolean workerStarted = false;
     boolean workerAdded = false;
     Worker w = null;
     try {
         // 根據firstTask來創建Worker對象,每一個Worker對象都會創建一個線程
         w = new Worker(firstTask);
         final Thread t = w.thread;
         /**
          * 如果過線程不為空,則試着將線程加入工作隊列中
          */

         if (t != null) {
             //獲取重入鎖
             final ReentrantLock mainLock = this.mainLock;
             mainLock.lock();
             try {
                 // 重新獲取線程的狀態
                 int rs = runStateOf(ctl.get());

                 /**
                  * rs < SHUTDOWN表示是RUNNING狀態;
                  * 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態並且firstTask為null,向線程池中添加線程。
                  * 因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務
                  */

                 if (rs < SHUTDOWN ||
                     (rs == SHUTDOWN && firstTask == null)) {
                     if (t.isAlive())
                         throw new IllegalThreadStateException();
                     //workers是一個HashSet,將該worker對象添加其中
                     workers.add(w);
                     int s = workers.size();
                     // largestPoolSize記錄着線程池中出現過的最大線程數量
                     if (s > largestPoolSize)
                         largestPoolSize = s;
                     // 更新狀態
                     workerAdded = true;
                 }
             } finally {
                 mainLock.unlock();
             }
             /**
              * 如果添加成功,則啟動線程
              */

             if (workerAdded) {
                 t.start();
                 workerStarted = true;
             }
         }
     } finally {
         if (! workerStarted)
             addWorkerFailed(w);
     }
     return workerStarted;
 }

注意一下這里的t.start()這個語句,啟動時會調用Worker類中的run()方法,Worker本身實現了Runnable接口,所以一個Worker類型的對象也是一個線程。

5. Worker

線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象,看一下Worker的定義:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */

    private static final long serialVersionUID = 6138294804551838833L;

    /**
     * 在調用構造方法時通過ThreadFactory來創建的線程,是用來處理任務的線程。
     */

    final Thread thread;
    /** 保存傳入的任務 */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * 在調用構造方法時,需要把任務傳入,這里通過getThreadFactory().newThread(this);來新建一個線程,
     * newThread方法傳入的參數是this,因為Worker本身繼承了Runnable接口,
     * 也就是一個線程,所以一個Worker對象在啟動的時候會調用Worker類中的run方法。
     * @param firstTask the first task (null if none)
     */

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** 利用runWorker()方法來執行任務  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(01)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker繼承了AQS,使用AQS來實現獨占鎖的功能。為什么不使用ReentrantLock來實現呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:

  1. lock方法一旦獲取了獨占鎖,表示當前線程正在執行任務中;
  2. 如果正在執行任務,則不應該中斷線程;
  3. 如果該線程現在不是獨占鎖的狀態,也就是空閑的狀態,說明它沒有在處理任務,這時可以對該線程進行中斷;
  4. 線程池在執行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers方法來中斷空閑的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態;
  5. 之所以設置為不可重入,是因為我們不希望任務在調用像setCorePoolSize這樣的線程池控制方法時重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果在任務中調用了如setCorePoolSize這類線程池控制的方法,會中斷正在運行的線程。

所以,Worker繼承自AQS,用於判斷線程是否空閑以及是否可以被中斷。

此外,在構造方法中執行了setState(-1);,把state變量設置為-1,為什么這么做呢?是因為AQS中默認的state是0,如果剛創建了一個Worker對象,還沒有執行任務時,這時就不應該被中斷,看一下tryAquire方法:

/**
  * 該方法不允許重入
  * @param unused
  * @return
  */

 protected boolean tryAcquire(int unused) {
     if (compareAndSetState(01)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
     }
     return false;
 }

tryAcquire方法是根據state是否是0來判斷的,所以,setState(-1);將state設置為-1是為了禁止在執行任務前對線程進行中斷。

正因為如此,在runWorker方法中會先調用Worker對象的unlock方法將state設置為0.

6. runWorker

在Worker類中的run方法調用了runWorker方法來執行任務,runWorker方法的代碼如下:

final void runWorker(Worker w{
     Thread wt = Thread.currentThread();
     //獲取第一個任務
     Runnable task = w.firstTask;
     w.firstTask = null;
     //將該線程設置為允許中斷
     w.unlock();
     // 是否因為異常退出循環
     boolean completedAbruptly = true;
     try {
         // 如果task為空,則通過getTask來獲取任務
         while (task != null || (task = getTask()) != null) {
             w.lock();
             /**
              * 如果線程池正在停止,那么要保證當前線程是中斷狀態;
              * 如果不是的話,則要保證當前線程不是中斷狀態;
              * 這里要考慮在執行該if語句期間可能也執行了shutdownNow方法,shutdownNow方法會把狀態設置為STOP
              *
              * STOP狀態要中斷線程池中的所有線程,而這里使用Thread.interrupted()來判斷是否中斷是為了
              * 確保在RUNNING或者SHUTDOWN狀態時線程是非中斷狀態的,因為Thread.interrupted()方法會復位中斷的狀態。
              */

             if ((runStateAtLeast(ctl.get(), STOP) ||
                  (Thread.interrupted() &&
                   runStateAtLeast(ctl.get(), STOP))) &&
                 !wt.isInterrupted())
                 wt.interrupt();
             try {
                 beforeExecute(wt, task);
                 Throwable thrown = null;
                 // 如果不是中斷狀態,則調用task.run()執行任務
                 try {
                     task.run();
                 } catch (RuntimeException x) {
                     thrown = x; throw x;
                 } catch (Error x) {
                     thrown = x; throw x;
                 } catch (Throwable x) {
                     thrown = x; throw new Error(x);
                 } finally {
                     afterExecute(task, thrown);
                 }
             } finally {
                 task = null;
                 w.completedTasks++;
                 w.unlock();
             }
         }
         completedAbruptly = false;
     } finally {
         processWorkerExit(w, completedAbruptly);
     }
 }

總結一下runWorker方法的執行過程:

  1. while循環不斷地通過getTask()方法獲取任務;
  2. getTask()方法從阻塞隊列中取任務;
  3. 如果線程池正在停止,那么要保證當前線程是中斷狀態,否則要保證當前線程不是中斷狀態;調用task.run()執行任務;
  4. 如果task為null則跳出循環,執行processWorkerExit()方法;
  5. runWorker方法執行完畢,也代表着Worker中的run方法執行完畢,銷毀線程。

completedAbruptly變量來表示在執行任務過程中是否出現了異常,在processWorkerExit方法中會對該變量的值進行判斷。

7. getTask()

getTask方法用來從阻塞隊列中取任務,代碼如下:

/**
  * 從阻塞隊列中取任務
  * @return
  */

 private Runnable getTask({
     // timeOut變量的值表示上次從阻塞隊列中取任務時是否超時
     boolean timedOut = false// Did the last poll() time out?

     for (;;) {
         int c = ctl.get();
         int rs = runStateOf(c);

         /**
          * 如果線程池狀態rs >= SHUTDOWN,也就是非RUNNING狀態,再進行以下判斷:
          * 1. rs >= STOP,線程池是否正在stop;
          * 2. 阻塞隊列是否為空。
          * 如果以上條件滿足,則將workerCount減1並返回null。
          *
          * 因為如果當前線程池狀態的值是SHUTDOWN或以上時,不允許再向阻塞隊列中添加任務。
          */

         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
             decrementWorkerCount();
             return null;
         }

         //重新獲取線程的數量
         int wc = workerCountOf(c);

         // timed變量用於判斷是否需要進行超時控制。
         // allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時;
         // wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量;對於超過核心線程數量的這些線程,需要進行超時控制
         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

         /**
          * wc > maximumPoolSize的情況是因為可能在此方法執行階段同時執行了setMaximumPoolSize方法;
          * timed && timedOut 如果為true,表示當前操作需要進行超時控制,並且上次從阻塞隊列中獲取任務發生了超時
          * 接下來判斷,如果有效線程數量大於1,或者阻塞隊列是空的,那么嘗試將workerCount減1;
          * 如果減1失敗,則返回重試。
          * 如果wc == 1時,也就說明當前線程是線程池中唯一的一個線程了。
          */

         if ((wc > maximumPoolSize || (timed && timedOut))
             && (wc > 1 || workQueue.isEmpty())) {
             if (compareAndDecrementWorkerCount(c))
                 return null;
             continue;
         }

         /**
          * 根據timed來判斷,如果為true,則通過阻塞隊列的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取到任務,則返回null;
          * 否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空。
          */


         try {
             Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
             if (r != null)
                 return r;
             // 如果 r == null,說明已經超時,timedOut設置為true
             timedOut = true;
         } catch (InterruptedException retry) {
             // 如果獲取任務時當前線程發生了中斷,則設置timedOut為false並返回循環重試
             timedOut = false;
         }
     }
 }
8. processWorkerExit方法
/**
  * 主要用於線程的清理工作
  * @param w
  * @param completedAbruptly
  */

  private void processWorkerExit(Worker w, boolean completedAbruptly{

      // 如果completedAbruptly值為true,則說明線程執行時出現了異常,需要將workerCount減1;
      // 如果線程執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操作,這里就不必再減了。
      if (completedAbruptly)
          decrementWorkerCount();

      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
          //統計完成的任務數
          completedTaskCount += w.completedTasks;
          // 從workers中移除,也就表示着從線程池中移除了一個工作線程
          workers.remove(w);
      } finally {
          mainLock.unlock();
      }

      // 根據線程池狀態進行判斷是否結束線程池
      tryTerminate();

      int c = ctl.get();

      /**
       * 當線程池是RUNNING或SHUTDOWN狀態時,如果worker是異常結束,那么會直接addWorker;
       * 如果allowCoreThreadTimeOut=true,並且等待隊列有任務,至少保留一個worker;
       * 如果allowCoreThreadTimeOut=false,workerCount不少於corePoolSize。
       */

      if (runStateLessThan(c, STOP)) {
          if (!completedAbruptly) {
              int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
              if (min == 0 && ! workQueue.isEmpty())
                  min = 1;
              if (workerCountOf(c) >= min)
                  return// replacement not needed
          }
          addWorker(nullfalse);
      }
  }

至此,processWorkerExit執行完之后,工作線程被銷毀,以上就是整個工作線程的生命周期。

execute方法開始,Worker使用ThreadFactory創建新的工作線程,runWorker通過getTask獲取任務,然后執行任務,如果getTask返回null,進入processWorkerExit方法,整個線程結束,如圖所示:

在這里插入圖片描述
在這里插入圖片描述
10. tryTerminate方法

tryTerminate方法根據線程池狀態進行判斷是否結束線程池,代碼如下:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        /**
         * 當前線程池的狀態為以下幾種情況時,直接返回:
         * 1. RUNNING,因為還在運行中,不能停止;
         * 2. TIDYING或TERMINATED,因為線程池中已經沒有正在運行的線程了;
         * 3. SHUTDOWN並且等待隊列非空,這時要執行完workQueue中的task;
         */

        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 如果線程數量不為0,則中斷一個空閑的工作線程,並返回
        if (workerCountOf(c) != 0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        // 這里嘗試設置狀態為TIDYING,如果設置成功,則調用terminated方法
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    // 設置狀態為TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
11:shutdown()

shutdown方法要將線程池切換到SHUTDOWN狀態,並調用 interruptIdleWorkers方法請求中斷所有空閑的worker,最后調用tryTerminate嘗試結束線程池。

public void shutdown({
   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
       // 安全策略判斷
       checkShutdownAccess();
       // 切換狀態為SHUTDOWN
       advanceRunState(SHUTDOWN);
       // 中斷空閑線程
       interruptIdleWorkers();
       onShutdown(); // hook for ScheduledThreadPoolExecutor
   } finally {
       mainLock.unlock();
   }
   // 嘗試結束線程池
   tryTerminate();
}

這里思考一個問題:在runWorker方法中,執行任務時對Worker對象w進行了lock操作,為什么要在執行任務的時候對每個工作線程都加鎖呢?

下面仔細分析一下:

  1. getTask方法中,如果這時線程池的狀態是SHUTDOWN並且workQueue為空,那么就應該返回null來結束這個工作線程,而使線程池進入SHUTDOWN狀態需要調用shutdown方法;

  2. shutdown方法會調用interruptIdleWorkers來中斷空閑的線程,interruptIdleWorkers持有mainLock,會遍歷workers來逐個判斷工作線程是否空閑。但getTask方法中沒有mainLock

  3. getTask中,如果判斷當前線程池狀態是RUNNING,並且阻塞隊列為空,那么會調用workQueue.take()進行阻塞;

  4. 如果在判斷當前線程池狀態是RUNNING后,這時調用了shutdown方法把狀態改為了SHUTDOWN,這時如果不進行中斷,那么當前的工作線程在調用了workQueue.take()后會一直阻塞而不會被銷毀,因為在SHUTDOWN狀態下不允許再有新的任務添加到workQueue中,這樣一來線程池永遠都關閉不了了;

  5. 由上可知,shutdown方法與getTask方法(從隊列中獲取任務時)存在競態條件;

  6. 解決這一問題就需要用到線程的中斷,也就是為什么要用interruptIdleWorkers方法。在調用workQueue.take()時,如果發現當前線程在執行之前或者執行期間是中斷狀態,則會拋出InterruptedException,解除阻塞的狀態;

  7. 但是要中斷工作線程,還要判斷工作線程是否是空閑的,如果工作線程正在處理任務,就不應該發生中斷;

  8. 所以Worker繼承自AQS,在工作線程處理任務時會進行lock,interruptIdleWorkers在進行中斷時會使用tryLock來判斷該工作線程是否正在處理任務,如果tryLock返回true,說明該工作線程當前未執行任務,這時才可以被中斷。

11 interruptIdleWorkers方法

interruptIdleWorkers遍歷workers中所有的工作線程,若線程沒有被中斷tryLock成功,就中斷該線程。

為什么需要持有mainLock?因為workers是HashSet類型的,不能保證線程安全。

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
12 hutdownNow方法

shutdownNow方法與shutdown方法類似,不同的地方在於:

  1. 設置狀態為STOP;
  2. 中斷所有工作線程,無論是否是空閑的;
  3. 取出阻塞隊列中沒有被執行的任務並返回。

shutdownNow方法執行完之后調用tryTerminate方法,該方法在上文已經分析過了,目的就是使線程池的狀態設置為TERMINATED。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        // 中斷所有工作線程,無論是否空閑
        interruptWorkers();
        // 取出隊列中沒有被執行的任務
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

3. 線程池的監控

通過線程池提供的參數進行監控。線程池里有一些屬性在監控線程池的時候可以使用

  1. getTaskCount:線程池已經執行的和未執行的任務總數;
  2. getCompletedTaskCount:線程池已完成的任務數量,該值小於等於taskCount;
  3. getLargestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過,也就是達到了maximumPoolSize;
  4. getPoolSize:線程池當前的線程數量;
  5. getActiveCount:當前線程池中正在執行任務的線程數量。

通過這些方法,可以對線程池進行監控,在ThreadPoolExecutor類中提供了幾個空方法,如
beforeExecute方法,afterExecute方法和terminated方法,可以擴展這些方法在執行前或執行后增加一些新的操作,例如統計線程池的執行任務的時間等,可以繼承自ThreadPoolExecutor來進行擴展。

4. 總結

本文比較詳細的分析了線程池的工作流程,總體來說有如下幾個內容:

  1. 分析了線程的創建,任務的提交,狀態的轉換以及線程池的關閉;
  2. 這里通過execute方法來展開線程池的工作流程,execute方法通過corePoolSize,maximumPoolSize以及阻塞隊列的大小來判斷決定傳入的任務應該被立即執行,還是應該添加到阻塞隊列中,還是應該拒絕任務。
  3. 介紹了線程池關閉時的過程,也分析了shutdown方法與getTask方法存在競態條件;
    在獲取任務時,要通過線程池的狀態來判斷應該結束工作線程還是阻塞線程等待新的任務,也5. 解釋了為什么關閉線程池時要中斷工作線程以及為什么每一個worker都需要lock。
    在向線程池提交任務時,除了execute方法,還有一個submit方法,submit方法會返回一個

Future對象用於獲取返回值,有關Future和Callable請自行了解一下相關的文章,這里就不介紹了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM