多線程編程學習十一(ThreadPoolExecutor 詳解).


一、ThreadPoolExecutor 參數說明

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize:核心線程池的大小。當提交一個任務到線程池時,核心線程池會創建一個核心線程來執行任務,即使其他核心線程能夠執行新任務也會創建線程,等到需要執行的任務數大於核心線程池基本大小時就不再創建。如果調用了線程池的 prestartAllCoreThreads() 方法,核心線程池會提前創建並啟動所有核心線程。

  • workQueue:任務隊列。當核心線程池中沒有線程時,所提交的任務會被暫存在隊列中。Java 提供了多種阻塞隊列

  • maximumPoolSize:線程池允許創建的最大線程數。如果隊列也滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的空閑線程執行任務。值得注意的是,如果使用了無界的任務隊列則這個參數不起作用。

  • keepAliveTime:當線程池中的線程數大於 corePoolSize 時,keepAliveTime 為多余的空閑線程等待新任務的最長時間,超過這個時間后多余的線程將被終止。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高線程的利用率。值得注意的是,如果使用了無界的任務隊列則這個參數不起作用。

  • TimeUnit:線程活動保持時間的單位。

  • threadFactory:創建線程的工廠。可以通過線程工廠給每個創建出來的線程設置符合業務的名字。

    // 依賴 guava
    new ThreadFactoryBuilder().setNameFormat("xx-task-%d").build();
    
  • handler:飽和策略。當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。Java 提供了以下4種策略:

    • AbortPolicy:默認。直接拋出異常。
    • CallerRunsPolicy:只用調用者所在線程來運行任務。
    • DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。

tips: 一般我們稱核心線程池中的線程為核心線程,這部分線程不會被回收;超過任務隊列后,創建的線程為空閑線程,這部分線程會被回收(回收時間即 keepAliveTime)

二、常見的 ThreadPoolExecutor 介紹

Executors 是創建 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的工廠類。

Java 提供了多種類型的 ThreadPoolExecutor,比較常見的有 FixedThreadPool、SingleThreadExecutor、CachedThreadPool等。

FixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

FixedThreadPool 被稱為可重用固定線程數的線程池。可以看到 corePoolSize 和 maximumPoolSize 都被設置成了 nThreads;keepAliveTime設置為0L,意味着多余的空閑線程會被立即終止;使用了阻塞隊列 LinkedBlockingQueue 作為線程的工作隊列(隊列的容量為 Integer.MAX_VALUE)。

FixedThreadPool 所存在的問題是,由於隊列的容量為 Integer.MAX_VALUE,基本可以認為是無界的,所以 maximumPoolSize 和 keepAliveTime 參數都不會生效,飽和拒絕策略也不會執行,會造成任務大量堆積在阻塞隊列中。

FixedThreadPool 適用於為了滿足資源管理的需求,而需要限制線程數量的應用場景。

SingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

SingleThreadExecutor 是使用單個線程的線程池。可以看到 corePoolSize 和 maximumPoolSize 被設置為1,其他參數與 FixedThreadPool 相同,所以所帶來的風險也和 FixedThreadPool 一致,就不贅述了。

SingleThreadExecutor 適用於需要保證順序的執行各個任務。

CachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool 是一個會根據需要創建新線程的線程池。可以看到 corePoolSize 被設置為 0,所以創建的線程都為空閑線程;maximumPoolSize 被設置為 Integer.MAX_VALUE(基本可認為無界),意味着可以創建無限數量的空閑線程;keepAliveTime 設置為60L,意味着空閑線程等待新任務的最長時間為60秒;使用沒有容量的 SynchronousQueue 作為線程池的工作隊列。

CachedThreadPool 所存在的問題是, 如果主線程提交任務的速度高於maximumPool 中線程處理任務的速度時,CachedThreadPool 會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU和內存資源。

CachedThreadPool 適用於執行很多的短期異步任務的小程序,或者是負載較輕的服務器。

三、自建 ThreadPoolExecutor 線程池

鑒於上面提到的風險,我們更提倡使用 ThreadPoolExecutor 去創建線程池,而不用 Executors 工廠去創建。

以下是一個 ThreadPoolExecutor 創建線程池的 Demo 實例:

public class Pool {

    static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pool-task-%d").build();
    static ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
            200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
            threadFactory, new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 無返回值的任務執行 -> Runnable
        executor.execute(() -> System.out.println("Hello World"));
        // 2. 有返回值的任務執行 -> Callable
        Future<String> future = executor.submit(() -> "Hello World");
        // get 方法會阻塞線程執行等待返回結果
        String result = future.get();
        System.out.println(result);

        // 3. 監控線程池
        monitor();

        // 4. 關閉線程池
        shutdownAndAwaitTermination();

        monitor();
    }

    private static void monitor() {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
        System.out.println("【線程池任務】線程池中曾經創建過的最大線程數:" + threadPoolExecutor.getLargestPoolSize());
        System.out.println("【線程池任務】線程池中線程數:" + threadPoolExecutor.getPoolSize());
        System.out.println("【線程池任務】線程池中活動的線程數:" + threadPoolExecutor.getActiveCount());
        System.out.println("【線程池任務】隊列中等待執行的任務數:" + threadPoolExecutor.getQueue().size());
        System.out.println("【線程池任務】線程池已執行完任務數:" + threadPoolExecutor.getCompletedTaskCount());
    }

    /**
     * 關閉線程池
     * 1. shutdown、shutdownNow 的原理都是遍歷線程池中的工作線程,然后逐個調用線程的 interrupt 方法來中斷線程。
     * 2. shutdownNow:將線程池的狀態設置成 STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表。
     * 3. shutdown:將線程池的狀態設置成 SHUTDOWN 狀態,然后中斷所有沒有正在執行任務的線程。
     */
    private static void shutdownAndAwaitTermination() {
        // 禁止提交新任務
        executor.shutdown();
        try {
            // 等待現有任務終止
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                // 取消當前正在執行的任務
                executor.shutdownNow();
                // 等待一段時間讓任務響應被取消
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            // 如果當前線程也中斷,則取消
            executor.shutdownNow();
            // 保留中斷狀態
            Thread.currentThread().interrupt();
        }
    }
}

創建線程池需要注意以下幾點:

  1. CPU 密集型任務應配置盡可能小的線程,如配置 Ncpu+1 個線程。
  2. IO 密集型任務(數據庫讀寫等)應配置盡可能多的線程,如配置 Ncpu*2 個線程。
  3. 優先級不同的任務可以使用優先級隊列 PriorityBlockingQueue 來處理。
  4. 建議使用有界隊列。可以避免創建數量非常多的線程,甚至拖垮系統。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。

四、ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor。它主要用來在給定的延遲之后運行任務,或者定期執行任務。

    public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
    }

ScheduledThreadPoolExecutor 的功能與 Timer 類似,但功能更強大、更靈活。Timer 對應的是單個后台線程,而ScheduledThreadPoolExecutor 可以在構造函數中指定多個對應的后台線程數。

Java 提供了多種類型的 ScheduledThreadPoolExecutor ,可以通過 Executors 創建,比較常見的有 ScheduledThreadPool、SingleThreadScheduledExecutor 等。適用於需要多個后台線程執行周期任務,同時為了滿足資源管理的需求而需要限制后台線程數量的應用場景。

public class ScheduleTaskTest {

    static ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").build();
    static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5, threadFactory);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 延遲 3 秒后執行 Runnable 方法
        scheduledExecutorService.schedule(() -> System.out.println("Hello World"), 3000, TimeUnit.MILLISECONDS);

        // 2. 延遲 3 秒后執行 Callable 方法
        ScheduledFuture<String> scheduledFuture = scheduledExecutorService.schedule(() -> "Hello ScheduledFuture", 3000, TimeUnit.MILLISECONDS);
        System.out.println(scheduledFuture.get());

        // 3. 延遲 1 秒后開始每隔 3 秒周期執行。
        //    如果中間任務遇到異常,則禁止后續執行。
        //    固定的頻率來執行某項任務,它不受任務執行時間的影響。到時間,就執行。
        scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("Hello ScheduleAtFixedRate"), 1, 3000, TimeUnit.MILLISECONDS);

        // 4. 延遲 1 秒后,每個任務結束延遲 3 秒后再執行下個任務。
        //    如果中間任務遇到異常,則禁止后續執行。
        //    受任務執行時間的影響,等待任務執行結束后才開始計算延遲。
        scheduledExecutorService.scheduleWithFixedDelay(() -> System.out.println("Hello ScheduleWithFixedDelay"), 1, 3000, TimeUnit.MILLISECONDS);
    }
}

ScheduledThreadPoolExecutor 的執行步驟大抵如下:

  1. 當調用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay()方 法時,會向 DelayedWorkQueue 隊列添加 ScheduledFutureTask 任務。
  2. 線程池中的線程從 DelayedWorkQueue隊列中獲取執行時間已到達的 ScheduledFutureTask,然后執行任務。
  3. 線程修改 ScheduledFutureTask 任務的執行時間為下次將要被執行的時間。
  4. 線程把修改后的 ScheduledFutureTask 重新放回隊列。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM