ThreadPoolExecutor 線程池執行並行任務


前言

在jdk中Executors類中提供了諸如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等創建線程的方法,但是都具有一定的局限性,不靈活,且內部還是通過ThreadPoolExecutor來創建的,使用ThreadPoolExecutor有助於大家明確線程池的運行規則,創建符合自己的業務場景需要的線程池,盡量規避由於不合理使用線程池而引發的風險。

使用場景

  1. 單批次的for循環去執行任務,在執行某些耗時操作的時候其效率很低,比如說定時任務。
  2. 並發場景下的某些操作,單線程執行效率理想的效果。

其他依賴

引用這個依賴主要是為了使用ThreadFactoryBuilder構造器給線程池命名,當然我們也可以自己去實現ThreadFactory。

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.2.5</version>
</dependency>

代碼


import cn.hutool.core.thread.ThreadFactoryBuilder;

import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;

/**
 * @author ming
 * @version 1.0.0
 * @date 2020/8/12 15:01
 **/

public class ThreadPoolTest {

    private final static SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    final static long START_TIME = System.currentTimeMillis();
    /**
     * 根據系統核心數來設置線程數: private static int corePoolSize = Runtime.getRuntime().availableProcessors();
     * 核心線程數
     */
    private static int corePoolSize = 100;
    /**
     * 最大線程數
     */
    private static int maximumPoolSize = corePoolSize * 2;
    /**
     * 線程存活時間
     */
    private static long keepAliveTime = 101L;
    /**
     * 阻塞隊列大小
     */
    private static final int CAPACITY = 1000;
    /**
     * 線程池命名
     */
    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNamePrefix("my-work-pool-").build();
    private static Random random = new Random();
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
            keepAliveTime, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(CAPACITY),
            namedThreadFactory,
            (r, executor) -> userDefinedRejectionPolicy(r));

    /**
     * 拒絕策略;當任務太多來不及處理時,如何拒絕任務;
     *
     * @param r Runnable
     */
    private static void userDefinedRejectionPolicy(Runnable r) {
        //一般我們創建線程池時,為防止資源被耗盡,任務隊列都會選擇創建有界任務隊列,但種模式下如果出現任務隊列已滿且線程池創建的線程數達到你設置的最大線程數時,
        //這時就需要你指定ThreadPoolExecutor的RejectedExecutionHandler參數即合理的拒絕策略,來處理線程池"超載"的情況。
        //1、AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作;
        //
        //2、CallerRunsPolicy策略:如果線程池的線程數量達到上限,該策略會把任務隊列中的任務放在調用者線程當中運行;
        //
        //3、DiscardOldestPolicy策略:該策略會丟棄任務隊列中最老的一個任務,也就是當前任務隊列中最先被添加進去的,馬上要被執行的那個任務,並嘗試再次提交;
        //
        //4、DiscardPolicy策略:該策略會默默丟棄無法處理的任務,不予任何處理。當然使用此策略,業務場景中需允許任務的丟失;
        System.out.println(r.toString() + "執行了拒絕策略");
        // TODO: 2020/8/19 這里寫拒絕策略的相關邏輯 ,在某些場景下我們應該盡量避免丟棄任務
    }


    public static void main(String[] args) throws InterruptedException {
        System.out.println("--->> 准備開始執行任務 <<---");
        // 初始30個任務(並行),如果處理器核心線程有這么多,那么久可以說的是並行的。
        CountDownLatch latch = new CountDownLatch(30);
        long start = System.currentTimeMillis();

        // mission
        Map<String, List<String>> map = new HashMap<>();
        for (int i = 0; i < latch.getCount(); i++) {
            int taskNum = 100;
            List<String> list = new ArrayList<>();
            for (int j = 0; j < taskNum; j++) {
                list.add("sub_task-" + j);
            }
            map.put("mission_" + i, list);
        }

        // 執行任務
        for (Map.Entry<String, List<String>> entry : map.entrySet()) {
            executor.execute(new Worker(latch, "執行自定義任務", 1600 + random.nextInt(100), entry.getKey(), entry.getValue()));
        }
        executor.shutdown();
        // 等待所有任務結束
        latch.await();
        long end = System.currentTimeMillis();
        System.out.println("任務執行累計耗時:" + figureOutTimeInterval(end - start));
    }

    private static String figureOutTimeInterval(long interval) {
        //初始化Formatter的轉換格式。
        SimpleDateFormat formatter = new SimpleDateFormat("HH:mm:ss");
        formatter.setTimeZone(TimeZone.getTimeZone("GMT+00:00"));
        String hms = formatter.format(interval);
        System.out.println("耗時 == " + hms);
        return hms;
    }

    /**
     * 具體的任務邏輯
     */
    static class Worker implements Runnable {
        private CountDownLatch latch;
        private String name;
        private Integer runTime;
        private List<String> list;
        private String task;

        Worker(CountDownLatch latch, String name, Integer runTime, String task, List<String> list) {
            this.latch = latch;
            this.name = name;
            this.runTime = runTime;
            this.task = task;
            this.list = list;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " do Working begin at " + SDF.format(new Date(START_TIME)));
                for (String item : list) {
                    System.out.println(name + " -- " + task + " and " + item);
                    //模擬耗時任務
                    Thread.sleep(runTime);
                }
                long end = System.currentTimeMillis();
                figureOutTimeInterval(end - START_TIME);
                System.out.println(name + " do Working complete at " + SDF.format(new Date(end)));
            } catch (InterruptedException e) {
                e.printStackTrace();
                // TODO: 2020/8/19 這里做失敗處理
            } finally {
                //單次任務結束,計數器減一
                latch.countDown();
            }
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM