当前用的一个线程池工具呢,感觉不怎么好。缺点如下:
1. 提交任务execute后,无异常直接返回true,表示任务执行成功。但是由于异步执行,真正执行到run方法期间产生的异常虽然有处理逻辑,但是前端无法感知,所以很可能返回的是成功,实际上却是失败的。
2. 由于是执行execute方法,是无法得到任务结果的。
3. 没有考虑拒绝策略。
自己研究了一天,我的思路是:
1. 利用JDK8新特性,CompletableFuture,异步执行任务,重要的是能获取执行结果。
2. 定义好CompletableFuture的异常处理方法,处理并记录异常情况。
3. 拒绝策略,这个比较鸡肋,仅仅做了统计拒绝的任务数。实际情况肯定要预估容量,防止出现拒绝的情况。
4. 失败的任务重试机制,这个还没做,但是这个比较重要。
代码
package com.example.pool; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 线程池工具类 */ @Slf4j public class ThreadPoolUtils { /** * 线程池 */ private ThreadPoolExecutor executor; /** * 线程工厂 */ private CustomThreadFactory threadFactory; /** * 异步执行结果 */ private List<CompletableFuture<Void>> completableFutures; /** * 拒绝策略 */ private CustomAbortPolicy abortPolicy; /** * 失败数量 */ private AtomicInteger failedCount; public ThreadPoolUtils(int corePoolSize, int maximumPoolSize, int queueSize, String poolName) { this.failedCount = new AtomicInteger(0); this.abortPolicy = new CustomAbortPolicy(); this.completableFutures = new ArrayList<>(); this.threadFactory = new CustomThreadFactory(poolName); this.executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize), this.threadFactory, abortPolicy); } /** * 执行任务 * @param runnable */ public void execute(Runnable runnable){ CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executor); // 设置好异常情况 future.exceptionally(e -> { failedCount.incrementAndGet(); log.error("Task Failed..." + e); e.printStackTrace(); return null; }); // 任务结果列表 completableFutures.add(future); } /** * 执行自定义runnable接口(可省略,只是加了个获取taskName) * @param runnable */ public void execute(SimpleTask runnable){ CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executor); // 设置好异常情况 future.exceptionally(e -> { failedCount.incrementAndGet(); log.error("Task ["+ runnable.taskName +"] Failed..." + e); e.printStackTrace(); return null; }); // 任务结果列表 completableFutures.add(future); } /** * 停止线程池 */ public void shutdown(){ executor.shutdown(); log.info("************************停止线程池************************"); log.info("** 活动线程数:"+ executor.getActiveCount() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 等待任务数:"+ executor.getQueue().size() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 完成任务数:"+ executor.getCompletedTaskCount() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 全部任务数:"+ executor.getTaskCount() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 拒绝任务数:"+ abortPolicy.getRejectCount() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 成功任务数:"+ (executor.getCompletedTaskCount() - failedCount.get()) +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 异常任务数:"+ failedCount.get() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("**********************************************************"); } /** * 获取任务执行情况 * 之所以遍历taskCount数的CompletableFuture,是因为如果有拒绝的任务,相应的CompletableFuture也会放进列表,而这种CompletableFuture调用get方法,是会永远阻塞的。 * @return */ public boolean getExecuteResult(){ // 任务数,不包含拒绝的任务 long taskCount = executor.getTaskCount(); for (int i = 0; i < taskCount; i++ ){ CompletableFuture<Void> future = completableFutures.get(i); try { // 获取结果,这个是同步的,目的是获取真实的任务完成情况 future.get(); } catch (InterruptedException | ExecutionException e) { // log.error("java.util.concurrent.CompletableFuture.get() Failed ..." + e); return false; } // 出现异常,false if (future.isCompletedExceptionally()){ return false; } } return true; } /** * 线程工厂 */ private static class CustomThreadFactory implements ThreadFactory{ private String poolName; private AtomicInteger count; private CustomThreadFactory(String poolName) { this.poolName = poolName; this.count = new AtomicInteger(0); } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); // 线程名,利于排查 thread.setName(poolName + "-[线程" + count.incrementAndGet() + "]"); return thread; } } /** * 自定义拒绝策略 */ private static class CustomAbortPolicy implements RejectedExecutionHandler { /** * 拒绝的任务数 */ private AtomicInteger rejectCount; private CustomAbortPolicy() { this.rejectCount = new AtomicInteger(0); } private AtomicInteger getRejectCount() { return rejectCount; } /** * 这个方法,如果不抛异常,则执行此任务的线程会一直阻塞 * @param r * @param e */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { log.error("Task " + r.toString() + " rejected from " + e.toString() + " 累计:" + rejectCount.incrementAndGet()); } } /** * 只是加了个taskName,可自行实现更加复杂的逻辑 */ public abstract static class SimpleTask implements Runnable{ /** * 任务名称 */ private String taskName; public void setTaskName(String taskName) { this.taskName = taskName; } } }
测试
package com.example; import com.example.pool.ThreadPoolUtils; import java.util.Random; public class Main { public static void main(String[] args) throws InterruptedException { test2(); } public static void test1() throws InterruptedException { ThreadPoolUtils pool = new ThreadPoolUtils(5,5,10, "A业务线程池"); // 14个正常任务 for (int i = 0; i < 14; i++){ pool.execute(() -> { try { // 模拟任务耗时 Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); } String taskName = randomName(); System.out.println(Thread.currentThread().getName() + "-执行【" + taskName + "】"); }); } // 1个异常任务 pool.execute(() -> { String taskName = randomName(); throw new RuntimeException("执行【" + taskName + "】" + "异常"); }); // 5个多余用来拒绝的任务 for (int i = 0; i < 5; i++){ pool.execute(() -> { throw new RuntimeException("多余任务"); }); } System.out.println("任务完成情况:" + pool.getExecuteResult()); pool.shutdown(); Thread.sleep(20000); } public static void test2() throws InterruptedException { ThreadPoolUtils pool = new ThreadPoolUtils(5,5,10, "A业务线程池"); // 14个正常任务 for (int i = 0; i < 14; i++){ pool.execute(new ThreadPoolUtils.SimpleTask() { @Override public void run() { try { // 模拟任务耗时 Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); } // 随机名称 String taskName = randomName(); super.setTaskName(taskName); System.out.println(Thread.currentThread().getName() + "-执行【" + taskName + "】"); } }); } // 1个异常任务 pool.execute(new ThreadPoolUtils.SimpleTask() { @Override public void run() { // 随机名称 String taskName = randomName(); super.setTaskName(taskName); throw new RuntimeException("执行【" + taskName + "】" + "异常"); } }); // 5个多余用来拒绝的任务 for (int i = 0; i < 5; i++){ pool.execute(new ThreadPoolUtils.SimpleTask() { @Override public void run() { // 随机名称 String taskName = randomName(); super.setTaskName(taskName); throw new RuntimeException("多余任务"); } }); } System.out.println("任务完成情况:" + pool.getExecuteResult()); pool.shutdown(); Thread.sleep(20000); } private static String randomName(){ return "任务" + (char) (new Random().nextInt(60) + 65); } }
输出:
15:22:45.165 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@4361bd48 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累计:1 15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@470e2030 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累计:2 15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@3fb4f649 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累计:3 15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@33833882 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累计:4 15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@200a570f rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累计:5 A业务线程池-[线程1]-执行【任务U】 A业务线程池-[线程4]-执行【任务{】 A业务线程池-[线程5]-执行【任务W】 A业务线程池-[线程3]-执行【任务B】 A业务线程池-[线程2]-执行【任务|】 A业务线程池-[线程1]-执行【任务_】 A业务线程池-[线程4]-执行【任务F】 A业务线程池-[线程5]-执行【任务u】 A业务线程池-[线程2]-执行【任务q】 A业务线程池-[线程3]-执行【任务g】 15:22:46.362 [A业务线程池-[线程3]] ERROR com.example.pool.ThreadPoolUtils - Task Failed...java.util.concurrent.CompletionException: java.lang.RuntimeException: 执行【任务q】异常 java.util.concurrent.CompletionException: java.lang.RuntimeException: 执行【任务q】异常 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1643) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: 执行【任务q】异常 at com.example.Main.lambda$test1$1(Main.java:33) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ... 3 more A业务线程池-[线程1]-执行【任务V】 A业务线程池-[线程4]-执行【任务h】 A业务线程池-[线程5]-执行【任务_】 A业务线程池-[线程2]-执行【任务`】 任务完成情况:false 15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ************************停止线程池************************ 15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ** 活动线程数:0 ** 15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ** 等待任务数:0 ** 15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ** 完成任务数:15 ** 15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 全部任务数:15 ** 15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 拒绝任务数:5 ** 15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 成功任务数:14 ** 15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 异常任务数:1 ** 15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ********************************************************** Process finished with exit code 0
随时完善