Springboot定時任務@Scheduled,異步任務@Async


Springboot定時任務

一、注解

@EnableScheduling和@Scheduled

定時任務Schedule,Spring調度默認則是順序執行的, 使用場景適用於定時任務為固定周期。(如果要改變周期需要重啟項目)

eg:

@Scheduled(cron = "0/5 * * * * ?")
public void test(){
    
}

二、基於接口

適用場景為任務周期經常變化,cron表達式來自於數據庫獲取。

@Slf4j
@Configuration
public class AsyncAndScheduleConf implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.addTriggerTask(
                //1.添加任務內容(Runnable)
                () -> System.out.println("執行動態定時任務: " + LocalDateTime.now().toLocalTime()),
                //2.設置執行周期(Trigger)
                triggerContext -> {
                    //2.1 從數據庫獲取執行周期
                    String cron = cronMapper.getCron();
                    //2.2 合法性校驗.
                    if (StringUtils.isEmpty(cron)) {
                        // Omitted Code ..
                    }
                    //2.3 返回執行周期(Date)
                    return new CronTrigger(cron).nextExecutionTime(triggerContext);
                }
        );
    }
}

三、異步任務

@EnableAsync和@Async
默認情況下,Spring 使用SimpleAsyncTaskExecutor去執行這些異步方法(此執行器沒有限制線程數)。此默認值可以從兩個層級進行覆蓋。

如果方法級別或應用級別未配置線程池,在使用SimpleAsyncTaskExecutor因為創建了大量線程極有可能造成OOM,以下貼出部分SimpleAsyncTaskExecutor源碼會說明為何創建無限制的線程。

public void execute(Runnable task) {
        this.execute(task, 9223372036854775807L);
}

public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
    //判斷是否開啟限流
        if (this.isThrottleActive() && startTimeout > 0L) {
            //限流入口
            this.concurrencyThrottle.beforeAccess();
            this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
        } else {
            this.doExecute(taskToUse);
        }

    }

是否限流判斷isThrottleActive()中屬性concurrencyLimit是否大於0,默認為-1,所以未進行限流。

如果開啟了限流,進入beforeAccess() 方法會判斷線程數是否超過concurrencyLimit,若超過則當前線程wait,其他線程執行完成后當前線程則會notify。

所以,使用@Async從以下幾點入手:

Case1:使用默認的SimpleAsyncTaskExecutor

為了防止創建過多線程,配置線程的限制數

@Configuration
public class ThreadLimitConfig extends AsyncConfigurerSupport {
    @Override
    public Executor getAsyncExecutor() {
        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
        //設置允許同時執行的線程數為10
 executor.setConcurrencyLimit(10);
        return executor;
    }
}
Case2:使用指定的線程池
@Async("myThreadPoolConfig")
public void mehtod1(){
    //todo
}
Case3:全局配置

@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
    // 聲明一個線程池(並指定線程池的名字,默認是方法名稱)
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心線程數5:線程池創建時候初始化的線程數
        executor.setCorePoolSize(5);
        //最大線程數5:線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
        executor.setMaxPoolSize(5);
        //緩沖隊列大小:用來緩沖執行任務的隊列
        executor.setQueueCapacity(500);
        //允許線程的空閑時間60秒:當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
        executor.setKeepAliveSeconds(60);
        //線程池名的前綴:設置好了之后可以方便我們定位處理任務所在的線程池
        executor.setThreadNamePrefix("線程名-");
 
          //不在新線程中執行任務,而是用調用者所在的線程來執行
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
 
        return executor;
 
    }
    //異常處理
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncUncaughtExceptionHandler();
    }
}
class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        System.out.println("class#method: " + method.getDeclaringClass().getName() + "#" + method.getName());
        System.out.println("type        : " + ex.getClass().getName());
        System.out.println("exception   : " + ex.getMessage());
 
    }
}

當異步方法有返回值時,可以捕獲到異常信息,當無返回值時想要捕獲異常需要使用以上配置追蹤異常信息。

@Async使用注意:方法不能使用static修飾符,否則注解失效。

關於@EnableAsync

默認啟動流程:
1 AnnotationAsyncExecutionInterceptor#getDefaultExecutor方法

搜索關聯的線程池定義:上下文中唯一的 TaskExecutor 實例,或一個名為 taskExecutorjava.util.concurrent.Executor 實例;
2 如果以上都沒找到,則會使用 SimpleAsyncTaskExecutor 處理異步方法調用

關於springframework提供的類:ThreadPoolTaskScheduler與ThreadPoolTaskExecutor

一個ThreadPoolTaskExecutor通過它的corePoolSize , maxPoolSize , keepAliveSeconds和queueCapacity屬性在線程池中提供細粒度的配置。 諸如ThreadPoolTaskScheduler這樣的調度器不提供這樣的配置。
spring中的線程調度類也是juc包中的間接實現。
因此,在兩者之間進行選擇歸結為以下問題:是否需要執行或計划執行任務?根據不同的用途去選擇就可以

參考:https://stackoverflow.com/questions/33453722/spring-threadpooltaskscheduler-vs-threadpooltaskexecutor

juc包中還提供了ScheduledThreadPoolExecutor使用,內部包含一個無界阻塞隊列,類似這種代碼如果沒有進行控制 一定會導致oom。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM