摘要:對於線程池的核心類ThreadPoolExecutor來說,有哪些重要的屬性和內部類為線程池的正確運行提供重要的保障呢?
本文分享自華為雲社區《【高並發】通過源碼深度解析ThreadPoolExecutor類是如何保證線程池正確運行的》,作者: 冰 河 。
對於線程池的核心類ThreadPoolExecutor來說,有哪些重要的屬性和內部類為線程池的正確運行提供重要的保障呢?今天我們就一起來深入探討下這些問題!!
ThreadPoolExecutor類中的重要屬性
在ThreadPoolExecutor類中,存在幾個非常重要的屬性和方法,接下來,我們就介紹下這些重要的屬性和方法。
ctl相關的屬性
AtomicInteger類型的常量ctl是貫穿線程池整個生命周期的重要屬性,它是一個原子類對象,主要用來保存線程的數量和線程池的狀態,我們看下與這個屬性相關的代碼如下所示。
//主要用來保存線程數量和線程池的狀態,高3位保存線程狀態,低29位保存線程數量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //線程池中線程的數量的位數(32-3) private static final int COUNT_BITS = Integer.SIZE - 3; //表示線程池中的最大線程數量 //將數字1的二進制值向右移29位,再減去1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //線程池的運行狀態 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; //獲取線程狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取線程數量 private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); }
對於線程池的各狀態說明如下所示。
- RUNNING:運行狀態,能接收新提交的任務,並且也能處理阻塞隊列中的任務
- SHUTDOWN: 關閉狀態,不能再接收新提交的任務,但是可以處理阻塞隊列中已經保存的任務,當線程池處於RUNNING狀態時,調用shutdown()方法會使線程池進入該狀態
- STOP: 不能接收新任務,也不能處理阻塞隊列中已經保存的任務,會中斷正在處理任務的線程,如果線程池處於RUNNING或SHUTDOWN狀態,調用shutdownNow()方法,會使線程池進入該狀態
- TIDYING: 如果所有的任務都已經終止,有效線程數為0(阻塞隊列為空,線程池中的工作線程數量為0),線程池就會進入該狀態。
- TERMINATED: 處於TIDYING狀態的線程池調用terminated ()方法,會使用線程池進入該狀態
也可以按照ThreadPoolExecutor類的,將線程池的各狀態之間的轉化總結成如下圖所示。
- RUNNING -> SHUTDOWN:顯式調用shutdown()方法, 或者隱式調用了finalize()方法
- (RUNNING or SHUTDOWN) -> STOP:顯式調用shutdownNow()方法
- SHUTDOWN -> TIDYING:當線程池和任務隊列都為空的時候
- STOP -> TIDYING:當線程池為空的時候
- TIDYING -> TERMINATED:當 terminated() hook 方法執行完成時候
其他重要屬性
除了ctl相關的屬性外,ThreadPoolExecutor類中其他一些重要的屬性如下所示。
//用於存放任務的阻塞隊列 private final BlockingQueue<Runnable> workQueue; //可重入鎖 private final ReentrantLock mainLock = new ReentrantLock(); //存放線程池中線程的集合,訪問這個集合時,必須獲得mainLock鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //在鎖內部阻塞等待條件完成 private final Condition termination = mainLock.newCondition(); //線程工廠,以此來創建新線程 private volatile ThreadFactory threadFactory; //拒絕策略 private volatile RejectedExecutionHandler handler; //默認的拒絕策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
ThreadPoolExecutor類中的重要內部類
在ThreadPoolExecutor類中存在對於線程池的執行至關重要的內部類,Worker內部類和拒絕策略內部類。接下來,我們分別看這些內部類。
Worker內部類
Worker類從源代碼上來看,實現了Runnable接口,說明其本質上是一個用來執行任務的線程,接下來,我們看下Worker類的源代碼,如下所示。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; //真正執行任務的線程 final Thread thread; //第一個Runnable任務,如果在創建線程時指定了需要執行的第一個任務 //則第一個任務會存放在此變量中,此變量也可以為null //如果為null,則線程啟動后,通過getTask方法到BlockingQueue隊列中獲取任務 Runnable firstTask; //用於存放此線程完全的任務數,注意:使用了volatile關鍵字 volatile long completedTasks; //Worker類唯一的構造放大,傳遞的firstTask可以為null Worker(Runnable firstTask) { //防止在調用runWorker之前被中斷 setState(-1); this.firstTask = firstTask; //使用ThreadFactory 來創建一個新的執行任務的線程 this.thread = getThreadFactory().newThread(this); } //調用外部ThreadPoolExecutor類的runWorker方法執行任務 public void run() { runWorker(this); } //是否獲取到鎖 //state=0表示鎖未被獲取 //state=1表示鎖被獲取 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
在Worker類的構造方法中,可以看出,首先將同步狀態state設置為-1,設置為-1是為了防止runWorker方法運行之前被中斷。這是因為如果其他線程調用線程池的shutdownNow()方法時,如果Worker類中的state狀態的值大於0,則會中斷線程,如果state狀態的值為-1,則不會中斷線程。
Worker類實現了Runnable接口,需要重寫run方法,而Worker的run方法本質上調用的是ThreadPoolExecutor類的runWorker方法,在runWorker方法中,會首先調用unlock方法,該方法會將state置為0,所以這個時候調用shutDownNow方法就會中斷當前線程,而這個時候已經進入了runWork方法,就不會在還沒有執行runWorker方法的時候就中斷線程。
注意:大家需要重點理解Worker類的實現。
拒絕策略內部類
在線程池中,如果workQueue阻塞隊列滿了,並且沒有空閑的線程池,此時,繼續提交任務,需要采取一種策略來處理這個任務。而線程池總共提供了四種策略,如下所示。
- 直接拋出異常,這也是默認的策略。實現類為AbortPolicy。
- 用調用者所在的線程來執行任務。實現類為CallerRunsPolicy。
- 丟棄隊列中最靠前的任務並執行當前任務。實現類為DiscardOldestPolicy。
- 直接丟棄當前任務。實現類為DiscardPolicy。
在ThreadPoolExecutor類中提供了4個內部類來默認實現對應的策略,如下所示。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
我們也可以通過實現RejectedExecutionHandler接口,並重寫RejectedExecutionHandler接口的rejectedExecution方法來自定義拒絕策略,在創建線程池時,調用ThreadPoolExecutor的構造方法,傳入我們自己寫的拒絕策略。
例如,自定義的拒絕策略如下所示。
public class CustomPolicy implements RejectedExecutionHandler { public CustomPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { System.out.println("使用調用者所在的線程來執行任務") r.run(); } } }
使用自定義拒絕策略創建線程池。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(),