線程池參數:核心線程數設置,根據生產環境平時QPS,任務處理能力決定,但也不能絕對參照這一算法。也與服務器整體處理能力,配置有關。
如:QPS是10,處理任務時間2S,核心線程數至少應該設置為20。也就是,10個任務需要總時長20S完成。那至少需要20個線程同時處理,粗略算法,其他因素影響需要留出冗余。
還有一種核心線程數 設置公式參見:https://www.cnblogs.com/warehouse/p/10810338.html
其結論:
IO密集型 = 2Ncpu(可以測試后自己控制大小,2Ncpu一般沒問題)(常出現於線程中:數據庫數據交互、文件上傳下載、網絡數據傳輸等等)
計算密集型 = Ncpu(常出現於線程中:復雜算法)+ 1
一 線程池工具類
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description 線程池工具類
*/
public class ThreadPoolUtil {
/**
* 核心線程數,會一直存活,即使沒有任務,線程池也會維護線程的最少數量
*/
private static final int SIZE_CORE_POOL = 5;
/**
* 線程池維護線程的最大數量
*/
private static final int SIZE_MAX_POOL = 10;
/**
* 線程池維護線程所允許的空閑時間
*/
private static final long ALIVE_TIME = 2000;
/**
* 線程緩沖隊列
*/
private static BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(100);
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(SIZE_CORE_POOL, SIZE_MAX_POOL, ALIVE_TIME, TimeUnit.MILLISECONDS, bqueue, new ThreadPoolExecutor.CallerRunsPolicy());
static {
pool.prestartAllCoreThreads();
}
public static ThreadPoolExecutor getPool() {
return pool;
}
}
測試類
import com.dashuai.cloud.consulconsumer.util.ThreadPoolUtil;
public class TestUtil {
public static void main(String[] args) {
ThreadPoolUtil.getPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("線程池調用");
}
});
}
}
二 線程池支持多線程返回結果
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.DisposableBean;
/**
* ClassName:CommenThreadPoolUtil <br/>
* Function:線程池公共入口處理類. <br/>
*
*/
@Service
public class CommonThreadPoolUtil implements DisposableBean{
// 核心線程數(默認初始化為10)
private int cacheCorePoolSize = 8;
// 核心線程控制的最大數目
private int maxCorePoolSize = 160;
// 隊列等待線程數閾值
private int blockingQueueWaitSize = 16;
// 核心線程數自動調整的增量幅度
private int incrementCorePoolSize = 4;
// 初始化線程對象ThreadLocal,重寫initialValue(),保證ThreadLocal首次執行get方法時不會null異常
private ThreadLocal<List<Future<?>>> threadlocal = new ThreadLocal<List<Future<?>>>() {
protected List<Future<?>> initialValue() {
return new ArrayList<Future<?>>();
}
};
// 初始化線程池
private MyselfThreadPoolExecutor ThreadPool = new MyselfThreadPoolExecutor(cacheCorePoolSize, cacheCorePoolSize, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
/**
*
* dealTask:(線程池執行操作-包含每個進程返回結果). <br/>
* 1、運用場景:例如,需要同時校驗很多不同的邏輯,依賴於獲取校驗結果響應給用戶; 2、具體實現java類:implements
* 的Callable接口,重寫call方法即可,支持返回值
*
* @author
* @param callable
* @return
*/
public Map<String, Object> dealTask(Callable<?> callable) {
try {
// 動態更改核心線程數大小
dynamicTuningPoolSize();
// 執行線程業務邏輯及獲取返回結果
Future<?> result = ThreadPool.submit(callable);
// 獲取當前進程的局部變量
List<Future<?>> threadLocalResult = threadlocal.get();
// 疊加主進程對應的多個進程處理結果
threadLocalResult.add(result);
// 設置最新的threadLocal變量到當前主進程
threadlocal.set(threadLocalResult);
} catch (Exception e) {
e.printStackTrace();
return errorResp("線程池發生異常-Future", null);
}
return successResp(null);
}
/**
*
* dealTask:(線程池執行操作-不包含每個進程返回結果). <br/>
* 1、運用場景:例如,不依賴於響應給用戶執行結果的業務邏輯 ; 2、具體實現java類:implements
* 的Runnable接口,重寫run方法,沒有返回值
*
* @author
* @param runnable
* @return
*/
public Map<String, Object> dealTask(Runnable runnable) {
try {
// 動態更改核心線程數大小
dynamicTuningPoolSize();
// 執行線程業務邏輯
ThreadPool.execute(runnable);
} catch (Exception e) {
e.printStackTrace();
return errorResp("線程池發生異常", null);
}
return successResp(null);
}
/**
* obtainTaskFuture:(獲取線程池執行結果:此為阻塞線程,即所有線程都執行完成才能獲取結果,故應將執行時間稍長的業務邏輯先執行,
* 減少等待時間). <br/>
* 此方法只能調用一次,即調用之后清除ThreadLocal變量,以便於同一進程再次調用線程池獲取最新的執行結果以及釋放內存, 防止內存泄露
*
* @author
* @return
*/
public Map<String, Object> obtainTaskFuture() {
List<Future<?>> threadLocalResult = null;
try {
// 獲取當前進程變量
threadLocalResult = threadlocal.get();
if (threadLocalResult == null || threadLocalResult.size() == 0) {
return errorResp("獲取線程池執行結果為空", null);
} else {
return successResp(threadLocalResult);
}
} catch (Exception e) {
return errorResp("獲取線程池執行結果發生異常:" + e.getMessage(), null);
} finally {
// 1、釋放內存;2、防止主進程再次調用線程池方法時對結果互有影響。
threadlocal.remove();
}
}
/**
*
* dynamicTuningPoolSize:(動態改變核心線程數). <br/>
*
* @author
* @return
*/
private void dynamicTuningPoolSize() {
// 隊列等待任務數(此為近似值,故采用>=判斷)
int queueSize = ThreadPool.getQueueSize();
// 動態更改核心線程數大小
if (queueSize >= blockingQueueWaitSize) {
// 核心線程數小於設定的最大線程數才會自動擴展線程數
if (cacheCorePoolSize <= maxCorePoolSize) {
// 原有核心線程數
int corePoolSize = ThreadPool.getCorePoolSize();
// 將要累積的核心線程數
int currentcorePoolSize = corePoolSize + incrementCorePoolSize;
ThreadPool.setCorePoolSize(currentcorePoolSize);
ThreadPool.setMaximumPoolSize(currentcorePoolSize);
cacheCorePoolSize = currentcorePoolSize;
System.out.println("動態改變線程池大小====原核心線程池數目為:" + corePoolSize + ";現累加為:" + currentcorePoolSize);
} else {
System.out.println("動態改變線程池大小====核心線程池數目已累加為:" + cacheCorePoolSize + ";不會繼續無限增加");
}
} else {
// 縮容
if (queueSize == 0 && cacheCorePoolSize >= CORE_POOL_SIZE) {
// 原有核心線程數
int corePoolSize = ThreadPool.getCorePoolSize();
// 將要累積的核心線程數
int currentcorePoolSize = corePoolSize - incrementCorePoolSize;
if (currentcorePoolSize <= CORE_POOL_SIZE) {
currentcorePoolSize = CORE_POOL_SIZE;
}
ThreadPool.setCorePoolSize(currentcorePoolSize);
ThreadPool.setMaximumPoolSize(currentcorePoolSize);
cacheCorePoolSize = currentcorePoolSize;
}
}
}
/**
* 獲取核心線程數 getCacheCorePoolSize:(). <br/>
*
* @author
* @return
*/
public int getCacheCorePoolSize() {
return ThreadPool.getCorePoolSize();
}
/**
* 設置核心線程數 setCacheCorePoolSize:(). <br/>
*
* @author
* @param cacheCorePoolSize
*/
public void setCacheCorePoolSize(int cacheCorePoolSize) {
ThreadPool.setCorePoolSize(cacheCorePoolSize);
ThreadPool.setMaximumPoolSize(cacheCorePoolSize);
this.cacheCorePoolSize = cacheCorePoolSize;
}
/**
*
* successResp:(正確響應信息). <br/>
*
* @author
* @param data
* @return
*/
private Map<String, Object> successResp(Object data) {
Map<String, Object> result = new HashMap<String, Object>();
result.put("status", "0");
result.put("data", data);
return result;
}
/**
*
* errorResp:(錯誤響應信息). <br/>
*
* @author
* @param errorMsg
* @param data
* @return
*/
public Map<String, Object> errorResp(String errorMsg, Object data) {
Map<String, Object> result = new HashMap<String, Object>();
result.put("status", "1");
result.put("msg", errorMsg);
result.put("data", data);
return result;
}
@Override
public void destroy() throws Exception {
ThreadPool.shutdown();
logger.info("線程池銷毀");
}
}
創建線程池類
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyselfThreadPoolExecutor extends ThreadPoolExecutor {
// 初始化父類構造函數及startTime
public MyselfThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
// 按過去執行已提交任務的順序發起一個有序的關閉,但是不接受新任務(已執行的任務不會停止)
@Override
public void shutdown() {
super.shutdown();
}
// 嘗試停止所有的活動執行任務、暫停等待任務的處理,並返回等待執行的任務列表。在從此方法返回的任務隊列中排空(移除)這些任務。並不保證能夠停止正在處理的活動執行任務,但是會盡力嘗試。
@Override
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
// 在執行給定線程中的給定 Runnable 之前調用的方法.可用於重新初始化ThreadLocals或者執行日志記錄。
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
// 基於完成執行給定 Runnable 所調用的方法
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
try {
// Future<?> result = (Future<?>) r;
// "任務結果:" result.get();
} catch (Exception e) {
}
}
/**
*
* getQueueSize:(已執行的任務數). <br/>
*
* @author
* @return
*/
@Override
public long getCompletedTaskCount() {
return super.getCompletedTaskCount();
}
/**
*
* getQueueSize:(正在運行的任務數). <br/>
*
* @author
* @return
*/
@Override
public int getActiveCount() {
return super.getActiveCount();
}
/**
*
* getQueueSize:(隊列等待任務數). <br/>
*
* @author
* @return
*/
public int getQueueSize() {
return getQueue().size();
}
}
測試類
public class TestUtil {
public static void main(String[] args) {
CommonThreadPoolUtil poolUtil = new CommonThreadPoolUtil();
poolUtil.dealTask(new Runnable() {
@Override
public void run() {
System.out.println("線程池調用");
}
});
poolUtil.dealTask(new Callable<HashMap<String, Object>>() {
@Override
public HashMap<String, Object> call() {
System.out.println("線程池調用");
return new HashMap<String, Object>();
}
}
Map<String, Object> result = poolUtil.obtainTaskFuture();
List<Future<HashMap<String, Object>>> list = (List<Future<HashMap<String, Object>>>) result.get("data");
for(int j = 0; j < list.size(); j++){
Future<HashMap<String, Object>> future = list.get(j);
}
}
}
三 jdk1.5之后提供工具類 Executors
工具類Executors面提供了一些靜態工廠方法,生成一些常用的線程池,如下所示:
-
newCachedThreadPool:創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制(Interger. MAX_VALUE),線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。
-
newFixedThreadPool:創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
-
newSingleThreadExecutor:創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。
-
newScheduledThreadPool:創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。
總結:除了newScheduledThreadPool的內部實現特殊一點之外,其它線程池內部都是基於 ThreadPoolExecutor 類(Executor的子類)實現的。
實現:
public class TestUtil {
public static void main(String[] args) {
ScheduledExecutorService scheduExec = Executors.newScheduledThreadPool(10);
scheduExec.schedule(new Runnable() {
@SuppressWarnings("static-access")
@Override
public void run() {
System.out.println("20秒后處理");
}
}, 20, TimeUnit.SECONDS);
}
}
周期性定時任務20秒后執行
四、線程池組
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Service("dispatchExecutorsPool")
public class DispatchExecutorsPool implements DisposableBean {
private int default_corePoolSize = 4;
private int default_maximumPoolSize = 500;
private int default_keepAliveTime = 300 * 1000;
private static final Logger logger = LoggerFactory.getLogger(DispatchExecutorsPool.class);
//初始化線程池
private ThreadPoolExecutor orderThreadPool = null;
private ThreadPoolExecutor productThreadPool = null;
@Value("${executor_core_size}")
private String executor_core_size;
@Value("${executor_max_size}")
private String executor_max_size;
@Value("${executor_alive_time}")
private String executor_alive_time;
/**
* 線程池集合
*/
List<ThreadPoolExecutor> executorList = new ArrayList<>();
@PostConstruct
public void init() {
orderThreadPool = generateOrderExecutor(executor_core_size, executor_max_size, executor_alive_time);
productThreadPool = generatePorductExecutor(executor_core_size, executor_max_size, executor_alive_time);
}
/**
* @Description: 訂單拉取線程池
* @Date: 2021/8/9 14:23
* @Param: [coolSize, maxSize, keepAliveTime]
* @Return: java.util.concurrent.ThreadPoolExecutor
*/
private ThreadPoolExecutor generateOrderExecutor(String coolSize, String maxSize, String keepAliveTime) {
int configPoolSize = Integer.valueOf(coolSize);
int configMaxSize = Integer.valueOf(maxSize);
int configKeepAliveTime = Integer.valueOf(keepAliveTime);
ThreadPoolExecutor executor = new ThreadPoolExecutor((configPoolSize == 0 ? default_corePoolSize : configPoolSize),
(configMaxSize == 0 ? default_maximumPoolSize : configMaxSize),
(configKeepAliveTime == 0 ? default_keepAliveTime : configKeepAliveTime),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
//使用hook在spring close的同時停止線程池shutdown之后的new submit task會走threadPool.setRejectedExecutionHandler
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
logger.error("DispatchExecutorsPool.rejectedExecution:" + executor.toString());
executor.awaitTermination(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
executor.getQueue().size();
logger.error("executor_drg.awaitTermination.error", e);
}
}
}
});
executorList.add(executor);
return executor;
}
/**
* @Description: 獲取線程池
* @Date: 2021/8/9 14:24
* @Param: [coolSize, maxSize, keepAliveTime]
* @Return: java.util.concurrent.ThreadPoolExecutor
*/
private ThreadPoolExecutor generatePorductExecutor(String coolSize, String maxSize, String keepAliveTime) {
int configPoolSize = Integer.valueOf(coolSize);
int configMaxSize = Integer.valueOf(maxSize);
int configKeepAliveTime = Integer.valueOf(keepAliveTime);
ThreadPoolExecutor executor = new ThreadPoolExecutor((configPoolSize == 0 ? default_corePoolSize : configPoolSize),
(configMaxSize == 0 ? default_maximumPoolSize : configMaxSize),
(configKeepAliveTime == 0 ? default_keepAliveTime : configKeepAliveTime),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
//使用hook在spring close的同時停止線程池shutdown之后的new submit task會走threadPool.setRejectedExecutionHandler
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
logger.error("DispatchExecutorsPool.rejectedExecution:" + executor.toString());
executor.awaitTermination(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
executor.getQueue().size();
logger.error("executor_drg.awaitTermination.error", e);
}
}
}
});
executorList.add(executor);
return executor;
}
@Override
public void destroy() throws Exception {
for (ThreadPoolExecutor threadPoolExecutor : executorList) {
threadPoolExecutor.shutdown();
}
}
/**
* @Description: 獲取訂單線程池
* @Date: 2021/8/9 14:25
* @Param:
* @Return:
*/
public ThreadPoolExecutor getOrderThreadPool() {
return orderThreadPool;
}
/**
* @Description: 產品線程池
* @Date: 2021/8/9 14:26
* @Param: []
* @Return: java.util.concurrent.ThreadPoolExecutor
*/
public ThreadPoolExecutor getProductThreadPool() {
return productThreadPool;
}
}