Java 中 Executors.newSingleThreadExecutor() 與Executors.newFixedThreadPool(1)有什么區別


在研究Executors提供的線程池時自然會想到標題這個問題,既然已經有了newFixedThreadPool,為什么還要存在newSingleThreadExecutor這個方法。難道newFixedThreadPool(1)不是只有一個線程(Single Thread)的?本文將通過分析JDK中的相關源碼回答這個問題。

源碼分析

寫JDK代碼的大佬們早就預料到了我們會有此疑問,在newSingleThreadExecutor給我們解釋了一下:Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.

這個解釋說明newSingleThreadExecutor和newFixedThreadPool(1)確實是有區別的,區別在於newSingleThreadExecutor返回的線程池保證不能被重新配置(重新調整線程池大小等)。這又引出了新的問題,難 newFixedThreadPool(1) 創建的線程池是可配置的?它不是線程池數量固定的么?為什么newSingleThreadExecutor是不可重新配置的?

帶着這些問題,找到了這兩個方法的源碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue < Runnable > ());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new gc(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue < Runnable > ()));
}

從源碼可以看出,兩者及其相似,newFixedThreadPool返回了一個ThreadPoolExecutor對象,newSingleThreadExecutor返回了一個被FinalizableDelegatedExecutorService包裝過的ThreadPoolExecutor對象,連ThreadPoolExecutor對象的參數值都一樣的。問題就在了包裝上,一層層的查看代碼,發現最里面的一層是DelegatedExecutorService。可以知道的是,ThreadPoolExecutor和包裝ThreadPoolExecutor對象的類都直接或間接實現了ThreadPoolExecutor接口。為了方便分析,我們先生成新相關類的類圖。

 從類圖中可以看出,ThreadPoolExecutor和DeletagedExecutorService之間是並列關系,並非繼承關系。再查看二者的方法,會發現DeletagedExecutorService只有一個構造方法,構造方法可以傳入ExecutorService的引用。其它方法都僅僅是調用構造方法傳入對象中對應的方法。而ThreadPoolExecutor中有很多的其它具體實現的方法。

    /**
     * A wrapper class that exposes only the ExecutorService methods
     * of an ExecutorService implementation.
     */
    static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }
DelegatedExecutorService 類源碼
    /**
     * Sets the core number of threads.  This overrides any value set
     * in the constructor.  If the new value is smaller than the
     * current value, excess existing threads will be terminated when
     * they next become idle.  If larger, new threads will, if needed,
     * be started to execute any queued tasks.
     *
     * @param corePoolSize the new core size
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @see #getCorePoolSize
     */
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }
ThreadPoolExecutor類中的setCorePoolSize方法

推論與實驗驗證

由此可以得知:

DelegatedExecutorService 其實是專門對實現了ExecutorService接口的類的對象進行包裝的,包裝之后的對象僅僅暴露ExecutorService接口中的方法。上面的 newSingleThreadExecutor() 方法中,FinalizableDelegatedExecutorService(繼承DelegatedExecutorService)對ThreadPoolExecutor對象進行了包裝,把諸如setCorePoolSize等方法給拿掉了,因此也就不具備ThreadPoolExecutor設置線程池屬性的功能,所以說 newSingleThreadExecutor() 返回的線程池能夠保證不能被重新配置。

// 獲取一個 Single Thread Executor
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
System.out.println(singleThreadExecutor instanceof ThreadPoolExecutor);//輸出:false

那為什么 newFixedThreadPool(1) 返回的線程池是可以重新配置呢?這個問題很簡單,newFixedThreadPool返回的是一個ThreadPoolExecutor對象,ExecutorService引用指向該對象。因此可以通過強轉的方式將它專為ThreadPoolExecutor的引用,然后通過該引用來對線程池重新進行配置。

// 獲取一個容量為 1 的 FixedThreadPool
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
// 定義任務組 tasks1
List<Runnable> tasks1 = Arrays.asList(
        ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task1");},
        ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task2");});
// 往 FixedThreadPool 中提交 tasks1。此時因為線程池的容量為1,所以兩個任務由1個線程執行。
tasks1.forEach(fixedThreadPool::submit);
// 等待前面兩個任務結束
Thread.sleep(1000L);
// 定義任務組 tasks2
List<Runnable> tasks2 = Arrays.asList(
        ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task3");},
        ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task4");});
System.out.println(fixedThreadPool instanceof ThreadPoolExecutor); // 輸出 true
// 將 ExecutorService 強轉為 ThreadPoolExecutor
ThreadPoolExecutor configurableFixedThreadPool = (ThreadPoolExecutor) fixedThreadPool;
// 改變容量
configurableFixedThreadPool.setCorePoolSize(2);
// 提交任務組 tasks2。此時由於線程池的容量變成了2,所以tasks2中的兩個任務將分別由不同的線程執行(極端情況下也可能由一個線程執行,但此時線程池容量切切實實變成了2)。
tasks2.forEach(fixedThreadPool::submit);
// 關閉線程池
fixedThreadPool.shutdown();
// 等待任務執行結束
fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);

輸出:

Thread ID:14---> Task1
Thread ID:14---> Task2
true
Thread ID:14---> Task4
Thread ID:15---> Task3

上面的兩個實驗可以回答前面的問題。容量為1的FixedThreadPool的屬性(容量等)可以通過將其強轉為ThreadPoolExecutor而被重新進行配置;而SingleThreadPool實際是一個FinalizableDelegatedExecutorService類的對象,由於該類沒有繼承任何可以配置線程池的類,因此可以保證它不能被再次配置。

小結

雖然SingleThreadPool與容量為1的FixedThreadPool的區別只在於一個可重新配置,一個不可重新配置;但是在按照需求寫代碼的時候:如果確實要用到容量為1的線程池,應該使用SingleThreadPool而不是用容量為1的FixedThreadPool。后者有一個隱患,如果開始設置的任務數是1,任務與任務之間本質是串行執行的,也就是說,一個任務得等到前面一個任務執行結束之后再執行。而如果后面有人寫代碼的時候擴大了容量為1的FixedThreadPool,那么修改之前,已經提交的但還未被執行的任務,可能被分到其它線程中去執行。這樣,原本應該串行執行任務變成了並行執行,如果任務之間沒有依賴還好,一旦有依賴,邏輯就錯亂了。

上面的第2個實驗中,將“等待前面兩個任務執行結束”的那行sleep代碼注釋掉就可以驗證這個問題。task1 和 task2本來是要在1個線程中執行的,而后面由於修改了容量,這兩個任務也有一定幾率在不同的線程中執行。

// 獲取一個容量為 1 的 FixedThreadPool
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
// 定義任務組 tasks1
List<Runnable> tasks1 = Arrays.asList(
        ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task1");},
        ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task2");});
// 往 FixedThreadPool 中提交 tasks1。此時因為線程池的容量為1,所以兩個任務由1個線程執行。
tasks1.forEach(fixedThreadPool::submit);
// 等待前面兩個任務結束
//Thread.sleep(1000L);
// 定義任務組 tasks2
List<Runnable> tasks2 = Arrays.asList(
        ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task3");},
        ()->{System.out.println("Thread ID:" + Thread.currentThread().getId() + "---> Task4");});
System.out.println(fixedThreadPool instanceof ThreadPoolExecutor); // 輸出 true
// 將 ExecutorService 強轉為 ThreadPoolExecutor
ThreadPoolExecutor configurableFixedThreadPool = (ThreadPoolExecutor) fixedThreadPool;
// 改變容量
configurableFixedThreadPool.setCorePoolSize(2);
// 提交任務組 tasks2。此時由於線程池的容量變成了2,所以tasks2中的兩個任務將分別由不同的線程執行(極端情況下也可能由一個線程執行,但此時線程池容量切切實實變成了2)。
tasks2.forEach(fixedThreadPool::submit);
// 關閉線程池
fixedThreadPool.shutdown();
// 等待任務執行結束
fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);

一種輸出:

true
Thread ID:14---> Task1
Thread ID:16---> Task3
Thread ID:16---> Task2
Thread ID:16---> Task4


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM