各位讀者,大家好!
我們在項目開發過程中,經常會使用線程池管理,特別是對TPS有一定要求的情況。
線程池會自動幫助我們管理線程的創建、回收及銷毀等工作,方便我們的開發。特別有異步需求、調用第三方系統且比較耗時的批量任務,都可以使用線程池很好的幫我們節省大量時間,提高用戶的體驗。
比如:小美想從XX系統進行貸款,那么XX系統肯定需要獲取小美的人行征信,還可能需要校驗小美是否從此種同類的系統已經貸款,如果貸款了,那么就不能再次貸款。這里牽扯到2個第三方系統,人行征信系統 及 貸款信息匯總系統,假設從人行征信系統獲取征信需要5s,取貸款信息匯總系統需要3s,那么按照正常的同步流程:先獲取征信,再獲取貸款信息,就一共需要8s;那么如果再加上其他各種各樣的校驗、第三方外調(甚至外調超時),那么加起來時間更長,對客戶的體驗肯定會受影響,可能就會丟失小美這樣大量的客戶。
所以,我們可以使用線程池來做這件事情,讓A線程去調人行征信,讓B線程去獲取貸款信息,讓C線程去調其他第三方依賴系統....,這樣,一旦任何一個線程得到不符合貸款的校驗結果,就返回並通知終止其他線程。這樣,客戶等待的時間就成倍節約下來,理論上最好情況是等待了花費最長的那個線程的時間。不僅如此,假設所有線程校驗都通過,我們需要等待以上所有線程執行完畢,才可以繼續接下來的工作,這個時候我們就可以用到本次介紹的工具類中awaitTasksFinished方法即可。
以下是該線程管理工具的完整代碼:
1 package com.cheng2839.utils; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.util.CollectionUtils; 6 7 import java.util.List; 8 import java.util.concurrent.BlockingQueue; 9 import java.util.concurrent.CompletionService; 10 import java.util.concurrent.ExecutorCompletionService; 11 import java.util.concurrent.ExecutorService; 12 import java.util.concurrent.Future; 13 import java.util.concurrent.LinkedBlockingQueue; 14 import java.util.concurrent.RejectedExecutionHandler; 15 import java.util.concurrent.ThreadPoolExecutor; 16 import java.util.concurrent.TimeUnit; 17 18 /** 19 * 線程池工具類 20 * 21 * @author cheng2839 22 * @date 2018年11月16日 23 */ 24 public final class ThreadPoolFactory { 25 26 private static final Logger logger = LoggerFactory.getLogger(ThreadPoolFactory.class); 27 28 private static ThreadPoolFactory factory = new ThreadPoolFactory(); 29 30 //線程池默認配置設置 31 private static final int CORE_POOL_SIZE = 5; 32 private static final int MAXIMUM_POOL_SIZE = 20; 33 private static final long KEEP_ALIVE_TIME = 60L; 34 35 private ThreadPoolFactory() { 36 } 37 38 public static ThreadPoolFactory getFactory() { 39 if (factory == null) { 40 factory = new ThreadPoolFactory(); 41 } 42 return factory; 43 } 44 45 /** 46 * 創建一個默認的線程池 47 * 48 * @return 49 * @author cheng2839 50 * @date 2018年11月16日 51 */ 52 public ExecutorService getDefaultThreadPool() { 53 //配置最大隊列容量 54 BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE); 55 return getCustomThreadPool(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, blockingQueue); 56 } 57 58 /** 59 * 創建一個簡單的線程池 60 * 61 * @return 62 * @author cheng2839 63 * @date 2018年11月16日 64 */ 65 public ExecutorService getSimpleThreadPool(int corePoolSize, int maximumPoolSize) { 66 return getCustomThreadPool(corePoolSize, maximumPoolSize, KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>()); 67 } 68 69 /** 70 * 創建一個指定隊列的線程池 71 * 72 * @return 73 * @author cheng2839 74 * @date 2018年11月16日 75 */ 76 public ExecutorService getCustomQueueThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> blockingQueue) { 77 return getCustomThreadPool(corePoolSize, maximumPoolSize, KEEP_ALIVE_TIME, blockingQueue); 78 } 79 80 /** 81 * 創建可跟蹤任務狀態的執行器 82 * 83 * @return 84 * @author cheng2839 85 * @date 2018年11月16日 86 */ 87 public ExecutorCompletionService getCompletionService(ExecutorService executorService) { 88 return new ExecutorCompletionService(executorService); 89 } 90 91 /** 92 * 創建一個定制化的線程池 93 * 94 * @return 95 * @author cheng2839 96 * @date 2018年11月16日 97 */ 98 public ExecutorService getCustomThreadPool(int corePoolSize, 99 int maximumPoolSize, 100 Long keepAliveTime, 101 BlockingQueue<Runnable> blockingQueue) { 102 logger.info("開始初始化線程池[corePoolSize={},maximumPoolSize={},keepAliveTime={}s]...", corePoolSize, maximumPoolSize, keepAliveTime); 103 104 RejectedExecutionHandler rejectedExecutionHandler = (Runnable r, ThreadPoolExecutor executor) -> { 105 logger.error("線程池已滿,任務被丟棄..........................."); 106 return; 107 }; 108 ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, 109 maximumPoolSize, 110 keepAliveTime, 111 TimeUnit.SECONDS, 112 blockingQueue, 113 rejectedExecutionHandler); 114 logger.info("初始化線程池完成!"); 115 return executorService; 116 } 117 118 /** 119 * 等待任務執行完成 並 釋放連接池 120 * 121 * @param futureList 122 * @param completionService 123 * @param <V> 124 * @author cheng2839 125 * @date 2018年11月16日 126 */ 127 public <V> void awaitTasksFinished(List<Future> futureList, CompletionService<V> completionService) { 128 try { 129 if (!CollectionUtils.isEmpty(futureList) && completionService != null) { 130 logger.info("等待批量任務[{}]執行。。。", futureList.size()); 131 for (int n = 0; n < futureList.size(); n++) { 132 Future future = completionService.take(); 133 if (future != null) 134 future.get(); 135 } 136 } 137 } catch (Exception e) { 138 logger.error("多線程獲取結果異常: {}", e); 139 } 140 } 141 142 /** 143 * 關閉線程池 144 * 145 * @param executorService 146 * @author cheng2839 147 * @date 2018年11月16日 148 */ 149 public void shutdown(ExecutorService executorService) { 150 try { 151 if (executorService != null) { 152 logger.info("關閉線程池:{}", executorService); 153 executorService.shutdown(); 154 } 155 } catch (Exception e) { 156 if (!executorService.isTerminated()) { 157 executorService.shutdownNow(); 158 } 159 } finally { 160 try { 161 if (executorService != null && !executorService.isShutdown()) { 162 executorService.shutdown(); 163 } 164 } catch (Exception e) { 165 logger.error("線程池關閉異常:{}", e); 166 } 167 } 168 } 169 170 }
運行示例略,大家可以根據自己的需求靈活運用!
源碼歸本人所有,如需轉載,請標注出處。
我還是忍不住放上了這張圖片,萬一你想和我說說話呢😄❀