ThreadPoolExecutor線程池工具類,異步執行 + 結果獲取


當前用的一個線程池工具呢,感覺不怎么好。缺點如下:

1. 提交任務execute后,無異常直接返回true,表示任務執行成功。但是由於異步執行,真正執行到run方法期間產生的異常雖然有處理邏輯,但是前端無法感知,所以很可能返回的是成功,實際上卻是失敗的。

2. 由於是執行execute方法,是無法得到任務結果的。

3. 沒有考慮拒絕策略。

自己研究了一天,我的思路是:

1. 利用JDK8新特性,CompletableFuture,異步執行任務,重要的是能獲取執行結果。

2. 定義好CompletableFuture的異常處理方法,處理並記錄異常情況。

3. 拒絕策略,這個比較雞肋,僅僅做了統計拒絕的任務數。實際情況肯定要預估容量,防止出現拒絕的情況。

4. 失敗的任務重試機制,這個還沒做,但是這個比較重要。

代碼

package com.example.pool;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * 線程池工具類
 */
@Slf4j
public class ThreadPoolUtils {

    /**
     * 線程池
     */
    private ThreadPoolExecutor executor;
    /**
     * 線程工廠
     */
    private CustomThreadFactory threadFactory;
    /**
     * 異步執行結果
     */
    private List<CompletableFuture<Void>> completableFutures;
    /**
     * 拒絕策略
     */
    private CustomAbortPolicy abortPolicy;
    /**
     * 失敗數量
     */
    private AtomicInteger failedCount;

    public ThreadPoolUtils(int corePoolSize,
                           int maximumPoolSize,
                           int queueSize,
                           String poolName) {
        this.failedCount = new AtomicInteger(0);
        this.abortPolicy = new CustomAbortPolicy();
        this.completableFutures = new ArrayList<>();
        this.threadFactory = new CustomThreadFactory(poolName);
        this.executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize), this.threadFactory, abortPolicy);
    }

    /**
     * 執行任務
     * @param runnable
     */
    public void execute(Runnable runnable){
        CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executor);
        // 設置好異常情況
        future.exceptionally(e -> {
            failedCount.incrementAndGet();
            log.error("Task Failed..." + e);
            e.printStackTrace();
            return null;
        });
        // 任務結果列表
        completableFutures.add(future);
    }

    /**
     * 執行自定義runnable接口(可省略,只是加了個獲取taskName)
     * @param runnable
     */
    public void execute(SimpleTask runnable){
        CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executor);
        // 設置好異常情況
        future.exceptionally(e -> {
            failedCount.incrementAndGet();
            log.error("Task ["+ runnable.taskName +"] Failed..." + e);
            e.printStackTrace();
            return null;
        });
        // 任務結果列表
        completableFutures.add(future);
    }

    /**
     * 停止線程池
     */
    public void shutdown(){
        executor.shutdown();
        log.info("************************停止線程池************************");
        log.info("** 活動線程數:"+ executor.getActiveCount() +"\t\t\t\t\t\t\t\t\t\t**");
        log.info("** 等待任務數:"+ executor.getQueue().size() +"\t\t\t\t\t\t\t\t\t\t**");
        log.info("** 完成任務數:"+ executor.getCompletedTaskCount() +"\t\t\t\t\t\t\t\t\t\t**");
        log.info("** 全部任務數:"+ executor.getTaskCount() +"\t\t\t\t\t\t\t\t\t\t**");
        log.info("** 拒絕任務數:"+ abortPolicy.getRejectCount() +"\t\t\t\t\t\t\t\t\t\t**");
        log.info("** 成功任務數:"+ (executor.getCompletedTaskCount() - failedCount.get()) +"\t\t\t\t\t\t\t\t\t\t**");
        log.info("** 異常任務數:"+ failedCount.get() +"\t\t\t\t\t\t\t\t\t\t**");
        log.info("**********************************************************");
    }

    /**
     * 獲取任務執行情況
     * 之所以遍歷taskCount數的CompletableFuture,是因為如果有拒絕的任務,相應的CompletableFuture也會放進列表,而這種CompletableFuture調用get方法,是會永遠阻塞的。
     * @return
     */
    public boolean getExecuteResult(){
        // 任務數,不包含拒絕的任務
        long taskCount = executor.getTaskCount();
        for (int i = 0; i < taskCount; i++ ){
            CompletableFuture<Void> future = completableFutures.get(i);
            try {
                // 獲取結果,這個是同步的,目的是獲取真實的任務完成情況
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                // log.error("java.util.concurrent.CompletableFuture.get() Failed ..." + e);
                return false;
            }
            // 出現異常,false
            if (future.isCompletedExceptionally()){
                return false;
            }
        }
        return true;
    }

    /**
     * 線程工廠
     */
    private static class CustomThreadFactory implements ThreadFactory{
        private String poolName;
        private AtomicInteger count;

        private CustomThreadFactory(String poolName) {
            this.poolName = poolName;
            this.count = new AtomicInteger(0);
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            // 線程名,利於排查
            thread.setName(poolName + "-[線程" + count.incrementAndGet() + "]");
            return thread;
        }
    }

    /**
     * 自定義拒絕策略
     */
    private static class CustomAbortPolicy implements RejectedExecutionHandler {
        /**
         * 拒絕的任務數
         */
        private AtomicInteger rejectCount;

        private CustomAbortPolicy() {
            this.rejectCount = new AtomicInteger(0);
        }

        private AtomicInteger getRejectCount() {
            return rejectCount;
        }

        /**
         * 這個方法,如果不拋異常,則執行此任務的線程會一直阻塞
         * @param r
         * @param e
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            log.error("Task " + r.toString() +
                    " rejected from " +
                    e.toString() + " 累計:" + rejectCount.incrementAndGet());
        }
    }

    /**
     * 只是加了個taskName,可自行實現更加復雜的邏輯
     */
    public abstract static class SimpleTask implements Runnable{
        /**
         * 任務名稱
         */
        private String taskName;

        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
    }

}

測試

package com.example;


import com.example.pool.ThreadPoolUtils;

import java.util.Random;

public class Main {


    public static void main(String[] args) throws InterruptedException {
        test2();
    }

    public static void test1() throws InterruptedException {
        ThreadPoolUtils pool = new ThreadPoolUtils(5,5,10, "A業務線程池");
        // 14個正常任務
        for (int i = 0; i < 14; i++){
            pool.execute(() -> {
                try {
                    // 模擬任務耗時
                    Thread.sleep(600);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String taskName = randomName();
                System.out.println(Thread.currentThread().getName() + "-執行【" + taskName + "】");
            });
        }
        // 1個異常任務
        pool.execute(() -> {
            String taskName = randomName();
            throw new RuntimeException("執行【" + taskName + "】" + "異常");
        });
        // 5個多余用來拒絕的任務
        for (int i = 0; i < 5; i++){
            pool.execute(() -> {
                throw new RuntimeException("多余任務");
            });
        }
        System.out.println("任務完成情況:" + pool.getExecuteResult());

        pool.shutdown();

        Thread.sleep(20000);
    }
    public static void test2() throws InterruptedException {
        ThreadPoolUtils pool = new ThreadPoolUtils(5,5,10, "A業務線程池");
        // 14個正常任務
        for (int i = 0; i < 14; i++){
            pool.execute(new ThreadPoolUtils.SimpleTask() {
                @Override
                public void run() {
                    try {
                        // 模擬任務耗時
                        Thread.sleep(600);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 隨機名稱
                    String taskName = randomName();
                    super.setTaskName(taskName);
                    System.out.println(Thread.currentThread().getName() + "-執行【" + taskName + "】");
                }
            });
        }
        // 1個異常任務
        pool.execute(new ThreadPoolUtils.SimpleTask() {
            @Override
            public void run() {
                // 隨機名稱
                String taskName = randomName();
                super.setTaskName(taskName);
                throw new RuntimeException("執行【" + taskName + "】" + "異常");
            }
        });
        // 5個多余用來拒絕的任務
        for (int i = 0; i < 5; i++){
            pool.execute(new ThreadPoolUtils.SimpleTask() {
                @Override
                public void run() {
                    // 隨機名稱
                    String taskName = randomName();
                    super.setTaskName(taskName);
                    throw new RuntimeException("多余任務");
                }
            });
        }
        System.out.println("任務完成情況:" + pool.getExecuteResult());

        pool.shutdown();

        Thread.sleep(20000);
    }
    
    private static String randomName(){
        return "任務" + (char) (new Random().nextInt(60) + 65);
    }

}

輸出:

15:22:45.165 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@4361bd48 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累計:1
15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@470e2030 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累計:2
15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@3fb4f649 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累計:3
15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@33833882 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累計:4
15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@200a570f rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累計:5
A業務線程池-[線程1]-執行【任務U】
A業務線程池-[線程4]-執行【任務{】
A業務線程池-[線程5]-執行【任務W】
A業務線程池-[線程3]-執行【任務B】
A業務線程池-[線程2]-執行【任務|】
A業務線程池-[線程1]-執行【任務_】
A業務線程池-[線程4]-執行【任務F】
A業務線程池-[線程5]-執行【任務u】
A業務線程池-[線程2]-執行【任務q】
A業務線程池-[線程3]-執行【任務g】
15:22:46.362 [A業務線程池-[線程3]] ERROR com.example.pool.ThreadPoolUtils - Task Failed...java.util.concurrent.CompletionException: java.lang.RuntimeException: 執行【任務q】異常
java.util.concurrent.CompletionException: java.lang.RuntimeException: 執行【任務q】異常 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1643)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 執行【任務q】異常
    at com.example.Main.lambda$test1$1(Main.java:33)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    ... 3 more
A業務線程池-[線程1]-執行【任務V】
A業務線程池-[線程4]-執行【任務h】
A業務線程池-[線程5]-執行【任務_】
A業務線程池-[線程2]-執行【任務`】
任務完成情況:false
15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ************************停止線程池************************
15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ** 活動線程數:0                                        **
15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ** 等待任務數:0                                        **
15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ** 完成任務數:15                                        **
15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 全部任務數:15                                        **
15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 拒絕任務數:5                                        **
15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 成功任務數:14                                        **
15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 異常任務數:1                                        **
15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - **********************************************************

Process finished with exit code 0

 

隨時完善

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM