Springboot定時任務
一、注解
@EnableScheduling和@Scheduled
定時任務Schedule,Spring調度默認則是順序執行的, 使用場景適用於定時任務為固定周期。(如果要改變周期需要重啟項目)
eg:
@Scheduled(cron = "0/5 * * * * ?")
public void test(){
}
二、基於接口
適用場景為任務周期經常變化,cron表達式來自於數據庫獲取。
@Slf4j
@Configuration
public class AsyncAndScheduleConf implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addTriggerTask(
//1.添加任務內容(Runnable)
() -> System.out.println("執行動態定時任務: " + LocalDateTime.now().toLocalTime()),
//2.設置執行周期(Trigger)
triggerContext -> {
//2.1 從數據庫獲取執行周期
String cron = cronMapper.getCron();
//2.2 合法性校驗.
if (StringUtils.isEmpty(cron)) {
// Omitted Code ..
}
//2.3 返回執行周期(Date)
return new CronTrigger(cron).nextExecutionTime(triggerContext);
}
);
}
}
三、異步任務
@EnableAsync和@Async
默認情況下,Spring 使用SimpleAsyncTaskExecutor去執行這些異步方法(此執行器沒有限制線程數)。此默認值可以從兩個層級進行覆蓋。
如果方法級別或應用級別未配置線程池,在使用SimpleAsyncTaskExecutor因為創建了大量線程極有可能造成OOM,以下貼出部分SimpleAsyncTaskExecutor源碼會說明為何創建無限制的線程。
public void execute(Runnable task) {
this.execute(task, 9223372036854775807L);
}
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
//判斷是否開啟限流
if (this.isThrottleActive() && startTimeout > 0L) {
//限流入口
this.concurrencyThrottle.beforeAccess();
this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
} else {
this.doExecute(taskToUse);
}
}
是否限流判斷isThrottleActive()中屬性concurrencyLimit是否大於0,默認為-1,所以未進行限流。
如果開啟了限流,進入beforeAccess() 方法會判斷線程數是否超過concurrencyLimit,若超過則當前線程wait,其他線程執行完成后當前線程則會notify。
所以,使用@Async從以下幾點入手:
Case1:使用默認的SimpleAsyncTaskExecutor
為了防止創建過多線程,配置線程的限制數
@Configuration
public class ThreadLimitConfig extends AsyncConfigurerSupport {
@Override
public Executor getAsyncExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
//設置允許同時執行的線程數為10
executor.setConcurrencyLimit(10);
return executor;
}
}
Case2:使用指定的線程池
@Async("myThreadPoolConfig")
public void mehtod1(){
//todo
}
Case3:全局配置
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
// 聲明一個線程池(並指定線程池的名字,默認是方法名稱)
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心線程數5:線程池創建時候初始化的線程數
executor.setCorePoolSize(5);
//最大線程數5:線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
executor.setMaxPoolSize(5);
//緩沖隊列大小:用來緩沖執行任務的隊列
executor.setQueueCapacity(500);
//允許線程的空閑時間60秒:當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
executor.setKeepAliveSeconds(60);
//線程池名的前綴:設置好了之后可以方便我們定位處理任務所在的線程池
executor.setThreadNamePrefix("線程名-");
//不在新線程中執行任務,而是用調用者所在的線程來執行
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執行初始化
executor.initialize();
return executor;
}
//異常處理
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncUncaughtExceptionHandler();
}
}
class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
System.out.println("class#method: " + method.getDeclaringClass().getName() + "#" + method.getName());
System.out.println("type : " + ex.getClass().getName());
System.out.println("exception : " + ex.getMessage());
}
}
當異步方法有返回值時,可以捕獲到異常信息,當無返回值時想要捕獲異常需要使用以上配置追蹤異常信息。
@Async使用注意:方法不能使用static修飾符,否則注解失效。
關於@EnableAsync
默認啟動流程:
1 AnnotationAsyncExecutionInterceptor#getDefaultExecutor方法
搜索關聯的線程池定義:上下文中唯一的 TaskExecutor
實例,或一個名為 taskExecutor
的 java.util.concurrent.Executor
實例;
2 如果以上都沒找到,則會使用 SimpleAsyncTaskExecutor
處理異步方法調用
關於springframework提供的類:ThreadPoolTaskScheduler與ThreadPoolTaskExecutor
- ThreadPoolTaskExecutor是一個專門用於執行任務的類。
- ThreadPoolTaskScheduler是一個專門用於調度任務的類。
一個ThreadPoolTaskExecutor通過它的corePoolSize , maxPoolSize , keepAliveSeconds和queueCapacity屬性在線程池中提供細粒度的配置。 諸如ThreadPoolTaskScheduler這樣的調度器不提供這樣的配置。
spring中的線程調度類也是juc包中的間接實現。
因此,在兩者之間進行選擇歸結為以下問題:是否需要執行或計划執行任務?根據不同的用途去選擇就可以
juc包中還提供了ScheduledThreadPoolExecutor使用,內部包含一個無界阻塞隊列,類似這種代碼如果沒有進行控制 一定會導致oom。