Java並發系列[10]----ThreadPoolExecutor源碼分析


在日常的開發調試中,我們經常會直接new一個Thread對象來執行某個任務。這種方式在任務數較少的情況下比較簡單實用,但是在並發量較大的場景中卻有着致命的缺陷。例如在訪問量巨大的網站中,如果每個請求都開啟一個線程來處理的話,即使是再強大的服務器也支撐不住。一台電腦的CPU資源是有限的,在CPU較為空閑的情況下,新增線程可以提高CPU的利用率,達到提升性能的效果。但是在CPU滿載運行的情況下,再繼續增加線程不僅不能提升性能,反而因為線程的競爭加大而導致性能下降,甚至導致服務器宕機。因此,在這種情況下我們可以利用線程池來使線程數保持在合理的范圍內,使得CPU資源被充分的利用,且避免因過載而導致宕機的危險。在Executors中為我們提供了多種靜態工廠方法來創建各種特性的線程池,其中大多數是返回ThreadPoolExecutor對象。因此本篇我們從ThreadPoolExecutor類着手,深入探究線程池的實現機制。

1. 線程池狀態和線程數的表示

 1 //高3位表示線程池狀態, 后29位表示線程個數
 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 3 private static final int COUNT_BITS = Integer.SIZE - 3;
 4 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
 5 
 6 //運行狀態  例:11100000000000000000000000000000
 7 private static final int RUNNING = -1 << COUNT_BITS;
 8 
 9 //關閉狀態  例:00000000000000000000000000000000
10 private static final int SHUTDOWN = 0 << COUNT_BITS;
11 
12 //停止狀態  例:00100000000000000000000000000000
13 private static final int STOP = 1 << COUNT_BITS;
14 
15 //整理狀態  例:01000000000000000000000000000000
16 private static final int TIDYING = 2 << COUNT_BITS;
17 
18 //終止狀態  例:01100000000000000000000000000000
19 private static final int TERMINATED = 3 << COUNT_BITS;
20 
21 private static int runStateOf(int c) { return c & ~CAPACITY; }
22 private static int workerCountOf(int c) { return c & CAPACITY; }
23 private static int ctlOf(int rs, int wc) { return rs | wc; }

在繼續接下來的探究之前,我們先來搞清楚ThreadPoolExecutor是怎樣存放狀態信息和線程數信息的。ThreadPoolExecutor利用原子變量ctl來同時存儲運行狀態和線程數的信息,其中高3位表示線程池的運行狀態(runState),后面的29位表示線程池中的線程數(workerCount)。上面代碼中,runStateOf方法是從ctl取出狀態信息,workerCountOf方法是從ctl取出線程數信息,ctlOf方法是將狀態信息和線程數信息糅合進ctl中。具體的計算過程如下圖所示。

2. 線程池各個狀態的具體含義

就像人的生老病死一樣,線程池也有自己的生命周期,從創建到終止,線程池在每個階段所做的事情是不一樣的。新建一個線程池時它的狀態為Running,這時它不斷的從外部接收並處理任務,當處理不過來時它會把任務放到任務隊列中;之后我們可能會調用shutdown()來終止線程池,這時線程池的狀態從Running轉為Shutdown,它開始拒絕接收從外部傳過來的任務,但是會繼續處理完任務隊列中的任務;我們也可能調用shutdownNow()來立刻停止線程池,這時線程池的狀態從Running轉為Stop,然后它會快速排空任務隊列中的任務並轉到Tidying狀態,處於該狀態的線程池需要執行terminated()來做相關的掃尾工作,執行完terminated()之后線程池就轉為Terminated狀態,表示線程池已終止。這些狀態的轉換圖如下所示。

3. 關鍵成員變量的介紹

 1 //任務隊列
 2 private final BlockingQueue<Runnable> workQueue;
 3 
 4 //工作者集合
 5 private final HashSet<Worker> workers = new HashSet<Worker>();
 6 
 7 //線程達到的最大值
 8 private int largestPoolSize;
 9 
10 //已完成任務總數
11 private long completedTaskCount;
12 
13 //線程工廠
14 private volatile ThreadFactory threadFactory;
15 
16 //拒絕策略
17 private volatile RejectedExecutionHandler handler;
18 
19 //閑置線程存活時間
20 private volatile long keepAliveTime;
21 
22 //是否允許核心線程超時
23 private volatile boolean allowCoreThreadTimeOut;
24 
25 //核心線程數量
26 private volatile int corePoolSize;
27 
28 //最大線程數量
29 private volatile int maximumPoolSize;
30 
31 //默認拒絕策略
32 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

在深入探究線程池的實現機制之前,我們有必要了解一下各個成員變量的作用。上面列出了主要的成員變量,除了一些用於統計的變量,例如largestPoolSize和completedTaskCount,其中大部分變量的值都是可以在構造時進行設置的。下面我們就看一下它的核心構造器。

 1 //核心構造器
 2 public ThreadPoolExecutor(int corePoolSize,
 3                           int maximumPoolSize,
 4                           long keepAliveTime,
 5                           TimeUnit unit,
 6                           BlockingQueue<Runnable> workQueue,
 7                           ThreadFactory threadFactory,
 8                           RejectedExecutionHandler handler) {
 9     if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
10         throw new IllegalArgumentException();
11     }    
12     if (workQueue == null || threadFactory == null || handler == null) {
13         throw new NullPointerException();
14     }
15     this.corePoolSize = corePoolSize;                  //設置核心線程數量
16     this.maximumPoolSize = maximumPoolSize;            //設置最大線程數量
17     this.workQueue = workQueue;                        //設置任務隊列
18     this.keepAliveTime = unit.toNanos(keepAliveTime);  //設置存活時間
19     this.threadFactory = threadFactory;                //設置線程工廠
20     this.handler = handler;                            //設置拒絕策略
21 }

ThreadPoolExecutor有多個構造器,所有的構造器都會調用上面的核心構造器。通過核心構造器我們可以為線程池設置不同的參數,由此線程池也能表現出不同的特性。因此徹底搞懂這幾個參數的含義能使我們更好的使用線程池,下面我們就來詳細看一下這幾個參數的含義。
corePoolSize:
核心線程數最大值,默認情況下新建線程池時並不創建線程,后續每接收一個任務就新建一個核心線程來處理,直到核心線程數達到corePoolSize。這時后面到來的任務都會被放到任務隊列中等待。
maximumPoolSize:
總線程數最大值,當任務隊列被放滿了之后,將會新建非核心線程來處理后面到來的任務。當總的線程數達到maximumPoolSize后,將不再繼續創建線程,而是對后面的任務執行拒絕策略。
workQueue:
任務隊列,當核心線程數達到corePoolSize后,后面到來的任務都會被放到任務隊列中,該任務隊列是阻塞隊列,工作線程可以通過定時或者阻塞方式從任務隊列中獲取任務。
keepAliveTime:
閑置線程存活時間,該參數默認情況下只在線程數大於corePoolSize時起作用,閑置線程在任務隊列上等待keepAliveTime時間后將會被終止,直到線程數減至corePoolSize。也可以通過設置allowCoreThreadTimeOut變量為true來使得keepAliveTime在任何時候都起作用,這時線程數最后會減至0。

4. execute方法的執行過程

 1 //核心執行方法
 2 public void execute(Runnable command) {
 3     if (command == null) throw new NullPointerException();
 4     int c = ctl.get();
 5     //線程數若小於corePoolSize則新建核心工作者
 6     if (workerCountOf(c) < corePoolSize) {
 7         if (addWorker(command, true)) return;
 8         c = ctl.get();
 9     }
10     //否則將任務放到任務隊列
11     if (isRunning(c) && workQueue.offer(command)) {
12         int recheck = ctl.get();
13         //若不是running狀態則將該任務從隊列中移除
14         if (!isRunning(recheck) && remove(command)) {
15             //成功移除后再執行拒絕策略
16           reject(command);
17         //若線程數為0則新增一個非核心線程
18         }else if (workerCountOf(recheck) == 0) {
19           addWorker(null, false);
20         }
21     //若隊列已滿則新增非核心工作者
22     }else if (!addWorker(command, false)) {
23         //若新建非核心線程失敗則執行拒絕策略
24       reject(command);
25     }
26 }

execute方法是線程池接收任務的入口方法,當創建好一個線程池之后,我們會調用execute方法並傳入一個Runnable交給線程池去執行。從上面代碼中可以看到execute方法首先會去判斷當前線程數是否小於corePoolSize,如果小於則調用addWorker方法新建一個核心線程去處理該任務,否則調用workQueue的offer方法將該任務放入到任務隊列中。通過offer方法添加並不會阻塞線程,如果添加成功會返回true,若隊列已滿則返回false。在成功將任務放入到任務隊列后,還會再次檢查線程池是否是Running狀態,如果不是則將剛剛添加的任務從隊列中移除,然后再執行拒絕策略。如果從隊列中移除任務失敗,則再檢查一下線程數是否為0(有可能剛好全部線程都被終止了),是的話就新建一個非核心線程去處理。如果任務隊列已經滿了,此時offer方法會返回false,接下來會再次調用addWorker方法新增一個非核心線程來處理該任務。如果期間創建線程失敗,則最后會執行拒絕策略。

5. 工作線程的實現

 1 //工作者類
 2 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
 3     //關聯線程
 4     final Thread thread;
 5     //初始任務
 6     Runnable firstTask;
 7     //完成任務數
 8     volatile long completedTasks;
 9 
10     //構造器
11     Worker(Runnable firstTask) {
12         //抑制中斷直到runWorker
13         setState(-1);
14         //設置初始任務
15         this.firstTask = firstTask;
16         //設置關聯線程
17         this.thread = getThreadFactory().newThread(this);
18     }
19     
20     public void run() {
21         runWorker(this);
22     }
23     
24     //判斷是否占有鎖, 0代表未占用, 1代表已占用
25     protected boolean isHeldExclusively() {
26         return getState() != 0;
27     }
28 
29     //嘗試獲取鎖
30     protected boolean tryAcquire(int unused) {
31         if (compareAndSetState(0, 1)) {
32             setExclusiveOwnerThread(Thread.currentThread());
33             return true;
34         }
35         return false;
36     }
37     
38     //嘗試釋放鎖
39     protected boolean tryRelease(int unused) {
40         setExclusiveOwnerThread(null);
41         setState(0);
42         return true;
43     }
44 
45     public void lock() { acquire(1); }
46     public boolean tryLock() { return tryAcquire(1); }
47     public void unlock() { release(1); }
48     public boolean isLocked() { return isHeldExclusively(); }
49 
50     //中斷關聯線程
51     void interruptIfStarted() {
52         Thread t;
53         //將活動線程和閑置線程都中斷
54         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
55             try {
56                 t.interrupt();
57             } catch (SecurityException ignore) {
58                 //ignore
59             }
60         }
61     }
62 }

ThreadPoolExecutor內部實現了一個Worker類,用它來表示工作線程。每個Worker對象都持有一個關聯線程和分配給它的初始任務。Worker類繼承自AQS並實現了自己的加鎖解鎖方法,說明每個Worker對象也是一個鎖對象。同時Worker類還實現了Runnable接口,因此每個Worker對象都是可以運行的。Worker類有一個唯一的構造器,需要傳入一個初始任務給它,在構造器里面首先將同步狀態設置為-1,這個操作主要是抑制中斷直到runWorker方法運行,為啥要這樣做?我們繼續看下去,可以看到在設置完初始任務之后,馬上就開始設置關聯線程,關聯線程是通過線程工廠的newThread方法來生成的,這時將Worker對象本身當作任務傳給關聯線程。因此在啟動關聯線程時(調用start方法),會運行Worker對象自身的run方法。而run方法里面緊接着調用runWorker方法,也就是說只有在runWorker方法運行時才表明關聯線程已啟動,這時去中斷關聯線程才有意義,因此前面要通過設置同步狀態為-1來抑制中斷。那么為啥將同步狀態設置為-1就可以抑制中斷?每個Worker對象都是通過調用interruptIfStarted方法來中斷關聯線程的,在interruptIfStarted方法內部會判斷只有同步狀態>=0時才會中斷關聯線程。因此將同步狀態設置為-1能起到抑制中斷的作用。

6. 工作線程的創建

 1 //添加工作線程
 2 private boolean addWorker(Runnable firstTask, boolean core) {
 3     retry:
 4     for (;;) {
 5         int c = ctl.get();
 6         int rs = runStateOf(c);
 7         //只有以下兩種情況會繼續添加線程
 8         //1.狀態為running
 9         //2.狀態為shutdown,首要任務為空,但任務隊列中還有任務
10         if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
11             return false;
12         }
13         for (;;) {
14             int wc = workerCountOf(c);
15             //以下三種情況不繼續添加線程:
16             //1.線程數大於線程池總容量
17             //2.當前線程為核心線程,且核心線程數達到corePoolSize
18             //3.當前線程非核心線程,且總線程達到maximumPoolSize
19             if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
20                 return false;
21             }
22             //否則繼續添加線程, 先將線程數加一
23             if (compareAndIncrementWorkerCount(c)) {
24                 //執行成功則跳過外循環
25                 break retry;
26             }
27             //CAS操作失敗再次檢查線程池狀態
28             c = ctl.get();
29             //如果線程池狀態改變則繼續執行外循環
30             if (runStateOf(c) != rs) {
31                 continue retry;
32             }
33             //否則表明CAS操作失敗是workerCount改變, 繼續執行內循環
34         }
35     }
36     boolean workerStarted = false;
37     boolean workerAdded = false;
38     Worker w = null;
39     try {
40         final ReentrantLock mainLock = this.mainLock;
41         w = new Worker(firstTask);
42         final Thread t = w.thread;
43         if (t != null) {
44             mainLock.lock();
45             try {
46                 int c = ctl.get();
47                 int rs = runStateOf(c);
48                 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
49                     //如果線程已經開啟則拋出異常
50                     if (t.isAlive()) throw new IllegalThreadStateException();
51                     //將工作者添加到集合中
52                     workers.add(w);
53                     int s = workers.size();
54                     //記錄線程達到的最大值
55                     if (s > largestPoolSize) {
56                         largestPoolSize = s;
57                     }
58                     workerAdded = true;
59                 }
60             } finally {
61                 mainLock.unlock();
62             }
63             //將工作者添加到集合后則啟動線程
64             if (workerAdded) {
65                 t.start();
66                 workerStarted = true;
67             }
68         }
69     } finally {
70         //如果線程啟動失敗則回滾操作
71         if (!workerStarted) {
72             addWorkerFailed(w);
73         }
74     }
75     return workerStarted;
76 }

上面我們知道在execute方法里面會調用addWorker方法來添加工作線程。通過代碼可以看到進入addWorker方法里面會有兩層自旋循環,在外層循環中獲取線程池當前的狀態,如果線程池狀態不符合就直接return,在內層循環中獲取線程數,如果線程數超過限定值也直接return。只有經過這兩重判斷之后才會使用CAS方式來將線程數加1。成功將線程數加1之后就跳出外層循環去執行后面的邏輯,否則就根據不同條件來進行自旋,如果是線程池狀態改變就執行外層循環,如果是線程數改變就執行內層循環。當線程數成功加1之后,后面就是去新建一個Worker對象,並在構造時傳入初始任務給它。然后將這個Worker對象添加到工作者集合當中,添加成功后就調用start方法來啟動關聯線程。

7. 工作線程的執行

 1 //運行工作者
 2 final void runWorker(Worker w) {
 3     //獲取當前工作線程
 4     Thread wt = Thread.currentThread();
 5     //獲取工作者的初始任務
 6     Runnable task = w.firstTask;
 7     //將工作者的初始任務置空
 8     w.firstTask = null;
 9     //將同步狀態從-1設為0
10     w.unlock();
11     boolean completedAbruptly = true;
12     try {
13         //初始任務不為空則執行初始任務, 否則從隊列獲取任務
14         while (task != null || (task = getTask()) != null) {
15             //確保獲取到任務后才加鎖
16             w.lock(); 
17             //若狀態大於等於stop, 保證當前線程被中斷
18             //若狀態小於stop, 保證當前線程未被中斷
19             //在清理中斷狀態時可能有其他線程在修改, 所以會再檢查一次
20             if ((runStateAtLeast(ctl.get(), STOP) || 
21                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
22                 wt.interrupt();
23             }
24             try {
25                 //任務執行前做些事情
26                 beforeExecute(wt, task);
27                 Throwable thrown = null;
28                 try {
29                     //執行當前任務
30                     task.run();
31                 } catch (RuntimeException x) {
32                     thrown = x; throw x;
33                 } catch (Error x) {
34                     thrown = x; throw x;
35                 } catch (Throwable x) {
36                     thrown = x; throw new Error(x);
37                 } finally {
38                     //任務執行后做一些事情
39                     afterExecute(task, thrown);
40                 }
41             } finally {
42                 //將執行完的任務置空
43                 task = null;
44                 //將完成的任務數加一
45                 w.completedTasks++;
46                 w.unlock();
47             }
48         }
49         //設置該線程為正常完成任務
50         completedAbruptly = false;
51     } finally {
52         //執行完所有任務后將線程刪除
53         processWorkerExit(w, completedAbruptly);
54     }
55 }

上面我們知道,將Worker對象添加到workers集合之后就會去調用關聯線程的start方法,由於傳給關聯線程的Runnable就是Worker對象本身,因此會調用Worker對象實現的run方法,最后會調用到runWorker方法。我們看到上面代碼,進入到runWorker方法里面首先獲取了Worker對象的初始任務,然后調用unlock方法將同步狀態加1,由於在構造Worker對象時將同步狀態置為-1了,所以這里同步狀態變回0,因此在這之后才可以調用interruptIfStarted方法來中斷關聯線程。如果初始任務不為空就先去執行初始任務,否則就調用getTask方法去任務隊列中獲取任務,可以看到這里是一個while循環,也就是說工作線程在執行完自己的任務之后會不斷的從任務隊列中獲取任務,直到getTask方法返回null,然后工作線程退出while循環最后執行processWorkerExit方法來移除自己。如果需要在所有任務執行之前或之后做些處理,可以分別實現beforeExecute方法和afterExecute方法。

8. 任務的獲取

 1 //從任務隊列中獲取任務
 2 private Runnable getTask() {
 3     //上一次獲取任務是否超時
 4     boolean timedOut = false;
 5     retry:
 6     //在for循環里自旋
 7     for (;;) {
 8         int c = ctl.get();
 9         int rs = runStateOf(c);
10         //以下兩種情況會將工作者數減為0並返回null,並直接使該線程終止:
11         //1.狀態為shutdown並且任務隊列為空
12         //2.狀態為stop, tidying或terminated
13         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
14             decrementWorkerCount();
15             return null;
16         }
17         
18         boolean timed;
19         //判斷是否要剔除當前線程
20         for (;;) {
21             int wc = workerCountOf(c);
22             //以下兩種情況會在限定時間獲取任務:
23             //1.允許核心線程超時
24             //2.線程數大於corePoolSize
25             timed = allowCoreThreadTimeOut || wc > corePoolSize;
26             //以下兩種情況不執行剔除操作:
27             //1.上次任務獲取未超時
28             //2.上次任務獲取超時, 但沒要求在限定時間獲取
29             if (wc <= maximumPoolSize && !(timedOut && timed)) {
30                 break;
31             }
32             //若上次任務獲取超時, 且規定在限定時間獲取, 則將線程數減一
33             if (compareAndDecrementWorkerCount(c)) {
34                 //CAS操作成功后直接返回null
35                 return null;
36             }
37             //CAS操作失敗后再次檢查狀態
38             c = ctl.get();
39             //若狀態改變就從外層循環重試
40             if (runStateOf(c) != rs) {
41                 continue retry;
42             }
43             //否則表明是workerCount改變, 繼續在內循環重試
44         }
45         
46         try {
47             //若timed為true, 則在規定時間內返回
48             //若timed為false, 則阻塞直到獲取成功
49             //注意:閑置線程會一直在這阻塞
50             Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
51             //獲取任務不為空則返回該任務
52             if (r != null) {
53                 return r;
54             }
55             //否則將超時標志設為true
56             timedOut = true;
57         } catch (InterruptedException retry) {
58             timedOut = false;
59         }
60     }
61 }

工作線程在while循環里不斷的通過getTask方法來從任務隊列中獲取任務,我們看一下getTask方法是怎樣獲取任務的。進入第一個for循環之后有一個if判斷,從這里我們可以看到,如果線程池狀態為shutdown,會繼續消費任務隊列里面的任務;如果線程池狀態為stop,則停止消費任務隊列里剩余的任務。進入第二個for循環后會給timed變量賦值,由於allowCoreThreadTimeOut變量默認是false,所以timed的值取決於線程數是否大於corePoolSize,小於為false,大於則為true。從任務隊列里面獲取任務的操作在try塊里面,如果timed為true,則調用poll方法進行定時獲取;如果timed為flase,則調用take方法進行阻塞獲取。也就是說默認情況下,如果線程數小於corePoolSize,則調用take方法進行阻塞獲取,即使任務隊列為空,工作線程也會一直等待;如果線程數大於corePoolSize,則調用poll方法進行定時獲取,在keepAliveTime時間內獲取不到任務則會返回null,對應的工作線程也會被移除,但線程數會保持在corePoolSize上。當然如果設置allowCoreThreadTimeOut為true,則會一直通過調用poll方法來從任務隊列中獲取任務,如果任務隊列長時間為空,則工作線程會減少至0。

9. 工作線程的退出

 1 //刪除工作線程
 2 private void processWorkerExit(Worker w, boolean completedAbruptly) {
 3     //若非正常完成則將線程數減為0
 4     if (completedAbruptly) {
 5         decrementWorkerCount();
 6     }
 7     final ReentrantLock mainLock = this.mainLock;
 8     mainLock.lock();
 9     try {
10         //統計完成的任務總數
11         completedTaskCount += w.completedTasks;
12         //在這將工作線程移除
13         workers.remove(w);
14     } finally {
15         mainLock.unlock();
16     }
17     //嘗試終止線程池
18     tryTerminate();
19     //再次檢查線程池狀態
20     int c = ctl.get();
21     //若狀態為running或shutdown, 則將線程數恢復到最小值
22     if (runStateLessThan(c, STOP)) {
23         //線程正常完成任務被移除
24         if (!completedAbruptly) {
25             //允許核心線程超時最小值為0, 否則最小值為核心線程數
26             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
27             //如果任務隊列還有任務, 則保證至少有一個線程
28             if (min == 0 && !workQueue.isEmpty()) {
29                 min = 1;
30             }
31             //若線程數大於最小值則不新增了
32             if (workerCountOf(c) >= min) {
33                 return;
34             }
35         }
36         //新增工作線程
37         addWorker(null, false);
38     }
39 }

工作線程如果從getTask方法中獲得null,則會退出while循環並隨后執行processWorkerExit方法,該方法會在這個工作線程終止之前執行一些操作,我們看到它會去統計該工作者完成的任務數,然后將其從workers集合中刪除,每刪除一個工作者之后都會去調用tryTerminate方法嘗試終止線程池,但並不一定會真的終止線程池。從tryTerminate方法返回后再次去檢查一遍線程池的狀態,如果線程池狀態為running或者shutdown,並且線程數小於最小值,則恢復一個工作者。這個最小值是怎樣計算出來的呢?我們來看看。如果allowCoreThreadTimeOut為true則最小值為0,否則最小值為corePoolSize。但還有一個例外情況,就是雖然允許核心線程超時了,但是如果任務隊列不為空的話,那么必須保證有一個線程存在,因此這時最小值設為1。后面就是判斷如果工作線程數大於最小值就不新增線程了,否則就新增一個非核心線程。從這個方法可以看到,每個線程退出時都會去判斷要不要再恢復一個線程,因此線程池中的線程總數也是動態增減的。

10. 線程池的終止

 1 //平緩關閉線程池
 2 public void shutdown() {
 3     final ReentrantLock mainLock = this.mainLock;
 4     mainLock.lock();
 5     try {
 6         //檢查是否有關閉的權限
 7         checkShutdownAccess();
 8         //將線程池狀態設為shutdown
 9         advanceRunState(SHUTDOWN);
10         //中斷閑置的線程
11         interruptIdleWorkers();
12         //對外提供的鈎子
13         onShutdown();
14     } finally {
15         mainLock.unlock();
16     }
17     //嘗試終止線程池
18     tryTerminate();
19 }
20 
21 //立刻關閉線程池
22 public List<Runnable> shutdownNow() {
23     List<Runnable> tasks;
24     final ReentrantLock mainLock = this.mainLock;
25     mainLock.lock();
26     try {
27         //檢查是否有關閉的權限
28         checkShutdownAccess();
29         //將線程池狀態設為stop
30         advanceRunState(STOP);
31         //中斷所有工作線程
32         interruptWorkers();
33         //排干任務隊列
34         tasks = drainQueue();
35     } finally {
36         mainLock.unlock();
37     }
38     //嘗試終止線程池
39     tryTerminate();
40     return tasks;
41 }

可以通過兩個方法來終止線程池,通過調用shutdown方法可以平緩的終止線程池,通過調用shutdownNow方法可以立即終止線程池。調用shutdown()方法后首先會將線程池狀態設置為shutdown,這時線程池會拒絕接收外部傳過來的任務,然后調用interruptIdleWorkers()方法中斷閑置線程,剩余的線程會繼續消費完任務隊列里的任務之后才會終止。調用shutdownNow()方法會將線程池狀態設置為stop,這是線程池也不再接收外界的任務,並且馬上調用interruptWorkers()方法將所有工作線程都中斷了,然后排干任務隊列里面沒有被處理的任務,最后返回未被處理的任務集合。調用shutdown()和shutdownNow()方法后還未真正終止線程池,這兩個方法最后都會調用tryTerminate()方法來終止線程池。我們看看該方法的代碼。

 1 //嘗試終止線程池
 2 final void tryTerminate() {
 3     for (;;) {
 4         int c = ctl.get();
 5         //以下兩種情況終止線程池,其他情況直接返回:
 6         //1.狀態為stop
 7         //2.狀態為shutdown且任務隊列為空
 8         if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
 9             (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) {
10             return;
11         }
12         //若線程不為空則中斷一個閑置線程后直接返回
13         if (workerCountOf(c) != 0) {
14             interruptIdleWorkers(ONLY_ONE);
15             return;
16         }
17         final ReentrantLock mainLock = this.mainLock;
18         mainLock.lock();
19         try {
20             //將狀態設置為tidying
21             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
22                 try {
23                     //線程池終止后做的事情
24                     terminated();
25                 } finally {
26                     //將狀態設置為終止狀態(TERMINATED)
27                     ctl.set(ctlOf(TERMINATED, 0));
28                     //喚醒條件隊列所有線程
29                     termination.signalAll();
30                 }
31                 return;
32             }
33         } finally {
34             mainLock.unlock();
35         }
36         //若狀態更改失敗則再重試
37     }
38 }

tryTerminate()方法在其他很多地方也被調用過,比如processWorkerExit()和addWorkerFailed()。調用該方法來嘗試終止線程池,在進入for循環后第一個if判斷過濾了不符合條件的終止操作,只有狀態為stop,或者狀態為shutdown且任務隊列為空這兩種情況才能繼續執行。第二個if語句判斷工作者數量是否為0,不為0的話也直接返回。經過這兩重判斷之后才符合終止線程池的條件,於是先通過CAS操作將線程池狀態設置為tidying狀態,在tidying狀態會調用用戶自己實現的terminated()方法來做一些處理。到了這一步,不管terminated()方法是否成功執行最后都會將線程池狀態設置為terminated,也就標志着線程池真正意義上的終止了。最后會喚醒所有等待線程池終止的線程,讓它們繼續執行。

11. 常用線程池參數配置

Executors中有許多靜態工廠方法來創建線程池,在平時使用中我們都是通過Executors的靜態工廠方法來創建線程池的。這其中有幾個使用線程池的典型例子我們來看一下。

 1 //固定線程數的線程池
 2 //注:該線程池將corePoolSize和maximumPoolSize都設置為同一數值,線程池剛創建時線程數為0,
 3 //之后每接收一個任務創建一個線程,直到線程數達到nThreads,此后線程數不再增長。如果其中有某個
 4 //線程因為發生異常而終止,線程池將補充一個新的線程。
 5 public static ExecutorService newFixedThreadPool(int nThreads) {
 6     return new ThreadPoolExecutor(nThreads, nThreads,
 7                                   0L, TimeUnit.MILLISECONDS,
 8                                   new LinkedBlockingQueue<Runnable>());
 9 }
10 
11 //單個線程的線程池
12 //注:該線程池將corePoolSize和maximumPoolSize都設置為1,因此線程池中永遠只有一個線程,
13 //如果該線程因為不可預知的異常而被終止,線程池將會補充一個新的線程。
14 public static ExecutorService newSingleThreadExecutor() {
15     return new FinalizableDelegatedExecutorService
16         (new ThreadPoolExecutor(1, 1,
17                                 0L, TimeUnit.MILLISECONDS,
18                                 new LinkedBlockingQueue<Runnable>()));
19 }
20 
21 //可緩存的線程池
22 //注:該線程池將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,
23 //空閑線程存活時間設置為60S。也就是說該線程池一開始線程數為0,隨着任務數的增加線程數也相應
24 //增加,線程數的上限為Integer.MAX_VALUE。當任務數減少時線程數也隨之減少,最后會減少至0。
25 public static ExecutorService newCachedThreadPool() {
26     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
27                                   60L, TimeUnit.SECONDS,
28                                   new SynchronousQueue<Runnable>());
29 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM