該功能主要是基於 TaskScheduler 和 CronTask兩個類來實現。
直接干代碼
1. 創建一個springboot 工程,依賴如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>qinfeng.zheng</groupId> <artifactId>job-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>job-demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2. 數據庫配置
spring.datasource.driver-class-name = com.mysql.jdbc.Driver spring.datasource.url= jdbc:mysql://120.79.34.98:3306/test?useUnicode=yes&characterEncoding=UTF-8&useSSL=false spring.datasource.username = root spring.datasource.password = 123456
3. 代碼
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class SchedulingConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); // 定時任務執行線程池核心線程數 taskScheduler.setPoolSize(4); taskScheduler.setRemoveOnCancelPolicy(true); taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-"); return taskScheduler; } }
import java.util.concurrent.ScheduledFuture; public final class ScheduledTask { volatile ScheduledFuture future; /** * 取消定時任務 */ public void cancel() { ScheduledFuture future = this.future; if (future != null) { future.cancel(true); } } }
import lombok.extern.slf4j.Slf4j; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; import java.lang.reflect.Method; import java.util.Objects; /** * 通過反射的方式 調用真定需要執行的方法 */ @Slf4j public class SchedulingRunnable implements Runnable { private String beanName; private String methodName; private String params; public SchedulingRunnable(String beanName, String methodName, String params) { this.beanName = beanName; this.methodName = methodName; this.params = params; } @Override public void run() { log.info("定時任務開始執行 - bean:{},方法:{},參數:{}", beanName, methodName, params); long startTime = System.currentTimeMillis(); try { Object target = SpringContextUtils.getBean(beanName); Method method = null; if (!StringUtils.isEmpty(params)) { method = target.getClass().getDeclaredMethod(methodName, String.class); } else { method = target.getClass().getDeclaredMethod(methodName); } ReflectionUtils.makeAccessible(method); if (!StringUtils.isEmpty(params)) { method.invoke(target, params); } else { method.invoke(target); } } catch (Exception ex) { log.error("定時任務執行異常 - bean:{},方法:{},參數:{} ", beanName, methodName, params, ex); } long times = System.currentTimeMillis() - startTime; log.info("定時任務執行結束 - bean:{},方法:{},參數:{},耗時:{} 毫秒", beanName, methodName, params, times); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SchedulingRunnable that = (SchedulingRunnable) o; if (params == null) { return beanName.equals(that.beanName) && methodName.equals(that.methodName) && that.params == null; } return beanName.equals(that.beanName) && methodName.equals(that.methodName) && params.equals(that.params); } @Override public int hashCode() { if (params == null) { return Objects.hash(beanName, methodName); } return Objects.hash(beanName, methodName, params); } }
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component public class SpringContextUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtils.applicationContext = applicationContext; } public static Object getBean(String name) { return applicationContext.getBean(name); } }
定時任務注冊類
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.config.CronTask; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 可參考spring 提供的定時任務注冊類 {@link ScheduledTaskRegistrar} */ @Component public class CronTaskRegistrar implements DisposableBean { /** * 緩存 */ private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16); @Autowired private TaskScheduler taskScheduler; /** * 添加一個定時任務 * 其核心就是靠spring提供的 CronTask 類來實現 * * @param task * @param cronExpression */ public void addCronTask(Runnable task, String cronExpression) { addCronTask(new CronTask(task, cronExpression)); } public void addCronTask(CronTask cronTask) { if (cronTask != null) { Runnable task = cronTask.getRunnable(); if (this.scheduledTasks.containsKey(task)) { removeCronTask(task); } this.scheduledTasks.put(task, scheduleCronTask(cronTask)); } } public void removeCronTask(Runnable task) { ScheduledTask scheduledTask = this.scheduledTasks.remove(task); if (scheduledTask != null) scheduledTask.cancel(); } public ScheduledTask scheduleCronTask(CronTask cronTask) { ScheduledTask scheduledTask = new ScheduledTask(); scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger()); return scheduledTask; } @Override public void destroy() { for (ScheduledTask task : this.scheduledTasks.values()) { task.cancel(); } this.scheduledTasks.clear(); } }
下面是將job信息記錄到數據庫,以便於啟動項目時,就可以運行定時任務
CREATE TABLE `sys_job` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `jobId` bigint(20) DEFAULT NULL, `beanName` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `methodParams` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `cronExpression` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `jobStatus` char(1) COLLATE utf8_unicode_ci DEFAULT NULL, `remark` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, `createTime` date DEFAULT NULL, `updateTime` date DEFAULT NULL, `methodName` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
import lombok.Data; import java.util.Date; @Data public class SysJobPO { /** * 任務ID */ private Integer jobId; /** * bean名稱 , 如示例中的: demoTask */ private String beanName; /** * 方法名稱 */ private String methodName; /** * 方法參數 */ private String methodParams; /** * cron表達式 */ private String cronExpression; /** * 狀態(1正常 0暫停) */ private Integer jobStatus; /** * 備注 */ private String remark; /** * 創建時間 */ private Date createTime; /** * 更新時間 */ private Date updateTime; }
import org.apache.ibatis.annotations.*; import qinfeng.zheng.jobdemo.SysJobPO; import java.util.List; @Mapper public interface SysJobMapper { @Insert("INSERT INTO sys_job(jobId,beanName,methodParams,cronExpression,jobStatus,remark,createTime,updateTime,methodName) values (#{jobId}," + "#{beanName},#{methodParams},#{cronExpression},#{jobStatus},#{remark},#{createTime},#{updateTime},#{methodName})") boolean addSysJob(SysJobPO sysJob); @Select("select * from sys_job where jobStatus = #{jobStatus}") List<SysJobPO> getSysJobListByStatus(Integer jobStatus); @Select("select * from sys_job where jobId = #{jobId}") SysJobPO findTaskJobByJobId(Integer jobId); @Delete("delete from sys_job where jobId = #{jobId}") boolean deleteTaskJobByJobId(Integer jobId); /** * 這兒只是修改corn表達式 和 狀態。 測試使用 * * @param sysJobPO * @return */ @Update("update sys_job set cronExpression = #{cronExpression} , jobStatus = #{jobStatus} where jobId= #{jobId}") boolean updateTaskJob(SysJobPO sysJobPO); }
/** * job狀態控制 */ public enum SysJobStatus { NORMAL("正常", 1), SUSPEND("暫停", 0); private String desc; private Integer index; private SysJobStatus(String desc, Integer index) { this.desc = desc; this.index = index; } public String desc() { return this.desc; } public Integer index() { return this.index; } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import qinfeng.zheng.jobdemo.CronTaskRegistrar; import qinfeng.zheng.jobdemo.SchedulingRunnable; import qinfeng.zheng.jobdemo.SysJobPO; import qinfeng.zheng.jobdemo.SysJobStatus; import qinfeng.zheng.jobdemo.mapper.SysJobMapper; import java.util.List; @Service public class SysJobRunner implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(SysJobRunner.class); @Autowired private SysJobMapper sysJobMapper; @Autowired private CronTaskRegistrar cronTaskRegistrar; @Override public void run(String... args) { // 初始加載數據庫里狀態為正常的定時任務 List<SysJobPO> jobList = sysJobMapper.getSysJobListByStatus(SysJobStatus.NORMAL.index()); if (!CollectionUtils.isEmpty(jobList)) { for (SysJobPO job : jobList) { SchedulingRunnable task = new SchedulingRunnable(job.getBeanName(), job.getMethodName(), job.getMethodParams()); cronTaskRegistrar.addCronTask(task, job.getCronExpression()); } logger.info("定時任務已加載完畢..."); } } }
定時job測試類
import org.springframework.stereotype.Component; /** * 該類用於測試定時任務 */ @Component("demoTask") public class DemoTask { public void taskWithParams(String params) { System.out.println("執行有參示例任務:" + params); } public void taskNoParams() { System.out.println("執行無參示例任務"); } }
啟動springboot項目時,就會通過 SysJobRunner 加載數據庫表中job信息,從而啟動定時任務。
下面再寫一個controller控制器,實現與前端交互,從而實現定時job的增、刪、改,切換
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import qinfeng.zheng.jobdemo.CronTaskRegistrar; import qinfeng.zheng.jobdemo.SchedulingRunnable; import qinfeng.zheng.jobdemo.SysJobPO; import qinfeng.zheng.jobdemo.SysJobStatus; import qinfeng.zheng.jobdemo.mapper.SysJobMapper; @RestController public class TaskController { @Autowired private SysJobMapper sysJobMapper; @Autowired private CronTaskRegistrar cronTaskRegistrar; @PostMapping("/addTask") public String addTask(SysJobPO sysJob) { boolean success = sysJobMapper.addSysJob(sysJob); if (!success) return "新增失敗"; else { if (sysJob.getJobStatus().equals(SysJobStatus.NORMAL.index())) { SchedulingRunnable task = new SchedulingRunnable(sysJob.getBeanName(), sysJob.getMethodName(), sysJob.getMethodParams()); // 注冊定時任務 cronTaskRegistrar.addCronTask(task, sysJob.getCronExpression()); } } return "SUCCESS"; } @DeleteMapping("/deleteTask/{jobId}") public String deleteTask(@PathVariable Integer jobId) { SysJobPO existJob = sysJobMapper.findTaskJobByJobId(jobId); boolean success = sysJobMapper.deleteTaskJobByJobId(jobId); if (!success) return "刪除失敗"; else { if (existJob.getJobStatus().equals(SysJobStatus.NORMAL.index())) { SchedulingRunnable task = new SchedulingRunnable(existJob.getBeanName(), existJob.getMethodName(), existJob.getMethodParams()); // 刪除定時任務 cronTaskRegistrar.removeCronTask(task); } } return "SUCCESS"; } /** * 修改定時任務 * * @param sysJob * @return */ @PostMapping("/updateTask") public String updateTaskJob(SysJobPO sysJob) { SysJobPO existJob = sysJobMapper.findTaskJobByJobId(sysJob.getJobId()); boolean success = sysJobMapper.updateTaskJob(sysJob); if (!success) return "修改成功"; else { // 1. 先刪除原來的定時任務(Map緩存) if (existJob.getJobStatus().equals(SysJobStatus.NORMAL.index())) { SchedulingRunnable task = new SchedulingRunnable(existJob.getBeanName(), existJob.getMethodName(), existJob.getMethodParams()); cronTaskRegistrar.removeCronTask(task); } //2. 新增定時任務(放到Map緩存中) if (sysJob.getJobStatus().equals(SysJobStatus.NORMAL.index())) { SchedulingRunnable task = new SchedulingRunnable(sysJob.getBeanName(), sysJob.getMethodName(), sysJob.getMethodParams()); cronTaskRegistrar.addCronTask(task, sysJob.getCronExpression()); } } return "SUCCESS"; } /** * 啟,停定時任務的狀態切換 */ @GetMapping("/trigger/{jobId}") public String triggerTaskJob(@PathVariable Integer jobId) { SysJobPO existJob = sysJobMapper.findTaskJobByJobId(jobId); // 1.如果原先是啟動狀態 ,那么就停止吧(從Map緩存中刪除, 並將表中狀態置為0) if (existJob.getJobStatus().equals(SysJobStatus.NORMAL.index())) { SchedulingRunnable task = new SchedulingRunnable(existJob.getBeanName(), existJob.getMethodName(), existJob.getMethodParams()); cronTaskRegistrar.removeCronTask(task); existJob.setJobStatus(0); sysJobMapper.updateTaskJob(existJob); } else { SchedulingRunnable task = new SchedulingRunnable(existJob.getBeanName(), existJob.getMethodName(), existJob.getMethodParams()); cronTaskRegistrar.addCronTask(task, existJob.getCronExpression()); existJob.setJobStatus(1); sysJobMapper.updateTaskJob(existJob); } return "SUCCESS"; } }
好,代碼到此為此。
最后來測試一把,首先啟動springboot項目
可以看到,項目一啟動,就將數據庫表中記錄的一個定時任務啟動了。
然后,我們調用addTask接口,新增一個定時job
可以看到,現在啟動了兩個job.
數據庫也有兩條job數據, 下次啟動項目時,兩個job就會同時啟動了。
+----+-------+----------+--------------+----------------+-----------+--------------------------+------------+------------+----------------+ | id | jobId | beanName | methodParams | cronExpression | jobStatus | remark | createTime | updateTime | methodName | +----+-------+----------+--------------+----------------+-----------+--------------------------+------------+------------+----------------+ | 7 | 1 | demoTask | NULL | */5 * * * * ? | 1 | 測試無參定時任務 | 2020-02-15 | 2020-02-15 | taskNoParams | | 15 | 10 | demoTask | 123456 | */10 * * * * ? | 1 | NULL | NULL | NULL | taskWithParams | +----+-------+----------+--------------+----------------+-----------+--------------------------+------------+------------+----------------+