在研究Executors提供的線程池時自然會想到標題這個問題,既然已經有了newFixedThreadPool,為什么還要存在newSingleThreadExecutor這個方法。難道newFixedThreadPool(1)不是只有一個線程(Single Thread)的?本文將通過分析JDK中的相關源碼回答這個問題。
源碼分析
寫JDK代碼的大佬們早就預料到了我們會有此疑問,在newSingleThreadExecutor給我們解釋了一下:Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.
這個解釋說明newSingleThreadExecutor和newFixedThreadPool(1)確實是有區別的,區別在於newSingleThreadExecutor返回的線程池保證不能被重新配置(重新調整線程池大小等)。這又引出了新的問題,難 newFixedThreadPool(1) 創建的線程池是可配置的?它不是線程池數量固定的么?為什么newSingleThreadExecutor是不可重新配置的?
帶着這些問題,找到了這兩個方法的源碼:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue < Runnable > ()); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new gc(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue < Runnable > ())); }
從源碼可以看出,兩者及其相似,newFixedThreadPool返回了一個ThreadPoolExecutor對象,newSingleThreadExecutor返回了一個被FinalizableDelegatedExecutorService包裝過的ThreadPoolExecutor對象,連ThreadPoolExecutor對象的參數值都一樣的。問題就在了包裝上,一層層的查看代碼,發現最里面的一層是DelegatedExecutorService。可以知道的是,ThreadPoolExecutor和包裝ThreadPoolExecutor對象的類都直接或間接實現了ThreadPoolExecutor接口。為了方便分析,我們先生成新相關類的類圖。
從類圖中可以看出,ThreadPoolExecutor和DeletagedExecutorService之間是並列關系,並非繼承關系。再查看二者的方法,會發現DeletagedExecutorService只有一個構造方法,構造方法可以傳入ExecutorService的引用。其它方法都僅僅是調用構造方法傳入對象中對應的方法。而ThreadPoolExecutor中有很多的其它具體實現的方法。

/** * A wrapper class that exposes only the ExecutorService methods * of an ExecutorService implementation. */ static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }

/** * Sets the core number of threads. This overrides any value set * in the constructor. If the new value is smaller than the * current value, excess existing threads will be terminated when * they next become idle. If larger, new threads will, if needed, * be started to execute any queued tasks. * * @param corePoolSize the new core size * @throws IllegalArgumentException if {@code corePoolSize < 0} * @see #getCorePoolSize */ public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0) { // We don't really know how many new threads are "needed". // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so. int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } }
推論與實驗驗證
由此可以得知:
DelegatedExecutorService 其實是專門對實現了ExecutorService接口的類的對象進行包裝的,包裝之后的對象僅僅暴露ExecutorService接口中的方法。上面的 newSingleThreadExecutor() 方法中,FinalizableDelegatedExecutorService(繼承DelegatedExecutorService)對ThreadPoolExecutor對象進行了包裝,把諸如setCorePoolSize等方法給拿掉了,因此也就不具備ThreadPoolExecutor設置線程池屬性的功能,所以說 newSingleThreadExecutor() 返回的線程池能夠保證不能被重新配置。
// 獲取一個 Single Thread Executor ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); System.out.println(singleThreadExecutor instanceof ThreadPoolExecutor);//輸出:false
那為什么 newFixedThreadPool(1) 返回的線程池是可以重新配置呢?這個問題很簡單,newFixedThreadPool返回的是一個ThreadPoolExecutor對象,ExecutorService引用指向該對象。因此可以通過強轉的方式將它專為ThreadPoolExecutor的引用,然后通過該引用來對線程池重新進行配置。
// 獲取一個容量為 1 的 FixedThreadPool ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1); // 定義任務組 tasks1 List<Runnable> tasks1 = Arrays.asList( ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task1");}, ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task2");}); // 往 FixedThreadPool 中提交 tasks1。此時因為線程池的容量為1,所以兩個任務由1個線程執行。 tasks1.forEach(fixedThreadPool::submit); // 等待前面兩個任務結束 Thread.sleep(1000L); // 定義任務組 tasks2 List<Runnable> tasks2 = Arrays.asList( ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task3");}, ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task4");}); System.out.println(fixedThreadPool instanceof ThreadPoolExecutor); // 輸出 true // 將 ExecutorService 強轉為 ThreadPoolExecutor ThreadPoolExecutor configurableFixedThreadPool = (ThreadPoolExecutor) fixedThreadPool; // 改變容量 configurableFixedThreadPool.setCorePoolSize(2); // 提交任務組 tasks2。此時由於線程池的容量變成了2,所以tasks2中的兩個任務將分別由不同的線程執行(極端情況下也可能由一個線程執行,但此時線程池容量切切實實變成了2)。 tasks2.forEach(fixedThreadPool::submit); // 關閉線程池 fixedThreadPool.shutdown(); // 等待任務執行結束 fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
輸出:
Thread ID:14---> Task1 Thread ID:14---> Task2 true Thread ID:14---> Task4 Thread ID:15---> Task3
上面的兩個實驗可以回答前面的問題。容量為1的FixedThreadPool的屬性(容量等)可以通過將其強轉為ThreadPoolExecutor而被重新進行配置;而SingleThreadPool實際是一個FinalizableDelegatedExecutorService類的對象,由於該類沒有繼承任何可以配置線程池的類,因此可以保證它不能被再次配置。
小結
雖然SingleThreadPool與容量為1的FixedThreadPool的區別只在於一個可重新配置,一個不可重新配置;但是在按照需求寫代碼的時候:如果確實要用到容量為1的線程池,應該使用SingleThreadPool而不是用容量為1的FixedThreadPool。后者有一個隱患,如果開始設置的任務數是1,任務與任務之間本質是串行執行的,也就是說,一個任務得等到前面一個任務執行結束之后再執行。而如果后面有人寫代碼的時候擴大了容量為1的FixedThreadPool,那么修改之前,已經提交的但還未被執行的任務,可能被分到其它線程中去執行。這樣,原本應該串行執行任務變成了並行執行,如果任務之間沒有依賴還好,一旦有依賴,邏輯就錯亂了。
上面的第2個實驗中,將“等待前面兩個任務執行結束”的那行sleep代碼注釋掉就可以驗證這個問題。task1 和 task2本來是要在1個線程中執行的,而后面由於修改了容量,這兩個任務也有一定幾率在不同的線程中執行。
// 獲取一個容量為 1 的 FixedThreadPool ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1); // 定義任務組 tasks1 List<Runnable> tasks1 = Arrays.asList( ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task1");}, ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task2");}); // 往 FixedThreadPool 中提交 tasks1。此時因為線程池的容量為1,所以兩個任務由1個線程執行。 tasks1.forEach(fixedThreadPool::submit); // 等待前面兩個任務結束 //Thread.sleep(1000L); // 定義任務組 tasks2 List<Runnable> tasks2 = Arrays.asList( ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task3");}, ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task4");}); System.out.println(fixedThreadPool instanceof ThreadPoolExecutor); // 輸出 true // 將 ExecutorService 強轉為 ThreadPoolExecutor ThreadPoolExecutor configurableFixedThreadPool = (ThreadPoolExecutor) fixedThreadPool; // 改變容量 configurableFixedThreadPool.setCorePoolSize(2); // 提交任務組 tasks2。此時由於線程池的容量變成了2,所以tasks2中的兩個任務將分別由不同的線程執行(極端情況下也可能由一個線程執行,但此時線程池容量切切實實變成了2)。 tasks2.forEach(fixedThreadPool::submit); // 關閉線程池 fixedThreadPool.shutdown(); // 等待任務執行結束 fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
一種輸出:
true Thread ID:14---> Task1 Thread ID:16---> Task3 Thread ID:16---> Task2 Thread ID:16---> Task4