線程池配置模板
springboot給我們提供了一個線程池的實現,它的底層是由線程池ThreadPoolTaskExecutor來實現的。相較與JDK提供的線程池進行了一些功能的增強,比如對線程狀態的監聽,在我們在使用的時候更加的方便。在這里給各位同學一個配置模板,簡單的講解下Spring線程池的底層原理(在最后的源碼章節)。
基礎的注解解釋
@Configuration:這是 Spring 3.0 添加的一個注解,用來代替 applicationContext.xml 配置文件,所有這個配置文件里面能做到的事情都可以通過這個注解所在類來進行注冊。
@Bean:用來代替 XML 配置文件里面的 <bean ...> 配置。
常用配置參數
- corePoolSize :線程池的核心池大小,在創建線程池之后,線程池默認沒有任何線程。
線程池創建之后,線程池中的線程數為0,當任務過來就會創建一個線程去執行,直到線程數達到corePoolSize 之后,就會被到達的任務放在隊列中。換句更精煉的話:corePoolSize 表示允許線程池中允許同時運行的最大線程數。
如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有核心線程。
- maximumPoolSize :線程池允許的最大線程數,他表示最大能創建多少個線程。maximumPoolSize肯定是大於等於corePoolSize。
- keepAliveTime:表示線程沒有任務時最多保持多久然后停止。默認情況下,只有線程池中線程數大於corePoolSize 時,keepAliveTime 才會起作用。換句話說,當線程池中的線程數大於corePoolSize,並且一個線程空閑時間達到了keepAliveTime,那么就是shutdown。如果配置了 allowCoreThreadTimeOut=true,那么核心線程池也會參與到超時的計時中。
- Unit:keepAliveTime 的單位。
- workQueue :一個阻塞隊列,用來存儲等待執行的任務,當線程池中的線程數超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。通過workQueue,線程池實現了阻塞功能
- threadFactory :線程工廠,用來創建線程。
- handler :表示當拒絕處理任務時的策略。
- AbortPolicy:丟棄任務並拋出RejectedExecutionException
- CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是,任務提交線程的性能極有可能會急劇下降。
- DiscardOldestPolicy:丟棄隊列中最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務。
- DiscardPolicy:丟棄任務,不做任何處理。
- allowCoreThreadTimeOut:設置為true則線程池會回收核心線程池的線程,false則只會回收超過核心線程池的線程。默認為false。
spring線程池會對上述的參數進行包裝,可能你看到的真正配置時的名稱不一樣,但實際的作用是一樣的。
配置類設計
這是博主自己寫的一個關於Springboot線程池的配置類,參考了一些文章的規范,可以直接使用。
@EnableAsync
@Configuration
public class LogThreadPoolConfig {
@Bean(name = "logThreadPool")
public ThreadPoolTaskExecutor LogThreadPoolTask() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
LogThreadPoolProperties properties = this.logThreadPoolProperties();
executor.setCorePoolSize(properties.getCorePoolSize());
executor.setMaxPoolSize(properties.getMaxPoolSize());
executor.setQueueCapacity(properties.getQueueCapacity());
executor.setKeepAliveSeconds(properties.getKeepAliveSeconds());
executor.setThreadNamePrefix(properties.getThreadName());
switch (properties.getRejectedExecutionHandler()) {
case "abortPolicy":
executor.setRejectedExecutionHandler(new AbortPolicy());
break;
case "callerRunsPolicy":
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
break;
case "discardOldestPolicy":
executor.setRejectedExecutionHandler(new DiscardOldestPolicy());
break;
case "discardPolicy":
executor.setRejectedExecutionHandler(new DiscardOldestPolicy());
break;
default:
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
break;
}
executor.initialize();
return executor;
}
@Bean
@ConfigurationProperties(prefix = "threadpool.log")
public LogThreadPoolProperties logThreadPoolProperties() {
return new LogThreadPoolProperties();
}
//@Getter lombok提供的getset方法生成注解
//@Setter
@Configuration
public static class LogThreadPoolProperties {
/**
* 線程前綴名
*/
private String threadName;
/**
* 核心線程池大小
*/
private int corePoolSize;
/**
* 最大線程數
*/
private int maxPoolSize;
/**
* 隊列大小
*/
private int queueCapacity;
/**
* 線程池維護空閑線程存在時間
*/
private int keepAliveSeconds;
/**
* 拒絕策略
*/
private String rejectedExecutionHandler;
}
}
這樣就可以在yml文件中配置參數了:
threadpool:
log:
threadName: ThreadPool-log- # 線程池前綴名
corePoolSize: 8 # 核心線程池數:IO型推薦設置為cpu核心數*2;cpu型推薦設置為cpu數+1
maxPoolSize: 16 # 最大線程池數
queueCapacity: 1000 # 線程池阻塞隊列容量
keepAliveSeconds: 60 # 允許線程空閑時間
# 拒絕策略 abortPolicy callerRunsPolicy discardOldestPolicy discardPolicy
rejectedExecutionHandler: callerRunsPolicy
線程池使用
Spring提供了注解方式來方便我們使用線程池,只需要在要異步處理的方法上加 @Async("你配置的線程池名字")就可以了,注意這個類需要被spring掃描並納入管理,所以要加@Service、@Component等注解。
@Service
public class ServiceImpl implements Service {
@Override
@Async("logThreadPool")
public void addOperationLog(BaseLog baseLog) {
//你要異步執行的邏輯
}
}
具體的異步效果可以自測一下
ThreadPoolTaskExecutor源碼
springboot給我們提供了一個線程池的實現,它的底層是由我們傳統線程池ThreadPoolExecutor來實現的。
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object();
private int corePoolSize = 1;
private int maxPoolSize = 2147483647;
private int keepAliveSeconds = 60;
private int queueCapacity = 2147483647;
private boolean allowCoreThreadTimeOut = false;
private TaskDecorator taskDecorator;
/**
* 在這可以看到,其底層封裝了我們熟悉的threadPoolExecutor,這是JDK提供給我們的線程池實現
*/
private ThreadPoolExecutor threadPoolExecutor;
public ThreadPoolTaskExecutor() {
}
/**
* 這些都是些get/set
*/
public void setCorePoolSize(int corePoolSize) {
Object var2 = this.poolSizeMonitor;
synchronized(this.poolSizeMonitor) {
this.corePoolSize = corePoolSize;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setCorePoolSize(corePoolSize);
}
}
}
public int getCorePoolSize() {
Object var1 = this.poolSizeMonitor;
synchronized(this.poolSizeMonitor) {
return this.corePoolSize;
}
}
public void setMaxPoolSize(int maxPoolSize) {
Object var2 = this.poolSizeMonitor;
synchronized(this.poolSizeMonitor) {
this.maxPoolSize = maxPoolSize;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
}
}
}
public int getMaxPoolSize() {
Object var1 = this.poolSizeMonitor;
synchronized(this.poolSizeMonitor) {
return this.maxPoolSize;
}
}
public void setKeepAliveSeconds(int keepAliveSeconds) {
Object var2 = this.poolSizeMonitor;
synchronized(this.poolSizeMonitor) {
this.keepAliveSeconds = keepAliveSeconds;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setKeepAliveTime((long)keepAliveSeconds, TimeUnit.SECONDS);
}
}
}
public int getKeepAliveSeconds() {
Object var1 = this.poolSizeMonitor;
synchronized(this.poolSizeMonitor) {
return this.keepAliveSeconds;
}
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
public void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}
/**
* 這是初始化方法,可以在這里把JDK提供的ThreadPoolExecutor初始化了
*/
protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
public void execute(Runnable command) {
super.execute(ThreadPoolTaskExecutor.this.taskDecorator.decorate(command));
}
};
} else {
executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue());
}
public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
return this.threadPoolExecutor;
}
public int getPoolSize() {
return this.threadPoolExecutor == null ? this.corePoolSize : this.threadPoolExecutor.getPoolSize();
}
public int getActiveCount() {
return this.threadPoolExecutor == null ? 0 : this.threadPoolExecutor.getActiveCount();
}
public void execute(Runnable task) {
ThreadPoolExecutor executor = this.getThreadPoolExecutor();
try {
executor.execute(task);
} catch (RejectedExecutionException var4) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
}
}
public void execute(Runnable task, long startTimeout) {
this.execute(task);
}
public Future<?> submit(Runnable task) {
ThreadPoolExecutor executor = this.getThreadPoolExecutor();
try {
return executor.submit(task);
} catch (RejectedExecutionException var4) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
}
}
public <T> Future<T> submit(Callable<T> task) {
ThreadPoolExecutor executor = this.getThreadPoolExecutor();
try {
return executor.submit(task);
} catch (RejectedExecutionException var4) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
}
}
//這些都是些Spring對線程池的功能增強,一般用不到
public ListenableFuture<?> submitListenable(Runnable task) {
ThreadPoolExecutor executor = this.getThreadPoolExecutor();
try {
ListenableFutureTask<Object> future = new ListenableFutureTask(task, (Object)null);
executor.execute(future);
return future;
} catch (RejectedExecutionException var4) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
}
}
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ThreadPoolExecutor executor = this.getThreadPoolExecutor();
try {
ListenableFutureTask<T> future = new ListenableFutureTask(task);
executor.execute(future);
return future;
} catch (RejectedExecutionException var4) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
}
}
public boolean prefersShortLivedTasks() {
return true;
}
}
ThreadPoolTaskExecutor 繼承了 ExecutorConfigurationSupport,其實它主要是完成線程池的初始化的:
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean {
protected final Log logger = LogFactory.getLog(this.getClass());
private ThreadFactory threadFactory = this;
private boolean threadNamePrefixSet = false;
private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy();
private boolean waitForTasksToCompleteOnShutdown = false;
private int awaitTerminationSeconds = 0;
private String beanName;
private ExecutorService executor;
public ExecutorConfigurationSupport() {
}
public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = (ThreadFactory)(threadFactory != null ? threadFactory : this);
}
public void setThreadNamePrefix(String threadNamePrefix) {
super.setThreadNamePrefix(threadNamePrefix);
this.threadNamePrefixSet = true;
}
public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler = (RejectedExecutionHandler)(rejectedExecutionHandler != null ? rejectedExecutionHandler : new AbortPolicy());
}
public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
}
public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
this.awaitTerminationSeconds = awaitTerminationSeconds;
}
public void setBeanName(String name) {
this.beanName = name;
}
/**
* 這里就是在bean初始化完后調用線程池的初始化方法生成線程池實例
* 並被Spring容器管理
*/
public void afterPropertiesSet() {
this.initialize();
}
public void initialize() {
if (this.logger.isInfoEnabled()) {
this.logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
this.setThreadNamePrefix(this.beanName + "-");
}
this.executor = this.initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
protected abstract ExecutorService initializeExecutor(ThreadFactory var1, RejectedExecutionHandler var2);
public void destroy() {
this.shutdown();
}
public void shutdown() {
if (this.logger.isInfoEnabled()) {
this.logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
} else {
this.executor.shutdownNow();
}
this.awaitTerminationIfNecessary();
}
private void awaitTerminationIfNecessary() {
if (this.awaitTerminationSeconds > 0) {
try {
if (!this.executor.awaitTermination((long)this.awaitTerminationSeconds, TimeUnit.SECONDS) && this.logger.isWarnEnabled()) {
this.logger.warn("Timed out while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
}
} catch (InterruptedException var2) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Interrupted while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
}
Thread.currentThread().interrupt();
}
}
}
}
上述好多的參數其實都是JDK線程池需要的,具體他們的功能可以看線程池源碼來了解它的作用。線程池源碼解析