面試官:你分析過線程池源碼嗎?


線程池源碼也是面試經常被提問到的點,我會將全局源碼做一分析,然后告訴你面試考啥,怎么答。

為什么要用線程池?

簡潔的答兩點就行。

  1. 降低系統資源消耗。

  2. 提高線程可控性。

如何創建使用線程池?

JDK8提供了五種創建線程池的方法:

1.創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。

1 public static ExecutorService newFixedThreadPool(int nThreads) {
2     return new ThreadPoolExecutor(nThreads, nThreads,
3                                   0L, TimeUnit.MILLISECONDS,
4                                   new LinkedBlockingQueue<Runnable>());
5 }

 

2.(JDK8新增)會根據所需的並發數來動態創建和關閉線程。能夠合理的使用CPU進行對任務進行並發操作,所以適合使用在很耗時的任務。

注意返回的是ForkJoinPool對象。

 

1 public static ExecutorService newWorkStealingPool(int parallelism) {
2     return new ForkJoinPool
3         (parallelism,
4          ForkJoinPool.defaultForkJoinWorkerThreadFactory,
5          null, true);
6 }

 

什么是ForkJoinPool:

 1 public ForkJoinPool(int parallelism,
 2                         ForkJoinWorkerThreadFactory factory,
 3                         UncaughtExceptionHandler handler,
 4                         boolean asyncMode) {
 5         this(checkParallelism(parallelism),
 6              checkFactory(factory),
 7              handler,
 8              asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
 9              "ForkJoinPool-" + nextPoolId() + "-worker-");
10         checkPermission();
11     }

 

使用一個無限隊列來保存需要執行的任務,可以傳入線程的數量;不傳入,則默認使用當前計算機中可用的cpu數量;使用分治法來解決問題,使用fork()和join()來進行調用。

3.創建一個可緩存的線程池,可靈活回收空閑線程,若無可回收,則新建線程。

1 public static ExecutorService newCachedThreadPool() {
2     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                   60L, TimeUnit.SECONDS,
4                                   new SynchronousQueue<Runnable>());
5 }

 

4.創建一個單線程的線程池。

1 public static ExecutorService newSingleThreadExecutor() {
2     return new FinalizableDelegatedExecutorService
3         (new ThreadPoolExecutor(1, 1,
4                                 0L, TimeUnit.MILLISECONDS,
5                                 new LinkedBlockingQueue<Runnable>()));
6 }

 

5.創建一個定長線程池,支持定時及周期性任務執行。

1 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
2     return new ScheduledThreadPoolExecutor(corePoolSize);
3 }

 

上層源碼結構分析

Executor結構:

面試官:你分析過線程池源碼嗎?

Executor

一個運行新任務的簡單接口

 1 public interface Executor { 2 3 void execute(Runnable command); 4 5 } 

ExecutorService

擴展了Executor接口。添加了一些用來管理執行器生命周期和任務生命周期的方法

面試官:你分析過線程池源碼嗎?

AbstractExecutorService

對ExecutorService接口的抽象類實現。不是我們分析的重點。

ThreadPoolExecutor

Java線程池的核心實現。

ThreadPoolExecutor源碼分析

屬性解釋

 1 // AtomicInteger是原子類  ctlOf()返回值為RUNNING;
 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 3 // 高3位表示線程狀態
 4 private static final int COUNT_BITS = Integer.SIZE - 3;
 5 // 低29位表示workerCount容量
 6 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 7 
 8 // runState is stored in the high-order bits
 9 // 能接收任務且能處理阻塞隊列中的任務
10 private static final int RUNNING    = -1 << COUNT_BITS;
11 // 不能接收新任務,但可以處理隊列中的任務。
12 private static final int SHUTDOWN   =  0 << COUNT_BITS;
13 // 不接收新任務,不處理隊列任務。
14 private static final int STOP       =  1 << COUNT_BITS;
15 // 所有任務都終止
16 private static final int TIDYING    =  2 << COUNT_BITS;
17 // 什么都不做
18 private static final int TERMINATED =  3 << COUNT_BITS;
19 
20 // 存放任務的阻塞隊列
21 private final BlockingQueue<Runnable> workQueue;

 

值的注意的是狀態值越大線程越不活躍。

線程池狀態的轉換模型:

面試官:你分析過線程池源碼嗎?

構造器

 1 public ThreadPoolExecutor(int corePoolSize,//線程池初始啟動時線程的數量
 2                           int maximumPoolSize,//最大線程數量
 3                           long keepAliveTime,//空閑線程多久關閉?
 4                           TimeUnit unit,// 計時單位
 5                           BlockingQueue<Runnable> workQueue,//放任務的阻塞隊列
 6                           ThreadFactory threadFactory,//線程工廠
 7                           RejectedExecutionHandler handler// 拒絕策略) {
 8     if (corePoolSize < 0 ||
 9         maximumPoolSize <= 0 ||
10         maximumPoolSize < corePoolSize ||
11         keepAliveTime < 0)
12         throw new IllegalArgumentException();
13     if (workQueue == null || threadFactory == null || handler == null)
14         throw new NullPointerException();
15     this.acc = System.getSecurityManager() == null ?
16             null :
17             AccessController.getContext();
18     this.corePoolSize = corePoolSize;
19     this.maximumPoolSize = maximumPoolSize;
20     this.workQueue = workQueue;
21     this.keepAliveTime = unit.toNanos(keepAliveTime);
22     this.threadFactory = threadFactory;
23     this.handler = handler;
24 }

 

在向線程池提交任務時,會通過兩個方法:execute和submit。

本文着重講解execute方法。submit方法放在下次和Future、Callable一起分析。

execute方法:

 1 public void execute(Runnable command) {
 2     if (command == null)
 3         throw new NullPointerException();
 4     // clt記錄着runState和workerCount
 5     int c = ctl.get();
 6     //workerCountOf方法取出低29位的值,表示當前活動的線程數
 7     //然后拿線程數和 核心線程數做比較
 8     if (workerCountOf(c) < corePoolSize) {
 9         // 如果活動線程數<核心線程數
10         // 添加到
11         //addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷還是maximumPoolSize來判斷
12         if (addWorker(command, true))
13             // 如果成功則返回
14             return;
15         // 如果失敗則重新獲取 runState和 workerCount
16         c = ctl.get();
17     }
18     // 如果當前線程池是運行狀態並且任務添加到隊列成功
19     if (isRunning(c) && workQueue.offer(command)) {
20         // 重新獲取 runState和 workerCount
21         int recheck = ctl.get();
22         // 如果不是運行狀態並且 
23         if (! isRunning(recheck) && remove(command))
24             reject(command);
25         else if (workerCountOf(recheck) == 0)
26             //第一個參數為null,表示在線程池中創建一個線程,但不去啟動
27             // 第二個參數為false,將線程池的有限線程數量的上限設置為maximumPoolSize
28             addWorker(null, false);
29     }
30     //再次調用addWorker方法,但第二個參數傳入為false,將線程池的有限線程數量的上限設置為maximumPoolSize
31     else if (!addWorker(command, false))
32         //如果失敗則拒絕該任務
33         reject(command);
34 }

 

總結一下它的工作流程:

  1. workerCount &lt; corePoolSize,創建線程執行任務。

  2. workerCount &gt;= corePoolSize&&阻塞隊列workQueue未滿,把新的任務放入阻塞隊列。

  3. workQueue已滿,並且workerCount &gt;= corePoolSize,並且workerCount &lt; maximumPoolSize,創建線程執行任務。

  4. 當workQueue已滿,workerCount &gt;= maximumPoolSize,采取拒絕策略,默認拒絕策略是直接拋異常。

 

面試官:你分析過線程池源碼嗎?

通過上面的execute方法可以看到,最主要的邏輯還是在addWorker方法中實現的,那我們就看下這個方法:

addWorker方法

主要工作是在線程池中創建一個新的線程並執行

參數定義:

  • firstTask the task the new thread should run first (or null if none). (指定新增線程執行的第一個任務或者不執行任務)

  • core if true use corePoolSize as bound, else maximumPoolSize.(core如果為true則使用corePoolSize綁定,否則為maximumPoolSize。 (此處使用布爾指示符而不是值,以確保在檢查其他狀態后讀取新值)。)

 

 1 private boolean addWorker(Runnable firstTask, boolean core) {
 2     retry:
 3     for (;;) {
 4 
 5         int c = ctl.get();
 6         //  獲取運行狀態
 7         int rs = runStateOf(c);
 8 
 9         // Check if queue empty only if necessary.
10         // 如果狀態值 >= SHUTDOWN (不接新任務&不處理隊列任務)
11         // 並且 如果 !(rs為SHUTDOWN 且 firsTask為空 且 阻塞隊列不為空)
12         if (rs >= SHUTDOWN &&
13             ! (rs == SHUTDOWN &&
14                firstTask == null &&
15                ! workQueue.isEmpty()))
16             // 返回false
17             return false;
18 
19         for (;;) {
20             //獲取線程數wc
21             int wc = workerCountOf(c);
22             // 如果wc大與容量 || core如果為true表示根據corePoolSize來比較,否則為maximumPoolSize
23             if (wc >= CAPACITY ||
24                 wc >= (core ? corePoolSize : maximumPoolSize))
25                 return false;
26             // 增加workerCount(原子操作)
27             if (compareAndIncrementWorkerCount(c))
28                 // 如果增加成功,則跳出
29                 break retry;
30             // wc增加失敗,則再次獲取runState
31             c = ctl.get();  // Re-read ctl
32             // 如果當前的運行狀態不等於rs,說明狀態已被改變,返回重新執行
33             if (runStateOf(c) != rs)
34                 continue retry;
35             // else CAS failed due to workerCount change; retry inner loop
36         }
37     }
38 
39     boolean workerStarted = false;
40     boolean workerAdded = false;
41     Worker w = null;
42     try {
43         // 根據firstTask來創建Worker對象
44         w = new Worker(firstTask);
45         // 根據worker創建一個線程
46         final Thread t = w.thread;
47         if (t != null) {
48             // new一個鎖
49             final ReentrantLock mainLock = this.mainLock;
50             // 加鎖
51             mainLock.lock();
52             try {
53                 // Recheck while holding lock.
54                 // Back out on ThreadFactory failure or if
55                 // shut down before lock acquired.
56                 // 獲取runState
57                 int rs = runStateOf(ctl.get());
58                 // 如果rs小於SHUTDOWN(處於運行)或者(rs=SHUTDOWN && firstTask == null)
59                 // firstTask == null證明只新建線程而不執行任務
60                 if (rs < SHUTDOWN ||
61                     (rs == SHUTDOWN && firstTask == null)) {
62                     // 如果t活着就拋異常
63                     if (t.isAlive()) // precheck that t is startable
64                         throw new IllegalThreadStateException();
65                     // 否則加入worker(HashSet)
66                     //workers包含池中的所有工作線程。僅在持有mainLock時訪問。
67                     workers.add(w);
68                     // 獲取工作線程數量
69                     int s = workers.size();
70                     //largestPoolSize記錄着線程池中出現過的最大線程數量
71                     if (s > largestPoolSize)
72                         // 如果 s比它還要大,則將s賦值給它
73                         largestPoolSize = s;
74                     // worker的添加工作狀態改為true    
75                     workerAdded = true;
76                 }
77             } finally {
78                 mainLock.unlock();
79             }
80             // 如果worker的添加工作完成
81             if (workerAdded) {
82                 // 啟動線程
83                 t.start();
84                 // 修改線程啟動狀態
85                 workerStarted = true;
86             }
87         }
88     } finally {
89         if (! workerStarted)
90             addWorkerFailed(w);
91     }
92     // 返回線啟動狀態
93     return workerStarted;

 

為什么需要持有mainLock?

因為workers是HashSet類型的,不能保證線程安全。

w = new Worker(firstTask);如何理解呢

Worker.java

1 private final class Worker
2     extends AbstractQueuedSynchronizer
3     implements Runnable

 

可以看到它繼承了AQS並發框架還實現了Runnable。證明它還是一個線程任務類。那我們調用t.start()事實上就是調用了該類重寫的run方法。

Worker為什么不使用ReentrantLock來實現呢?

tryAcquire方法它是不允許重入的,而ReentrantLock是允許重入的。對於線程來說,如果線程正在執行是不允許其它鎖重入進來的。

線程只需要兩個狀態,一個是獨占鎖,表明正在執行任務;一個是不加鎖,表明是空閑狀態。

 1 public void run() { 2 runWorker(this); 3 } 

run方法又調用了runWorker方法:

 1 final void runWorker(Worker w) {
 2     // 拿到當前線程
 3     Thread wt = Thread.currentThread();
 4     // 拿到當前任務
 5     Runnable task = w.firstTask;
 6     // 將Worker.firstTask置空 並且釋放鎖
 7     w.firstTask = null;
 8     w.unlock(); // allow interrupts
 9     boolean completedAbruptly = true;
10     try {
11         // 如果task或者getTask不為空,則一直循環
12         while (task != null || (task = getTask()) != null) {
13             // 加鎖
14             w.lock();
15             // If pool is stopping, ensure thread is interrupted;
16             // if not, ensure thread is not interrupted.  This
17             // requires a recheck in second case to deal with
18             // shutdownNow race while clearing interrupt
19             //  return ctl.get() >= stop 
20             // 如果線程池狀態>=STOP 或者 (線程中斷且線程池狀態>=STOP)且當前線程沒有中斷
21             // 其實就是保證兩點:
22             // 1. 線程池沒有停止
23             // 2. 保證線程沒有中斷
24             if ((runStateAtLeast(ctl.get(), STOP) ||
25                  (Thread.interrupted() &&
26                   runStateAtLeast(ctl.get(), STOP))) &&
27                 !wt.isInterrupted())
28                 // 中斷當前線程
29                 wt.interrupt();
30             try {
31                 // 空方法
32                 beforeExecute(wt, task);
33                 Throwable thrown = null;
34                 try {
35                     // 執行run方法(Runable對象)
36                     task.run();
37                 } catch (RuntimeException x) {
38                     thrown = x; throw x;
39                 } catch (Error x) {
40                     thrown = x; throw x;
41                 } catch (Throwable x) {
42                     thrown = x; throw new Error(x);
43                 } finally {
44                     afterExecute(task, thrown);
45                 }
46             } finally {
47                 // 執行完后, 將task置空, 完成任務++, 釋放鎖
48                 task = null;
49                 w.completedTasks++;
50                 w.unlock();
51             }
52         }
53         completedAbruptly = false;
54     } finally {
55         // 退出工作
56         processWorkerExit(w, completedAbruptly);
57     }

 

總結一下runWorker方法的執行過程:

  1. while循環中,不斷地通過getTask()方法從workerQueue中獲取任務

  2. 如果線程池正在停止,則中斷線程。否則調用3.

  3. 調用task.run()執行任務;

  4. 如果task為null則跳出循環,執行processWorkerExit()方法,銷毀線程workers.remove(w);

這個流程圖非常經典:

面試官:你分析過線程池源碼嗎?

除此之外,ThreadPoolExector還提供了tryAcquiretryReleaseshutdownshutdownNowtryTerminate、等涉及的一系列線程狀態更改的方法有興趣可以自己研究。大體思路是一樣的,這里不做介紹。

Worker為什么不使用ReentrantLock來實現呢?

tryAcquire方法它是不允許重入的,而ReentrantLock是允許重入的。對於線程來說,如果線程正在執行是不允許其它鎖重入進來的。

線程只需要兩個狀態,一個是獨占鎖,表明正在執行任務;一個是不加鎖,表明是空閑狀態。

在runWorker方法中,為什么要在執行任務的時候對每個工作線程都加鎖呢?

shutdown方法與getTask方法存在競態條件.(這里不做深入,建議自己深入研究,對它比較熟悉的面試官一般會問)

高頻考點

  1. 創建線程池的五個方法。

  2. 線程池的五個狀態

  3. execute執行過程。

  4. runWorker執行過程。(把兩個流程圖記下,理解后說個大該就行。)

  5. 比較深入的問題就是我在文中插入的問題。

  6. …期望大家能在評論區補充。

聲明:圖片來源於網絡,侵刪。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM