相關文章目錄:
Java線程池ThreadPoolExecutor使用和分析(一)
Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理
Java線程池ThreadPoolExecutor使用和分析(三) - 終止線程池原理
線程池是可以控制線程創建、釋放,並通過某種策略嘗試復用線程去執行任務的一種管理框架,從而實現線程資源與任務之間的一種平衡。
以下分析基於 JDK1.7
以下是本文的目錄大綱:
若有不正之處請多多諒解,歡迎批評指正、互相討論。
請尊重作者勞動成果,轉載請標明原文鏈接:
http://www.cnblogs.com/trust-freedom/p/6594270.html
一、線程池架構
概括一下:
Executor是最基礎的執行接口;
ExecutorService接口繼承了Executor,在其上做了一些shutdown()、submit()的擴展,可以說是真正的線程池接口;
AbstractExecutorService抽象類實現了ExecutorService接口中的大部分方法;
TheadPoolExecutor繼承了AbstractExecutorService,是線程池的具體實現;
ScheduledExecutorService接口繼承了ExecutorService接口,提供了帶"周期執行"功能ExecutorService;
ScheduledThreadPoolExecutor既繼承了TheadPoolExecutor線程池,也實現了ScheduledExecutorService接口,是帶"周期執行"功能的線程池;
Executors是線程池的靜態工廠,其提供了快捷創建線程池的靜態方法。
“執行者”接口,只提供了一個方法:
void execute(Runnable command);
可以用來執行已經提交的Runnable任務對象,這個接口提供了一種將“任務提交”與“任務執行”解耦的方法。
“執行者服務”接口,可以說是真正的線程池接口,在Executor接口的基礎上做了一些擴展,主要是
(A) 管理任務如何終止的 shutdown相關方法
/** * 啟動一次有序的關閉,之前提交的任務執行,但不接受新任務 * 這個方法不會等待之前提交的任務執行完畢 */ void shutdown(); /** * 試圖停止所有正在執行的任務,暫停處理正在等待的任務,返回一個等待執行的任務列表 * 這個方法不會等待正在執行的任務終止 */ List<Runnable> shutdownNow(); /** * 如果已經被shutdown,返回true */ boolean isShutdown(); /** * 如果所有任務都已經被終止,返回true * 是否為終止狀態 */ boolean isTerminated(); /** * 在一個shutdown請求后,阻塞的等待所有任務執行完畢 * 或者到達超時時間,或者當前線程被中斷 */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
(B) 可以生成用於追蹤一個或多個異步任務執行結果的Future對象的 submit()相關方法
/** * 提交一個可執行的任務,返回一個Future代表這個任務 * 等到任務成功執行,Future#get()方法會返回null */ Future<?> submit(Runnable task); /** * 提交一個可以執行的任務,返回一個Future代表這個任務 * 等到任務執行結束,Future#get()方法會返回這個給定的result */ <T> Future<T> submit(Runnable task, T result); /** * 提交一個有返回值的任務,並返回一個Future代表等待的任務執行的結果 * 等到任務成功執行,Future#get()方法會返回任務執行的結果 */ <T> Future<T> submit(Callable<T> task);
/**
* 在給定延時后,創建並執行一個一次性的Runnable任務
* 任務執行完畢后,ScheduledFuture#get()方法會返回null
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* 在給定延時后,創建並執行一個ScheduledFutureTask
* ScheduledFuture 可以獲取結果或取消任務
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, ong delay, TimeUnit unit);
/**
* 創建並執行一個在給定初始延遲后首次啟用的定期操作,后續操作具有給定的周期
* 也就是將在 initialDelay 后開始執行,然后在 initialDelay+period 后執行,接着在 initialDelay + 2 * period 后執行,依此類推
* 如果執行任務發生異常,隨后的任務將被禁止,否則任務只會在被取消或者Executor被終止后停止
* 如果任何執行的任務超過了周期,隨后的執行會延時,不會並發執行
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲
* 如果執行任務發生異常,隨后的任務將被禁止,否則任務只會在被取消或者Executor被終止后停止
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
二、ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
線程池中的核心線程數,當提交一個任務時,線程池創建一個新線程執行任務,直到當前線程數等於corePoolSize;
如果當前線程數為corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;
如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有核心線程。
maximumPoolSize
線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小於maximumPoolSize
keepAliveTime
線程空閑時的存活時間,即當線程沒有任務執行時,繼續存活的時間。默認情況下,該參數只在線程數大於corePoolSize時才有用
workQueue
workQueue必須是BlockingQueue阻塞隊列。當線程池中的線程數超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。通過workQueue,線程池實現了阻塞功能
幾種排隊的策略:
(1)不排隊,直接提交
將任務直接交給線程處理而不保持它們,可使用SynchronousQueue
如果不存在可用於立即運行任務的線程(即線程池中的線程都在工作),則試圖把任務加入緩沖隊列將會失敗,因此會構造一個新的線程來處理新添加的任務,並將其加入到線程池中(corePoolSize-->maximumPoolSize擴容)
Executors.newCachedThreadPool()采用的便是這種策略
(2)無界隊列
可以使用LinkedBlockingQueue(基於鏈表的有界隊列,FIFO),理論上是該隊列可以對無限多的任務排隊
將導致在所有corePoolSize線程都工作的情況下將新任務加入到隊列中。這樣,創建的線程就不會超過corePoolSize,也因此,maximumPoolSize的值也就無效了
(3)有界隊列
可以使用ArrayBlockingQueue(基於數組結構的有界隊列,FIFO),並指定隊列的最大長度
使用有界隊列可以防止資源耗盡,但也會造成超過隊列大小和maximumPoolSize后,提交的任務被拒絕的問題,比較難調整和控制。
threadFactory
創建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設置一個具有識別度的線程名
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
Executors靜態工廠里默認的threadFactory,線程的命名規則是“pool-數字-thread-數字”
RejectedExecutionHandler(飽和策略)
線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續提交任務,必須采取一種策略處理該任務,線程池提供了4種策略:
(1)AbortPolicy:直接拋出異常,默認策略;
(2)CallerRunsPolicy:用調用者所在的線程來執行任務;
(3)DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
(4)DiscardPolicy:直接丟棄任務;
當然也可以根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務。
根據ThreadPoolExecutor源碼前面大段的注釋,我們可以看出,當試圖通過execute方法將一個Runnable任務添加到線程池中時,按照如下順序來處理:
(1)如果線程池中的線程數量少於corePoolSize,就創建新的線程來執行新添加的任務;
(2)如果線程池中的線程數量大於等於corePoolSize,但隊列workQueue未滿,則將新添加的任務放到workQueue中,按照FIFO的原則依次等待執行(線程池中有線程空閑出來后依次將隊列中的任務交付給空閑的線程執行);
(3)如果線程池中的線程數量大於等於corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會創建新的線程來處理被添加的任務;
(4)如果線程池中的線程數量等於了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
總結,當有新的任務要處理時,先看線程池中的線程數量是否大於corePoolSize,再看緩沖隊列workQueue是否滿,最后看線程池中的線程數量是否大於maximumPoolSize
另外,當線程池中的線程數量大於corePoolSize時,如果里面有線程的空閑時間超過了keepAliveTime,就將其移除線程池
最后,通過下面的圖來看看線程池中的任務調度策略:
圖1
圖2
說明:
在圖1中,線程池中有N個任務。"任務1", "任務2", "任務3"這3個任務在執行,而"任務3"到"任務N"在阻塞隊列中等待。正在執行的任務,在workers集合中,workers集合包含3個Worker,每一個Worker對應一個Thread線程,Thread線程每次處理一個任務。
當workers集合中處理完某一個任務之后,會從阻塞隊列中取出一個任務來繼續執行,如圖2所示。圖2表示"任務1"處理完畢之后,線程池將"任務4"從阻塞隊列中取出,放到workers中進行處理。
三、Executors靜態工廠創建幾種常用線程池
Exectors工廠類提供了線程池的初始化接口,主要有如下幾種:
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
創建一個指定工作線程數的線程池,其中參數 corePoolSize 和 maximumPoolSize 相等,阻塞隊列基於LinkedBlockingQueue
它是一個典型且優秀的線程池,它具有線程池提高程序效率和節省創建線程時所耗的開銷的優點。但是在線程池空閑時,即線程池中沒有可運行任務時,它也不會釋放工作線程,還會占用一定的系統資源
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
初始化的線程池中只有一個線程,如果該線程異常結束,會重新創建一個新的線程繼續執行任務,唯一的線程可以保證所提交任務的順序執行,內部使用LinkedBlockingQueue作為阻塞隊列
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
創建一個可緩存工作線程的線程池,默認存活時間60秒,線程池的線程數可達到Integer.MAX_VALUE,即2147483647,內部使用SynchronousQueue作為阻塞隊列;
在沒有任務執行時,當線程的空閑時間超過keepAliveTime,則工作線程將會終止,當提交新任務時,如果沒有空閑線程,則創建新線程執行任務,會導致一定的系統開銷
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
初始化的線程池可以在指定的時間內周期性的執行所提交的任務,在實際的業務場景中可以使用該線程池定期的同步數據
注意:
ScheduledExecutorService#scheduleAtFixedRate() 指的是“以固定的頻率”執行,period(周期)指的是兩次成功執行之間的時間
比如,scheduleAtFixedRate(command, 5, 2, second),第一次開始執行是5s后,假如執行耗時1s,那么下次開始執行是7s后,再下次開始執行是9s后
而ScheduledExecutorService#scheduleWithFixedDelay() 指的是“以固定的延時”執行,delay(延時)指的是一次執行終止和下一次執行開始之間的延遲
還是上例,scheduleWithFixedDelay(command, 5, 2, second),第一次開始執行是5s后,假如執行耗時1s,執行完成時間是6s后,那么下次開始執行是8s后,再下次開始執行是11s后
參考資料:



