線程池源碼也是面試經常被提問到的點,我會將全局源碼做一分析,然后告訴你面試考啥,怎么答。
為什么要用線程池?
簡潔的答兩點就行。
-
降低系統資源消耗。
-
提高線程可控性。
如何創建使用線程池?
JDK8提供了五種創建線程池的方法:
1.創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
2.(JDK8新增)會根據所需的並發數來動態創建和關閉線程。能夠合理的使用CPU進行對任務進行並發操作,所以適合使用在很耗時的任務。
注意返回的是ForkJoinPool對象。
1 public static ExecutorService newWorkStealingPool(int parallelism) { 2 return new ForkJoinPool 3 (parallelism, 4 ForkJoinPool.defaultForkJoinWorkerThreadFactory, 5 null, true); 6 }
什么是ForkJoinPool:
1 public ForkJoinPool(int parallelism, 2 ForkJoinWorkerThreadFactory factory, 3 UncaughtExceptionHandler handler, 4 boolean asyncMode) { 5 this(checkParallelism(parallelism), 6 checkFactory(factory), 7 handler, 8 asyncMode ? FIFO_QUEUE : LIFO_QUEUE, 9 "ForkJoinPool-" + nextPoolId() + "-worker-"); 10 checkPermission(); 11 }
使用一個無限隊列來保存需要執行的任務,可以傳入線程的數量;不傳入,則默認使用當前計算機中可用的cpu數量;使用分治法來解決問題,使用fork()和join()來進行調用。
3.創建一個可緩存的線程池,可靈活回收空閑線程,若無可回收,則新建線程。
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
4.創建一個單線程的線程池。
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
5.創建一個定長線程池,支持定時及周期性任務執行。
1 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 2 return new ScheduledThreadPoolExecutor(corePoolSize); 3 }
上層源碼結構分析
Executor結構:
Executor
一個運行新任務的簡單接口
1 public interface Executor { 2 3 void execute(Runnable command); 4 5 }
ExecutorService
擴展了Executor接口。添加了一些用來管理執行器生命周期和任務生命周期的方法
AbstractExecutorService
對ExecutorService接口的抽象類實現。不是我們分析的重點。
ThreadPoolExecutor
Java線程池的核心實現。
ThreadPoolExecutor源碼分析
屬性解釋
1 // AtomicInteger是原子類 ctlOf()返回值為RUNNING; 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 3 // 高3位表示線程狀態 4 private static final int COUNT_BITS = Integer.SIZE - 3; 5 // 低29位表示workerCount容量 6 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 7 8 // runState is stored in the high-order bits 9 // 能接收任務且能處理阻塞隊列中的任務 10 private static final int RUNNING = -1 << COUNT_BITS; 11 // 不能接收新任務,但可以處理隊列中的任務。 12 private static final int SHUTDOWN = 0 << COUNT_BITS; 13 // 不接收新任務,不處理隊列任務。 14 private static final int STOP = 1 << COUNT_BITS; 15 // 所有任務都終止 16 private static final int TIDYING = 2 << COUNT_BITS; 17 // 什么都不做 18 private static final int TERMINATED = 3 << COUNT_BITS; 19 20 // 存放任務的阻塞隊列 21 private final BlockingQueue<Runnable> workQueue;
值的注意的是狀態值越大線程越不活躍。
線程池狀態的轉換模型:
構造器
1 public ThreadPoolExecutor(int corePoolSize,//線程池初始啟動時線程的數量 2 int maximumPoolSize,//最大線程數量 3 long keepAliveTime,//空閑線程多久關閉? 4 TimeUnit unit,// 計時單位 5 BlockingQueue<Runnable> workQueue,//放任務的阻塞隊列 6 ThreadFactory threadFactory,//線程工廠 7 RejectedExecutionHandler handler// 拒絕策略) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.acc = System.getSecurityManager() == null ? 16 null : 17 AccessController.getContext(); 18 this.corePoolSize = corePoolSize; 19 this.maximumPoolSize = maximumPoolSize; 20 this.workQueue = workQueue; 21 this.keepAliveTime = unit.toNanos(keepAliveTime); 22 this.threadFactory = threadFactory; 23 this.handler = handler; 24 }
在向線程池提交任務時,會通過兩個方法:execute和submit。
本文着重講解execute方法。submit方法放在下次和Future、Callable一起分析。
execute方法:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 // clt記錄着runState和workerCount 5 int c = ctl.get(); 6 //workerCountOf方法取出低29位的值,表示當前活動的線程數 7 //然后拿線程數和 核心線程數做比較 8 if (workerCountOf(c) < corePoolSize) { 9 // 如果活動線程數<核心線程數 10 // 添加到 11 //addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷還是maximumPoolSize來判斷 12 if (addWorker(command, true)) 13 // 如果成功則返回 14 return; 15 // 如果失敗則重新獲取 runState和 workerCount 16 c = ctl.get(); 17 } 18 // 如果當前線程池是運行狀態並且任務添加到隊列成功 19 if (isRunning(c) && workQueue.offer(command)) { 20 // 重新獲取 runState和 workerCount 21 int recheck = ctl.get(); 22 // 如果不是運行狀態並且 23 if (! isRunning(recheck) && remove(command)) 24 reject(command); 25 else if (workerCountOf(recheck) == 0) 26 //第一個參數為null,表示在線程池中創建一個線程,但不去啟動 27 // 第二個參數為false,將線程池的有限線程數量的上限設置為maximumPoolSize 28 addWorker(null, false); 29 } 30 //再次調用addWorker方法,但第二個參數傳入為false,將線程池的有限線程數量的上限設置為maximumPoolSize 31 else if (!addWorker(command, false)) 32 //如果失敗則拒絕該任務 33 reject(command); 34 }
總結一下它的工作流程:
-
當
workerCount < corePoolSize
,創建線程執行任務。 -
當
workerCount >= corePoolSize
&&阻塞隊列workQueue
未滿,把新的任務放入阻塞隊列。 -
當
workQueue
已滿,並且workerCount >= corePoolSize
,並且workerCount < maximumPoolSize
,創建線程執行任務。 -
當workQueue已滿,
workerCount >= maximumPoolSize
,采取拒絕策略,默認拒絕策略是直接拋異常。
通過上面的execute方法可以看到,最主要的邏輯還是在addWorker方法中實現的,那我們就看下這個方法:
addWorker方法
主要工作是在線程池中創建一個新的線程並執行
參數定義:
-
firstTask
the task the new thread should run first (or null if none). (指定新增線程執行的第一個任務或者不執行任務) -
core
if true use corePoolSize as bound, else maximumPoolSize.(core如果為true則使用corePoolSize綁定,否則為maximumPoolSize。 (此處使用布爾指示符而不是值,以確保在檢查其他狀態后讀取新值)。)
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 5 int c = ctl.get(); 6 // 獲取運行狀態 7 int rs = runStateOf(c); 8 9 // Check if queue empty only if necessary. 10 // 如果狀態值 >= SHUTDOWN (不接新任務&不處理隊列任務) 11 // 並且 如果 !(rs為SHUTDOWN 且 firsTask為空 且 阻塞隊列不為空) 12 if (rs >= SHUTDOWN && 13 ! (rs == SHUTDOWN && 14 firstTask == null && 15 ! workQueue.isEmpty())) 16 // 返回false 17 return false; 18 19 for (;;) { 20 //獲取線程數wc 21 int wc = workerCountOf(c); 22 // 如果wc大與容量 || core如果為true表示根據corePoolSize來比較,否則為maximumPoolSize 23 if (wc >= CAPACITY || 24 wc >= (core ? corePoolSize : maximumPoolSize)) 25 return false; 26 // 增加workerCount(原子操作) 27 if (compareAndIncrementWorkerCount(c)) 28 // 如果增加成功,則跳出 29 break retry; 30 // wc增加失敗,則再次獲取runState 31 c = ctl.get(); // Re-read ctl 32 // 如果當前的運行狀態不等於rs,說明狀態已被改變,返回重新執行 33 if (runStateOf(c) != rs) 34 continue retry; 35 // else CAS failed due to workerCount change; retry inner loop 36 } 37 } 38 39 boolean workerStarted = false; 40 boolean workerAdded = false; 41 Worker w = null; 42 try { 43 // 根據firstTask來創建Worker對象 44 w = new Worker(firstTask); 45 // 根據worker創建一個線程 46 final Thread t = w.thread; 47 if (t != null) { 48 // new一個鎖 49 final ReentrantLock mainLock = this.mainLock; 50 // 加鎖 51 mainLock.lock(); 52 try { 53 // Recheck while holding lock. 54 // Back out on ThreadFactory failure or if 55 // shut down before lock acquired. 56 // 獲取runState 57 int rs = runStateOf(ctl.get()); 58 // 如果rs小於SHUTDOWN(處於運行)或者(rs=SHUTDOWN && firstTask == null) 59 // firstTask == null證明只新建線程而不執行任務 60 if (rs < SHUTDOWN || 61 (rs == SHUTDOWN && firstTask == null)) { 62 // 如果t活着就拋異常 63 if (t.isAlive()) // precheck that t is startable 64 throw new IllegalThreadStateException(); 65 // 否則加入worker(HashSet) 66 //workers包含池中的所有工作線程。僅在持有mainLock時訪問。 67 workers.add(w); 68 // 獲取工作線程數量 69 int s = workers.size(); 70 //largestPoolSize記錄着線程池中出現過的最大線程數量 71 if (s > largestPoolSize) 72 // 如果 s比它還要大,則將s賦值給它 73 largestPoolSize = s; 74 // worker的添加工作狀態改為true 75 workerAdded = true; 76 } 77 } finally { 78 mainLock.unlock(); 79 } 80 // 如果worker的添加工作完成 81 if (workerAdded) { 82 // 啟動線程 83 t.start(); 84 // 修改線程啟動狀態 85 workerStarted = true; 86 } 87 } 88 } finally { 89 if (! workerStarted) 90 addWorkerFailed(w); 91 } 92 // 返回線啟動狀態 93 return workerStarted;
為什么需要持有mainLock?
因為workers是HashSet類型的,不能保證線程安全。
那w = new Worker(firstTask);
如何理解呢
Worker.java
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable
可以看到它繼承了AQS並發框架還實現了Runnable。證明它還是一個線程任務類。那我們調用t.start()事實上就是調用了該類重寫的run方法。
Worker為什么不使用ReentrantLock來實現呢?
tryAcquire方法它是不允許重入的,而ReentrantLock是允許重入的。對於線程來說,如果線程正在執行是不允許其它鎖重入進來的。
線程只需要兩個狀態,一個是獨占鎖,表明正在執行任務;一個是不加鎖,表明是空閑狀態。
1 public void run() { 2 runWorker(this); 3 }
run方法又調用了runWorker方法:
1 final void runWorker(Worker w) { 2 // 拿到當前線程 3 Thread wt = Thread.currentThread(); 4 // 拿到當前任務 5 Runnable task = w.firstTask; 6 // 將Worker.firstTask置空 並且釋放鎖 7 w.firstTask = null; 8 w.unlock(); // allow interrupts 9 boolean completedAbruptly = true; 10 try { 11 // 如果task或者getTask不為空,則一直循環 12 while (task != null || (task = getTask()) != null) { 13 // 加鎖 14 w.lock(); 15 // If pool is stopping, ensure thread is interrupted; 16 // if not, ensure thread is not interrupted. This 17 // requires a recheck in second case to deal with 18 // shutdownNow race while clearing interrupt 19 // return ctl.get() >= stop 20 // 如果線程池狀態>=STOP 或者 (線程中斷且線程池狀態>=STOP)且當前線程沒有中斷 21 // 其實就是保證兩點: 22 // 1. 線程池沒有停止 23 // 2. 保證線程沒有中斷 24 if ((runStateAtLeast(ctl.get(), STOP) || 25 (Thread.interrupted() && 26 runStateAtLeast(ctl.get(), STOP))) && 27 !wt.isInterrupted()) 28 // 中斷當前線程 29 wt.interrupt(); 30 try { 31 // 空方法 32 beforeExecute(wt, task); 33 Throwable thrown = null; 34 try { 35 // 執行run方法(Runable對象) 36 task.run(); 37 } catch (RuntimeException x) { 38 thrown = x; throw x; 39 } catch (Error x) { 40 thrown = x; throw x; 41 } catch (Throwable x) { 42 thrown = x; throw new Error(x); 43 } finally { 44 afterExecute(task, thrown); 45 } 46 } finally { 47 // 執行完后, 將task置空, 完成任務++, 釋放鎖 48 task = null; 49 w.completedTasks++; 50 w.unlock(); 51 } 52 } 53 completedAbruptly = false; 54 } finally { 55 // 退出工作 56 processWorkerExit(w, completedAbruptly); 57 }
總結一下runWorker方法的執行過程:
-
while循環中,不斷地通過getTask()方法從workerQueue中獲取任務
-
如果線程池正在停止,則中斷線程。否則調用3.
-
調用task.run()執行任務;
-
如果task為null則跳出循環,執行processWorkerExit()方法,銷毀線程
workers.remove(w);
這個流程圖非常經典:
除此之外,ThreadPoolExector
還提供了tryAcquire
、tryRelease
、shutdown
、shutdownNow
、tryTerminate
、等涉及的一系列線程狀態更改的方法有興趣可以自己研究。大體思路是一樣的,這里不做介紹。
Worker為什么不使用ReentrantLock來實現呢?
tryAcquire方法它是不允許重入的,而ReentrantLock是允許重入的。對於線程來說,如果線程正在執行是不允許其它鎖重入進來的。
線程只需要兩個狀態,一個是獨占鎖,表明正在執行任務;一個是不加鎖,表明是空閑狀態。
在runWorker方法中,為什么要在執行任務的時候對每個工作線程都加鎖呢?
shutdown方法與getTask方法存在競態條件.(這里不做深入,建議自己深入研究,對它比較熟悉的面試官一般會問)
高頻考點
-
創建線程池的五個方法。
-
線程池的五個狀態
-
execute執行過程。
-
runWorker執行過程。(把兩個流程圖記下,理解后說個大該就行。)
-
比較深入的問題就是我在文中插入的問題。
-
…期望大家能在評論區補充。
聲明:圖片來源於網絡,侵刪。