一、線程池的作用
線程池類似於數據庫鏈接池、Redis鏈接池等池化技術。池化技術的優點如下:
1. 統一管理資源,線程是操作系統一個重要監控管理指標,過多的線程會導致占用內存、上下文切換頻繁等問題,所以需要管理起來線程,而每處都用new Thread()方法來創建線程,那線程資源散落在應用程序各地,沒法管理。
2. 不需要每次要用到線程時都再次創建一個新的線程,可以做到線程重用。線程池默認初始化時是沒有創建線程的(也可以在創建線程池時自動創建好核心線程),線程池里的線程的初始化與其他線程一樣,但是在完成任務以后,該線程不會自行銷毀,而是以掛起的狀態返回到線程池。直到應用程序再次向線程池發出請求時,線程池里掛起的線程就會再度激活執行任務。這樣既節省了建立線程所造成的性能損耗,也可以讓多個任務反復重用同一線程,從而在應用程序生存期內節約大量開銷。
二、Java中提供的創建線程池的API
先來解釋一下每個參數的作用,稍后我們在分析源碼的過程中再來詳細了解參數的意義。
public ThreadPoolExecutor(int corePoolSize, // 核心線程數 int maximumPoolSize,// 最大線程數 long keepAliveTime,// 非核心線程數空閑時,回收時長 TimeUnit unit,// 回收時長單位 BlockingQueue<Runnable> workQueue,// 阻塞隊列 RejectedExecutionHandler handler/** 拒絕策略*/) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
1. 線程數少於核心線程數(也就是設置的線程數)時,新建線程執行任務;
2. 線程數等於核心線程數后,將任務加入阻塞隊列;
3. 由於隊列容量非常大,所以可以一直添加;
4. 執行完任務的線程反復去隊列中取任務執行;
用途:FixedThreadPool 用於負載比較大的服務器,為了資源的合理利用,需要限制當前線程數量
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, // 不會有非核心線程,所以回收時間間隔為0 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); }
2. CachedThreadPool:核心線程數為0,然后任務進入SynchronousQueue阻塞隊列,最后在由非核心線程來處理其余的任務(在60秒內非核心線程處理完后可以繼續服用)。(先來的先做,后來的全部找外包干,來多少活就找多少外包,反正老子有的是錢,結果最后老板跑路,發生了著名的OOM事件)
最大線程數和非核心線程數是Integer.MAX_VALUE,不會有拒絕策略。所有線程執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就被回收。
它的執行流程如下:
1. 沒有核心線程,直接向 SynchronousQueue 中提交任務
2. 如果有空閑的非核心線程,就去取出任務執行;如果沒有空閑的非核心線程,就新建一個
3. 執行完任務的非核心線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就被回收
/* * 核心線程數為0(沒有核心線程,直接向 SynchronousQueue 中提交任務), * 最大線程數是Integer.MAX_VALUE,不會有拒絕策略,導致大量線程的創建出現 CPU 使用過高或者 OOM 的問題 * 執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就被回收 */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); }
3. newSingleThreadExecutor():核心線程數=最大線程數=1,使用無界阻塞隊列。(老子就一個人,慢慢做,先來先做,后來的全部去排隊)
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
三、線程池的實現原理分析:
源碼分析:
ThreadPoolExecutor的execute()方法:
/**
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* 線程池初始化時是沒有創建線程的,線程池里的線程的初始化與其他線程一樣,但是在完成任務以后,該線程不會自行銷毀,
* 而是以掛起的狀態返回到線程池。直到應用程序再次向線程池發出請求時,線程池里掛起的線程就會再度激活執行任務。
* 這樣既節省了建立線程所造成的性能損耗,也可以讓多個任務反復重用同一線程,從而在應用程序生存期內節約大量開銷
*
* 默認情況下,創建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創建線程。
* 在實際中如果需要線程池創建之后立即創建線程,可以通過以下兩個方法辦到:
* prestartCoreThread():初始化一個核心線程
* prestartAllCoreThreads():初始化所有核心線程
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.當前池中線程比核心數少,新建一個線程執行任務
if (workerCountOf(c) < corePoolSize) {
// 創建新的線程並執行任務,如果成功就返回
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.核心池已滿但任務隊列未滿,將任務添加到隊列中
if (isRunning(c) && workQueue.offer(command)) {
//重新獲取ctl
int recheck = ctl.get();
//任務成功添加到隊列以后,再次檢查是否需要添加新的線程,因為已存在的線程可能被銷毀了
//如果線程池處於非運行狀態,並且把當前的任務從任務隊列中移除成功,則拒絕該任務
if (!isRunning(recheck) && remove(command))
reject(command);
//如果之前的線程已被銷毀完,新建一個線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
/*
* 如果執行到這里,有兩種情況:
* 1. 線程池已經不是RUNNING狀態;
* 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize並且workQueue已滿。
*/
} else if (!addWorker(command, false))
// 如果線程池是非RUNNING狀態或者加入阻塞隊列失敗,則嘗試創建新非核心線程(外包)直到maxPoolSize
// 創建非核心線程失敗,則啟動拒絕策略
reject(command);
}
其中ctl就是一個AutomicInteger的變量,用於存儲線程數量(低29位)和線程池的狀態(高3位)。
線程池狀態如下:
/** * 即高3位為111,該狀態的線程池會接收新任務,並處理阻塞隊列中的任務; * 111 0 0000 0000 0000 0000 0000 0000 0000 * -1 原碼:0000 ... 0001 反碼:1111 ... 1110 補碼:1111 ... 1111 * 左移操作:后面補 0 * 111 0 0000 0000 0000 0000 0000 0000 0000 */ private static final int RUNNING = -1 << COUNT_BITS; /** * 即高3位為000,該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務; * 000 0 0000 0000 0000 0000 0000 0000 0000 */ private static final int SHUTDOWN = 0 << COUNT_BITS; /** * 即高3位為001,該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務; * 001 0 0000 0000 0000 0000 0000 0000 0000 */ private static final int STOP = 1 << COUNT_BITS; /** * 即高3位為010,所有任務都已終止,workerCount為零,過渡到狀態TIDYING的線程將運行terminated()鈎子方法; * 010 0 0000 0000 0000 0000 0000 0000 0000 */ private static final int TIDYING = 2 << COUNT_BITS; /** * 即高3位為011,terminated()方法執行完畢; * 011 0 0000 0000 0000 0000 0000 0000 0000 */ private static final int TERMINATED = 3 << COUNT_BITS;
/** * firstTask參數用於表示怎么獲取線程處理的任務,true為傳入的任務,false表示從阻塞隊列獲取任務 * core參數為true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize, * false表示新增線程前需要判斷當前活動線程數是否少於maximumPoolSize */ private boolean addWorker(Runnable firstTask, boolean core) { // 內嵌循環,通過CAS worker + 1 retry: for (; ; ) { // 獲取當前線程池狀態與線程數 int c = ctl.get(); // 獲取當前線程狀態 int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 這個if判斷 * 如果線程池處於SHUTDOWN,STOP,TIDYING,TERMINATED的時候,則表示此時不再接收新任務; * 接着判斷以下3個條件,只要有1個不滿足,則返回false: * 1. rs == SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務 * 2. firsTask為空 * 3. 阻塞隊列不為空 * * 首先考慮rs == SHUTDOWN的情況 * 這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會返回false; * 然后,如果firstTask為空,並且workQueue也為空,則返回false, * 因為隊列中已經沒有任務了,不需要再添加線程了 */ if (rs >= SHUTDOWN && !(rs == SHUTDOWN && // 不在接受新的任務 firstTask == null && // 隊列中已經沒有任務了,不需要再添加線程了 !workQueue.isEmpty())) return false; // 增加工作線程數 for (; ; ) { // 線程數量 int wc = workerCountOf(c); // 如果當前線程數大於線程最大上限CAPACITY return false // 創建核心線程則與 corePoolSize 比較,否則與 maximumPoolSize 比較 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 嘗試增加workerCount,如果成功,則跳出第一個for循環 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 創建一個新的線程並執行 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; // 新建線程,將線程封裝成Worker w = new Worker(firstTask); // 每一個Worker對象都會創建一個線程 final Thread t = w.thread; if (t != null) { // 將任務添加到workers Queue中 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); // 線程池狀態 int rs = runStateOf(c); // rs < SHUTDOWN表示是RUNNING狀態; if (rs < SHUTDOWN || // rs是SHUTDOWN狀態並且firstTask為null,向線程池中添加線程。 // 因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務 (rs == SHUTDOWN && firstTask == null)) { // 當前線程已經啟動,拋出異常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers是一個HashSet<Worker> workers.add(w); // 設置最大的池大小largestPoolSize,workerAdded設置為true int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 啟動線程 if (workerAdded) { // 啟動時會調用Worker類中的run方法,Worker本身實現了Runnable接口,所以一個Worker類型的對象也是一個線程 t.start(); workerStarted = true; } } } finally { // 線程啟動失敗 if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker 類繼承了 AQS並實現了 Runnable 接口,注意其中的 firstTask 和 thread 屬性:
firstTask 用它來保存傳入的任務;thread 是在調用構造方法時通過 ThreadFactory 來創建的線程,是用來處理任務的線程。
在調用構造方法時,需要傳入任務,這里通過 getThreadFactory().newThread(this) 來新建一個線程,newThread 方法傳入的參數是 this,因為 Worker 本身繼承了 Runnable 接口,所以一個 Worker 對象在啟動的時候會調用 Worker 類中的 run 方法。
/** * 1. 如果 task 不為空,則開始執行 task * 2. 如果 task 為空則通過 getTask()再去取任務,並賦值給 task;如果取到的 Runnable 不為空則執行該任務 * 3. 執行完畢后,通過 while 循環繼續 getTask()取任務 * 4. 如果 getTask()取到的任務依然是空,那么整個 runWorker()方法執行完畢 */ final void runWorker(Worker w) { // 獲取當前線程 Thread wt = Thread.currentThread(); // 獲取第一個任務 Runnable task = w.firstTask; w.firstTask = null; // 釋放鎖,運行中斷 w.unlock(); // allow interrupts //是否突然完成,如果是由於異常導致的進入finally,那么completedAbruptly==true就是突然完成的 boolean completedAbruptly = true; try { // 調用getTask()方法從阻塞隊列中獲取新任務,如果阻塞隊列為空則根據是否超時來判斷是否需要阻塞 while (task != null || (task = getTask()) != null) { /** * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! * 上鎖,不是為了防止並發執行任務,為了在shutdown()時不終止正在運行的 worker * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! */ w.lock(); /** * 如果線程池正在停止,那么要保證當前線程是中斷狀態; * 如果不是的話,則要保證當前線程不是中斷狀態; */ if ((runStateAtLeast(ctl.get(), STOP) || /** * 線程池為 stop 狀態時不接受新任務,不執行已經加入任務隊列的任務,還中斷正在執 * 行的任務,所以對於 stop 狀態以上是要中斷線程的 */ (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 設置中斷標志 wt.interrupt(); try { //自定義實現 beforeExecute(wt, task); Throwable thrown = null; try { // 執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //自定義實現 afterExecute(task, thrown); } } finally { task = null; // 完成任務數 + 1 w.completedTasks++; // 釋放鎖 w.unlock(); } } completedAbruptly = false; } finally { // 獲取不到任務時,主動回收自己 // 線程回收的工作是在processWorkerExit方法完成的 processWorkerExit(w, completedAbruptly); } }