一:類繼承結構
繼承關系
二:構造函數
構造函數
(1)線程池的大小除了顯示的限制外,還可能由於其他資源上的約束而存在一些隱式限制。比如JDBC連接池。
(2)運行時間較長的任務。
如果任務阻塞的時間過長,即使不出現死鎖,線程池的響應性也會變得糟糕。執行時間較長的任務不僅會造成線程池阻塞,甚至還會增加執行時間。如果線程池中線程的數量遠小於在穩定狀態下執行時間較長任務的數量,那么到最后可能所有的線程都會運行這些執行時間較長的任務,從而影響整體的響應性。
有一項技術可以緩解執行時間較長任務造成的影響,即限定任務等待資源的時間,而不要無限制地等待。在平台類庫的大多數可阻塞方法中,都同時定義了限時版本和無限時版本,例如:Thread.join,BlockingQueue.put,CountDownLatch.await以及Selector.select等。如果等待超時,可以把任務標識為失敗,然后中止任何或者將任務重新放回隊列以便隨后執行。如果在線程池中總是充滿了唄阻塞的任務,那么也可能表示線程池的規模過小。
(3)設置線程池的大小
(3.1)線程池的理想大小取決於被提交任務的類型以及所部署系統的特性。在代碼中不會固定線程池的大小,而應該通過某種配置機制來提供,或者根據Runtime.availableProcessors來動態計算。
(3.2)要設置線程池的大小也並不困難,只需要避免“過大”或“過小”這兩種極端情況。如果設置過大,那么大量的線程將在相對很少的CPU和內存資源上發生競爭,這不僅會導致更高的內存使用量,而且還可能耗盡資源。如果設置過小,那么將導致很多空閑的處理器無法執行工作,從而降低吞吐率。
(3.3)要想正確設置線程池的大小,必須分析計算環境,資源預算和任務的特性。在部署的系統中有多少CPU?多大的內存?計算是計算密集型、I/O密集型還是二者皆可?它們是否需要像JDBC連接這樣的稀缺資源?如果需要執行不同類別的任務,並且它們之間的行為相差很大,那么應該考慮使用多個線程池,從而使每個線程池可以根據自己的工作負載來調整。
(3.4)對於計算密集型的任務,在擁有Ncpu個處理器的系統上,當線程池的大小為Ncpu+1時,通常能實現最優的利用率。(計算當計算密集型的線程偶爾由於頁缺失故障或者其他原因而暫停時,這個“額外”的線程也能確保CPU的時鍾周期不會被浪費)
(3.5)對於包含I/O操作或者其他阻塞操作的任務,你必須估算出任務的等待時間與計算時間的比值。這種估算不需要很精確,並且可以通過一些分析或者監控工具來獲得。你還可以通過另一種方法來調節線程池的大小:在某個基准負載下,分別設置不同大小的線程池來運行應用程序,並觀察CPU利用率的水平。給定如下列定義:
Ncpu = number of CPUs
Ucpu = target CPU utilization,0 <= Ucpu <= 1
W/C = ratio of wait time to compute time
要使處理器達到期望的使用率,線程池的最優大小等於:
Nthreads = Ncpu * Ucpu * (1 + W/C)
可以通過Runtime來獲得CPU的數目:
int N_CPUS = Runtime.getRuntime().availableProcessors();
(4)參數解析
- corePoolSize
線程池的基本大小(線程池的目標大小),默認情況下會一直存活,即使處於閑置狀態也不會受keepAliveTime限制,除非將allowCoreThreadTimeOut設置為true。
- maximumPoolSize
最大線程池大小,表示可同時活動的線程數量的上限。
- keepAliveTime
如果某個線程的空閑時間超過了存活時間,那么將被標記為可回收的,並且當線程池的當前大小超過了基本大小時,這個線程將被終止。
分析:線程池的基本大小(corePoolSize)、最大大小(maximumPoolSize)以及存活時間等因素共同負責線程的創建與銷毀。通過調節線程池的基本大小和存活時間,可以幫助線程池回收空閑線程占用的資源,從而使得這些資源可以用於執行其他工作。
三:基本實現
(1)newCachedThreadPool()
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
線程池的基本大小設置為零,最大大小設置為Integer.MAX_VALUE,線程池可以被無限擴展,需求降低時自動收縮,最大大小設置過大在某些情況下也是缺點。
(2)newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
缺點是LinkedBlockingQueue是無界隊列,有些情況下排隊的任務會很多。
(3)newScheduledThreadExecutor()
/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } /** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given core pool size. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
(4)newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
總結:都由Executors類的靜態方法統一提供,如Executors.newCachedThreadPool(),底層通過ThreadPoolExecutor來實現。ThreadPoolExecutor提供了很多構造函數,它是一個靈活的、穩定的線程池,允許進行各種定制。
四:管理隊列任務
(1)單線程的Executor是一種值得注意的特例:它們能確保不會有任務並發執行,因為它們通過線程封閉來實現線程安全性。
(2)如果無限制地創建線程,將會導致不穩定性。可以通過采用固定大小的線程池(而不是每收到一個請求就創建一個新線程)來解決這個問題。然而,這個方案並不完整。在高負載的情況下,應用程序仍可能耗盡資源,知識問題的概率較小。如果新請求的到達超過了線程池的處理效率,那么新到來的請求將累計起來。在線程池中,這些請求會在一個由Executor管理的Runable隊列中等待,而不會像線程那樣去競爭CPU資源,通過一個Runnable和一個鏈表節點來表現一個等待中的任務,當然比使用線程來表示的開銷低得多,但如果客戶提交給服務器請求的效率超過了服務器的處理效率,那么仍可能會耗盡資源。
(3)ThreadPoolExecutor允許提供一個BlockingQueue來保存等待執行的任務。基本的任務排隊方法有3種:無界隊列、有界隊列和同步移交(Synchronous Handoff)。隊列的選擇與其他的配置參數有關,例如線程池的大小。
(4)newFixedThreadPool和newSingleThreadExecutor在默認的情況下將使用一個無界的LinkedBlockingQueue。如果所有工作者線程都處於忙碌狀態,那么任務將在隊列中等候。如果任務持續快速地到達,並且超過了線程池處理它們的速度,那么隊列將無限制地增加。
(5)一種更穩妥的資源管理策略是使用有界隊列,例如ArrayBlockingQueue、有界的LinkedBlockQueue、PriorityBlockingQueue。有界隊列有助於避免資源耗盡的情況發生,但它又帶來了新的問題:當隊列填滿后,新的任務該怎么辦?(有許多飽和策略可以解決這個問題)在使用有界
的工作隊列時,隊列的大小和線程池的大小必須一起調節。如果線程池較小而隊列較大,那么有助於減少內存使用量,降低CPU的使用率,同時還可以減少上下文切換,但付出的代價是可能會限制吞吐量。
(6)對於非常大的或者無界的線程池,可以通過使用SynchronousQueue來避免任務排隊,以及直接將任務從生產者移交給工作者線程。SynchronousQueue不是一個真正的隊列,而是一種在線程之間進行移交的機制。要將一個元素放入SynchronousQueue中,必須有另一個線程正在等待接受這個元素。
(7)當使用像LinkedBlockingQueue或ArrayBlockingQueue這樣的FIFO(先進先出)隊列時,任務的執行順序與它們的到達順序相同。如果想進一步控制任務執行順序,還可以使用PriorityBlockingQueue,這個隊列將根據優先級來安排任務。
(8)只有當任務相互獨立時,為線程池或工作隊列設置界限才是合理的。如果任務之間存在依賴性,那么有界的線程池或隊列就可能導致線程“飢餓”死鎖問題。此時應該使用無界的線程池,例如newCacheThreadPool。
五:飽和策略
(1)當有界隊列被填滿后,飽和策略開始發揮作用。ThreadPoolExecutor的飽和策略可以通過調用setRejectedExecutionHander來修改。(如果某個任務被提交到一個已被關閉的Executor時,也會用到飽和策略。)JDK提供了幾種不同的RejectedExecutionHandler實現,每種實現都包含不同的飽和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。
(2)中止(Abort)策略是默認的飽和策略,該策略將拋出未檢查的RejectedExecutionException。調用者可以捕獲這個異常,然后根據需要編寫自己的處理代碼。
(3)當新提交的任務無法保存到隊列中等待執行時,“拋棄(Discard)”策略會悄悄拋棄該任務。
(4)“拋棄最舊的(Discard-Oldest)”策略則會拋棄下一個將被執行的任務,然后嘗試重新提交新的任務。(如果工作隊列是一個優先隊列,那么“拋棄最舊的”策略將導致拋棄優先級最高的任務,因此最好不要將“拋棄最舊的”飽和策略和優先隊列放在一起使用。)
(5)“調用者運行(Caller-Runs)”策略實現了一種調度機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而降低新任務的流量。它不會在線程池的某個線程中執行新提交的任務,而是在一個調用了execute的線程中執行該任務。如果采用有界隊列和“調用者運行”飽和策略,當線程池中的所有線程都被占用,並且工作隊列被填滿后,下一個任務會在調用execute時在主線程中執行。由於執行任務需要一定的時間,因此主線程至少在一段時間內不能提交任何任務,從而使得工作者線程有時間來處理完正在執行的任務。在此期間,主線程不會調用accept,因此到達的請求將被保存在TCP層的隊列中而不是在應用程序的隊列中。如果持續過載,那么TCP層將最終發現它的請求隊列被填滿,因此同樣會開始拋棄請求。當服務器過載時,這種過載情況會逐漸向外蔓延開來——從線程池到工作隊列到應用程序再到TCP層,最終達到客戶端,導致服務器在高負載下實現一種平緩的性能降低。
可通過如下方式設置飽和策略:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
六:線程工廠
(1)每當線程池需要創建一個線程時,都是通過線程工廠方法來完成的。默認的線程工廠方法將創建一個新的、非守護的線程,並且不包含特殊的配置信息。通過指定一個線程工廠方法,可以定制線程池的配置信息。在ThreadFactory中只定義了一個方法newThread,每當線程池需要創建一個新線程時都會調用這個方法。
(2)許多情況下都需要使用定制的線程工廠方法。例如,你希望為線程池中的線程指定一個UncaughtExceptionHandler,或者實例化一個定制的Thread類用於執行調試信息的記錄。你還可能希望修改線程的優先級(這通常並不是一個好主意)或者守護狀態(同樣,這也不是一個好主意)。或許你只是希望給線程取一個更有意義的名稱,用來解釋線程的轉儲信息和錯誤日志。
public interface ThreadFactory { /** * Constructs a new {@code Thread}. Implementations may also initialize * priority, name, daemon status, {@code ThreadGroup}, etc. * * @param r a runnable to be executed by new thread instance * @return constructed thread, or {@code null} if the request to * create a thread is rejected */ Thread newThread(Runnable r); }
通過實現該接口,可以定制自己的線程池工廠方法。
七:調用構造函數后再定制ThreadPoolExecutor
(1)在調用完ThreadPoolExecutor的構造函數后,仍然可以通過設置函數(setter)來修改大多數傳遞給它的構造函數的參數(例如線程池的基本大小、最大大小、存活時間、線程工廠以及拒絕執行處理器)。如果Executor是通過Executors中的某個(newSingleThreadExecutor除外)工廠方法創建的,那么可以將結果的類型轉換為ThreadPoolExecutor以訪問設置器,如下:
ExecutorService exec = Executors.newCachedThreadPool();
if ( exec instanceof ThreadPoolExecutor)
((ThreadPoolExecutor) exec).setCorePoolSize(10);
else
throw new AssertionError("Oops, bad assumption");
八:擴展ThreadPoolExecutor
(1)ThreadPoolExecutor是可擴展的,它提供了幾個可以在子類化中改寫的方法:beforeExecute、afterExecute和 terminated,這些方法可以用於擴展ThreadPoolExecutor的行為。
(2)在執行任務的線程中將調用beforeExecute 和 afterExecute等方法,在這些方法中還可以添加日志、計時、監視或統計信息收集的功能。無論任務是從run中正常返回還是拋出一個異常而返回,afterExecute都會被調用。(如果任何在調用完成后帶有一個Error,那么就不會調用afterExecute。)如果beforeExecute拋出一個RuntimeException,那么任務將不被執行,並且afterExecute也不會被調用。
(3)在線程池完成關閉操作時調用terminated,也就是在所有任務都已經完成並且所有工作者線程也已經關閉后。terminated可以用來釋放Executor在其生命周期里分配的各種資源,此外還可以執行發送通知、記錄日志或者收集finalize統計信息等操作。
