0003-spring 中線程池配置
引用alibaba編碼規范中的話
【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣
的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明:Executors 返回的線程池對象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
1. spring 中自己已經提供了線程池策略 ThreadPoolTaskExecutor
// 部分源碼
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object();
private int corePoolSize = 1;
// 最大線程數 Integer.MAX_VALUE;
private int maxPoolSize = Integer.MAX_VALUE;
private int keepAliveSeconds = 60;
private int queueCapacity = Integer.MAX_VALUE;
private boolean allowCoreThreadTimeOut = false;
上面的代碼可知,spring默認的線程池最大線程數跟等待隊列都是 Integer.MAX_VALUE
直接使用的話也會存在OOM問題。
2. 所以我們在使用時候會自定義我們自己的線程池策略
2.1 通過配置文件yml配置
spring:
task:
execution:
pool:
core-size: 8 # 核心線程數
max-size: 20 # 最大線程數
queue-capacity: 100 # 等待隊列
allow-core-thread-timeout: true # 是否允許超時回收
keep-alive: 60s # 線程存活時間
thread-name-prefix: my_task_
使用:
只需要給方法頭加上 @Async,這個方法在調用的時候就是個異步方法
@Async
@Override
public void testThread2() {
log.info("testThread2 正在執行,線程{}",Thread.currentThread().getName());
}
2.2 如果上面不能滿足我們的需求,我們還可以自己新增線程池
package com.zhoust.threadbatch.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*;
/**
* @author zhoust
*/
@EnableAsync
@Configuration
@Slf4j
public class ThreadPoolConfig {
/**
* 核心線程數(默認線程數)
*/
@Value("${my-task.execution.pool.corePoolSize}")
private int corePoolSize;
/**
* 最大線程數
*/
@Value("${my-task.execution.pool.maxPoolSize}")
private int maxPoolSize;
/**
* 允許線程空閑時間(單位:默認為秒)
*/
@Value("${my-task.execution.pool.keepAliveTime}")
private int keepAliveTime;
/**
* 緩沖隊列大小
*/
@Value("${my-task.execution.pool.queueCapacity}")
private int queueCapacity;
/**
* 線程池名前綴
*/
@Value("${my-task.execution.threadNamePrefix}")
private String threadNamePrefix;
/**
* 拒絕策略 1. AbortPolicy 丟棄任務並拋異常,2. DiscardPolicy 丟棄任務不拋異常 3.DiscardOldestPolicy 丟棄隊列最前面的線程(棄老)4.CallerRunsPolicy:由調用線程處理該任務
*/
@Value("${my-task.execution.rejectedExecutionHandler}")
private String rejectedExecutionHandler;
/**
* 是否回收核心線程
*/
@Value("${my-task.execution.allowCoreThreadTimeOut}")
private boolean allowCoreThreadTimeOut;
/**
* 執行完畢是否關閉線程池
*/
@Value("${my-task.execution.waitForTasksToCompleteOnShutdown}")
private boolean waitForTasksToCompleteOnShutdown;
@Bean("asyncServiceThread")
public Executor getThreadPool(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心線程數
executor.setCorePoolSize(getCorePoolSize(corePoolSize));
// 隊列允許等待的線程數
executor.setQueueCapacity(queueCapacity);
// 最大線程數,包含核心線程數
executor.setMaxPoolSize(maxPoolSize);
// 線程回收時間
executor.setKeepAliveSeconds(keepAliveTime);
// 4種拒絕策略 1. AbortPolicy 丟棄任務並拋異常,2. DiscardPolicy 丟棄任務不拋異常 3.DiscardOldestPolicy 丟棄隊列最前面的線程(棄老)4.CallerRunsPolicy:由調用線程處理該任務
executor.setRejectedExecutionHandler(getRejectedExecutionHandler(rejectedExecutionHandler));
// 是否允許超時回收線程
executor.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut);
// 線程執行完畢后,關閉線程池
executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
// 線程名前綴
executor.setThreadNamePrefix(threadNamePrefix);
// 初始化
executor.initialize();
return executor;
}
private int getCorePoolSize(int corePoolSize){
// 如果是-1 去機器核心數的2倍
if(-1 == corePoolSize){
int processors = Runtime.getRuntime().availableProcessors();
log.info("機器核心數:{},核心線程數:{}",processors,processors * 2);
return processors * 2;
}
log.info("核心線程數:{}",corePoolSize);
return corePoolSize;
}
private RejectedExecutionHandler getRejectedExecutionHandler(String rejectedExecutionHandler){
RejectedExecutionHandler reh = null;
try{
log.info("配置的線程拒絕策略:{}",rejectedExecutionHandler);
reh = (RejectedExecutionHandler) Class.forName("java.util.concurrent.ThreadPoolExecutor$" + rejectedExecutionHandler).newInstance();
}catch (Exception e){
log.warn("獲取拒絕策略失敗,默認使用 CallerRunsPolicy");
reh = new ThreadPoolExecutor.CallerRunsPolicy();
}
return reh;
}
}
yml:
my-task:
execution:
pool:
corePoolSize: 8 # 核心線程數
maxPoolSize: 20 # 最大線程數
queueCapacity: 100 # 等待隊列
allow-core-thread-timeout: true # 是否允許超時回收
keepAliveTime: 60 # 線程存活時間
threadNamePrefix: spring_task_
rejectedExecutionHandler: CallerRunsPolicy # 拒絕策略 1. AbortPolicy 丟棄任務並拋異常,2. DiscardPolicy 丟棄任務不拋異常 3.DiscardOldestPolicy 丟棄隊列最前面的線程(棄老)4.CallerRunsPolicy:由調用線程處理該任務
allowCoreThreadTimeOut: true # 是否回收核心線程
waitForTasksToCompleteOnShutdown: true # 執行完畢是否關閉線程池
使用:
指定使用哪個線程池
@Async(value = "asyncServiceThread")
@Override
public void testThread1() {
log.info("testThread1 正在執行,線程{}",Thread.currentThread().getName());
}
調用:
@GetMapping("/testThread")
public String testThread(){
for (int i = 0; i < 100; i++) {
taskServer.testThread1();
taskServer.testThread2();
}
return "succeed";
}
補充:拒絕策略
拒絕策略就是當線程池已經滿了,還有新的線程過來,此時我們怎么處理這些線程。
1.AbortPolicy(默認線程策略):丟棄任務並拋出RejectedExecutionException異常
如果是比較關鍵的業務,推薦使用此拒絕策略,這樣子在系統不能承載更大的並發量的時候,能夠及時的通過異常發現。
2.DiscardPolicy:丟棄任務,但是不拋出異常
無關緊要的業務可以這樣做。
3.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新提交被拒絕的任務。(棄老策略)
4.CallerRunsPolicy:由調用線程處理該任務