前言:在最新的阿里規范中強制使用ThreadPoolExecutor方式創建線程池,不允許使用Executors,因此有必要對ThreadPoolExecutor進行進一步了解。
1.ThreadPoolExecutor介紹
線程池類,直接看其入參最多的構造函數:
參數意義:
corePoolSize
核心線程數的大小。默認情況下,在創建了線程池之后,線程池中的線程數為0,當有任務到來后,如果線程池中存活的線程數小於corePoolSize,則創建一個線程。
maximumPoolSize
線程池中允許的最大線程數,這個參數表示了線程池中最多能創建的線程數量。當任務數量比corePoolSize大時,任務添加到workQueue,當workQueue滿了,將繼續創建線程以處理任務。maximumPoolSize表示當wordQueue滿了,線程池中最多可以創建的線程數量。
keepAliveTime、unit
當線程池處於空閑狀態時,超過keepAliveTime時間之后,空閑的線程會被終止。只有當線程池中的線程數大於corePoolSize時,這個參數才會起作用,但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;當線程數大於corePoolSize時,如果一個線程的空閑時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。
workQueue
阻塞隊列,存儲提交的等待任務。
threadFactory
線程工廠,指定創建線程的工廠
handler
當任務超出線程池范圍和隊列容量時,采取何種拒絕策略。
對於上述參數,源碼注釋中有很詳細的解釋。這里筆者挑出認為重要的幾段:
這里表明了corePoolSize、maximumPoolSize和workQueue的關系(上述注釋說的非常的清楚,這里稍微翻譯下):
#1.默認情況下,線程池初始化的時候,線程數為0。當接收到新任務時,如果線程池中存活的線程數小於corePoolSize,則新建一個線程。
#2.當運行的線程數超出核心線程數時,執行器更多的選擇是將任務放入隊列中,而不是新建一個線程。
#3.當隊列滿后,任務不能提交到隊列,在不超過maximumPoolSize(最大線程數)的情況下,會創建一個新線程去執行任務,當超過maximumPoolSize時,任務將被拒絕(這里就關聯到接下來說要介紹的內容,在任務操作maximumPoolSize時,線程池所使用拒絕策略)。
當執行器關閉、線程池滿了、隊列滿了,則新任務會被拒絕。使用的拒絕策略有以下幾種:
注釋解釋的非常清楚,線程池采用的拒絕策略共有4種:
#1.AbortPolicy : 默認策略,當任務被拒絕時直接拋出異常RejectedExecutionException。
#2.CallerRunsPolicy : 讓調用者所在的線程來執行任務,這種策略並不會丟棄任務,但是會降低執行器處理任務的速率。
#3.DiscardPolicy : 直接丟棄新任務。
#4.DiscardOldestPolicy : 如果執行器未關閉,刪除隊列中第一個任務,再次執行任務。如果失敗會重試(repeated)。
接下來看線程池的排隊策略。
線程池提供了3種排隊的策略:
#1.直接提交(SynchronousQueue):直接提交任務,不保存任務。直接提交策略無容量限制,但是當任務數量過速增長有可能撐爆“JVM”。在生產中一般不采用此策略。
#2.無界隊列(LinkedBlockingQueue):當所有核心線程都在忙時,用一個無界隊列存放提交的任務。最大線程數設置了也無效。使用無界隊列會保存核心線程處理不了的任務,隊列無上限,因此最大線程數設置了也無效,無界隊列需謹慎使用。
#3.有界隊列(ArrayBlockingQueue):用一個有界隊列幫助防止資源被耗盡,不過調整和控制比較難。因為隊列容量小了,任務不能立即執行,當然需要配合拒絕策略;隊列容量太大,又比較耗費資源。當然在生產環境中一般使用有界隊列的排隊策略,因為使用有界隊列可以保存超過核心線程的任務,並且隊列有上限,超過上限,新建線程拋錯,可以更好的保護資源,防止崩潰。
通過以上分析,可以發現corePoolSize、maximumPoolSize和排隊策略是相互影響的,maximumPoolSize的值並不一定有效。
接下來看看線程池的存活機制
當創建的線程超過核心線程數時,線程池會讓該線程保持存活keepAliveTime時間,超過該時間后會銷毀該線程。默認情況下該值對非核心線程有效,如果想讓核心線程也適用於該機制,可以調用allowCoreThreadTimeOut()方法,但是這樣的話就不存在核心線程的概念了。
綜合以上,線程池在多次執行任務后,會一直維持部分線程存活,即使它是閑置的。目的是為了減少線程銷毀創建的開銷,下次有任務需要執行,直接從池子里拿線程就能用了。但核心線程不能維護太多,因為也需要一定開銷。最大的線程數保護了整個系統的穩定性,避免並發量大的時候,把線程擠滿。工作隊列則是保證了任務順序和暫存,系統的可靠性。線程存活規則的目的和維護核心線程的目的類似,但降低了它的存活的時間。
2.線程狀態控制
ctl變量是整個線程池的核心控制狀態,它是一個AtomicInteger類型的原子對象,它記錄了線程池中生效線程數和線程池的運行狀態。
- workerCount,生效的線程數,基本上可以理解為存活的線程數。
- runState,線程池運行狀態。
ctl總共32位,其中低29位代表workerCount,所以最大線程數為(2^29)-1。高3位代表runState。
runState有5個值:
各值對應的值如下:
RUNNING -- 對應的高3位值是111。
SHUTDOWN -- 對應的高3位值是000。
STOP -- 對應的高3位值是001。
TIDYING -- 對應的高3位值是010。
TERMINATED -- 對應的高3位值是011。
- RUNNING,接收新任務處理隊列任務。
- SHUTDOWN,不接收新任務,但處理隊列任務。
- STOP,不接收新任務,也不處理隊列任務,並且中斷所有處理中的任務。
- TIDYING,所有任務都被終結,有效線程為0,並觸發terminated()方法。
- TERMINATED,當terminated()方法執行結束。
線程池各個狀態之間的切換:
當調用了shutdown(),狀態會從RUNNING變成SHUTDOWN,不再接收新任務,此時會處理完隊列里面的任務。
如果調用的是shutdownNow(),狀態會直接變成STOP。
當線程或者隊列都是空的時候,狀態就會變成TIDYING。
當terminated()執行完的時候,就會變成TERMINATED。
3.關鍵函數解析
execute(Runnable)
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * Proceed in 3 steps: 6 * 7 * 1. If fewer than corePoolSize threads are running, try to 8 * start a new thread with the given command as its first 9 * task. The call to addWorker atomically checks runState and 10 * workerCount, and so prevents false alarms that would add 11 * threads when it shouldn't, by returning false. 12 * 13 * 2. If a task can be successfully queued, then we still need 14 * to double-check whether we should have added a thread 15 * (because existing ones died since last checking) or that 16 * the pool shut down since entry into this method. So we 17 * recheck state and if necessary roll back the enqueuing if 18 * stopped, or start a new thread if there are none. 19 * 20 * 3. If we cannot queue task, then we try to add a new 21 * thread. If it fails, we know we are shut down or saturated 22 * and so reject the task. 23 */ 24 int c = ctl.get(); 25 if (workerCountOf(c) < corePoolSize) { 26 if (addWorker(command, true)) 27 return; 28 c = ctl.get(); 29 } 30 if (isRunning(c) && workQueue.offer(command)) { 31 int recheck = ctl.get(); 32 if (! isRunning(recheck) && remove(command)) 33 reject(command); 34 else if (workerCountOf(recheck) == 0) 35 addWorker(null, false); 36 } 37 else if (!addWorker(command, false)) 38 reject(command); 39 }
execute函數的主要流程源碼中的注釋已經講得非常清楚了。
- 如果少於核心線程在運行,則嘗試創建一個新的線程。
- 如果任務成功入隊,需再次檢查線程池狀態看是否需要入隊,因為在入隊過程中,有可能狀態發生變化;如果確認入隊但沒有存活線程,則新建一個空線程。
- 如果不能入隊,則嘗試新創建一個線程,如果失敗,則拒絕任務。
- 注意在第二步最后會新建一個線程,這里會有一個輪詢機制讓下個task出隊,然后直接利用這個空閑線程。
在execute中我們主要關注addWorker()函數。
首先看下該函數的整體注釋了解其大致流程。
- 該函數會檢查當前線程池是否可以創建worker(線程)。
- 當線程池stop或者shut down,又或者線程工廠創建線程失敗時都會返回false。
- 在線程創建失敗時,會進行回滾。
- 注意core參數:true表示以corePoolSize作為參照,false表示以maximumPoolSize為參照。
接下來分析addWorker源碼:
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: // 標記,表示跳出循環時,從哪里開始執行,類似於goto 3 for (;;) { 4 int c = ctl.get(); // 獲取ctl對應的值,“生效線程數”和“線程池狀態” 5 int rs = runStateOf(c); // 獲取線程池狀態 6 7 // Check if queue empty only if necessary. 8 // 如果該if判斷想要返回false,隊列為空為必要條件,因為addWorker()不只是在接收新任務會調用到,處理隊列的任務也會調用到。在線程池狀態為SHUTDOWN時還會處理隊列中的任務,所以隊列不為空會繼續向下執行 9 if (rs >= SHUTDOWN && 10 ! (rs == SHUTDOWN && 11 firstTask == null && 12 ! workQueue.isEmpty())) 13 return false; 14 /* 內循環意義:判斷worker是否符合corePoolSize和maximumPoolSize定義,不滿足則返回false;
然后利用CAS自增workerCount,如果CAS成功則退出循環;
如果CAS失敗會繼續自旋,在自旋過程中會檢查線程池狀態,如果發生變化,則回退到外層循環,重新執行。 15 因此內循環的主要作用就是讓workerCount在符合條件下自增。 16 */ 17 for (;;) { 18 int wc = workerCountOf(c); 19 if (wc >= CAPACITY || 20 wc >= (core ? corePoolSize : maximumPoolSize)) 21 return false; 22 if (compareAndIncrementWorkerCount(c)) 23 break retry; 24 c = ctl.get(); // Re-read ctl 25 if (runStateOf(c) != rs) 26 continue retry; 27 // else CAS failed due to workerCount change; retry inner loop 28 } 29 } 30 31 boolean workerStarted = false; 32 boolean workerAdded = false; 33 Worker w = null; 34 // 這段代碼的主要功能:添加任務到線程池,並啟動任務所在的線程 35 try { 36 // 創建一個Worker對象,包含一個由線程工廠創建的線程和一個需執行的任務 37 w = new Worker(firstTask); 38 final Thread t = w.thread; 39 if (t != null) { 40 // 線程創建成功 獲取一個可重入鎖,把Worker對象放入worker成員變量中 41 final ReentrantLock mainLock = this.mainLock; 42 mainLock.lock(); 43 try { 44 // Recheck while holding lock. 45 // Back out on ThreadFactory failure or if 46 // shut down before lock acquired. 47 int rs = runStateOf(ctl.get()); 48 // 檢查線程池狀態和線程狀態 49 if (rs < SHUTDOWN || 50 (rs == SHUTDOWN && firstTask == null)) { 51 if (t.isAlive()) // precheck that t is startable 52 throw new IllegalThreadStateException(); 53 workers.add(w); // 將Worker變量加入workers中(集合) 54 // 更新largestPoolSize 55 int s = workers.size(); 56 if (s > largestPoolSize) 57 largestPoolSize = s; 58 workerAdded = true; 59 } 60 } finally { 61 mainLock.unlock(); 62 } 63 // 如果任務添加成功,則啟動任務所在的線程 64 if (workerAdded) { 65 t.start(); 66 workerStarted = true; 67 } 68 } 69 } finally { 70 // 如果任務添加失敗則執行addWorkerFailed進行回滾 71 if (! workerStarted) 72 addWorkerFailed(w); 73 } 74 return workerStarted; 75 }
addWorkerFailed(Worker),任務添加失敗回滾函數:
1 private void addWorkerFailed(Worker w) { 2 final ReentrantLock mainLock = this.mainLock; 3 // 加鎖回滾 4 mainLock.lock(); 5 try { 6 if (w != null) 7 workers.remove(w); // 回滾workers 8 decrementWorkerCount();// 回滾workerCount 9 tryTerminate();// 判斷線程池狀態,是否需要終結線程池 10 } finally { 11 mainLock.unlock(); 12 } 13 }
4.總結
ThreadPoolExecutor我們主要關注其addWorker方法,對於其他方法,可翻看源碼,比較好理解。
核心要點:
- 當核心線程忙碌時,線程池更傾向於把任務放進隊列,而不是新建線程。
- 三種不同的排隊策略,根據選擇隊列的不同,maximumPoolSize不一定有用的。
- ctl是線程池的核心控制狀態,包含的runState線程池運行狀態和workCount有效線程數。
- retry:是一種標記循環的語法,retry可以是任何變量命名合法字符。
by Shawn Chen,2019.02.16,下午。