最近項目經歷,需要自定義增加定時發送任務,於是學習了下定時任務的知識,Elastic-Job 是基於成熟的開源產品Quartz和Zookeeper及其客戶端Curator進行二次開發。那么我要實現一個和Elastic-Job 一樣的自定義任務也應該基於Quartz
於是學習了下Quartz結合springboot的自定義任務。
1、任務記錄入庫,啟動服務后讀取該任務,並加入或刪除或更新job計划。
2、定時任務執行指定任務。
指定任務是這類定時任務指定的是同樣的方法或者一類方法
具體代碼思路如下
依賴的java包
<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>3.2.17.RELEASE</version> </dependency>
a 配置的xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd"> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="1"/> <property name="maxPoolSize" value="20"/> <property name="queueCapacity" value="100"/> <property name="keepAliveSeconds" value="2000"/> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/> </property> </bean> <bean id="quartzManager" class="com.ql.vessels.schemajob.QuartzManager"> <property name="scheduler" ref="schedulerManager"/> </bean> <bean id="quartzManagerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <property name="targetObject" ref="quartzManager"/> <property name="targetMethod" value="reScheduleJob"/> <property name="concurrent" value="false"/> </bean> <bean id="cronTriggerBean" class="org.springframework.scheduling.quartz.SimpleTriggerBean"> <property name="jobDetail" ref="quartzManagerJobDetail"/> <!-- 延時0秒 執行任務 --> <property name="startDelay" value="0"/> <!-- 任務執行周期 1min --> <property name="repeatInterval" value="60000"/> </bean> <!-- 總管理類 如果將lazy-init='false'那么容器啟動就會執行調度程序 --> <bean id="schedulerManager" lazy-init="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <list> <ref bean="cronTriggerBean"/> </list> </property> <property name="jobFactory" ref="jobFactory"></property> <property name="taskExecutor" ref="taskExecutor"/> </bean> <!--注意標記有背景色的這兩行很重要是為了在執行方法中能夠注入到需要的service--> <bean id="jobFactory" class="com.ql.vessels.schemajob.JobAdapter"></bean> </beans>
b 這個是關鍵代碼,
從數據庫讀任務
增加到job計划
刪除過時或者以取消的任務
更新任務
package com.ql.vessels.schemajob; import com.fqgj.log.factory.LogFactory; import com.fqgj.log.interfaces.Log; import com.ql.vessels.common.util.DateUtils; import com.ql.vessels.domain.services.MsgSendSchemaService; import com.ql.vessels.repo.entity.MsgSendSchemaEntity; import com.ql.vessels.repo.entity.PushSchemaEntity; import com.ql.vessels.repo.vo.TaskVo; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.CronTriggerBean; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author vampire * @date 2019/5/16 10:08 AM */ public class QuartzManager implements BeanFactoryAware { private final static Log log = LogFactory.getLog(QuartzManager.class); private Scheduler scheduler; private static BeanFactory beanFactory; @Autowired MsgSendSchemaService msgSendSchemaService; /** * 定時要執行的方法類。 */ public void reScheduleJob() { // 1.讀取數據庫中的任務列表。 Date now = new Date(); Map map = new HashMap(4); map.put("sendType", 2); //獲取短信定時任務 List<MsgSendSchemaEntity> list = msgSendSchemaService.selectByParams(map); for (MsgSendSchemaEntity sendSchema : list ) { TaskVo taskVo = new TaskVo(); taskVo.setTaskId(sendSchema.getId()); taskVo.setTaskType(1); taskVo.setCronExpression(DateUtils.getCron(sendSchema.getSendTime())); taskVo.setState(sendSchema.getStatus()); if (sendSchema.getStatus() != 0 || now.after(sendSchema.getSendTime())) { //去掉過時的任務 removeExpireTasks(taskVo); } else { configSchedul(taskVo); } } } /** * 移除過期任務 * * @param bo */ private void removeExpireTasks(TaskVo bo) { try { CronTriggerBean trigger = (CronTriggerBean) scheduler.getTrigger(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP); if (trigger != null) { log.info("==移除任務==" + bo.getTaskId() + "name" + bo.getTaskType()); scheduler.pauseTrigger(trigger.getName(), trigger.getGroup());// 停止觸發器 scheduler.unscheduleJob(trigger.getName(), trigger.getGroup());// 移除觸發器 scheduler.deleteJob(trigger.getJobName(), trigger.getJobGroup());// 刪除任務 } } catch (SchedulerException e) { log.error("移除任務失敗..."); e.printStackTrace(); } } /** * 配置任務列表 * * @param bo */ private void configSchedul(TaskVo bo) { try { CronTriggerBean trigger = (CronTriggerBean) scheduler.getTrigger(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP); if (trigger == null) {//說明schedule中不存在該定時任務 createTriggerTask(bo); } else { updateTriggerTask(bo, trigger); } } catch (SchedulerException e) { log.error("獲取觸發器trigger失敗..."); e.printStackTrace(); } } /** * 更新任務列表 * * @param bo */ private void updateTriggerTask(TaskVo bo, CronTriggerBean trigger) { if (bo.getState() == 0) { try { // 判斷從DB中取得的任務時間和現在的quartz線程中的任務時間是否相等 // 如果相等,則表示用戶並沒有重新設定數據庫中的任務時間,這種情況不需要重新rescheduleJob if (trigger.getCronExpression() != null && !trigger.getCronExpression().equalsIgnoreCase(bo.getCronExpression())) { log.info("=真正更新方法:=" + bo.getTaskId() + "name" + bo.getTaskType()); trigger.setCronExpression(bo.getCronExpression()); scheduler.rescheduleJob(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP, trigger); log.info("更新任務時間失敗..."); } } catch (Exception e) { log.error("更新任務時間失敗..."); e.printStackTrace(); } } else { this.removeExpireTasks(bo); } } /** * 創建任務列表 * * @param bo */ private void createTriggerTask(TaskVo bo) { if (bo.getState() == 0) { log.info("=創建:=" + bo.getTaskId() + "name" + bo.getTaskType()); try { Class clazz = QuartzJobFactory.class;//執行計划任務的類 JobDetail jobDetail = new JobDetail(bo.getTaskId() + "", clazz); Map map = new HashMap(); map.put("task", bo); jobDetail.setJobDataMap(new JobDataMap(map)); jobDetail.setName(bo.getTaskId() + "name" + bo.getTaskType()); scheduler.addJob(jobDetail, true); // 將Job添加到管理類 // 新一個基於Spring的時間類 CronTriggerBean c = new CronTriggerBean(); c.setCronExpression(bo.getCronExpression());// 設置時間表達式 c.setName(bo.getTaskId() + "name" + bo.getTaskType());// 設置名稱 c.setJobDetail(jobDetail);// 注入Job c.setJobName(bo.getTaskId() + "name" + bo.getTaskType());// 設置Job名稱 scheduler.scheduleJob(c);// 注入到管理類 scheduler.rescheduleJob(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP, c);// 刷新管理類 } catch (Exception e) { log.error("創建" + bo.getTaskId() + "name" + bo.getTaskType() + "任務失敗..."); e.printStackTrace(); } } else { this.removeExpireTasks(bo); } } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } public Scheduler getScheduler() { return scheduler; } public QuartzManager setScheduler(Scheduler scheduler) { this.scheduler = scheduler; return this; } public static BeanFactory getBeanFactory() { return beanFactory; } }
c 任務添加后那每一個任務真正執行的方法是什么?需要我們來寫任務具體做的事
所有的定時任務都會觸發到這個 execute 方法中,根據定時任務創建時傳入的參數來調用這個執行中的具體業務方法,
注意 :@Autowired 注入服務時空指針還需要添加下邊一個類,剛剛在xml中也提到過,背景色標注的那兩行就是為解決此問題的
package com.ql.vessels.schemajob; import com.alibaba.fastjson.JSON; import com.fqgj.log.factory.LogFactory; import com.fqgj.log.interfaces.Log; import com.ql.vessels.domain.services.MsgSendSchemaService; import com.ql.vessels.domain.services.PushSchemaService; import com.ql.vessels.repo.vo.TaskVo; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author vampire * @date 2019/5/16 11:56 AM */ @Component public class QuartzJobFactory implements Job { private final static Log log = LogFactory.getLog(QuartzJobFactory.class); @Autowired MsgSendSchemaService msgSendSchemaService; @Autowired PushSchemaService pushSchemaService; @Override public void execute(JobExecutionContext context) throws JobExecutionException { JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); TaskVo taskVo = (TaskVo) jobDataMap.get("task"); log.info("定時任務開始執行。。。。。。。。。。。" + JSON.toJSONString(taskVo)); if (taskVo.getTaskType() == 1) { log.info("message job=======" + taskVo.getTaskId()); msgSendSchemaService.sendSchemaMsg(taskVo.getTaskId()); } else if (taskVo.getTaskType() == 2) { //推送任務執行 log.info("push job=======" + taskVo.getTaskId()); pushSchemaService.sendSchemaPush(taskVo.getTaskId()); } } }
d 解決服務注入為空
package com.ql.vessels.schemajob; import org.quartz.spi.TriggerFiredBundle; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.scheduling.quartz.AdaptableJobFactory; /** * @author vampire * @date 2019/5/16 4:12 PM * 作用是解決QuartzJobFactory 類中不能注入服務類 */ public class JobAdapter extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { //調用父類的方法 Object jobInstance = super.createJobInstance(bundle); //進行注入 capableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
到此就完美解決了自定義任務,當然服務器是單台的沒有考慮到集群,若集群的需要添加zk,跑任務有且只能有一個服務器去跑,負責重復重復后果很嚴重。