前言
在Java的高並發領域,線程池一直是一個繞不開的話題。有些童鞋一直在使用線程池,但是,對於如何創建線程池僅僅停留在使用Executors工具類的方式,那么,創建線程池究竟存在哪幾種方式呢?就讓我們一起從創建線程池的源碼來深入分析究竟有哪些方式可以創建線程池。
使用Executors工具類創建線程池
在創建線程池時,初學者用的最多的就是Executors 這個工具類,而使用這個工具類創建線程池時非常簡單的,不需要關注太多的線程池細節,只需要傳入必要的參數即可。Executors 工具類提供了幾種創建線程池的方法,如下所示。
- Executors.newCachedThreadPool:創建一個可緩存的線程池,如果線程池的大小超過了需要,可以靈活回收空閑線程,如果沒有可回收線程,則新建線程
- Executors.newFixedThreadPool:創建一個定長的線程池,可以控制線程的最大並發數,超出的線程會在隊列中等待
- Executors.newScheduledThreadPool:創建一個定長的線程池,支持定時、周期性的任務執行
- Executors.newSingleThreadExecutor: 創建一個單線程化的線程池,使用一個唯一的工作線程執行任務,保證所有任務按照指定順序(先入先出或者優先級)執行
- Executors.newSingleThreadScheduledExecutor:創建一個單線程化的線程池,支持定時、周期性的任務執行
- Executors.newWorkStealingPool:創建一個具有並行級別的work-stealing線程池
其中,Executors.newWorkStealingPool方法是Java 8中新增的創建線程池的方法,它能夠為線程池設置並行級別,具有更高的並發度和性能。除了此方法外,其他創建線程池的方法本質上調用的是ThreadPoolExecutor類的構造方法。
例如,我們可以使用如下代碼創建線程池。
Executors.newWorkStealingPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool(3);
使用ThreadPoolExecutor類創建線程池
從代碼結構上看ThreadPoolExecutor類繼承自AbstractExecutorService,也就是說,ThreadPoolExecutor類具有AbstractExecutorService類的全部功能。
既然Executors工具類中創建線程池大部分調用的都是ThreadPoolExecutor類的構造方法,所以,我們也可以直接調用ThreadPoolExecutor類的構造方法來創建線程池,而不再使用Executors工具類。接下來,我們一起看下ThreadPoolExecutor類的構造方法。
ThreadPoolExecutor類中的所有構造方法如下所示。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
由ThreadPoolExecutor類的構造方法的源代碼可知,創建線程池最終調用的構造方法如下。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
關於此構造方法中各參數的含義和作用,如下所示。
注意:為了更加深入的分析ThreadPoolExecutor類的構造方法,會適當調整參數的順序進行解析,以便於大家更能深入的理解ThreadPoolExecutor構造方法中每個參數的作用。
上述構造方法接收如下參數進行初始化:
(1)corePoolSize:核心線程數量。
(2)maximumPoolSize:最大線程數。
(3)workQueue:阻塞隊列,存儲等待執行的任務,很重要,會對線程池運行過程產生重大影響。
其中,上述三個參數的關系如下所示:
- 如果運行的線程數小於corePoolSize,直接創建新線程處理任務,即使線程池中的其他線程是空閑的。
- 如果運行的線程數大於等於corePoolSize,並且小於maximumPoolSize,此時,只有當workQueue滿時,才會創建新的線程處理任務。
- 如果設置的corePoolSize與maximumPoolSize相同,那么創建的線程池大小是固定的,此時,如果有新任務提交,並且workQueue沒有滿時,就把請求放入到workQueue中,等待空閑的線程,從workQueue中取出任務進行處理。
- 如果運行的線程數量大於maximumPoolSize,同時,workQueue已經滿了,會通過拒絕策略參數rejectHandler來指定處理策略。
根據上述三個參數的配置,線程池會對任務進行如下處理方式:
當提交一個新的任務到線程池時,線程池會根據當前線程池中正在運行的線程數量來決定該任務的處理方式。處理方式總共有三種:直接切換、使用無限隊列、使用有界隊列。
- 直接切換常用的隊列就是SynchronousQueue。
- 使用無限隊列就是使用基於鏈表的隊列,比如:LinkedBlockingQueue,如果使用這種方式,線程池中創建的最大線程數就是corePoolSize,此時maximumPoolSize不會起作用。當線程池中所有的核心線程都是運行狀態時,提交新任務,就會放入等待隊列中。
- 使用有界隊列使用的是ArrayBlockingQueue,使用這種方式可以將線程池的最大線程數量限制為maximumPoolSize,可以降低資源的消耗。但是,這種方式使得線程池對線程的調度更困難,因為線程池和隊列的容量都是有限的了。
根據上面三個參數,我們可以簡單得出如何降低系統資源消耗的一些措施:
- 如果想降低系統資源的消耗,包括CPU使用率,操作系統資源的消耗,上下文環境切換的開銷等,可以設置一個較大的隊列容量和較小的線程池容量。這樣,會降低線程處理任務的吞吐量。
- 如果提交的任務經常發生阻塞,可以考慮調用設置最大線程數的方法,重新設置線程池最大線程數。如果隊列的容量設置的較小,通常需要將線程池的容量設置的大一些,這樣,CPU的使用率會高些。如果線程池的容量設置的過大,並發量就會增加,則需要考慮線程調度的問題,反而可能會降低處理任務的吞吐量。
接下來,我們繼續看ThreadPoolExecutor的構造方法的參數。
(4)keepAliveTime:線程沒有任務執行時最多保持多久時間終止
當線程池中的線程數量大於corePoolSize時,如果此時沒有新的任務提交,核心線程外的線程不會立即銷毀,需要等待,直到等待的時間超過了keepAliveTime就會終止。
(5)unit:keepAliveTime的時間單位
(6)threadFactory:線程工廠,用來創建線程
默認會提供一個默認的工廠來創建線程,當使用默認的工廠來創建線程時,會使新創建的線程具有相同的優先級,並且是非守護的線程,同時也設置了線程的名稱
(7)rejectHandler:拒絕處理任務時的策略
如果workQueue阻塞隊列滿了,並且沒有空閑的線程池,此時,繼續提交任務,需要采取一種策略來處理這個任務。
線程池總共提供了四種策略:
- 直接拋出異常,這也是默認的策略。實現類為AbortPolicy。
- 用調用者所在的線程來執行任務。實現類為CallerRunsPolicy。
- 丟棄隊列中最靠前的任務並執行當前任務。實現類為DiscardOldestPolicy。
- 直接丟棄當前任務。實現類為DiscardPolicy。
大家可以自行調用ThreadPoolExecutor類的構造方法來創建線程池。例如,我們可以使用如下形式創建線程池。
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
使用ForkJoinPool類創建線程池
在Java8的Executors工具類中,新增了如下創建線程池的方式。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
從源代碼可以可以,本質上調用的是ForkJoinPool類的構造方法類創建線程池,而從代碼結構上來看ForkJoinPool類繼承自AbstractExecutorService抽象類。接下來,我們看下ForkJoinPool類的構造方法。
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
通過查看源代碼得知,ForkJoinPool的構造方法,最終調用的是如下私有構造方法。
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
其中,各參數的含義如下所示。
- parallelism:並發級別。
- factory:創建線程的工廠類對象。
- handler:當線程池中的線程拋出未捕獲的異常時,統一使用UncaughtExceptionHandler對象處理。
- mode:取值為FIFO_QUEUE或者LIFO_QUEUE。
- workerNamePrefix:執行任務的線程名稱的前綴。
當然,私有構造方法雖然是參數最多的一個方法,但是其不會直接對外方法,我們可以使用如下方式創建線程池。
new ForkJoinPool();
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
使用ScheduledThreadPoolExecutor類創建線程池
在Executors工具類中存在如下方法類創建線程池。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
從源碼來看,這幾個方法本質上調用的都是ScheduledThreadPoolExecutor類的構造方法,ScheduledThreadPoolExecutor中存在的構造方法如下所示。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
而從代碼結構上看,ScheduledThreadPoolExecutor類繼承自ThreadPoolExecutor類,本質上還是調用ThreadPoolExecutor類的構造方法,只不過此時傳遞的隊列為DelayedWorkQueue。我們可以直接調用ScheduledThreadPoolExecutor類的構造方法來創建線程池,例如以如下形式創建線程池。
new ScheduledThreadPoolExecutor(3)
最后,需要注意的是:ScheduledThreadPoolExecutor主要用來創建執行定時任務的線程池。