1 前言
線程池是並發編程中一個重要的概念和技術。大多數異步或並發執行任務都會用到線程池。 線程池,正如其名,它是有一定數量的線程的池子,它會執行被提交過來的任務,執行完一個任務后不會馬上結束,它們會繼續等待或執行新的任務。線程池有兩個重要的概念一個是任務隊列,另一個是工作者線程 。任務隊列是存放任務的容器,工作者線程會依次不斷地到隊列中獲取任務並執行。
線程池有這些優點:
-
① 減少系統資源的消耗。它通過對線程的重用,避免不斷創建新線程導致的系統開銷。任務過多時,通過排隊避免創建過多的線程,減少系統資源的消耗與競爭,確保任務有序完成。
-
②提高響應速度。當任務到達時,任務無需等待線程的創建完成,它得利用已有的線程立即執行任務。
-
③提高線程的可控性。線程是稀缺資源,不能無限制地創建,線程池它對線程能統一分配、調度和銷毀。
線程池直接繼承於抽象類AbstractExecutorService
,AbstractExecutorService
是對ExecutorSerivice
接口的默認實現,而ExecutorService
又擴展了Executor
接口。
2 處理任務的流程
線程池對任務的處理有它自己定義的流程,它對任務的處理流程如下:
①線程池判斷核心線程池里的線程是否都在執行任務。如果不是,則創建一個新的工作線程來執行任務(線程池在開始階段會盡快讓池中的線程數達到設定的核心線程數)。如果核心線程池里的線程都在執行任務,則進入下個流程。
②線程池判斷工作隊列是否已經滿。如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里(盡量先往阻塞隊列中放)。如果工作隊列滿了,則進入下個流程。
③線程池判斷線程池的線程是否都處於工作狀態。如果沒有,則創建一個新的工作線程來執行任務(工作隊列放不下了,只有創建新線程來執行任務)。如果已經滿了,則交給飽和策略來處理這個任務。
其基本理念是:①在線程池啟動階段,盡快讓池中的線程數達到設定的核心線程數,這里主要從能利用已有線程立即執行之后提交的新任務、避免創建線程而等待的角度考慮;② 在核心線程池滿了之后,盡可能向阻塞隊列中放入任務,這里是從減少資源消耗的角度考慮,畢竟線程是稀缺資源、不能無限制地創建;③在阻塞隊列已滿的情況下,已經無法再往隊列中放入任務了,此時只能創建新的線程去執行任務,雖然創建線程會消耗系統資源,但是總不能不執行提交的任務;④而在最壞的情況下,線程池中的線程數也達到了設定的最大線程數,此時已無法直接執行任務了,只能按照指定的飽和策略來拒絕任務。
3 線程池的配置
ThreadPoolExecutor
有4個構造方法,分別需要若干個參數,我們主要通過構造方法參數去配置線程池。我們從其參數個數最多的構造方法看起,其他的構造方法都是直接調用這個構造方法來實現的。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
這個構造方法有7個參數:
1) corePoolSize : 核心線程數. 提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建。另外還可以調用setCorePoolSize(int)
方法來設置核心線程數。
默認情況下,核心線程不會從預告創建,只有有任務時才創建;核心線程不會因空閑而終止。但以下幾個API可以改變這種默認方式。
int prestartAllCoreThreads()
方法,線程池會提前創建並啟動所有核心線程。
boolean prestartCoreThread()
方法,創建一個核心線程,若所有核心線程已創建則返回false.
allowCoreThreadTimeOut(boolean)
方法,若參數是true,核心線程會因空閑和終止(和其他非核心線程一樣,使用keepAliveTime參數作為最大空閑存活時間)。
2) maximumPoolSize: 線程池可創建的最大線程數。如果隊列已滿,線程池將創建新的線程執行任務直到達到這個最大線程數。若是使用無界阻塞隊列,隊列永遠也不會滿,它不會創建新線程,它會一直往隊列中放任務,其結果是一些任務長時間等待、難以被執行。另外還可以調用setMaximumPoolSize(int)
方法來設置最大線程數
3) keepAliveTime:空閑線程存活時間,設置此參數的目的是釋放多余的線程資源。它表示線程個數大於corePoolSize時,其他額外空閑線程的存活時間。也就是說,一個非核心線程在空閑等待新任務時,會有一個最長等待時間,若等待時間超過了keepAliveTime,這個線程就會被銷毀。若是將此參數設為0,那么所有的線程將一直不會被銷毀。若任務較多且任務執行時間較短,可適當增大此參數,提高線程的利用率,避免反復創建新線程。 另外調用setKeepAliveTime(long , TimeUnit )
方法也可設置空閑線程存活時間。
4) unit :參數keepAliveTime的時間單位,可以是“DAYS”(天) 、”HOURS“(時)、“MINUTES”(分)、“SECONDS”(秒)、”MILLISECONDS“(毫秒)、”NANOSECONDS“(納秒)等
5) workQueue:工作隊列, 用於保存等待執行的任務的阻塞隊列。之前的文章並發編程中的阻塞隊列概述有對阻塞隊列做過介紹,這里只對進行SynchronousQueue
特別說明。SynchronousQueue
不存儲元素的阻塞隊列,當嘗試排隊時,只有正好有空閑線程正在等待接受任務時才會入隊成功,否則總是創建新線程執行任務,直到線程數達到maximumPoolSize ,其吞吐量通常要高於LinkedBlockingQueue
。
6) threadFactory :線程工廠,主要用於為創建出來的線程設置優先級、取個有意義的名字、是否守護線程等。另外還可調用setThreadFactory(ThreadFactory)
方法設置線程工廠。ThreadFactory
是一個接口,它的定義是:
public interface ThreadFactory { Thread newThread(Runnable r); }
ThreadPoolExecutor
的默認實現是Executors
工具類的靜態內部DefaultThreadFactory
,這個線程工廠主要是創建一個線程,設置一個名稱,設置daemon為false,將優先級設為標准默認優先級,線程名稱的格式:”pool-線程池編號-thread-線程編號“。
7) handler:拒絕策略. 當線程池和隊列都滿了時,表示線程池已經飽和,此時應采取一些特殊的手段來處理這個新任務。反過來說,拒絕策略只有在隊列有界且maximumPoolSize有限大時才會被觸發。若隊列無界,任務一直往隊列中放置,任務一直處於排隊中,難以得到執行。若隊列有有界、maximumPoolSize無限大,則會創建大量的線程,占滿CPU和內存,可能導致程序或系統崩潰。
默認情況下線程池會使用AbortPolicy策略,此策略會直接拋出異常。線程池內置有4種拒絕策略,這4種拒絕策略都是ThreadPoolExecutor
的靜態內部類。
-
CallerRunsPolicy
, 使用任務提交者的所在線程執行任務; -
AbortPolicy
,直接拋出異常,這是默認的拒絕策略; -
DiscardPolicy
, 不執行任務,將任務丟棄; -
DiscardOldestPolicy
,丟棄隊列中最近的任務,然后執行當前任務。
以上4個類都實現了RejectedExecutionHandler
接口,當線程無法接受新任務時,調用拒絕策略的rejectedExecution
方法進行相應處理。
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
拒絕策略除了在構造方法中指定外,還可調用線程池的setRejectedExecutionHandler
方法進行設置。
4 提交任務
1)線程池有兩組提交單任務的方法
execute(Runnable)
用於提交不需要結果的任務,因此無法確定任務是否完成。 而submit系列方法, submit(Runnable, T)
、submit(Callable<T>)
、submit(Runnable)
都用於提交需要結果的任務。線程池會返回一個Future類型的對象,通過這個Future對象可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()
方法會阻塞當前線程直到任務完成(使用“submit(Runnable)
”提交任務,get()
方法最終返回null),而使用get(long,TimeUnit)
方法則會阻塞當前線程一段時間后立即返回,此時返回可能任務未完成。
2)線程池有兩組批量提交任務組的方法 invokeAll(Collection)
方法用於批量提交任務、等待所有任務完成成后,返回Future的List集合 。invokeAll(Collection, long, TimeUnit)
方法是超時版本的 invokeAll(Collection)
,需要指定超時時間,若超時后還有任務還未完成,這些未完成的任務就會被取消。
invokeAny(Collection)
也用於批量提交任務,但只要有一個任務正常完成(沒拋出異常)后,它就返回此任務的結果;在正常返回或異常拋出返回后,其他任務則會被取消(最多只有一個任務能正常執行完成)。invokeAny(Collection, long , TimeUnit )
是超時版本的invokeAny(Collection)
,它對任務的執行耗時做了限制,如果在限定時間內有一任務正常(沒拋出異常)完成,就返回此任務的結果 ,其他將任務會被取消;如果沒有任務能在限時內成功完成返回,就拋出 TimeoutException; 沒有任務正常成功返回(可能是因發生某種異常而返回),將拋出ExecutionException 。
5 關閉線程池
shutdown()
和shutdownNow()
方法都能關閉線程池,它們的處理邏輯是:遍歷線程池中的工作者線程,然后逐個調用線程的interrupt方法來中斷線程,若某些任務不能響應中斷,那么它們就無法終止。但兩者在細節上有一些區別,shutdownNow()
首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown()
只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。
只要調用了這兩個關閉方法中的任意一個,isShutdown()
方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminated()
方法會返回true。至於應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown()
方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow()
方法。
6 合理配置線程池
要想合理地配置線程池,就必須首先分析任務特性,可以從以下幾個角度來分析。
-
任務的性質:CPU密集型任務、IO密集型任務和混合型任務。
-
任務的優先級:高、中和低。
-
任務的執行時間:長、中和短。
-
任務的依賴性:是否依賴其他系統資源,如數據庫連接
性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務應配置盡可能小的線程,如配置Ncpu +1個線程的線程池。由於IO密集型任務線程並不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu 。混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐量將高於串行執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過Runtime.getRuntime().availableProcessors()
方法獲得當前設備的CPU個數。
優先級不同的任務可以使用優先級隊列PriorityBlockingQueue
來處理。它可以讓優先級高的任務先執行(如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行)
執行時間不同的任務可以交給不同規模的線程池來處理,或者可以使用優先級隊列,讓執行時間短的任務先執行。依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,等待的時間越長,則CPU空閑時間就越長,那么線程數應該設置得越大,這樣才能更好地利用CPU
盡可能使用有界隊列 。有界隊列能增加系統的穩定性和預警能力,如果當時我們設置成無界隊列,隊列永不可能滿,那么線程池的隊列就會越來越多,有可能會導致內存溢出、程序崩潰。
7 狀態監控
為了監控線程池,我們可以使用一些方法獲取線程池的狀態信息。
getTaskCount()
: 計划要執行的任務總數
getCompletedTaskCount()
: 線程池已完成的任務數量
getLargestPoolSize()
: 線程池里曾經創建過的最大線程數量。通過這個數據可以知道線程池是否曾經滿過。如該數值等於線程池的最大大小,則表示線程池曾經滿過。
getActiveCount()
:當前活動(正在執行任務)的工作線程數
getPoolSize()
:當前線程池中的工作線程總數
除此之外,線程池還提供了3個空方法,beforeExecute
方法在執行一個任務前被調用,afterExecute
方法在一個任務完成后被調用,terminated()
方法在線程池停止時被調用。
我們可繼承ThreadPoolExecutor
來實現自己的線程池,並以此為基礎重寫這3個方法來實現自己的監控邏輯。
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }
afterExecute
方法注釋上寫了一個這樣的使用示例,它能打印導致任務非正常完成的異常信息。
class ExtendedExecutor extends ThreadPoolExecutor { // ... protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { Object result = ((Future<?>) r).get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { //在捕獲中斷異常后,中斷標志將設為false ,這里調用interrupt恢復中斷狀態 Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) System.out.println(t); } }
參考:《Java並發編程的藝術》、《Java的邏輯》