摘要:從創建線程池的源碼來深入分析究竟有哪些方式可以創建線程池。
本文分享自華為雲社區《【高並發】從源碼角度分析創建線程池究竟有哪些方式》,作者:冰 河 。
在Java的高並發領域,線程池一直是一個繞不開的話題。有些童鞋一直在使用線程池,但是,對於如何創建線程池僅僅停留在使用Executors工具類的方式,那么,創建線程池究竟存在哪幾種方式呢?就讓我們一起從創建線程池的源碼來深入分析究竟有哪些方式可以創建線程池。
使用Executors工具類創建線程池
在創建線程池時,初學者用的最多的就是Executors 這個工具類,而使用這個工具類創建線程池時非常簡單的,不需要關注太多的線程池細節,只需要傳入必要的參數即可。Executors 工具類提供了幾種創建線程池的方法,如下所示。
- Executors.newCachedThreadPool:創建一個可緩存的線程池,如果線程池的大小超過了需要,可以靈活回收空閑線程,如果沒有可回收線程,則新建線程
- Executors.newFixedThreadPool:創建一個定長的線程池,可以控制線程的最大並發數,超出的線程會在隊列中等待
- Executors.newScheduledThreadPool:創建一個定長的線程池,支持定時、周期性的任務執行
- Executors.newSingleThreadExecutor: 創建一個單線程化的線程池,使用一個唯一的工作線程執行任務,保證所有任務按照指定順序(先入先出或者優先級)執行
- Executors.newSingleThreadScheduledExecutor:創建一個單線程化的線程池,支持定時、周期性的任務執行
- Executors.newWorkStealingPool:創建一個具有並行級別的work-stealing線程池
其中,Executors.newWorkStealingPool方法是Java 8中新增的創建線程池的方法,它能夠為線程池設置並行級別,具有更高的並發度和性能。除了此方法外,其他創建線程池的方法本質上調用的是ThreadPoolExecutor類的構造方法。
例如,我們可以使用如下代碼創建線程池。
Executors.newWorkStealingPool(); Executors.newCachedThreadPool(); Executors.newScheduledThreadPool(3);
使用ThreadPoolExecutor類創建線程池
從代碼結構上看ThreadPoolExecutor類繼承自AbstractExecutorService,也就是說,ThreadPoolExecutor類具有AbstractExecutorService類的全部功能。
既然Executors工具類中創建線程池大部分調用的都是ThreadPoolExecutor類的構造方法,所以,我們也可以直接調用ThreadPoolExecutor類的構造方法來創建線程池,而不再使用Executors工具類。接下來,我們一起看下ThreadPoolExecutor類的構造方法。
ThreadPoolExecutor類中的所有構造方法如下所示。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
由ThreadPoolExecutor類的構造方法的源代碼可知,創建線程池最終調用的構造方法如下。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
關於此構造方法中各參數的含義和作用,各位可以移步《高並發之——不得不說的線程池與ThreadPoolExecutor類淺析》進行查閱。
大家可以自行調用ThreadPoolExecutor類的構造方法來創建線程池。例如,我們可以使用如下形式創建線程池。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
使用ForkJoinPool類創建線程池
在Java8的Executors工具類中,新增了如下創建線程池的方式。
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
從源代碼可以可以,本質上調用的是ForkJoinPool類的構造方法類創建線程池,而從代碼結構上來看ForkJoinPool類繼承自AbstractExecutorService抽象類。接下來,我們看下ForkJoinPool類的構造方法。
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
通過查看源代碼得知,ForkJoinPool的構造方法,最終調用的是如下私有構造方法。
private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
其中,各參數的含義如下所示。
- parallelism:並發級別。
- factory:創建線程的工廠類對象。
- handler:當線程池中的線程拋出未捕獲的異常時,統一使用UncaughtExceptionHandler對象處理。
- mode:取值為FIFO_QUEUE或者LIFO_QUEUE。
- workerNamePrefix:執行任務的線程名稱的前綴。
當然,私有構造方法雖然是參數最多的一個方法,但是其不會直接對外方法,我們可以使用如下方式創建線程池。
new ForkJoinPool(); new ForkJoinPool(Runtime.getRuntime().availableProcessors()); new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
使用ScheduledThreadPoolExecutor類創建線程池
在Executors工具類中存在如下方法類創建線程池。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
從源碼來看,這幾個方法本質上調用的都是ScheduledThreadPoolExecutor類的構造方法,ScheduledThreadPoolExecutor中存在的構造方法如下所示。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
而從代碼結構上看,ScheduledThreadPoolExecutor類繼承自ThreadPoolExecutor類,本質上還是調用ThreadPoolExecutor類的構造方法,只不過此時傳遞的隊列為DelayedWorkQueue。我們可以直接調用ScheduledThreadPoolExecutor類的構造方法來創建線程池,例如以如下形式創建線程池。
new ScheduledThreadPoolExecutor(3)