俺的第一個文章,有掌聲的給掌聲,沒掌聲的給鮮花啦!
起因:因系統的一個定時任務突然執行不正常了,原來是一個時跑一次,現在偶爾跑,偶爾不跑,日志跟蹤二天只跑了一次,這個時間段內沒有對系統做任務變更,日志也沒有任務異常,用VisualVM遠程JMX的方式不能正常監控到進程(待努力重試),因此臨時起意想做一下任務監控界面,且形成一個組件,方便管理員查看所有任務列表,及方便調整,暫停等。
本來參考了網上一些例子,都不適合我的需求,因此自己寫了一份。代碼主要參考了quartz,spring-job相關官方代碼及例子。
本文提供一種思路,也許你有更好實現,能否回復一下?一起討論?
目標:對管理員來說,希望可看到每個任務信息,以及當前狀態,歷史執行情況及日志,可對當前任務可以暫停,啟動,立即執行,查看異常。
當然,以下數據都是持久在數據庫。
當然,我的考慮中,是將所有的任務都變成Corn Expression,也就是說使用CornTrigger,SimpleTrigger沒被使用,沒這個使用的方便。
大致效果如下:
我們需要通過界面來增加要管理的任務:
進一步考慮:
也許現在我們只要配置一個Spring BEAN即可,也許將來,有人寫了直接繼承org.quartz.Job或者QuartzJobBean也要能支持:
1: public class DemoJob extends AbstractJob {
2:
3: @Override
4: public void execute(JobDataMap jobDataMap) throws Exception {
5: logger.debug("DEMO JOB開始運行:"+jobDataMap.getWrappedMap());
6:
7: }
8:
9: }
也許有人寫了個類,只想執行里面的一個方法也可以執行,如
1: public class NonQuartzJob {
2: public void execute() {
3: System.out.println("NonQuartzJob Runned:"+jobEntityService);
4: try {
5: Thread.sleep(8000);
6: } catch (InterruptedException e) {
7: // TODO Auto-generated catch block
8: e.printStackTrace();
9: }
10: throw new RuntimeException("test");
11: }
12: }
現狀:現在的Job都是獨立實現,然后用spring配置式實現,都是采用如下方式配置
1: <!-- 原始任務 -->
2: <bean id="queryStatementState" class="com.apusic.nsec.settlement.job.impl.QueryStatementState">
3: <property name="settlementService" ref="settlementService"></property>
4: </bean>
5: <!-- 包裝成Spring任務 -->
6: <bean name="checkDiskJob"
7: class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
8: <property name="targetObject" ref="queryStatementState" />
9: <property name="targetMethod" value="queryStatement" />
10: <property name="concurrent" value="false" />
11: </bean>
12:
13: <!-- Trigger-->
14: <bean id="repeatTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerBean">
15: <property name="jobDetail" ref="checkDiskJob" />
16: <!-- 5分鍾后啟動-->
17: <property name="startDelay" value="300000" />
18: <!-- 30分鍾檢查一次-->
19: <property name="repeatInterval" value="1800000" />
20: </bean>
21:
22: <!-- 調度器-->
23: <bean id="scheduler"
24: class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
25: <property name="triggers">
26: <list>
27: <ref bean="repeatTrigger" />
28: </list>
29: </property>
30: </bean>
實現:
1.實體定義:
1: @Entity
2: public class JobEntity extends IdEntity {
3: @NotBlank
4: @Column(unique = true)
5: private String jobName;// 任務名
6: @NotBlank
7: private String jobClass;// 類名或者bean名
8: private String jobMethod;// 如果為bean名,對應執行的方法
9: @NotNull
10: private String jobCronExpress;// 表達式
11: private String jobDesc;// 任務描述
12: private String jobGroupName;// Group名
13: @ElementCollection(fetch = FetchType.LAZY)
14: @CollectionTable(name = "t_job_properties")
15: private Map<String, String> properties = new HashMap<String, String>();
16: private int jobExecCount;
17: @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
18: @Temporal(TemporalType.TIMESTAMP)
19: private Date createTime = new Date();
20: @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
21: @Temporal(TemporalType.TIMESTAMP)
22: private Date lastExecTime;
23: @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
24: @Temporal(TemporalType.TIMESTAMP)
25: private Date nextExecTime;
26: // true=繼承Job類,false=spring bean,沒有繼承job類
27: private boolean jobClassIsBeanName = false;
28: @Enumerated(EnumType.STRING)
29: private JobStatus status = JobStatus.WAITTING;
30:
31: private long jobUsedTime;// ms
32: private long jobExceptionCount;//任務異常總數
33:
34: @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
35: @Temporal(TemporalType.TIMESTAMP)
36: private Date lastExeceptionTime;
日志記錄
1: @Entity
2: public class JobLogEntity extends IdEntity {
3: @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
4: @Temporal(TemporalType.TIMESTAMP)
5: private Date execTime = new Date();
6: @Enumerated(EnumType.STRING)
7: private JobLogStatus status = JobLogStatus.SUCCESS;
8:
9: @ManyToOne
10: private JobEntity jobEntity = new JobEntity();
11: @Column(length = 4000)
12: private String exceptionStackTrace;
2.接口定義
1:
2: public interface QuartzFacade {
3:
4: public void startJobs(List<JobEntity> jobEntitys) throws SchedulerException, ClassNotFoundException,
5: NoSuchMethodException;
6:
7: public void startJob(JobEntity jobEntity) throws SchedulerException, ClassNotFoundException, NoSuchMethodException;
8:
9: public void startJobImmediatelyOnce(JobEntity jobEntity) throws SchedulerException, ClassNotFoundException,
10: NoSuchMethodException;
11:
12: public void startScheduler() throws SchedulerException;
13:
14: public void pauseJob(JobEntity jobEntity) throws SchedulerException;
15:
16: public void resumeJob(JobEntity jobEntity) throws SchedulerException;
17:
18: public void pauseAll() throws SchedulerException;
19:
20: public void shutdownAll() throws SchedulerException;
21:
22: public void saveJobEntity(JobEntity jobEntity);
23:
24: public void updateJobEntity(JobEntity jobEntity);
25:
26: public void removeJobEntity(JobEntity jobEntity);
27:
28: public void deleteById(Long id);
29:
30: public JobEntity getById(Long id);
31:
32: public JobEntity findJobEntityByJobName(String jobName);
33:
34: public List<JobEntity> getAllJobEntitys();
35:
36: public Page<JobEntity> getAllJobEntitysAsPage(Page<JobEntity> p);
37:
38: public boolean isExistJobEntity(String jobName);
39: …log相關
40: }
3.實現
為了方便記錄日志,或者增加操作,如果用切面,沒有通用性,還不如定義父類。
3.1抽象JOB定義
在此我們包裝了JOB,之后的JOB都必須繼承此類,之前的已定義的須做相應的轉換:
1: public abstract class AbstractJob extends QuartzJobBean {
2: protected final static Log logger = LogFactory.getLog(AbstractJob.class);
3:
4: @Override
5: protected final void executeInternal(JobExecutionContext context) throws JobExecutionException {
6: JobDetail jobDetail = context.getJobDetail();
7:
8: JobEntity jobEntity = getJobEntityService().findJobEntityByJobName(jobDetail.getKey().getName());
9: JobLogEntity logEntity = preExecute(context, jobEntity);
10:
11: try {
12: long start = System.currentTimeMillis();
13:
14: execute(jobDetail.getJobDataMap());//
15:
16: jobEntity.setJobUsedTime(System.currentTimeMillis() - start);
17: jobEntity.setStatus(JobStatus.WAITTING);
18: getJobEntityService().updateJobEntity(jobEntity);
19: getJobLogEntityService().addJobLog(logEntity);
20: } catch (Exception e) {
21: logger.error("任務執行出錯" + e.getMessage(), e);
22: dealException(jobEntity, logEntity, e);
23: throw new JobExecutionException(e);
24: }
25:
26: }
27:
28: private JobLogEntity preExecute(JobExecutionContext context, JobEntity jobEntity) throws JobExecutionException {
29: if (jobEntity == null) {
30: logger.error("您要執行的任務不存在:" + context.getJobDetail().getName());
31: throw new JobExecutionException("任務不存在:" + context.getJobDetail().getName());
32: }
33:
34: jobEntity.setStatus(JobStatus.RUNNING);
35: jobEntity.setLastExecTime(new Date());
36: jobEntity.setNextExecTime(context.getNextFireTime());
37: jobEntity.setJobExecCount(jobEntity.getJobExecCount() + 1);
38:
39: JobLogEntity logEntity = new JobLogEntity();
40: logEntity.setJobEntity(jobEntity);
41: getJobEntityService().updateJobEntity(jobEntity);
42: return logEntity;
43: }
44:
45: private void dealException(JobEntity jobEntity, JobLogEntity logEntity, Exception e) throws JobExecutionException {
46: ExceptionEventDispather.getInstance().notify(jobEntity);
47: logEntity.setStatus(JobLogStatus.FAIL);
48: logEntity.setExceptionStackTrace(Util.getStackTrack(e));
49: try {
50: getJobLogEntityService().addJobLog(logEntity);
51: jobEntity.setJobExceptionCount(jobEntity.getJobExceptionCount() + 1);
52: jobEntity.setStatus(JobStatus.EXCEPTION);
53: jobEntity.setLastExeceptionTime(new Date());
54: getJobEntityService().updateJobEntity(jobEntity);
55: } catch (Exception e1) {
56: throw new JobExecutionException(e1);
57: }
58: }
59:
60: @Transactional
61: public abstract void execute(JobDataMap jobDataMap) throws Exception;
同時定義了事件派發,以便任務出錯,或者出錯多少次時,發送郵件到管理員的郵件.
Job代碼:
1: public class DemoSpringJob extends AbstractJob {
2: @Autowired
3: JobEntityService jobEntityService;
4: @Override
5: public void execute(JobDataMap jobDataMap) throws Exception {
6: //JobEntity job = jobEntityService.findJobEntityByJobName("Demo任務_1325661555923");
7: logger.debug("DemoSpringJob Runned:"+jobEntityService);
8: }
9: public void setJobEntityService(JobEntityService jobEntityService) {
10: this.jobEntityService = jobEntityService;
11: }
12:
13:
14: }
1: <bean id="demoSpringJob" class="com.xia.quartz.job.DemoSpringJob">
2: <property name="jobEntityService" ref="jobEntityService"></property>
3: </bean>
3.2 實體轉換成任務Quartz.jobdetail
1: private JobDetail convertJob(JobEntity jobEntity) throws ClassNotFoundException, NoSuchMethodException {
2: logger.debug("Job生成中:" + jobEntity.getJobName());
3:
4: JobDetail jobDetail;
5: if (jobEntity.isJobClassIsBeanName()) {//如果是spring bean
6: InvokerJobBean bean=ApplicationContextHolder.getBean("invokerJobBean");//通過invokerJobBean轉換
7: bean.setTargetBeanName(jobEntity.getJobClass());
8: bean.setTargetMethod(jobEntity.getJobMethod());
9: bean.afterPropertiesSet();
10: jobDetail=bean.getObject();
11: } else {//如果是繼承的是Job類
12: String jobClass = jobEntity.getJobClass();
13: @SuppressWarnings("unchecked")
14: Class<? extends Job> clazz = (Class<? extends Job>) Class.forName(jobClass);
15: jobDetail = ApplicationContextHolder.getBean("jobDetail");
16: jobDetail.setJobClass(clazz);
17: }
18: jobDetail.setName(jobEntity.getJobName());
19: jobDetail.setGroup(jobEntity.getJobGroupName());
20: jobDetail.setDescription(jobEntity.getJobDesc());
21: try {
22: jobDetail.getJobDataMap().putAll(jobEntity.getProperties());
23: } catch (Exception e) {
24: logger.error(e.getMessage());
25: }
26: return (JobDetail) jobDetail;
27: }
3.3 開始任務
在看此代碼,你須了解Quartz的機制,它由三個東西組成:Scheduler,Trigger,JobDetail.對應作用是:
執行線程,執行策略,執行內容。也就是,在哪個線程下,使用什么策略,執行什么內容。執行線程下可以有多個Trigger,一個trigger下可以多個任務。
一般地,一個系統有一個Scheduler即可,而Trigger與JobDetail一般是一一對應的。畢竟不同任務執行周期都不同。
1: public void startJob(JobEntity jobEntity) throws SchedulerException, ClassNotFoundException, NoSuchMethodException {
2: try {
3: JobDetail jobDetail = convertJob(jobEntity);
4: CronTrigger triggerCorn = convertTrigger(jobEntity);
5: if (StringUtils.isNotBlank(jobEntity.getJobGroupName())) {
6: jobEntity.setJobGroupName(jobDetail.getGroup());
7: jobEntityService.updateJobEntity(jobEntity);
8: }
9: //jobEntity.setStatus(JobStatus.WAITTING);
10:
11: Date ft = scheduler.scheduleJob(jobDetail, triggerCorn);
12: logger.debug("任務:" + jobDetail.getKey() + " will run at: " + ft.toLocaleString());
13: } catch (ParseException e) {
14: logger.error("任務轉換失敗:" + e.getMessage(), e);
15: throw new SchedulerException(e);
16: }
17: }
執行所有任務:
1: QuartzService quartzService = ApplicationContextHolder.getBean("quartzService");
2: JobEntityService jobService = ApplicationContextHolder.getBean("jobEntityService");
3: List<JobEntity> all = jobService.getAllJobEntitys();
4: quartzService.startJobs(all);
3.3 狀態變化
暫停的任務,如果暫停有執行點,那么,你繼續后,同樣會執行一次此任務。
其它狀態變化在此沒有列出。
1: public void pauseJob(JobEntity jobEntity) throws SchedulerException {
2: logger.debug("暫停JOB:" + jobEntity);
3: scheduler.pauseJob(jobEntity.getJobName(), jobEntity.getJobGroupName());
4: // scheduler.interrupt(jobKey);
5: jobEntity.setStatus(JobStatus.PAUSED);
6: jobEntityService.updateJobEntity(jobEntity);
7: }
3.4數據庫數據如下:
4.組件化
將quartz相關代碼獨立成一個工程,對外提供quartzFacade的服務即可,包括JobEntity的crud,pagingList,Exception view,Log view,ExceptionEvent push.
使用者要做的事就是,希望是開箱即用Out-Of-Box:
- 引入工程
- 增加spring配置
- 注冊一個ExceptionHandlerListener,
- 使用quartzFacade服務
5.遺留問題
Spring bean 注入問題:
因Quartz的機制里,執行的任務類是這么jobDetail.setJobClass(clazz);注入進來的,那么,這個clazz如果要注入service,只能通過手工注入,或者使用使用spring
1: <bean id="jobDetailInvoker"
2: class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
3: <property name="targetObject" ref="nonQuartzJob" />
4: <property name="targetMethod">
5: <value>execute</value>
6: </property>
7: </bean>
而這個實現的原理是,把目標首丟過去,同時把要注入的東西丟到Quartz.job.contextMap中,在執行任務時,重裝裝配上去。
以下是Spring處理的相關代碼
org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean:
1: public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {
2: prepare();
3:
4: // Use specific name if given, else fall back to bean name.
5: String name = (this.name != null ? this.name : this.beanName);
6:
7: // Consider the concurrent flag to choose between stateful and stateless job.
8: Class jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);
9:
10: // Build JobDetail instance.
11: this.jobDetail = new JobDetail(name, this.group, jobClass);
12: this.jobDetail.getJobDataMap().put("methodInvoker", this);
13: this.jobDetail.setVolatility(true);
14: this.jobDetail.setDurability(true);
15:
16: // Register job listener names.
17: if (this.jobListenerNames != null) {
18: for (String jobListenerName : this.jobListenerNames) {
19: this.jobDetail.addJobListener(jobListenerName);
20: }
21: }
22:
23: postProcessJobDetail(this.jobDetail);
24: }
1: public static class MethodInvokingJob extends QuartzJobBean {
2:
3: protected static final Log logger = LogFactory.getLog(MethodInvokingJob.class);
4:
5: private MethodInvoker methodInvoker;
6:
7: /**
8: * Set the MethodInvoker to use.
9: */
10: public void setMethodInvoker(MethodInvoker methodInvoker) {
11: this.methodInvoker = methodInvoker;
12: }
13:
14: /**
15: * Invoke the method via the MethodInvoker.
16: */
17: @Override
18: protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
19: try {
20: context.setResult(this.methodInvoker.invoke());
21: }
22: catch (InvocationTargetException ex) {
23: if (ex.getTargetException() instanceof JobExecutionException) {
24: // -> JobExecutionException, to be logged at info level by Quartz
25: throw (JobExecutionException) ex.getTargetException();
26: }
27: else {
28: // -> "unhandled exception", to be logged at error level by Quartz
29: throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());
30: }
31: }
32: catch (Exception ex) {
33: // -> "unhandled exception", to be logged at error level by Quartz
34: throw new JobMethodInvocationFailedException(this.methodInvoker, ex);
35: }
36: }
37: }
1: public abstract class QuartzJobBean implements Job {
2:
3: /**
4: * This implementation applies the passed-in job data map as bean property
5: * values, and delegates to <code>executeInternal</code> afterwards.
6: * @see #executeInternal
7: */
8: public final void execute(JobExecutionContext context) throws JobExecutionException {
9: try {
10: BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);
11: MutablePropertyValues pvs = new MutablePropertyValues();
12: pvs.addPropertyValues(context.getScheduler().getContext());
13: pvs.addPropertyValues(context.getMergedJobDataMap());
14: bw.setPropertyValues(pvs, true);
15: }
16: catch (SchedulerException ex) {
17: throw new JobExecutionException(ex);
18: }
19: executeInternal(context);
20: }
第一次寫BLOG,工具還沒用好,章節處理可能不太好,不過有了開始,后來會慢慢變好。