Java多線程之ThreadPoolTaskExecutor用法


一、簡介

ThreadPoolTaskExecutor線程是Spring的線程池,其底層是依據JDK線程池ThreadPoolExecutor來實現的。

 

二、參數介紹

corePoolSize:線程池維護線程最小的數量,默認為1
maxPoolSize:線程池維護線程最大數量,默認為Integer.MAX_VALUE
keepAliveSeconds:(maxPoolSize-corePoolSize)部分線程空閑最大存活時間,默認存活時間是60s
queueCapacity:阻塞任務隊列的大小,默認為Integer.MAX_VALUE,默認使用LinkedBlockingQueue
allowCoreThreadTimeOut:設置為true的話,keepAliveSeconds參數設置的有效時間對corePoolSize線程也有效,默認是flase
threadFactory::用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給線程池里的線程設置有意義的名字
rejectedExecutionHandler:拒絕策略,當隊列workQueue和線程池maxPoolSize都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。

 

三、拒絕策略

拒絕策略有以下四種,默認情況下是AbortPolicy,也可根據實際業務需求類實現RejectedExecutionHandler接口實現自己的處理策略
1.AbortPolicy:丟棄任務,並且拋出RejectedExecutionException異常;
2.DiscardPolicy:丟棄任務,不處理,不拋出異常;
3.CallerRunsPolicy:直接在execute方法的調用線程中運行被拒絕的任務;
4.DiscardOldestPolicy:丟棄隊列中最前面的任務,然后重新嘗試執行任務。

 

四、處理流程

1.查看核心線程池是否已滿,不滿就創建一條線程執行任務,否則執行第二步。
2.查看任務隊列是否已滿,不滿就將任務存儲在任務隊列中,否則執行第三步。
3.查看線程池是否已滿,即就是是否達到最大線程池數,不滿就創建一條線程執行任務,否則就按照策略處理無法執行的任務。

 

四、代碼示例

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
    /**
     * ThreadPoolTaskExecutor用法:
     * corePoolSize:線程池維護線程最小的數量,默認為1
     * maxPoolSize:線程池維護線程最大數量,默認為Integer.MAX_VALUE
     * keepAliveSeconds:(maxPoolSize-corePoolSize)部分線程空閑最大存活時間,默認存活時間是60s
     * queueCapacity:阻塞任務隊列的大小,默認為Integer.MAX_VALUE,默認使用LinkedBlockingQueue
     * allowCoreThreadTimeOut:設置為true的話,keepAliveSeconds參數設置的有效時間對corePoolSize線程也有效,默認是flase
     * threadFactory::用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給線程池里的線程設置有意義的名字
     * rejectedExecutionHandler:拒絕策略,當隊列workQueue和線程池maxPoolSize都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常,有以下四種策略,當然也可以根據實際業務需求類實現RejectedExecutionHandler接口實現自己的處理策略
     * 1.AbortPolicy:丟棄任務,並且拋出RejectedExecutionException異常
     * 2.DiscardPolicy:丟棄任務,不處理,不拋出異常
     * 3.CallerRunsPolicy:直接在execute方法的調用線程中運行被拒絕的任務
     * 4.DiscardOldestPolicy:丟棄隊列中最前面的任務,然后重新嘗試執行任務
     */
    @Test()
    public void _03_test() throws Exception {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3); //核心池大小
        executor.setMaxPoolSize(10); //最大線程數
        executor.setQueueCapacity(10); //隊列程度
        executor.setKeepAliveSeconds(60); //線程空閑時間
        executor.setThreadNamePrefix("子線程-");//線程前綴名稱
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); //配置拒絕策略
        executor.initialize(); //初始化

        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " 工作開始!");
                    Thread.sleep((long) (Math.random() * 2000));
//                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + " 工作結束!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 5; i++) {
            executor.execute(task);
        }
        int n = Runtime.getRuntime().availableProcessors();//獲取到服務器的cpu內核
        log.info("服務器的cpu內核:{}", n);

        Thread.sleep(5000);
        System.out.println("主線程工作結束!");
        executor.shutdown();
    }

 運行結果:

03-02 14:39:00 Initializing ExecutorService 
子線程-1 工作開始!
子線程-3 工作開始!
子線程-2 工作開始!
03-02 14:39:00 服務器的cpu內核:8 
子線程-3 工作結束!
子線程-3 工作開始!
子線程-2 工作結束!
子線程-2 工作開始!
子線程-2 工作結束!
子線程-1 工作結束!
子線程-3 工作結束!
主線程工作結束!
03-02 14:39:05 Shutting down ExecutorService 

 

五、多線程同步工具用法

/**
     * 多線程同步工具用法(單獨方法)
     * ThreadPoolTaskExecutor和CountDownLatch結合用法
     */
    @Test
    public void _05_test() throws Exception {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3); //核心池大小
        executor.setMaxPoolSize(10); //最大線程數
        executor.setQueueCapacity(10); //隊列程度
        executor.setThreadNamePrefix("子線程-");//線程前綴名稱
        executor.initialize(); //初始化

        int count = 5; // 任務數量
        CountDownLatch countDownLatch = new CountDownLatch(count); // 同步工具
        for (int i = 0; i < count; i++) {
            executor.execute(() -> task(countDownLatch));
        }
        System.out.println("等待子線程完成...");
        countDownLatch.await();
        System.out.println("主線程工作結束!");
        executor.shutdown();
    }

    private void task(CountDownLatch countDownLatch) {
        try {
            System.out.println(Thread.currentThread().getName() + " 工作開始!");
            Thread.sleep((long) (Math.random() * 2000));
            System.out.println(Thread.currentThread().getName() + " 工作結束!");
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

運行結果:

03-02 14:59:41 Initializing ExecutorService 
子線程-1 工作開始!
子線程-2 工作開始!
子線程-3 工作開始!
等待子線程完成
子線程-2 工作結束!
子線程-2 工作開始!
子線程-3 工作結束!
子線程-3 工作開始!
子線程-2 工作結束!
子線程-3 工作結束!
子線程-1 工作結束!
主線程工作結束!
03-02 14:59:42 Shutting down ExecutorService 

 

丟棄隊列中最前面的任務,然后重新嘗試執行任務


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM