線程池管理工具類


 

各位讀者,大家好!

   

   我們在項目開發過程中,經常會使用線程池管理,特別是對TPS有一定要求的情況。

   線程池會自動幫助我們管理線程的創建、回收及銷毀等工作,方便我們的開發。特別有異步需求、調用第三方系統且比較耗時的批量任務,都可以使用線程池很好的幫我們節省大量時間,提高用戶的體驗。

 

   比如:小美想從XX系統進行貸款,那么XX系統肯定需要獲取小美的人行征信,還可能需要校驗小美是否從此種同類的系統已經貸款,如果貸款了,那么就不能再次貸款。這里牽扯到2個第三方系統,人行征信系統 及 貸款信息匯總系統,假設從人行征信系統獲取征信需要5s,取貸款信息匯總系統需要3s,那么按照正常的同步流程:先獲取征信,再獲取貸款信息,就一共需要8s;那么如果再加上其他各種各樣的校驗、第三方外調(甚至外調超時),那么加起來時間更長,對客戶的體驗肯定會受影響,可能就會丟失小美這樣大量的客戶。

    所以,我們可以使用線程池來做這件事情,讓A線程去調人行征信,讓B線程去獲取貸款信息,讓C線程去調其他第三方依賴系統....,這樣,一旦任何一個線程得到不符合貸款的校驗結果,就返回並通知終止其他線程。這樣,客戶等待的時間就成倍節約下來,理論上最好情況是等待了花費最長的那個線程的時間。不僅如此,假設所有線程校驗都通過,我們需要等待以上所有線程執行完畢,才可以繼續接下來的工作,這個時候我們就可以用到本次介紹的工具類中awaitTasksFinished方法即可。

    以下是該線程管理工具的完整代碼:

 

  1 package com.cheng2839.utils;
  2 
  3 import org.slf4j.Logger;
  4 import org.slf4j.LoggerFactory;
  5 import org.springframework.util.CollectionUtils;
  6 
  7 import java.util.List;
  8 import java.util.concurrent.BlockingQueue;
  9 import java.util.concurrent.CompletionService;
 10 import java.util.concurrent.ExecutorCompletionService;
 11 import java.util.concurrent.ExecutorService;
 12 import java.util.concurrent.Future;
 13 import java.util.concurrent.LinkedBlockingQueue;
 14 import java.util.concurrent.RejectedExecutionHandler;
 15 import java.util.concurrent.ThreadPoolExecutor;
 16 import java.util.concurrent.TimeUnit;
 17 
 18 /**
 19  * 線程池工具類
 20  *
 21  * @author cheng2839
 22  * @date 2018年11月16日
 23  */
 24 public final class ThreadPoolFactory {
 25 
 26     private static final Logger logger = LoggerFactory.getLogger(ThreadPoolFactory.class);
 27 
 28     private static ThreadPoolFactory factory = new ThreadPoolFactory();
 29 
 30     //線程池默認配置設置
 31     private static final int CORE_POOL_SIZE = 5;
 32     private static final int MAXIMUM_POOL_SIZE = 20;
 33     private static final long KEEP_ALIVE_TIME = 60L;
 34 
 35     private ThreadPoolFactory() {
 36     }
 37 
 38     public static ThreadPoolFactory getFactory() {
 39         if (factory == null) {
 40             factory = new ThreadPoolFactory();
 41         }
 42         return factory;
 43     }
 44 
 45     /**
 46      * 創建一個默認的線程池
 47      *
 48      * @return
 49      * @author cheng2839
 50      * @date 2018年11月16日
 51      */
 52     public ExecutorService getDefaultThreadPool() {
 53         //配置最大隊列容量
 54         BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE);
 55         return getCustomThreadPool(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, blockingQueue);
 56     }
 57 
 58     /**
 59      * 創建一個簡單的線程池
 60      *
 61      * @return
 62      * @author cheng2839
 63      * @date 2018年11月16日
 64      */
 65     public ExecutorService getSimpleThreadPool(int corePoolSize, int maximumPoolSize) {
 66         return getCustomThreadPool(corePoolSize, maximumPoolSize, KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>());
 67     }
 68 
 69     /**
 70      * 創建一個指定隊列的線程池
 71      *
 72      * @return
 73      * @author cheng2839
 74      * @date 2018年11月16日
 75      */
 76     public ExecutorService getCustomQueueThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> blockingQueue) {
 77         return getCustomThreadPool(corePoolSize, maximumPoolSize, KEEP_ALIVE_TIME, blockingQueue);
 78     }
 79 
 80     /**
 81      * 創建可跟蹤任務狀態的執行器
 82      *
 83      * @return
 84      * @author cheng2839
 85      * @date 2018年11月16日
 86      */
 87     public ExecutorCompletionService getCompletionService(ExecutorService executorService) {
 88         return new ExecutorCompletionService(executorService);
 89     }
 90 
 91     /**
 92      * 創建一個定制化的線程池
 93      *
 94      * @return
 95      * @author cheng2839
 96      * @date 2018年11月16日
 97      */
 98     public ExecutorService getCustomThreadPool(int corePoolSize,
 99                                                int maximumPoolSize,
100                                                Long keepAliveTime,
101                                                BlockingQueue<Runnable> blockingQueue) {
102         logger.info("開始初始化線程池[corePoolSize={},maximumPoolSize={},keepAliveTime={}s]...", corePoolSize, maximumPoolSize, keepAliveTime);
103 
104         RejectedExecutionHandler rejectedExecutionHandler = (Runnable r, ThreadPoolExecutor executor) -> {
105             logger.error("線程池已滿,任務被丟棄...........................");
106             return;
107         };
108         ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,
109                 maximumPoolSize,
110                 keepAliveTime,
111                 TimeUnit.SECONDS,
112                 blockingQueue,
113                 rejectedExecutionHandler);
114         logger.info("初始化線程池完成!");
115         return executorService;
116     }
117 
118     /**
119      * 等待任務執行完成 並 釋放連接池
120      *
121      * @param futureList
122      * @param completionService
123      * @param <V>
124      * @author cheng2839
125      * @date 2018年11月16日
126      */
127     public <V> void awaitTasksFinished(List<Future> futureList, CompletionService<V> completionService) {
128         try {
129             if (!CollectionUtils.isEmpty(futureList) && completionService != null) {
130                 logger.info("等待批量任務[{}]執行。。。", futureList.size());
131                 for (int n = 0; n < futureList.size(); n++) {
132                     Future future = completionService.take();
133                     if (future != null)
134                         future.get();
135                 }
136             }
137         } catch (Exception e) {
138             logger.error("多線程獲取結果異常: {}", e);
139         }
140     }
141 
142     /**
143      * 關閉線程池
144      *
145      * @param executorService
146      * @author cheng2839
147      * @date 2018年11月16日
148      */
149     public void shutdown(ExecutorService executorService) {
150         try {
151             if (executorService != null) {
152                 logger.info("關閉線程池:{}", executorService);
153                 executorService.shutdown();
154             }
155         } catch (Exception e) {
156             if (!executorService.isTerminated()) {
157                 executorService.shutdownNow();
158             }
159         } finally {
160             try {
161                 if (executorService != null && !executorService.isShutdown()) {
162                     executorService.shutdown();
163                 }
164             } catch (Exception e) {
165                 logger.error("線程池關閉異常:{}", e);
166             }
167         }
168     }
169 
170 }

 

    運行示例略,大家可以根據自己的需求靈活運用!

 

    源碼歸本人所有,如需轉載,請標注出處。

 

我還是忍不住放上了這張圖片,萬一你想和我說說話呢😄❀

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM