Spring-job(quartz)任務監控界面(組件)


俺的第一個文章,有掌聲的給掌聲,沒掌聲的給鮮花啦!

起因:因系統的一個定時任務突然執行不正常了,原來是一個時跑一次,現在偶爾跑,偶爾不跑,日志跟蹤二天只跑了一次,這個時間段內沒有對系統做任務變更,日志也沒有任務異常,用VisualVM遠程JMX的方式不能正常監控到進程(待努力重試),因此臨時起意想做一下任務監控界面,且形成一個組件,方便管理員查看所有任務列表,及方便調整,暫停等。

本來參考了網上一些例子,都不適合我的需求,因此自己寫了一份。代碼主要參考了quartz,spring-job相關官方代碼及例子。

本文提供一種思路,也許你有更好實現,能否回復一下?一起討論?

目標:對管理員來說,希望可看到每個任務信息,以及當前狀態,歷史執行情況及日志,可對當前任務可以暫停,啟動,立即執行,查看異常。

當然,以下數據都是持久在數據庫。

當然,我的考慮中,是將所有的任務都變成Corn Expression,也就是說使用CornTrigger,SimpleTrigger沒被使用,沒這個使用的方便。

大致效果如下:

image

我們需要通過界面來增加要管理的任務:

image

進一步考慮:

也許現在我們只要配置一個Spring BEAN即可,也許將來,有人寫了直接繼承org.quartz.Job或者QuartzJobBean也要能支持:

  1: public class DemoJob extends AbstractJob {
  2: 
  3:   @Override
  4:   public void execute(JobDataMap jobDataMap) throws Exception {
  5:     logger.debug("DEMO JOB開始運行:"+jobDataMap.getWrappedMap());
  6: 
  7:   }
  8: 
  9: }

也許有人寫了個類,只想執行里面的一個方法也可以執行,如

 

  1: public class NonQuartzJob {
  2:   public void execute() {
  3:     System.out.println("NonQuartzJob Runned:"+jobEntityService);
  4:     try {
  5:       Thread.sleep(8000);
  6:     } catch (InterruptedException e) {
  7:       // TODO Auto-generated catch block
  8:       e.printStackTrace();
  9:     }
 10:     throw new RuntimeException("test");
 11:   }
 12: }

現狀:現在的Job都是獨立實現,然后用spring配置式實現,都是采用如下方式配置

  1: <!-- 原始任務 -->
  2:   <bean id="queryStatementState" class="com.apusic.nsec.settlement.job.impl.QueryStatementState">
  3:     <property name="settlementService" ref="settlementService"></property>
  4:   </bean>
  5: <!-- 包裝成Spring任務 -->
  6:   <bean name="checkDiskJob"
  7:     class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
  8:     <property name="targetObject" ref="queryStatementState" />
  9:     <property name="targetMethod" value="queryStatement" />
 10:     <property name="concurrent" value="false" />
 11:   </bean>
 12: 
 13:   <!-- Trigger-->
 14:   <bean id="repeatTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerBean">
 15:     <property name="jobDetail" ref="checkDiskJob" />
 16:     <!-- 5分鍾后啟動-->
 17:     <property name="startDelay" value="300000" />
 18:     <!--  30分鍾檢查一次-->
 19:     <property name="repeatInterval" value="1800000" />
 20:   </bean>
 21: 
 22:   <!-- 調度器-->
 23:   <bean id="scheduler"
 24:     class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
 25:     <property name="triggers">
 26:       <list>
 27:         <ref bean="repeatTrigger" />
 28:       </list>
 29:     </property>
 30:   </bean>

實現:

1.實體定義:

  1: @Entity
  2: public class JobEntity extends IdEntity {
  3:   @NotBlank
  4:   @Column(unique = true)
  5:   private String jobName;// 任務名
  6:   @NotBlank
  7:   private String jobClass;// 類名或者bean名
  8:   private String jobMethod;// 如果為bean名,對應執行的方法
  9:   @NotNull
 10:   private String jobCronExpress;// 表達式
 11:   private String jobDesc;// 任務描述
 12:   private String jobGroupName;// Group名
 13:   @ElementCollection(fetch = FetchType.LAZY)
 14:   @CollectionTable(name = "t_job_properties")
 15:   private Map<String, String> properties = new HashMap<String, String>();
 16:   private int jobExecCount;
 17:   @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
 18:   @Temporal(TemporalType.TIMESTAMP)
 19:   private Date createTime = new Date();
 20:   @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
 21:   @Temporal(TemporalType.TIMESTAMP)
 22:   private Date lastExecTime;
 23:   @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
 24:   @Temporal(TemporalType.TIMESTAMP)
 25:   private Date nextExecTime;
 26:   // true=繼承Job類,false=spring bean,沒有繼承job類
 27:   private boolean jobClassIsBeanName = false;
 28:   @Enumerated(EnumType.STRING)
 29:   private JobStatus status = JobStatus.WAITTING;
 30: 
 31:   private long jobUsedTime;// ms
 32:   private long jobExceptionCount;//任務異常總數
 33:   
 34:   @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
 35:   @Temporal(TemporalType.TIMESTAMP)
 36:   private Date lastExeceptionTime;

日志記錄

  1: @Entity
  2: public class JobLogEntity extends IdEntity {
  3:   @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
  4:   @Temporal(TemporalType.TIMESTAMP)
  5:   private Date execTime = new Date();
  6:   @Enumerated(EnumType.STRING)
  7:   private JobLogStatus status = JobLogStatus.SUCCESS;
  8: 
  9:   @ManyToOne
 10:   private JobEntity jobEntity = new JobEntity();
 11:   @Column(length = 4000)
 12:   private String exceptionStackTrace;

 

2.接口定義

  1: 
  2: public interface QuartzFacade {
  3: 
  4:   public void startJobs(List<JobEntity> jobEntitys) throws SchedulerException, ClassNotFoundException,
  5:       NoSuchMethodException;
  6: 
  7:   public void startJob(JobEntity jobEntity) throws SchedulerException, ClassNotFoundException, NoSuchMethodException;
  8: 
  9:   public void startJobImmediatelyOnce(JobEntity jobEntity) throws SchedulerException, ClassNotFoundException,
 10:       NoSuchMethodException;
 11: 
 12:   public void startScheduler() throws SchedulerException;
 13: 
 14:   public void pauseJob(JobEntity jobEntity) throws SchedulerException;
 15: 
 16:   public void resumeJob(JobEntity jobEntity) throws SchedulerException;
 17: 
 18:   public void pauseAll() throws SchedulerException;
 19: 
 20:   public void shutdownAll() throws SchedulerException;
 21: 
 22:   public void saveJobEntity(JobEntity jobEntity);
 23: 
 24:   public void updateJobEntity(JobEntity jobEntity);
 25: 
 26:   public void removeJobEntity(JobEntity jobEntity);
 27: 
 28:   public void deleteById(Long id);
 29: 
 30:   public JobEntity getById(Long id);
 31: 
 32:   public JobEntity findJobEntityByJobName(String jobName);
 33: 
 34:   public List<JobEntity> getAllJobEntitys();
 35: 
 36:   public Page<JobEntity> getAllJobEntitysAsPage(Page<JobEntity> p);
 37: 
 38:   public boolean isExistJobEntity(String jobName);
 39: …log相關
 40: }

3.實現

為了方便記錄日志,或者增加操作,如果用切面,沒有通用性,還不如定義父類。

3.1抽象JOB定義

在此我們包裝了JOB,之后的JOB都必須繼承此類,之前的已定義的須做相應的轉換:

  1: public abstract class AbstractJob extends QuartzJobBean {
  2:   protected final static Log logger = LogFactory.getLog(AbstractJob.class);
  3: 
  4:   @Override
  5:   protected final void executeInternal(JobExecutionContext context) throws JobExecutionException {
  6:     JobDetail jobDetail = context.getJobDetail();
  7: 
  8:     JobEntity jobEntity = getJobEntityService().findJobEntityByJobName(jobDetail.getKey().getName());
  9:     JobLogEntity logEntity = preExecute(context, jobEntity);
 10: 
 11:     try {
 12:       long start = System.currentTimeMillis();
 13: 
 14:       execute(jobDetail.getJobDataMap());//
 15: 
 16:       jobEntity.setJobUsedTime(System.currentTimeMillis() - start);
 17:       jobEntity.setStatus(JobStatus.WAITTING);
 18:       getJobEntityService().updateJobEntity(jobEntity);
 19:       getJobLogEntityService().addJobLog(logEntity);
 20:     } catch (Exception e) {
 21:       logger.error("任務執行出錯" + e.getMessage(), e);
 22:       dealException(jobEntity, logEntity, e);
 23:       throw new JobExecutionException(e);
 24:     }
 25: 
 26:   }
 27: 
 28:   private JobLogEntity preExecute(JobExecutionContext context, JobEntity jobEntity) throws JobExecutionException {
 29:     if (jobEntity == null) {
 30:       logger.error("您要執行的任務不存在:" + context.getJobDetail().getName());
 31:       throw new JobExecutionException("任務不存在:" + context.getJobDetail().getName());
 32:     }
 33: 
 34:     jobEntity.setStatus(JobStatus.RUNNING);
 35:     jobEntity.setLastExecTime(new Date());
 36:     jobEntity.setNextExecTime(context.getNextFireTime());
 37:     jobEntity.setJobExecCount(jobEntity.getJobExecCount() + 1);
 38: 
 39:     JobLogEntity logEntity = new JobLogEntity();
 40:     logEntity.setJobEntity(jobEntity);
 41:     getJobEntityService().updateJobEntity(jobEntity);
 42:     return logEntity;
 43:   }
 44: 
 45:   private void dealException(JobEntity jobEntity, JobLogEntity logEntity, Exception e) throws JobExecutionException {
 46:     ExceptionEventDispather.getInstance().notify(jobEntity);
 47:     logEntity.setStatus(JobLogStatus.FAIL);
 48:     logEntity.setExceptionStackTrace(Util.getStackTrack(e));
 49:     try {
 50:       getJobLogEntityService().addJobLog(logEntity);
 51:       jobEntity.setJobExceptionCount(jobEntity.getJobExceptionCount() + 1);
 52:       jobEntity.setStatus(JobStatus.EXCEPTION);
 53:       jobEntity.setLastExeceptionTime(new Date());
 54:       getJobEntityService().updateJobEntity(jobEntity);
 55:     } catch (Exception e1) {
 56:       throw new JobExecutionException(e1);
 57:     }
 58:   }
 59: 
 60:   @Transactional
 61:   public abstract void execute(JobDataMap jobDataMap) throws Exception;

同時定義了事件派發,以便任務出錯,或者出錯多少次時,發送郵件到管理員的郵件.

Job代碼:

  1: public class DemoSpringJob  extends AbstractJob {
  2:   @Autowired
  3:   JobEntityService jobEntityService;
  4:   @Override
  5:   public void execute(JobDataMap jobDataMap) throws Exception {
  6:     //JobEntity job = jobEntityService.findJobEntityByJobName("Demo任務_1325661555923");
  7:     logger.debug("DemoSpringJob Runned:"+jobEntityService);
  8:   }
  9:   public void setJobEntityService(JobEntityService jobEntityService) {
 10:     this.jobEntityService = jobEntityService;
 11:   }
 12: 
 13: 
 14: }
  1: <bean id="demoSpringJob" class="com.xia.quartz.job.DemoSpringJob">
  2:     <property name="jobEntityService" ref="jobEntityService"></property>
  3:   </bean>

 

3.2 實體轉換成任務Quartz.jobdetail

  1: private JobDetail convertJob(JobEntity jobEntity) throws ClassNotFoundException, NoSuchMethodException {
  2:     logger.debug("Job生成中:" + jobEntity.getJobName());
  3: 
  4:     JobDetail jobDetail;
  5:     if (jobEntity.isJobClassIsBeanName()) {//如果是spring bean
  6:       InvokerJobBean bean=ApplicationContextHolder.getBean("invokerJobBean");//通過invokerJobBean轉換
  7:       bean.setTargetBeanName(jobEntity.getJobClass());
  8:       bean.setTargetMethod(jobEntity.getJobMethod());
  9:       bean.afterPropertiesSet();
 10:       jobDetail=bean.getObject();
 11:     } else {//如果是繼承的是Job類
 12:       String jobClass = jobEntity.getJobClass();
 13:       @SuppressWarnings("unchecked")
 14:       Class<? extends Job> clazz = (Class<? extends Job>) Class.forName(jobClass);
 15:       jobDetail = ApplicationContextHolder.getBean("jobDetail");
 16:       jobDetail.setJobClass(clazz);
 17:     }
 18:     jobDetail.setName(jobEntity.getJobName());
 19:     jobDetail.setGroup(jobEntity.getJobGroupName());
 20:     jobDetail.setDescription(jobEntity.getJobDesc());
 21:     try {
 22:       jobDetail.getJobDataMap().putAll(jobEntity.getProperties());
 23:     } catch (Exception e) {
 24:       logger.error(e.getMessage());
 25:     }
 26:     return (JobDetail) jobDetail;
 27:   }

3.3 開始任務

在看此代碼,你須了解Quartz的機制,它由三個東西組成:Scheduler,Trigger,JobDetail.對應作用是:

執行線程,執行策略,執行內容。也就是,在哪個線程下,使用什么策略,執行什么內容。執行線程下可以有多個Trigger,一個trigger下可以多個任務。

一般地,一個系統有一個Scheduler即可,而Trigger與JobDetail一般是一一對應的。畢竟不同任務執行周期都不同。

  1: public void startJob(JobEntity jobEntity) throws SchedulerException, ClassNotFoundException, NoSuchMethodException {
  2:     try {
  3:       JobDetail jobDetail = convertJob(jobEntity);
  4:       CronTrigger triggerCorn = convertTrigger(jobEntity);
  5:       if (StringUtils.isNotBlank(jobEntity.getJobGroupName())) {
  6:         jobEntity.setJobGroupName(jobDetail.getGroup());
  7:         jobEntityService.updateJobEntity(jobEntity);
  8:       }
  9:       //jobEntity.setStatus(JobStatus.WAITTING);
 10: 
 11:       Date ft = scheduler.scheduleJob(jobDetail, triggerCorn);
 12:       logger.debug("任務:" + jobDetail.getKey() + " will run at: " + ft.toLocaleString());
 13:     } catch (ParseException e) {
 14:       logger.error("任務轉換失敗:" + e.getMessage(), e);
 15:       throw new SchedulerException(e);
 16:     }
 17:   }

執行所有任務:

  1: QuartzService quartzService = ApplicationContextHolder.getBean("quartzService");
  2:     JobEntityService jobService = ApplicationContextHolder.getBean("jobEntityService");
  3:     List<JobEntity> all = jobService.getAllJobEntitys();
  4:     quartzService.startJobs(all);

3.3 狀態變化

暫停的任務,如果暫停有執行點,那么,你繼續后,同樣會執行一次此任務。

其它狀態變化在此沒有列出。

  1: public void pauseJob(JobEntity jobEntity) throws SchedulerException {
  2:     logger.debug("暫停JOB:" + jobEntity);
  3:     scheduler.pauseJob(jobEntity.getJobName(), jobEntity.getJobGroupName());
  4:     // scheduler.interrupt(jobKey);
  5:     jobEntity.setStatus(JobStatus.PAUSED);
  6:     jobEntityService.updateJobEntity(jobEntity);
  7:   }

3.4數據庫數據如下:

image

4.組件化

將quartz相關代碼獨立成一個工程,對外提供quartzFacade的服務即可,包括JobEntity的crud,pagingList,Exception view,Log view,ExceptionEvent push.

使用者要做的事就是,希望是開箱即用Out-Of-Box:

  • 引入工程
  • 增加spring配置
  • 注冊一個ExceptionHandlerListener,
  • 使用quartzFacade服務

5.遺留問題

Spring bean 注入問題:

因Quartz的機制里,執行的任務類是這么jobDetail.setJobClass(clazz);注入進來的,那么,這個clazz如果要注入service,只能通過手工注入,或者使用使用spring

  1: <bean id="jobDetailInvoker"
  2:     class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
  3:     <property name="targetObject" ref="nonQuartzJob" />
  4:     <property name="targetMethod">
  5:       <value>execute</value>
  6:     </property>
  7:   </bean>

而這個實現的原理是,把目標首丟過去,同時把要注入的東西丟到Quartz.job.contextMap中,在執行任務時,重裝裝配上去。

以下是Spring處理的相關代碼

org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean:

  1: public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {
  2:     prepare();
  3: 
  4:     // Use specific name if given, else fall back to bean name.
  5:     String name = (this.name != null ? this.name : this.beanName);
  6: 
  7:     // Consider the concurrent flag to choose between stateful and stateless job.
  8:     Class jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);
  9: 
 10:     // Build JobDetail instance.
 11:     this.jobDetail = new JobDetail(name, this.group, jobClass);
 12:     this.jobDetail.getJobDataMap().put("methodInvoker", this);
 13:     this.jobDetail.setVolatility(true);
 14:     this.jobDetail.setDurability(true);
 15: 
 16:     // Register job listener names.
 17:     if (this.jobListenerNames != null) {
 18:       for (String jobListenerName : this.jobListenerNames) {
 19:         this.jobDetail.addJobListener(jobListenerName);
 20:       }
 21:     }
 22: 
 23:     postProcessJobDetail(this.jobDetail);
 24:   }
  1: public static class MethodInvokingJob extends QuartzJobBean {
  2: 
  3:     protected static final Log logger = LogFactory.getLog(MethodInvokingJob.class);
  4: 
  5:     private MethodInvoker methodInvoker;
  6: 
  7:     /**
  8:      * Set the MethodInvoker to use.
  9:      */
 10:     public void setMethodInvoker(MethodInvoker methodInvoker) {
 11:       this.methodInvoker = methodInvoker;
 12:     }
 13: 
 14:     /**
 15:      * Invoke the method via the MethodInvoker.
 16:      */
 17:     @Override
 18:     protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
 19:       try {
 20:         context.setResult(this.methodInvoker.invoke());
 21:       }
 22:       catch (InvocationTargetException ex) {
 23:         if (ex.getTargetException() instanceof JobExecutionException) {
 24:           // -> JobExecutionException, to be logged at info level by Quartz
 25:           throw (JobExecutionException) ex.getTargetException();
 26:         }
 27:         else {
 28:           // -> "unhandled exception", to be logged at error level by Quartz
 29:           throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());
 30:         }
 31:       }
 32:       catch (Exception ex) {
 33:         // -> "unhandled exception", to be logged at error level by Quartz
 34:         throw new JobMethodInvocationFailedException(this.methodInvoker, ex);
 35:       }
 36:     }
 37:   }
  1: public abstract class QuartzJobBean implements Job {
  2: 
  3:   /**
  4:    * This implementation applies the passed-in job data map as bean property
  5:    * values, and delegates to <code>executeInternal</code> afterwards.
  6:    * @see #executeInternal
  7:    */
  8:   public final void execute(JobExecutionContext context) throws JobExecutionException {
  9:     try {
 10:       BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);
 11:       MutablePropertyValues pvs = new MutablePropertyValues();
 12:       pvs.addPropertyValues(context.getScheduler().getContext());
 13:       pvs.addPropertyValues(context.getMergedJobDataMap());
 14:       bw.setPropertyValues(pvs, true);
 15:     }
 16:     catch (SchedulerException ex) {
 17:       throw new JobExecutionException(ex);
 18:     }
 19:     executeInternal(context);
 20:   }

 

 

第一次寫BLOG,工具還沒用好,章節處理可能不太好,不過有了開始,后來會慢慢變好。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM