Java並發包中線程池ThreadPoolExecutor原理探究


一、線程池簡介

  線程池的使用主要是解決兩個問題:①當執行大量異步任務的時候線程池能夠提供更好的性能,在不使用線程池時候,每當需要執行異步任務的時候直接new一個線程來運行的話,線程的創建和銷毀都是需要開銷的。而線程池中的線程是可復用的,不需要每次執行異步任務的時候重新創建和銷毀線程;②線程池提供一種資源限制和管理的手段,比如可以限制線程的個數,動態的新增線程等等。

  在下面的分析中,我們可以看到,線程池使用一個Integer的原子類型變量來記錄線程池狀態和線程池中的線程數量,通過線程池狀態來控制任務的執行,每個工作線程Worker線程可以處理多個任務。

二、ThreadPoolExecutor類

1、我們先簡單看一下關於ThreadPoolExecutor的一些成員變量以及其所表示的含義

   ThreadPoolExecutor繼承了AbstractExecutorService,其中的成員變量ctl是一個Integer類型的原子變量,用來記錄線程池的狀態和線程池中的線程的個數,類似於前面講到的讀寫鎖中使用一個變量保存兩種信息。這里(Integer看做32位)ctl高三位表示線程池的狀態,后面的29位表示線程池中的線程個數。如下所示是ThreadPoolExecutor源碼中的成員變量

 1 //(高3位)表示線程池狀態,(低29位)表示線程池中線程的個數;
 2 // 默認狀態是RUNNING,線程池中線程個數為0
 3 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 4 
 5 //表示具體平台下Integer的二進制位數-3后的剩余位數表示的數才是線程的個數;
 6 //其中Integer.SIZE=32,-3之后的低29位表示的就是線程的個數了
 7 private static final int COUNT_BITS = Integer.SIZE - 3;
 8 
 9 //線程最大個數(低29位)00011111111111111111111111111111(1<<29-1)
10 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
11 
12 //線程池狀態(高3位表示線程池狀態)
13 //111 00000000000000000000000000000
14 private static final int RUNNING    = -1 << COUNT_BITS;
15 
16 //000 00000000000000000000000000000
17 private static final int SHUTDOWN   =  0 << COUNT_BITS;
18 
19 //001 00000000000000000000000000000
20 private static final int STOP       =  1 << COUNT_BITS;
21 
22 //010 00000000000000000000000000000
23 private static final int TIDYING    =  2 << COUNT_BITS;
24 
25 //011 00000000000000000000000000000
26 private static final int TERMINATED =  3 << COUNT_BITS;
27 
28 //獲取高3位(運行狀態)==> c & 11100000000000000000000000000000
29 private static int runStateOf(int c)     { return c & ~CAPACITY; }
30 
31 //獲取低29位(線程個數)==> c &  00011111111111111111111111111111
32 private static int workerCountOf(int c)  { return c & CAPACITY; }
33 
34 //計算原子變量ctl新值(運行狀態和線程個數)
35 private static int ctlOf(int rs, int wc) { return rs | wc; }

  下面我們簡單解釋一下上面的線程狀態的含義:

  ①RUNNING:接受新任務並處理阻塞隊列中的任務

  ②SHUTDOWN:拒絕新任務但是處理阻塞隊列中的任務

  ③STOP:拒絕新任務並拋棄阻塞隊列中的任務,同時會中斷當前正在執行的任務

  ④TIDYING:所有任務執行完之后(包含阻塞隊列中的任務)當前線程池中活躍的線程數量為0,將要調用terminated方法

  ⑥TERMINATED:終止狀態。terminated方法調用之后的狀態

2、下面初步了解一下ThreadPoolExecutor的參數以及實現原理

  ①corePoolSize:線程池核心現車個數

  ②workQueue:用於保存等待任務執行的任務的阻塞隊列(比如基於數組的有界阻塞隊列ArrayBlockingQueue、基於鏈表的無界阻塞隊列LinkedBlockingQueue等等)

  ③maximumPoolSize:線程池最大線程數量

  ④ThreadFactory:創建線程的工廠

  ⑤RejectedExecutionHandler:拒絕策略,表示當隊列已滿並且線程數量達到線程池最大線程數量的時候對新提交的任務所采取的策略,主要有四種策略:AbortPolicy(拋出異常)、CallerRunsPolicy(只用調用者所在線程來運行該任務)、DiscardOldestPolicy(丟掉阻塞隊列中最近的一個任務來處理當前提交的任務)、DiscardPolicy(不做處理,直接丟棄掉)

  ⑥keepAliveTime:存活時間,如果當前線程池中的數量比核心線程數量多,並且當前線程是閑置狀態,該變量就是這些線程的最大生存時間

  ⑦TimeUnit:存活時間的時間單位。

  根據上面的參數介紹,簡單了解一下線程池的實現原理,以提交一個新任務為開始點,分析線程池的主要處理流程

3、關於一些線程池的使用類型

  ①newFixedThreadPool:創建一個核心線程個數和最大線程個數均為nThreads的線程池,並且阻塞隊列長度為Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數比核心線程個數多並且當前空閑即回收。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

  ②newSingleThreadExecutor:創建一個核心線程個數和最大線程個數都為1 的線程池,並且阻塞隊列長度為Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數比核心線程個數多並且當前線程空閑即回收該線程。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

   ③newCachedThreadPoolExecutor:創建一個按需創建線程的線程池,初始線程個數為0,最多線程個數為Integer.MAX_VALUE,並且阻塞隊列為同步隊列(最多只有一個元素),keepAliveTime=60說明只要當前線程在60s內空閑則回收。這個類型的線程池的特點就是:加入同步隊列的任務會被馬上執行,同步隊列中最多只有一個任務

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

   4、ThreadPoolExecutor中的其他成員

  其中的ReentrantLock可參考前面寫到的Java中的鎖——Lock和synchronized,其中降到了ReentrantLock的具體實現原理;

  關於AQS部分可參考前面說到的Java中的隊列同步器AQS,也講到了關於AQS的具體實現原理分析;

  關於條件隊列的相關知識可參考前面寫的Java中的線程協作之Condition,里面說到了關於Java中線程協作Condition的實現原理;

//獨占鎖,用來控制新增工作線程Worker操作的原子性
private final ReentrantLock mainLock = new ReentrantLock();

//工作線程集合,Worker繼承了AQS接口和Runnable接口,是具體處理任務的線程對象
//Worker實現AQS,並自己實現了簡單不可重入獨占鎖,其中state=0表示當前鎖未被獲取狀態,state=1表示鎖被獲取,
//state=-1表示Work創建時候的默認狀態,創建時候設置state=-1是為了防止runWorker方法運行前被中斷
private final HashSet<Worker> workers = new HashSet<Worker>();

//termination是該鎖對應的條件隊列,在線程調用awaitTermination時候用來存放阻塞的線程
private final Condition termination = mainLock.newCondition();

三、execute(Runnable command)方法實現

  executor方法的作用是提交任務command到線程池執行,可以簡單的按照下面的圖進行理解,ThreadPoolExecutor的實現類似於一個生產者消費者模型,當用戶添加任務到線程池中相當於生產者生產元素,workers工作線程則直接執行任務或者從任務隊列中獲取任務,相當於消費之消費元素。

 1 public void execute(Runnable command) {
 2     //(1)首先檢查任務是否為null,為null拋出異常,否則進行下面的步驟
 3     if (command == null)
 4         throw new NullPointerException();
 5     //(2)ctl值中包含了當前線程池的狀態和線程池中的線程數量
 6     int c = ctl.get();
 7     //(3)workerCountOf方法是獲取低29位,即獲取當前線程池中的線程個數,如果小於corePoolSize,就開啟新的線程運行
 8     if (workerCountOf(c) < corePoolSize) {
 9         if (addWorker(command, true))
10             return;
11         c = ctl.get();
12     }
13     //(4)如果線程池處理RUNNING狀態,就添加任務到阻塞隊列中
14     if (isRunning(c) && workQueue.offer(command)) {
15         //(4-1)二次檢查,獲取ctl值
16         int recheck = ctl.get();
17         //(4-2)如果當前線程池不是出於RUNNING狀態,就從隊列中刪除任務,並執行拒絕策略
18         if (! isRunning(recheck) && remove(command))
19             reject(command);
20         //(4-3)否則,如果線程池為空,就添加一個線程
21         else if (workerCountOf(recheck) == 0)
22             addWorker(null, false);
23     }
24     //(5)如果隊列滿,則新增線程,如果新增線程失敗,就執行拒絕策略
25     else if (!addWorker(command, false))
26         reject(command);
27 }

  我們在看一下上面代碼的執行流程,按照標記的數字進行分析:

  ①步驟(3)判斷當前線程池中的線程個數是否小於corePoolSize,如果小於核心線程數,會向workers里面新增一個核心線程執行任務。

  ②如果當前線程池中的線程數量大於核心線程數,就執行(4)。(4)首先判斷當前線程池是否處於RUNNING狀態,如果處於該狀態,就添加任務到任務隊列中,這里需要判斷線程池的狀態是因為線程池可能已經處於非RUNNING狀態,而在非RUNNING狀態下是需要拋棄新任務的。

  ③如果想任務隊列中添加任務成功,需要進行二次校驗,因為在添加任務到任務隊列后,可能線程池的狀態發生了變化,所以這里需要進行二次校驗,如果當前線程池已經不是RUNNING狀態了,需要將任務從任務隊列中移除,然后執行拒絕策略;如果二次校驗通過,則執行4-3代碼重新判斷當前線程池是否為空,如果線程池為空沒有線程,那么就需要新創建一個線程。

  ④如果上面的步驟(4)創建添加任務失敗,說明隊列已滿,那么(5)會嘗試再開啟新的線程執行任務(類比上圖中的thread3和thread4,即不是核心線程的那些線程),如果當前線程池中的線程個數已經大於最大線程數maximumPoolSize,表示不能開啟新的線程。這就屬於線程池滿並且任務隊列滿,就需要執行拒絕策略了。

  下面我們在看看addWorker方法的實現

 1 private boolean addWorker(Runnable firstTask, boolean core) {
 2     retry:
 3     for (;;) {
 4         int c = ctl.get();
 5         int rs = runStateOf(c);
 6 
 7         //(6)檢查隊列是否只在必要時候為空
 8         if (rs >= SHUTDOWN &&
 9             ! (rs == SHUTDOWN &&
10                firstTask == null &&
11                ! workQueue.isEmpty()))
12             return false;
13 
14         //(7)使用CAS增加線程個數
15         for (;;) {
16             //根據ctl值獲得當前線程池中的線程數量
17             int wc = workerCountOf(c);
18             //(7-1)如果線程數量超出限制,返回false
19             if (wc >= CAPACITY ||
20                 wc >= (core ? corePoolSize : maximumPoolSize))
21                 return false;
22             //(7-2)CAS增加線程數量,同時只有一個線程可以成功
23             if (compareAndIncrementWorkerCount(c))
24                 break retry;
25             c = ctl.get();  // 重新讀取ctl值
26             //(7-3)CAS失敗了,需要查看當前線程池狀態是否發生變化,如果發生變化需要跳轉到外層循環嘗試重新獲取線程池狀態,否則內層循環重新進行CAS增加線程數量
27             if (runStateOf(c) != rs)
28                 continue retry;
29         }
30     }
31 
32     //(8)執行到這里說明CAS增加新線程個數成功了,我們需要開始創建新的工作線程Worker
33     boolean workerStarted = false;
34     boolean workerAdded = false;
35     Worker w = null;
36     try {
37         //(8-1)創建新的worker
38         w = new Worker(firstTask);
39         final Thread t = w.thread;
40         if (t != null) {
41             final ReentrantLock mainLock = this.mainLock;
42             //(8-2)加獨占鎖,保證workers的同步,可能線程池中的多個線程調用了線程池的execute方法
43             mainLock.lock();
44             try {
45                 // (8-3)重新檢查線程池狀態,以免在獲取鎖之前調用shutdown方法改變線程池狀態
46                 int rs = runStateOf(ctl.get());
47 
48                 if (rs < SHUTDOWN ||
49                     (rs == SHUTDOWN && firstTask == null)) {
50                     if (t.isAlive()) // precheck that t is startable
51                         throw new IllegalThreadStateException();
52                     //(8-4)添加新任務
53                     workers.add(w);
54                     int s = workers.size();
55                     if (s > largestPoolSize)
56                         largestPoolSize = s;
57                     workerAdded = true;
58                 }
59             } finally {
60                 mainLock.unlock();
61             }
62             //(8-6)添加新任務成功之后,啟動任務
63             if (workerAdded) {
64                 t.start();
65                 workerStarted = true;
66             }
67         }
68     } finally {
69         if (! workerStarted)
70             addWorkerFailed(w);
71     }
72     return workerStarted;
73 }

  簡單再分析說明一下上面的代碼,addWorker方法主要分為兩部分,第一部分是使用CAS線程安全的添加線程數量,第二部分則是創建新的線程並且將任務並發安全的添加到新的workers之中,然后啟動線程執行。

  ①代碼(6)中檢查隊列是否只在必要時候為空,只有線程池狀態符合條件才能夠進行下面的步驟,從(6)中的判斷條件來看,下面的集中情況addWorker會直接返回false

  ( I )當前線程池狀態為STOP,TIDYING或者TERMINATED ; (I I)當前線程池狀態為SHUTDOWN並且已經有了第一個任務;   (I I I)當前線程池狀態為SHUTDOWN並且任務隊列為空

  ②外層循環中判斷條件通過之后,在內層循環中使用CAS增加線程數,當CAS成功就退出雙重循環進行(8)步驟代碼的執行,如果失敗需要查看當前線程池的狀態是否發生變化,如果發生變化需要進行外層循環重新判斷線程池狀態然后在進入內層循環重新進行CAS增加線程數,如果線程池狀態沒有發生變化但是上一次CAS失敗就繼續進行CAS嘗試。

  ③執行到(8)代碼處,表明當前已經成功增加 了線程數,但是還沒有線程執行任務。ThreadPoolExecutor中使用全局獨占鎖mainLock來控制將新增的工作線程Worker線程安全的添加到工作者線程集合workers中。

  ④(8-2)獲取了獨占鎖,但是在獲取到鎖之后,還需要進行重新檢查線程池的狀態,這是為了避免在獲取全局獨占鎖之前其他線程調用了shutDown方法關閉了線程池。如果線程池已經關閉需要釋放鎖。否則將新增的線程添加到工作集合中,釋放鎖啟動線程執行任務。

  上面的addWorker方法最后幾行中,會判斷添加工作線程是否成功,如果失敗,會執行addWorkerFailed方法,將任務從workers中移除,並且workerCount做-1操作。

 1 private void addWorkerFailed(Worker w) {
 2     final ReentrantLock mainLock = this.mainLock;
 3     //獲取鎖
 4     mainLock.lock();
 5     try {
 6       //如果worker不為null
 7       if (w != null)
 8           //workers移除worker
 9           workers.remove(w);
10       //通過CAS操作,workerCount-1
11       decrementWorkerCount();
12       tryTerminate();
13     } finally {
14       //釋放鎖
15       mainLock.unlock();
16     }
17 }

四、工作線程Worker的執行

(1)工作線程Worker類源碼分析

  上面查看addWorker方法在CAS更新線程數成功之后,下面就是創建新的Worker線程執行任務,所以我們這里先查看Worker類,下面是Worker類的源碼,我們可以看出,Worker類繼承了AQS並實現了Runnable接口,所以他既是一個自定義的同步組件,也是一個執行任務的線程類。下面我們分析Worker類的執行

 1 private final class Worker
 2     extends AbstractQueuedSynchronizer
 3     implements Runnable
 4 {
 5 
 6     /** 使用線程工廠創建的線程,執行任務 */
 7     final Thread thread;
 8     /** 初始化執行任務 */
 9     Runnable firstTask;
10     /** 計數 */
11     volatile long completedTasks;
12 
13     /**
14      * 給出初始firstTask,線程創建工廠創建新的線程
15      */
16     Worker(Runnable firstTask) {
17         setState(-1); // 防止在調用runWorker之前被中斷
18         this.firstTask = firstTask;
19         this.thread = getThreadFactory().newThread(this); //使用threadFactory創建線程
20     }
21 
22     /** run方法實際上執行的是runWorker方法  */
23     public void run() {
24         runWorker(this);
25     }
26 
27     // 關於同步狀態(鎖)
28     //
29     // 同步狀態state=0表示鎖未被獲取
30     // 同步狀態state=1表示鎖被獲取
31 
32     protected boolean isHeldExclusively() {
33         return getState() != 0;
34     }
35 
36     //下面都是重寫AQS的方法,Worker為自定義的同步組件
37     protected boolean tryAcquire(int unused) {
38         if (compareAndSetState(0, 1)) {
39             setExclusiveOwnerThread(Thread.currentThread());
40             return true;
41         }
42         return false;
43     }
44 
45     protected boolean tryRelease(int unused) {
46         setExclusiveOwnerThread(null);
47         setState(0);
48         return true;
49     }
50 
51     public void lock()        { acquire(1); }
52     public boolean tryLock()  { return tryAcquire(1); }
53     public void unlock()      { release(1); }
54     public boolean isLocked() { return isHeldExclusively(); }
55 
56     void interruptIfStarted() {
57         Thread t;
58         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
59             try {
60                 t.interrupt();
61             } catch (SecurityException ignore) {
62             }
63         }
64     }
65 }

  在構造函數中我們可以看出,首先將同步狀態state置為-1,而Worker這個同步組件的state有三個值,其中state=-1表示Work創建時候的默認狀態,創建時候設置state=-1是為了防止runWorker方法運行前被中斷 前面說到過這個結論,這里置為-1是為了避免當前Worker在調用runWorker方法之前被中斷(當其他線程調用線程池的shutDownNow時候,如果Worker的state>=0則會中斷線程),設置為-1就不會被中斷了。而Worker實現Runnable接口,那么需要重寫run方法,在run方法中,我們可以看到,實際上執行的是runWorker方法,在runWorker方法中,會首先調用unlock方法,該方法會將state置為0,所以這個時候調用shutDownNow方法就會中斷當前線程,而這個時候已經進入了runWork方法了,就不會在還沒有執行runWorker方法的時候就中斷線程。

(2)runWorker方法的源碼分析

 1 final void runWorker(Worker w) {
 2     Thread wt = Thread.currentThread();
 3     Runnable task = w.firstTask;
 4     w.firstTask = null;
 5     w.unlock(); // 這個時候調用unlock方法,將state置為0,就可以被中斷了
 6     boolean completedAbruptly = true;
 7     try {
 8         //(10)如果當前任務為null,或者從任務隊列中獲取到的任務為null,就跳轉到(11)處執行清理工作
 9         while (task != null || (task = getTask()) != null) {
10             //task不為null,就需要線程執行任務,這個時候,需要獲取工作線程內部持有的獨占鎖
11             w.lock();
12             /**如果線程池已被停止(STOP)(至少大於STOP狀態),要確保線程都被中斷
13              * 如果狀態不對,檢查當前線程是否中斷並清除中斷狀態,並且再次檢查線程池狀態是否大於STOP
14              * 如果上述滿足,檢查該對象是否處於中斷狀態,不清除中斷標記
15              */
16             if ((runStateAtLeast(ctl.get(), STOP) ||
17                  (Thread.interrupted() &&
18                   runStateAtLeast(ctl.get(), STOP))) &&
19                 !wt.isInterrupted())
20                 //中斷該對象
21                 wt.interrupt();
22             try {
23                 //執行任務之前要做的事情
24                 beforeExecute(wt, task);
25                 Throwable thrown = null;
26                 try {
27                     task.run(); //執行任務
28                 } catch (RuntimeException x) {
29                     thrown = x; throw x;
30                 } catch (Error x) {
31                     thrown = x; throw x;
32                 } catch (Throwable x) {
33                     thrown = x; throw new Error(x);
34                 } finally {
35                     //執行任務之后的方法
36                     afterExecute(task, thrown);
37                 }
38             } finally {
39                 task = null;
40                 //更新當前已完成任務數量
41                 w.completedTasks++;
42                 //釋放鎖
43                 w.unlock();
44             }
45         }
46         completedAbruptly = false;
47     } finally {
48         //執行清理工作:處理並退出當前worker
49         processWorkerExit(w, completedAbruptly);
50     }
51 }

  我們梳理一下runWorker方法的執行流程

  ①首先先執行unlock方法,將Worker的state置為0,這樣工作線程就可以被中斷了(后續的操作如果線程池關閉就需要線程被中斷)

  ②首先判斷判斷當前的任務(當前工作線程中的task,或者從任務隊列中取出的task)是否為null,如果不為null就往下執行,為null就執行processWorkerExit方法。

  ③獲取工作線程內部持有的獨占鎖(避免在執行任務期間,其他線程調用shutdown后正在執行的任務被中斷,shutdown只會中斷當前被阻塞掛起的沒有執行任務的線程)

  ④然后執行beforeExecute()方法,該方法為擴展接口代碼,表示在具體執行任務之前所做的一些事情,然后執行task.run()方法執行具體任務,執行完之后會調用afterExecute()方法,用以處理任務執行完畢之后的工作,也是一個擴展接口代碼。

  ⑤更新當前線程池完成的任務數,並釋放鎖

(3)執行清理工作的方法processWorkerExit

  下面是方法processWorkerExit的源碼,在下面的代碼中

  ①首先(1-1)處統計線程池完成的任務個數,並且在此之前獲取全局鎖,然后更新當前的全局計數器,然后從工作線程集合中移除當前工作線程,完成清理工作。

  ②代碼(1-2)調用了tryTerminate 方法,在該方法中,判斷了當前線程池狀態是SHUTDOWN並且隊列不為空或者當前線程池狀態為STOP並且當前線程池中沒有活動線程,則置線程池狀態為TERMINATED。如果設置稱為了TERMINATED狀態,還需要調用全局條件變量termination的signalAll方法喚醒所有因為調用線程池的awaitTermination方法而被阻塞住的線程,使得線程池中的所有線程都停止,從而使得線程池為TERMINATED狀態。

  ③代碼(1-3)處判斷當前線程池中的線程個數是否小於核心線程數,如果是,需要新增一個線程保證有足夠的線程可以執行任務隊列中的任務或者提交的任務。

 1 private void processWorkerExit(Worker w, boolean completedAbruptly) {
 2     /*
 3     *completedAbruptly:是由runWorker傳過來的參數,表示是否突然完成的意思
 4     *當在就是在執行任務過程當中出現異常,就會突然完成,傳true
 5     *
 6     *如果是突然完成,需要通過CAS操作,更新workerCount(-1操作)
 7     *不是突然完成,則不需要-1,因為getTask方法當中已經-1(getTask方法中執行了decrementWorkerCount()方法)
 8     */
 9     if (completedAbruptly)
10         decrementWorkerCount();
11     //(1-1)在統計完成任務個數之前加上全局鎖,然后統計線程池中完成的任務個數並更新全局計數器,並從工作集中刪除當前worker
12     final ReentrantLock mainLock = this.mainLock;
13     mainLock.lock(); //獲得全局鎖
14     try {
15         completedTaskCount += w.completedTasks; //更新已完成的任務數量
16         workers.remove(w); //將完成該任務的線程worker從工作線程集合中移除
17     } finally {
18         mainLock.unlock(); //釋放鎖
19     }
20     /**(1-2)
21      * 這一個方法調用完成了下面的事情:
22      * 判斷如果當前線程池狀態是SHUTDOWN並且工作隊列為空,
23      * 或者當前線程池狀態是STOP並且當前線程池里面沒有活動線程,
24      * 則設置當前線程池狀態為TERMINATED,如果設置成了TERMINATED狀態,
25      * 還需要調用條件變量termination的signAll方法激活所有因為調用線程池的awaitTermination方法而被阻塞的線程
26      */
27     tryTerminate();
28 
29     //(1-3)如果當前線程池中線程數小於核心線程,則增加核心線程數
30     int c = ctl.get();
31     //判斷當前線程池的狀態是否小於STOP(RUNNING或者SHUTDOWN)
32     if (runStateLessThan(c, STOP)) {
33         //如果任務忽然完成,執行后續的代碼
34         if (!completedAbruptly) {
35             //allowCoreThreadTimeOut表示是否允許核心線程超時,默認為false
36             //min這里當默認為allowCoreThreadTimeOut默認為false的時候,min置為coorPoolSize
37             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
38             //這里說明:如果允許核心線程超時,那么allowCoreThreadTimeOut可為true,那么min值為0,不需要維護核心線程了
39             //如果min為0並且任務隊列不為空
40             if (min == 0 && ! workQueue.isEmpty())
41                 min = 1; //這里表示如果min為0,且隊列不為空,那么至少需要一個核心線程存活來保證任務的執行
42             //如果工作線程數大於min,表示當前線程數滿足,直接返回
43             if (workerCountOf(c) >= min)
44                 return; // replacement not needed
45         }
46         addWorker(null, false);
47     }
48 }

   在tryTerminate 方法中,我們簡單說明了該方法的作用,下面是該方法的源碼,可以看出源碼實現上和上面所總結的功能是差不多的

 1 final void tryTerminate() {
 2     for (;;) {
 3         //獲取線程池狀態
 4         int c = ctl.get();
 5         //如果線程池狀態為RUNNING
 6         //或者狀態大於TIDYING
 7         //或者狀態==SHUTDOWN並未任務隊列不為空
 8         //直接返回,不能調用terminated方法
 9         if (isRunning(c) ||
10             runStateAtLeast(c, TIDYING) ||
11             (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
12             return;
13         //如果線程池中工作線程數不為0,需要中斷線程
14         if (workerCountOf(c) != 0) { // Eligible to terminate
15             interruptIdleWorkers(ONLY_ONE);
16             return;
17         }
18         //獲得線程池的全局鎖
19         final ReentrantLock mainLock = this.mainLock;
20         mainLock.lock();
21         try {
22             //通過CAS操作,將線程池狀態設置為TIDYING
23             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //private static int ctlOf(int rs, int wc) { return rs | wc; }
24                 try {
25                     //調用terminated方法
26                     terminated();
27                 } finally {
28                     //最終將線程狀態設置為TERMINATED
29                     ctl.set(ctlOf(TERMINATED, 0));
30                     //調用條件變量termination的signaAll方法喚醒所有因為
31                     //調用線程池的awaitTermination方法而被阻塞的線程
32                     //private final Condition termination = mainLock.newCondition();
33                     termination.signalAll();
34                 }
35                 return;
36             }
37         } finally {
38             mainLock.unlock();
39         }
40         // else retry on failed CAS
41     }
42 }

 五、補充(shutdown、shutdownNow、awaitTermination方法)

(1)shutdown操作

  我們在使用線程池的時候知道,調用shutdown方法之后線程池就不會再接受新的任務了,但是任務隊列中的任務還是需要執行完的。調用該方法會立刻返回,並不是等到線程池的任務隊列中的所有任務執行完畢在返回的。

 1 public void shutdown() {
 2     //獲得線程池的全局鎖
 3     final ReentrantLock mainLock = this.mainLock;
 4     mainLock.lock();
 5     try {
 6         //進行權限檢查
 7         checkShutdownAccess();
 8         
 9         //設置當前線程池的狀態的SHUTDOWN,如果線程池狀態已經是該狀態就會直接返回,下面我們會分析這個方法的源碼
10         advanceRunState(SHUTDOWN);
11         
12         //設置中斷 標志
13         interruptIdleWorkers();
14         onShutdown(); // hook for ScheduledThreadPoolExecutor
15     } finally {
16         mainLock.unlock();
17     }
18     //嘗試將狀態變為TERMINATED,上面已經分析過該方法的源碼
19     tryTerminate();
20 }

  該方法的源碼比較簡短,首先檢查了安全管理器,是查看當前調用shutdown命令的線程是否有關閉線程的權限,如果有權限還需要看調用線程是否有中斷工作線程的權限,如果沒有權限將會拋出SecurityException異常或者空指針異常。下面我們查看一下advanceRunState   方法的源碼。

 1 private void advanceRunState(int targetState) {
 2     for (;;) {
 3         //下面的方法執行的就是:
 4         //首先獲取線程的ctl值,然后判斷當前線程池的狀態如果已經是SHUTDOWN,那么if條件第一個為真就直接返回
 5         //如果不是SHUTDOWN狀態,就需要CAS的設置當前狀態為SHUTDOWN
 6         int c = ctl.get();
 7         if (runStateAtLeast(c, targetState) ||
 8             //private static int ctlOf(int rs, int wc) { return rs | wc; }
 9             ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
10             break;
11     }
12 }

  我們可以看出advanceRunState   方法實際上就是判斷當前線程池的狀態是否為SHUTDWON,如果是那么就返回,否則就需要設置當前狀態為SHUTDOWN。

  我們再來看看shutdown方法中調用線程中斷的方法interruptIdleWorkers源碼

 1 private void interruptIdleWorkers() {
 2     interruptIdleWorkers(false);
 3 }
 4 private void interruptIdleWorkers(boolean onlyOne) {
 5     final ReentrantLock mainLock = this.mainLock;
 6     mainLock.lock();
 7     try {
 8         for (Worker w : workers) {
 9             Thread t = w.thread;
10             //如果工作線程沒有被中斷,並且沒有正在運行設置中斷標志
11             if (!t.isInterrupted() && w.tryLock()) {
12                 try {
13                     //需要中斷當前線程
14                     t.interrupt();
15                 } catch (SecurityException ignore) {
16                 } finally {
17                     w.unlock();
18                 }
19             }
20             if (onlyOne)
21                 break;
22         }
23     } finally {
24         mainLock.unlock();
25     }
26 }

  上面的代碼中,需要設置所有空閑線程的中斷標志。首先獲取線程池的全局鎖,同時只有一個線程可以調用shutdown方法設置中斷標志。然后嘗試獲取工作線程Worker自己的鎖,獲取成功則可以設置中斷標志(這是由於正在執行任務的線程需要獲取自己的鎖,並且不可重入,所以正在執行的任務沒有被中斷),這里要中斷的那些線程是阻塞到getTask()方法並嘗試從任務隊列中獲取任務的線程即空閑線程。

(2)shutdownNow操作

   在使用線程池的時候,如果我們調用了shutdownNow方法,線程池不僅不會再接受新的任務,還會將任務隊列中的任務丟棄,正在執行的任務也會被中斷,然后立刻返回該方法,不會等待激活的任務完成,返回值為當前任務隊列中被丟棄的任務列表

 1 public List<Runnable> shutdownNow() {
 2     List<Runnable> tasks;
 3     final ReentrantLock mainLock = this.mainLock;
 4     mainLock.lock();
 5     try {
 6         checkShutdownAccess(); //還是進行權限檢查
 7         advanceRunState(STOP); //設置線程池狀態台STOP
 8         interruptWorkers(); //中斷所有線程
 9         tasks = drainQueue(); //將任務隊列中的任務移動到task中
10     } finally {
11         mainLock.unlock();
12     }
13     tryTerminate();
14     return tasks; //返回tasks
15 }

   從上面的代碼中,我們可以可以發現,shutdownNow方法也是首先需要檢查調用該方法的線程的權限,之后不同於shutdown方法之處在於需要即刻設置當前線程池狀態為STOP,然后中斷所有線程(空閑線程+正在執行任務的線程),移除任務隊列中的任務

 1 private void interruptWorkers() {
 2     final ReentrantLock mainLock = this.mainLock;
 3     mainLock.lock();
 4     try {
 5         for (Worker w : workers) //不需要判斷當前線程是否在執行任務(即不需要調用w.tryLock方法),中斷所有線程
 6             w.interruptIfStarted();
 7     } finally {
 8         mainLock.unlock();
 9     }
10 }

(3)awaitTermination操作

   當線程調用該方法之后,會阻塞調用者線程,直到線程池狀態為TERMINATED狀態才會返回,或者等到超時時間到之后會返回,下面是該方法的源碼。

 1 //調用該方法之后,會阻塞調用者線程,直到線程池狀態為TERMINATED狀態才會返回,或者等到超時時間到之后會返回
 2 public boolean awaitTermination(long timeout, TimeUnit unit)
 3     throws InterruptedException {
 4     long nanos = unit.toNanos(timeout);
 5     final ReentrantLock mainLock = this.mainLock;
 6     mainLock.lock();
 7     try {
 8         //阻塞當前線程,(獲取了Worker自己的鎖),那么當前線程就不會再執行任務(因為獲取不到鎖)
 9         for (;;) {
10             //當前線程池狀態為TERMINATED狀態,會返回true
11             if (runStateAtLeast(ctl.get(), TERMINATED)) 
12                 return true;
13             //超時時間到返回false
14             if (nanos <= 0) 
15                 return false;
16             nanos = termination.awaitNanos(nanos);
17         }
18     } finally {
19         mainLock.unlock();
20     }
21 }

  在上面的代碼中,調用者線程需要首先獲取線程Worker 自己的獨占鎖,然后在循環判斷當前線程池是否已經是TERMINATED狀態,如果是則直接返回,否則說明當前線程池中還有線程正在執行任務,這時候需要查看當前設置的超時時間是否小於0,小於0說明不需要等待就直接返回,如果大於0就需要調用條件變量termination的awaitNanos方法等待設置的時間,並在這段時間之內等待線程池的狀態變為TERMINATED。

  我們在前面說到清理線程池的方法processWorkerExit的時候,需要調用tryTerminated方法,在該方法中會查看當前線程池狀態是否為TERMINATED,如果是該狀態也會調用termination.signalAll()方法喚醒所有線程池中因調用awaitTermination而被阻塞住的線程。

  如果是設置了超時時間,那么termination的awaitNanos方法也會返回,這時候需要重新檢查線程池狀態是否為TERMINATED,如果是則返回,不是就繼續阻塞自己。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM