SpringBoot 動態多線程並發定時任務


一、簡介

實現定時任務有多種方式:

  • Timer:jdk 中自帶的一個定時調度類,可以簡單的實現按某一頻度進行任務執行。提供的功能比較單一,無法實現復雜的調度任務。
  • ScheduledExecutorService:也是 jdk 自帶的一個基於線程池設計的定時任務類。其每個調度任務都會分配到線程池中的一個線程執行,所以其任務是並發執行的,互不影響。
  • Spring Task:Spring 提供的一個任務調度工具,支持注解和配置文件形式,支持 Cron 表達式,使用簡單但功能強大。
  • Quartz:一款功能強大的任務調度器,可以實現較為復雜的調度功能,如每月一號執行、每天凌晨執行、每周五執行等等,還支持分布式調度,就是配置稍顯復雜。

使用 spring 自帶的,繼承 SchedulingConfigurer 的方式。

源碼地址:

Gitee: https://gitee.com/typ1805/tansci

GitHub: https://github.com/typ1805/tansci

二、編碼實現

啟動類添加 @EnableScheduling 注解

@EnableScheduling
@SpringBootApplication
public class TansciApplication {

    public static void main(String[] args) {
        SpringApplication.run(TansciApplication.class, args);
    }

}

定時任務類

添加注解 @Component 注冊到 spring 容器中。

/**
 * @ClassName: ScheduledTask.java
 * @ClassPath: com.tansci.common.task.ScheduledTask.java
 * @Description: 定時任務
 * @Author: tanyp
 * @Date: 2022/2/25 9:30
 **/
@Slf4j
@Component
public class ScheduledTask implements SchedulingConfigurer {

    private volatile ScheduledTaskRegistrar registrar;

    private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();

    private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();

    @Autowired
    private TaskContextService taskContextService;

    @Override
    public void configureTasks(ScheduledTaskRegistrar registrar) {
        registrar.setScheduler(Executors.newScheduledThreadPool(Constants.DEFAULT_THREAD_POOL));
        this.registrar = registrar;
    }

    @PreDestroy
    public void destroy() {
        this.registrar.destroy();
    }

    /**
     * @MonthName: refreshTask
     * @Description: 初始化任務
     * 1、從數據庫獲取執行任務的集合【TaskConfig】
     * 2、通過調用 【refresh】 方法刷新任務列表
     * 3、每次數據庫中的任務發生變化后重新執行【1、2】
     * @Author: tanyp
     * @Date: 2022/2/25 9:42
     * @Param: [tasks]
     * @return: void
     **/
    public void refreshTask(List<TaskConfig> tasks) {
        // 刪除已經取消任務
        scheduledFutures.keySet().forEach(key -> {
            if (Objects.isNull(tasks) || tasks.size() == 0) {
                scheduledFutures.get(key).cancel(false);
                scheduledFutures.remove(key);
                cronTasks.remove(key);
                return;
            }
            tasks.forEach(task -> {
                if (!Objects.equals(key, task.getTaskId())) {
                    scheduledFutures.get(key).cancel(false);
                    scheduledFutures.remove(key);
                    cronTasks.remove(key);
                    return;
                }
            });
        });

        // 添加新任務、更改執行規則任務
        tasks.forEach(item -> {
            String expression = item.getExpression();
            // 任務表達式為空則跳過
            if (StringUtils.isEmpty(expression)) {
                return;
            }

            // 任務已存在並且表達式未發生變化則跳過
            if (scheduledFutures.containsKey(item.getTaskId()) && cronTasks.get(item.getTaskId()).getExpression().equals(expression)) {
                return;
            }

            // 任務執行時間發生了變化,則刪除該任務
            if (scheduledFutures.containsKey(item.getTaskId())) {
                scheduledFutures.get(item.getTaskId()).cancel(false);
                scheduledFutures.remove(item.getTaskId());
                cronTasks.remove(item.getTaskId());
            }

            CronTask task = new CronTask(new Runnable() {
                @Override
                public void run() {
                    // 執行業務邏輯
                    try {
                        log.info("====執行單個任務,任務ID【{}】執行規則【{}】=======", item.getTaskId(), item.getExpression());
                        taskContextService.execute(item.getCode());
                    } catch (Exception e) {
                        log.error("執行任務異常,異常信息:{}", e);
                    }
                }
            }, expression);
            ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
            cronTasks.put(item.getTaskId(), task);
            scheduledFutures.put(item.getTaskId(), future);
        });
    }

}

任務自啟動配置

啟動項目是讀取任務配置表中的信息,初始化任務執行列表。

/**
 * @ClassName: TaskApplicationRunner.java
 * @ClassPath: com.tansci.common.task.TaskApplicationRunner.java
 * @Description: 任務自啟動配置
 * @Author: tanyp
 * @Date: 2022/2/25 9:43
 **/
@Slf4j
@Component
public class TaskApplicationRunner implements ApplicationRunner {

    @Autowired
    private ScheduledTask scheduledTask;

    @Autowired
    private TaskConfigService taskConfigService;

    @Override
    public void run(ApplicationArguments args) {
        try {
            log.info("================項目啟動初始化定時任務====開始===========");
            List<TaskConfig> tasks = taskConfigService.list(Wrappers.<TaskConfig>lambdaQuery().eq(TaskConfig::getStatus, 1));
            log.info("========初始化定時任務數為:{}=========", tasks.size());
            scheduledTask.refreshTask(tasks);
            log.info("================項目啟動初始化定時任務====完成==========");
        } catch (Exception e) {
            log.error("================項目啟動初始化定時任務====異常:{}", e);
        }
    }

}

任務配置相關類

  • TaskConfig: 任務配置實體
  • TaskConfigService: 接口
  • TaskConfigServiceImpl: 接口實現類
  • TaskConfigMapper: Mapper 接口
/**
 * @ClassName: TaskConfig.java
 * @ClassPath: com.tansci.domain.system.TaskConfig.java
 * @Description: 任務配置
 * @Author: tanyp
 * @Date: 2022/2/25 9:35
 **/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "task_config")
@ApiModel(value = "任務配置")
public class TaskConfig {

    @ApiModelProperty(value = "主鍵id")
    @TableId(type = IdType.ASSIGN_UUID)
    private String id;

    @ApiModelProperty(value = "任務服務名稱")
    private String code;

    @ApiModelProperty(value = "任務編碼")
    private String taskId;

    @ApiModelProperty(value = "任務執行規則時間:cron表達式")
    private String expression;

    @ApiModelProperty(value = "任務名稱")
    private String name;

    @ApiModelProperty(value = "狀態:0、未啟動,1、正常")
    private Integer status;

    @ApiModelProperty(value = "狀態")
    @TableField(exist = false)
    private String statusName;

    @ApiModelProperty(value = "創建人")
    private String creater;

    @ApiModelProperty(value = "更新時間")
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", locale = "zh", timezone = "GMT+8")
    private LocalDateTime updateTime;

    @ApiModelProperty(value = "創建時間")
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", locale = "zh", timezone = "GMT+8")
    private LocalDateTime createTime;

    @ApiModelProperty(value = "描述")
    private String remarks;

}

動態調用任務配置

通過 task_config 配置的 code 字段信息來調用業務實現 spring bean。

/**
 * @ClassName: TaskContextServiceImpl.java
 * @ClassPath: com.tansci.service.impl.system.TaskContextServiceImpl.java
 * @Description: 動態調用任務配置信息
 * @Author: tanyp
 * @Date: 2022/2/25 10:12
 **/
@Slf4j
@Service
public class TaskContextServiceImpl implements TaskContextService {

    /**
     * 任務注冊器
     */
    @Autowired
    private Map<String, TaskRegisterService> componentServices;

    /**
     * @MonthName: execute
     * @Description: 解析器
     * @Author: tanyp
     * @Date: 2022/2/25 10:13
     * @Param: [taskServerName]
     * @return: void
     **/
    @Override
    public void execute(String taskServerName) {
        componentServices.get(taskServerName).register();
    }

}

任務注冊器

/**
 * @ClassName: TaskRegisterService.java
 * @ClassPath: com.tansci.service.system.TaskRegisterService.java
 * @Description: 任務注冊器
 * @Author: tanyp
 * @Date: 2022/2/25 10:05
 **/
public interface TaskRegisterService {

    void register();

}

創建業務實現時,只需實現 TaskRegisterService 接口即可。

注意:
@Service("taskTest1Service") 是唯一的,對應 task_config 表中的 code 字段;
expression 的配置為 cron 表達式。

創建兩個任務測試類:

@Slf4j
@Service("taskTest1Service")
public class TaskTest1ServiceImpl implements TaskRegisterService {

    @Override
    public void register() {
        log.info("===========自定義任務測試【TaskTest1ServiceImpl】====【1】=========");
    }
}
@Slf4j
@Service("taskTest2Service")
public class TaskTest2ServiceImpl implements TaskRegisterService {

    @Override
    public void register() {
        log.info("===========自定義任務測試【TaskTest2ServiceImpl】====【3】=========");
    }
}

三、測試

在界面配置 taskTest1ServicetaskTest2Service 如下:

添加配置

任務配置列表

啟動項目,執行結果如下:

2022-02-25 12:59:00,007  [pool-2-thread-1] INFO  com.tansci.common.task.ScheduledTask.run 107 - ====執行單個任務,任務ID【T1000214524DFS】執行規則【*/20 * * * * ?】=======
2022-02-25 12:59:00,007  [pool-2-thread-1] INFO  com.tansci.service.impl.system.task.TaskTest1ServiceImpl.register 20 - ===========自定義任務測試【TaskTest1ServiceImpl】====【1】=========
2022-02-25 12:59:20,015  [pool-2-thread-3] INFO  com.tansci.common.task.ScheduledTask.run 107 - ====執行單個任務,任務ID【T1000214524DFS】執行規則【*/20 * * * * ?】=======
2022-02-25 12:59:20,015  [pool-2-thread-3] INFO  com.tansci.service.impl.system.task.TaskTest1ServiceImpl.register 20 - ===========自定義任務測試【TaskTest1ServiceImpl】====【1】=========
2022-02-25 12:59:40,004  [pool-2-thread-3] INFO  com.tansci.common.task.ScheduledTask.run 107 - ====執行單個任務,任務ID【T1000214524DFS】執行規則【*/20 * * * * ?】=======
2022-02-25 12:59:40,004  [pool-2-thread-3] INFO  com.tansci.service.impl.system.task.TaskTest1ServiceImpl.register 20 - ===========自定義任務測試【TaskTest1ServiceImpl】====【1】=========

可以看到初始化的任務都在執行,並且是多線程在執行。

四、cron 表達式

corn 從左到右(用空格隔開):秒 分 小時 月份中的日期 月份 星期中的日期 年份

字段 允許值 允許的特殊字符
秒(Seconds) 0~59 的整數 , - * /
分(Minutes) 0~59 的整數 , - * /
小時(Hours) 0~23 的整數 , - * /
日期(DayofMonth) 1~31 的整數 ,- * ? / L W C
月份(Month) 1~12 的整數或者 JAN-DEC , - * /
星期(DayofWeek) 1~7 的整數或者 SUN-SAT (1=SUN) , - * ? / L C #
年(可選,留空)(Year) 1970~2099 , - * /
  • *:表示匹配該域的任意值。假如在 Minutes 域使用*, 即表示每分鍾都會觸發事件。
  • ?:只能用在 DayofMonth 和 DayofWeek 兩個域。
  • -:表示范圍。例如在 Minutes 域使用 5-20,表示從 5 分到 20 分鍾每分鍾觸發一次
  • /:表示起始時間開始觸發,然后每隔固定時間觸發一次。
  • ,:表示列出枚舉值。例如:在 Minutes 域使用 5,20,則意味着在 5 和 20 分每分鍾觸發一次。
  • L:表示最后,只能出現在 DayofWeek 和 DayofMonth 域。
  • W:表示有效工作日(周一到周五),只能出現在 DayofMonth 域,系統將在離指定日期的最近的有效工作日觸發事件。
  • LW:這兩個字符可以連用,表示在某個月最后一個工作日,即最后一個星期五。
  • #:用於確定每個月第幾個星期幾,只能出現在 DayofMonth 域。例如在 4#2,表示某月的第二個星期三。

常用表達式例子

  • 0 0 2 1 * ? * 表示在每月的 1 日的凌晨 2 點調整任務
  • 0 15 10 ? * MON-FRI 表示周一到周五每天上午 10:15 執行作業
  • 0 15 10 ? 6L 2002-2006 表示 2002-2006 年的每個月的最后一個星期五上午 10:15 執行作
  • 0 0 10,14,16 * * ? 每天上午 10 點,下午 2 點,4 點
  • 0 0/30 9-17 * * ? 朝九晚五工作時間內每半小時
  • 0 0 12 ? * WED 表示每個星期三中午 12 點
  • 0 0 12 * * ? 每天中午 12 點觸發
  • 0 15 10 ? * * 每天上午 10:15 觸發
  • 0 15 10 * * ? 每天上午 10:15 觸發
  • 0 15 10 * * ? * 每天上午 10:15 觸發
  • 0 15 10 * * ? 2005 2005 年的每天上午 10:15 觸發
  • 0 * 14 * * ? 在每天下午 2 點到下午 2:59 期間的每 1 分鍾觸發
  • 0 0/5 14 * * ? 在每天下午 2 點到下午 2:55 期間的每 5 分鍾觸發
  • 0 0/5 14,18 * * ? 在每天下午 2 點到 2:55 期間和下午 6 點到 6:55 期間的每 5 分鍾觸發
  • 0 0-5 14 * * ? 在每天下午 2 點到下午 2:05 期間的每 1 分鍾觸發
  • 0 10,44 14 ? 3 WED 每年三月的星期三的下午 2:10 和 2:44 觸發
  • 0 15 10 ? * MON-FRI 周一至周五的上午 10:15 觸發
  • 0 15 10 15 * ? 每月 15 日上午 10:15 觸發
  • 0 15 10 L * ? 每月最后一日的上午 10:15 觸發
  • 0 15 10 ? * 6L 每月的最后一個星期五上午 10:15 觸發
  • 0 15 10 ? * 6L 2002-2005 2002 年至 2005 年的每月的最后一個星期五上午 10:15 觸發
  • 0 15 10 ? * 6#3 每月的第三個星期五上午 10:1 觸發

五、動態使用方式

1、啟動方式有兩種

  • 啟動項目后,手動調用 ScheduledTask.refreshTask(List<MyTask> tasks),並初始化任務列表;
  • 使用我測試中的方式,配置項目啟動完成后自動調用初始任務的方法,並初始化任務列表。

2、數據初始化

只需要給 List<MyTask> 集合賦值並調用 refreshTask() 方法即可:

  • 根據業務需求修改 TaskConfig 實體類;
  • 這里的初始化數據可以從數據庫讀取數據賦值給集合;

例如:從 mysql 讀取任務配置表的數據,調用 refreshTask() 方法。

3、如何動態

  • 修改:修改某一項正在執行的任務規則;
  • 添加:添加一項新的任務;
  • 刪除:停止某一項正在執行的任務。

例如:我們有一張任務配置表,此時進行分別新增一條或多條數據、刪除一條或多條數據、改一條數據,只需要完成以上任何一項操作后,重新調用一下 refreshTask() 方法即可。

怎么重新調用 refreshTask() 方法:可以另外啟一個任務實時監控任務表的數據變化。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM