當前用的一個線程池工具呢,感覺不怎么好。缺點如下:
1. 提交任務execute后,無異常直接返回true,表示任務執行成功。但是由於異步執行,真正執行到run方法期間產生的異常雖然有處理邏輯,但是前端無法感知,所以很可能返回的是成功,實際上卻是失敗的。
2. 由於是執行execute方法,是無法得到任務結果的。
3. 沒有考慮拒絕策略。
自己研究了一天,我的思路是:
1. 利用JDK8新特性,CompletableFuture,異步執行任務,重要的是能獲取執行結果。
2. 定義好CompletableFuture的異常處理方法,處理並記錄異常情況。
3. 拒絕策略,這個比較雞肋,僅僅做了統計拒絕的任務數。實際情況肯定要預估容量,防止出現拒絕的情況。
4. 失敗的任務重試機制,這個還沒做,但是這個比較重要。
代碼
package com.example.pool; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 線程池工具類 */ @Slf4j public class ThreadPoolUtils { /** * 線程池 */ private ThreadPoolExecutor executor; /** * 線程工廠 */ private CustomThreadFactory threadFactory; /** * 異步執行結果 */ private List<CompletableFuture<Void>> completableFutures; /** * 拒絕策略 */ private CustomAbortPolicy abortPolicy; /** * 失敗數量 */ private AtomicInteger failedCount; public ThreadPoolUtils(int corePoolSize, int maximumPoolSize, int queueSize, String poolName) { this.failedCount = new AtomicInteger(0); this.abortPolicy = new CustomAbortPolicy(); this.completableFutures = new ArrayList<>(); this.threadFactory = new CustomThreadFactory(poolName); this.executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize), this.threadFactory, abortPolicy); } /** * 執行任務 * @param runnable */ public void execute(Runnable runnable){ CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executor); // 設置好異常情況 future.exceptionally(e -> { failedCount.incrementAndGet(); log.error("Task Failed..." + e); e.printStackTrace(); return null; }); // 任務結果列表 completableFutures.add(future); } /** * 執行自定義runnable接口(可省略,只是加了個獲取taskName) * @param runnable */ public void execute(SimpleTask runnable){ CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executor); // 設置好異常情況 future.exceptionally(e -> { failedCount.incrementAndGet(); log.error("Task ["+ runnable.taskName +"] Failed..." + e); e.printStackTrace(); return null; }); // 任務結果列表 completableFutures.add(future); } /** * 停止線程池 */ public void shutdown(){ executor.shutdown(); log.info("************************停止線程池************************"); log.info("** 活動線程數:"+ executor.getActiveCount() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 等待任務數:"+ executor.getQueue().size() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 完成任務數:"+ executor.getCompletedTaskCount() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 全部任務數:"+ executor.getTaskCount() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 拒絕任務數:"+ abortPolicy.getRejectCount() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 成功任務數:"+ (executor.getCompletedTaskCount() - failedCount.get()) +"\t\t\t\t\t\t\t\t\t\t**"); log.info("** 異常任務數:"+ failedCount.get() +"\t\t\t\t\t\t\t\t\t\t**"); log.info("**********************************************************"); } /** * 獲取任務執行情況 * 之所以遍歷taskCount數的CompletableFuture,是因為如果有拒絕的任務,相應的CompletableFuture也會放進列表,而這種CompletableFuture調用get方法,是會永遠阻塞的。 * @return */ public boolean getExecuteResult(){ // 任務數,不包含拒絕的任務 long taskCount = executor.getTaskCount(); for (int i = 0; i < taskCount; i++ ){ CompletableFuture<Void> future = completableFutures.get(i); try { // 獲取結果,這個是同步的,目的是獲取真實的任務完成情況 future.get(); } catch (InterruptedException | ExecutionException e) { // log.error("java.util.concurrent.CompletableFuture.get() Failed ..." + e); return false; } // 出現異常,false if (future.isCompletedExceptionally()){ return false; } } return true; } /** * 線程工廠 */ private static class CustomThreadFactory implements ThreadFactory{ private String poolName; private AtomicInteger count; private CustomThreadFactory(String poolName) { this.poolName = poolName; this.count = new AtomicInteger(0); } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); // 線程名,利於排查 thread.setName(poolName + "-[線程" + count.incrementAndGet() + "]"); return thread; } } /** * 自定義拒絕策略 */ private static class CustomAbortPolicy implements RejectedExecutionHandler { /** * 拒絕的任務數 */ private AtomicInteger rejectCount; private CustomAbortPolicy() { this.rejectCount = new AtomicInteger(0); } private AtomicInteger getRejectCount() { return rejectCount; } /** * 這個方法,如果不拋異常,則執行此任務的線程會一直阻塞 * @param r * @param e */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { log.error("Task " + r.toString() + " rejected from " + e.toString() + " 累計:" + rejectCount.incrementAndGet()); } } /** * 只是加了個taskName,可自行實現更加復雜的邏輯 */ public abstract static class SimpleTask implements Runnable{ /** * 任務名稱 */ private String taskName; public void setTaskName(String taskName) { this.taskName = taskName; } } }
測試
package com.example; import com.example.pool.ThreadPoolUtils; import java.util.Random; public class Main { public static void main(String[] args) throws InterruptedException { test2(); } public static void test1() throws InterruptedException { ThreadPoolUtils pool = new ThreadPoolUtils(5,5,10, "A業務線程池"); // 14個正常任務 for (int i = 0; i < 14; i++){ pool.execute(() -> { try { // 模擬任務耗時 Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); } String taskName = randomName(); System.out.println(Thread.currentThread().getName() + "-執行【" + taskName + "】"); }); } // 1個異常任務 pool.execute(() -> { String taskName = randomName(); throw new RuntimeException("執行【" + taskName + "】" + "異常"); }); // 5個多余用來拒絕的任務 for (int i = 0; i < 5; i++){ pool.execute(() -> { throw new RuntimeException("多余任務"); }); } System.out.println("任務完成情況:" + pool.getExecuteResult()); pool.shutdown(); Thread.sleep(20000); } public static void test2() throws InterruptedException { ThreadPoolUtils pool = new ThreadPoolUtils(5,5,10, "A業務線程池"); // 14個正常任務 for (int i = 0; i < 14; i++){ pool.execute(new ThreadPoolUtils.SimpleTask() { @Override public void run() { try { // 模擬任務耗時 Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); } // 隨機名稱 String taskName = randomName(); super.setTaskName(taskName); System.out.println(Thread.currentThread().getName() + "-執行【" + taskName + "】"); } }); } // 1個異常任務 pool.execute(new ThreadPoolUtils.SimpleTask() { @Override public void run() { // 隨機名稱 String taskName = randomName(); super.setTaskName(taskName); throw new RuntimeException("執行【" + taskName + "】" + "異常"); } }); // 5個多余用來拒絕的任務 for (int i = 0; i < 5; i++){ pool.execute(new ThreadPoolUtils.SimpleTask() { @Override public void run() { // 隨機名稱 String taskName = randomName(); super.setTaskName(taskName); throw new RuntimeException("多余任務"); } }); } System.out.println("任務完成情況:" + pool.getExecuteResult()); pool.shutdown(); Thread.sleep(20000); } private static String randomName(){ return "任務" + (char) (new Random().nextInt(60) + 65); } }
輸出:
15:22:45.165 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@4361bd48 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累計:1 15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@470e2030 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累計:2 15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@3fb4f649 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累計:3 15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@33833882 rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累計:4 15:22:45.173 [main] ERROR com.example.pool.ThreadPoolUtils - Task java.util.concurrent.CompletableFuture$AsyncRun@200a570f rejected from java.util.concurrent.ThreadPoolExecutor@53bd815b[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0] 累計:5 A業務線程池-[線程1]-執行【任務U】 A業務線程池-[線程4]-執行【任務{】 A業務線程池-[線程5]-執行【任務W】 A業務線程池-[線程3]-執行【任務B】 A業務線程池-[線程2]-執行【任務|】 A業務線程池-[線程1]-執行【任務_】 A業務線程池-[線程4]-執行【任務F】 A業務線程池-[線程5]-執行【任務u】 A業務線程池-[線程2]-執行【任務q】 A業務線程池-[線程3]-執行【任務g】 15:22:46.362 [A業務線程池-[線程3]] ERROR com.example.pool.ThreadPoolUtils - Task Failed...java.util.concurrent.CompletionException: java.lang.RuntimeException: 執行【任務q】異常 java.util.concurrent.CompletionException: java.lang.RuntimeException: 執行【任務q】異常 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1643) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: 執行【任務q】異常 at com.example.Main.lambda$test1$1(Main.java:33) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ... 3 more A業務線程池-[線程1]-執行【任務V】 A業務線程池-[線程4]-執行【任務h】 A業務線程池-[線程5]-執行【任務_】 A業務線程池-[線程2]-執行【任務`】 任務完成情況:false 15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ************************停止線程池************************ 15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ** 活動線程數:0 ** 15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ** 等待任務數:0 ** 15:22:46.963 [main] INFO com.example.pool.ThreadPoolUtils - ** 完成任務數:15 ** 15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 全部任務數:15 ** 15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 拒絕任務數:5 ** 15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 成功任務數:14 ** 15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ** 異常任務數:1 ** 15:22:46.964 [main] INFO com.example.pool.ThreadPoolUtils - ********************************************************** Process finished with exit code 0
隨時完善