一、Quartz 集群架構
Quartz 是 Java 領域最著名的開源任務調度工具。
在上篇文章中,我們詳細的介紹了 Quartz 的單體應用實踐,如果只在單體環境中應用,Quartz 未必是最好的選擇,例如Spring Scheduled一樣也可以實現任務調度,並且與SpringBoot無縫集成,支持注解配置,非常簡單,但是它有個缺點就是在集群環境下,會導致任務被重復調度!
而與之對應的 Quartz 提供了極為廣用的特性,如任務持久化、集群部署和分布式調度任務等等,正因如此,基於 Quartz 任務調度功能在系統開發中應用極為廣泛!
在集群環境下,Quartz 集群中的每個節點是一個獨立的 Quartz 應用,沒有負責集中管理的節點,而是通過數據庫表來感知另一個應用,利用數據庫鎖的方式來實現集群環境下進行並發控制,每個任務當前運行的有效節點有且只有一個!
特別需要注意的是:分布式部署時需要保證各個節點的系統時間一致!
二、數據表初始化
數據庫表結構官網已經提供,我們可以直接訪問Quartz
對應的官方網站,找到對應的版本,然后將其下載!
我選擇的是quartz-2.3.0-distribution.tar.gz
,下載完成之后將其解壓,在文件中搜索sql
,在里面選擇適合當前環境的數據庫腳本文件,然后將其初始化到數據庫中即可!
例如,我使用的數據庫是mysql-5.7
,因此我選擇的是tables_mysql_innodb.sql
腳本,具體內容如下:
DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS; DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS; DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE; DROP TABLE IF EXISTS QRTZ_LOCKS; DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS; DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS; DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS; DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS; DROP TABLE IF EXISTS QRTZ_TRIGGERS; DROP TABLE IF EXISTS QRTZ_JOB_DETAILS; DROP TABLE IF EXISTS QRTZ_CALENDARS; CREATE TABLE QRTZ_JOB_DETAILS( SCHED_NAME VARCHAR(120) NOT NULL, JOB_NAME VARCHAR(190) NOT NULL, JOB_GROUP VARCHAR(190) NOT NULL, DESCRIPTION VARCHAR(250) NULL, JOB_CLASS_NAME VARCHAR(250) NOT NULL, IS_DURABLE VARCHAR(1) NOT NULL, IS_NONCONCURRENT VARCHAR(1) NOT NULL, IS_UPDATE_DATA VARCHAR(1) NOT NULL, REQUESTS_RECOVERY VARCHAR(1) NOT NULL, JOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(190) NOT NULL, TRIGGER_GROUP VARCHAR(190) NOT NULL, JOB_NAME VARCHAR(190) NOT NULL, JOB_GROUP VARCHAR(190) NOT NULL, DESCRIPTION VARCHAR(250) NULL, NEXT_FIRE_TIME BIGINT(13) NULL, PREV_FIRE_TIME BIGINT(13) NULL, PRIORITY INTEGER NULL, TRIGGER_STATE VARCHAR(16) NOT NULL, TRIGGER_TYPE VARCHAR(8) NOT NULL, START_TIME BIGINT(13) NOT NULL, END_TIME BIGINT(13) NULL, CALENDAR_NAME VARCHAR(190) NULL, MISFIRE_INSTR SMALLINT(2) NULL, JOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP) REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_SIMPLE_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(190) NOT NULL, TRIGGER_GROUP VARCHAR(190) NOT NULL, REPEAT_COUNT BIGINT(7) NOT NULL, REPEAT_INTERVAL BIGINT(12) NOT NULL, TIMES_TRIGGERED BIGINT(10) NOT NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_CRON_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(190) NOT NULL, TRIGGER_GROUP VARCHAR(190) NOT NULL, CRON_EXPRESSION VARCHAR(120) NOT NULL, TIME_ZONE_ID VARCHAR(80), PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_SIMPROP_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(190) NOT NULL, TRIGGER_GROUP VARCHAR(190) NOT NULL, STR_PROP_1 VARCHAR(512) NULL, STR_PROP_2 VARCHAR(512) NULL, STR_PROP_3 VARCHAR(512) NULL, INT_PROP_1 INT NULL, INT_PROP_2 INT NULL, LONG_PROP_1 BIGINT NULL, LONG_PROP_2 BIGINT NULL, DEC_PROP_1 NUMERIC(13,4) NULL, DEC_PROP_2 NUMERIC(13,4) NULL, BOOL_PROP_1 VARCHAR(1) NULL, BOOL_PROP_2 VARCHAR(1) NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_BLOB_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(190) NOT NULL, TRIGGER_GROUP VARCHAR(190) NOT NULL, BLOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_CALENDARS ( SCHED_NAME VARCHAR(120) NOT NULL, CALENDAR_NAME VARCHAR(190) NOT NULL, CALENDAR BLOB NOT NULL, PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)) ENGINE=InnoDB; CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_GROUP VARCHAR(190) NOT NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_FIRED_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, ENTRY_ID VARCHAR(95) NOT NULL, TRIGGER_NAME VARCHAR(190) NOT NULL, TRIGGER_GROUP VARCHAR(190) NOT NULL, INSTANCE_NAME VARCHAR(190) NOT NULL, FIRED_TIME BIGINT(13) NOT NULL, SCHED_TIME BIGINT(13) NOT NULL, PRIORITY INTEGER NOT NULL, STATE VARCHAR(16) NOT NULL, JOB_NAME VARCHAR(190) NULL, JOB_GROUP VARCHAR(190) NULL, IS_NONCONCURRENT VARCHAR(1) NULL, REQUESTS_RECOVERY VARCHAR(1) NULL, PRIMARY KEY (SCHED_NAME,ENTRY_ID)) ENGINE=InnoDB; CREATE TABLE QRTZ_SCHEDULER_STATE ( SCHED_NAME VARCHAR(120) NOT NULL, INSTANCE_NAME VARCHAR(190) NOT NULL, LAST_CHECKIN_TIME BIGINT(13) NOT NULL, CHECKIN_INTERVAL BIGINT(13) NOT NULL, PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)) ENGINE=InnoDB; CREATE TABLE QRTZ_LOCKS ( SCHED_NAME VARCHAR(120) NOT NULL, LOCK_NAME VARCHAR(40) NOT NULL, PRIMARY KEY (SCHED_NAME,LOCK_NAME)) ENGINE=InnoDB; CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY); CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME); CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME); CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY); CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); commit;
具體表描述如下:
其中,QRTZ_LOCKS 就是 Quartz 集群實現同步機制的行鎖表!
三、Quartz 集群實踐
3.1、創建springboot項目,導入maven依賴包
<!--引入boot父類--> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.0.RELEASE</version> </parent> <!--引入相關包--> <dependencies> <!--spring boot核心--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--spring boot 測試--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--springmvc web--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--開發環境調試--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <!--jpa 支持--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!--mysql 數據源--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--druid 數據連接池--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.17</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency> <!--Alibaba Json處理包 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.46</version> </dependency> </dependencies>
3.2、創建 application.properties 配置文件
spring.application.name=springboot-quartz-001 server.port=8080 #引入數據源 spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true spring.datasource.username=root spring.datasource.password=123456 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
3.3、創建 quartz.properties 配置文件
#調度配置 #調度器實例名稱 org.quartz.scheduler.instanceName=SsmScheduler #調度器實例編號自動生成 org.quartz.scheduler.instanceId=AUTO #是否在Quartz執行一個job前使用UserTransaction org.quartz.scheduler.wrapJobExecutionInUserTransaction=false #線程池配置 #線程池的實現類 org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool #線程池中的線程數量 org.quartz.threadPool.threadCount=10 #線程優先級 org.quartz.threadPool.threadPriority=5 #配置是否啟動自動加載數據庫內的定時任務,默認true org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true #是否設置為守護線程,設置后任務將不會執行 #org.quartz.threadPool.makeThreadsDaemons=true #持久化方式配置 #JobDataMaps是否都為String類型 org.quartz.jobStore.useProperties=true #數據表的前綴,默認QRTZ_ org.quartz.jobStore.tablePrefix=QRTZ_ #最大能忍受的觸發超時時間 org.quartz.jobStore.misfireThreshold=60000 #是否以集群方式運行 org.quartz.jobStore.isClustered=true #調度實例失效的檢查時間間隔,單位毫秒 org.quartz.jobStore.clusterCheckinInterval=2000 #數據保存方式為數據庫持久化 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX #數據庫代理類,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以滿足大部分數據庫 org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate #數據庫別名 隨便取 org.quartz.jobStore.dataSource=qzDS #數據庫連接池,將其設置為druid org.quartz.dataSource.qzDS.connectionProvider.class=com.example.cluster.quartz.config.DruidConnectionProvider #數據庫引擎 org.quartz.dataSource.qzDS.driver=com.mysql.cj.jdbc.Driver #數據庫連接 org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/test-quartz?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true #數據庫用戶 org.quartz.dataSource.qzDS.user=root #數據庫密碼 org.quartz.dataSource.qzDS.password=123456 #允許最大連接 org.quartz.dataSource.qzDS.maxConnection=5 #驗證查詢sql,可以不設置 org.quartz.dataSource.qzDS.validationQuery=select 0 from dual
3.4、注冊 Quartz 任務工廠
@Component public class QuartzJobFactory extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { //調用父類的方法 Object jobInstance = super.createJobInstance(bundle); //進行注入 capableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
3.5、注冊調度工廠
@Configuration public class QuartzConfig { @Autowired private QuartzJobFactory jobFactory; @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException { //獲取配置屬性 PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); //在quartz.properties中的屬性被讀取並注入后再初始化對象 propertiesFactoryBean.afterPropertiesSet(); //創建SchedulerFactoryBean SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setQuartzProperties(propertiesFactoryBean.getObject()); factory.setJobFactory(jobFactory);//支持在JOB實例中注入其他的業務對象 factory.setApplicationContextSchedulerContextKey("applicationContextKey"); factory.setWaitForJobsToCompleteOnShutdown(true);//這樣當spring關閉時,會等待所有已經啟動的quartz job結束后spring才能完全shutdown。 factory.setOverwriteExistingJobs(false);//是否覆蓋己存在的Job factory.setStartupDelay(10);//QuartzScheduler 延時啟動,應用啟動完后 QuartzScheduler 再啟動 return factory; } /** * 通過SchedulerFactoryBean獲取Scheduler的實例 * @return * @throws IOException * @throws SchedulerException */ @Bean(name = "scheduler") public Scheduler scheduler() throws IOException, SchedulerException { Scheduler scheduler = schedulerFactoryBean().getScheduler(); return scheduler; } }
3.6、重新設置 Quartz 數據連接池
默認 Quartz 的數據連接池是 c3p0,由於性能不太穩定,不推薦使用,因此我們將其改成driud
數據連接池,配置如下:
public class DruidConnectionProvider implements ConnectionProvider { /** * 常量配置,與quartz.properties文件的key保持一致(去掉前綴),同時提供set方法,Quartz框架自動注入值。 * @return * @throws SQLException */ //JDBC驅動 public String driver; //JDBC連接串 public String URL; //數據庫用戶名 public String user; //數據庫用戶密碼 public String password; //數據庫最大連接數 public int maxConnection; //數據庫SQL查詢每次連接返回執行到連接池,以確保它仍然是有效的。 public String validationQuery; private boolean validateOnCheckout; private int idleConnectionValidationSeconds; public String maxCachedStatementsPerConnection; private String discardIdleConnectionsSeconds; public static final int DEFAULT_DB_MAX_CONNECTIONS = 10; public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120; //Druid連接池 private DruidDataSource datasource; @Override public Connection getConnection() throws SQLException { return datasource.getConnection(); } @Override public void shutdown() throws SQLException { datasource.close(); } @Override public void initialize() throws SQLException { if (this.URL == null) { throw new SQLException("DBPool could not be created: DB URL cannot be null"); } if (this.driver == null) { throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!"); } if (this.maxConnection < 0) { throw new SQLException("DBPool maxConnectins could not be created: Max connections must be greater than zero!"); } datasource = new DruidDataSource(); try{ datasource.setDriverClassName(this.driver); } catch (Exception e) { try { throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e); } catch (SchedulerException e1) { } } datasource.setUrl(this.URL); datasource.setUsername(this.user); datasource.setPassword(this.password); datasource.setMaxActive(this.maxConnection); datasource.setMinIdle(1); datasource.setMaxWait(0); datasource.setMaxPoolPreparedStatementPerConnectionSize(DEFAULT_DB_MAX_CONNECTIONS); if (this.validationQuery != null) { datasource.setValidationQuery(this.validationQuery); if(!this.validateOnCheckout) datasource.setTestOnReturn(true); else datasource.setTestOnBorrow(true); datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds); } } public String getDriver() { return driver; } public void setDriver(String driver) { this.driver = driver; } public String getURL() { return URL; } public void setURL(String URL) { this.URL = URL; } public String getUser() { return user; } public void setUser(String user) { this.user = user; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getMaxConnection() { return maxConnection; } public void setMaxConnection(int maxConnection) { this.maxConnection = maxConnection; } public String getValidationQuery() { return validationQuery; } public void setValidationQuery(String validationQuery) { this.validationQuery = validationQuery; } public boolean isValidateOnCheckout() { return validateOnCheckout; } public void setValidateOnCheckout(boolean validateOnCheckout) { this.validateOnCheckout = validateOnCheckout; } public int getIdleConnectionValidationSeconds() { return idleConnectionValidationSeconds; } public void setIdleConnectionValidationSeconds(int idleConnectionValidationSeconds) { this.idleConnectionValidationSeconds = idleConnectionValidationSeconds; } public DruidDataSource getDatasource() { return datasource; } public void setDatasource(DruidDataSource datasource) { this.datasource = datasource; } public String getDiscardIdleConnectionsSeconds() { return discardIdleConnectionsSeconds; } public void setDiscardIdleConnectionsSeconds(String discardIdleConnectionsSeconds) { this.discardIdleConnectionsSeconds = discardIdleConnectionsSeconds; } }
創建完成之后,還需要在quartz.properties
配置文件中設置一下即可!
#數據庫連接池,將其設置為druid org.quartz.dataSource.qzDS.connectionProvider.class=com.example.cluster.quartz.config.DruidConnectionProvider
如果已經配置,請忽略!
3.7、編寫 Job 具體任務類
public class TfCommandJob implements Job { private static final Logger log = LoggerFactory.getLogger(TfCommandJob.class); @Override public void execute(JobExecutionContext context) { try { System.out.println(context.getScheduler().getSchedulerInstanceId() + "--" + new SimpleDateFormat("YYYY-MM-dd HH:mm:ss").format(new Date())); } catch (SchedulerException e) { log.error("任務執行失敗",e); } } }
3.8、編寫 Quartz 服務層接口
public interface QuartzJobService { /** * 添加任務可以傳參數 * @param clazzName * @param jobName * @param groupName * @param cronExp * @param param */ void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param); /** * 暫停任務 * @param jobName * @param groupName */ void pauseJob(String jobName, String groupName); /** * 恢復任務 * @param jobName * @param groupName */ void resumeJob(String jobName, String groupName); /** * 立即運行一次定時任務 * @param jobName * @param groupName */ void runOnce(String jobName, String groupName); /** * 更新任務 * @param jobName * @param groupName * @param cronExp * @param param */ void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param); /** * 刪除任務 * @param jobName * @param groupName */ void deleteJob(String jobName, String groupName); /** * 啟動所有任務 */ void startAllJobs(); /** * 暫停所有任務 */ void pauseAllJobs(); /** * 恢復所有任務 */ void resumeAllJobs(); /** * 關閉所有任務 */ void shutdownAllJobs(); }
對應的實現類QuartzJobServiceImpl
如下:
@Service public class QuartzJobServiceImpl implements QuartzJobService { private static final Logger log = LoggerFactory.getLogger(QuartzJobServiceImpl.class); @Autowired private Scheduler scheduler; @Override public void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param) { try { // 啟動調度器,默認初始化的時候已經啟動 // scheduler.start(); //構建job信息 Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(clazzName); JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, groupName).build(); //表達式調度構建器(即任務執行的時間) CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp); //按新的cronExpression表達式構建一個新的trigger CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName).withSchedule(scheduleBuilder).build(); //獲得JobDataMap,寫入數據 if (param != null) { trigger.getJobDataMap().putAll(param); } scheduler.scheduleJob(jobDetail, trigger); } catch (Exception e) { log.error("創建任務失敗", e); } } @Override public void pauseJob(String jobName, String groupName) { try { scheduler.pauseJob(JobKey.jobKey(jobName, groupName)); } catch (SchedulerException e) { log.error("暫停任務失敗", e); } } @Override public void resumeJob(String jobName, String groupName) { try { scheduler.resumeJob(JobKey.jobKey(jobName, groupName)); } catch (SchedulerException e) { log.error("恢復任務失敗", e); } } @Override public void runOnce(String jobName, String groupName) { try { scheduler.triggerJob(JobKey.jobKey(jobName, groupName)); } catch (SchedulerException e) { log.error("立即運行一次定時任務失敗", e); } } @Override public void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param) { try { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, groupName); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); if (cronExp != null) { // 表達式調度構建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp); // 按新的cronExpression表達式重新構建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); } //修改map if (param != null) { trigger.getJobDataMap().putAll(param); } // 按新的trigger重新設置job執行 scheduler.rescheduleJob(triggerKey, trigger); } catch (Exception e) { log.error("更新任務失敗", e); } } @Override public void deleteJob(String jobName, String groupName) { try { //暫停、移除、刪除 scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, groupName)); scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, groupName)); scheduler.deleteJob(JobKey.jobKey(jobName, groupName)); } catch (Exception e) { log.error("刪除任務失敗", e); } } @Override public void startAllJobs() { try { scheduler.start(); } catch (Exception e) { log.error("開啟所有的任務失敗", e); } } @Override public void pauseAllJobs() { try { scheduler.pauseAll(); } catch (Exception e) { log.error("暫停所有任務失敗", e); } } @Override public void resumeAllJobs() { try { scheduler.resumeAll(); } catch (Exception e) { log.error("恢復所有任務失敗", e); } } @Override public void shutdownAllJobs() { try { if (!scheduler.isShutdown()) { // 需謹慎操作關閉scheduler容器 // scheduler生命周期結束,無法再 start() 啟動scheduler scheduler.shutdown(true); } } catch (Exception e) { log.error("關閉所有的任務失敗", e); } } }
3.9、編寫 contoller 服務
先創建一個請求參數實體類
public class QuartzConfigDTO implements Serializable { private static final long serialVersionUID = 1L; /** * 任務名稱 */ private String jobName; /** * 任務所屬組 */ private String groupName; /** * 任務執行類 */ private String jobClass; /** * 任務調度時間表達式 */ private String cronExpression; /** * 附加參數 */ private Map<String, Object> param; public String getJobName() { return jobName; } public QuartzConfigDTO setJobName(String jobName) { this.jobName = jobName; return this; } public String getGroupName() { return groupName; } public QuartzConfigDTO setGroupName(String groupName) { this.groupName = groupName; return this; } public String getJobClass() { return jobClass; } public QuartzConfigDTO setJobClass(String jobClass) { this.jobClass = jobClass; return this; } public String getCronExpression() { return cronExpression; } public QuartzConfigDTO setCronExpression(String cronExpression) { this.cronExpression = cronExpression; return this; } public Map<String, Object> getParam() { return param; } public QuartzConfigDTO setParam(Map<String, Object> param) { this.param = param; return this; } }
編寫 web 服務接口
@RestController @RequestMapping("/test") public class TestController { private static final Logger log = LoggerFactory.getLogger(TestController.class); @Autowired private QuartzJobService quartzJobService; /** * 添加新任務 * @param configDTO * @return */ @RequestMapping("/addJob") public Object addJob(@RequestBody QuartzConfigDTO configDTO) { quartzJobService.addJob(configDTO.getJobClass(), configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam()); return HttpStatus.OK; } /** * 暫停任務 * @param configDTO * @return */ @RequestMapping("/pauseJob") public Object pauseJob(@RequestBody QuartzConfigDTO configDTO) { quartzJobService.pauseJob(configDTO.getJobName(), configDTO.getGroupName()); return HttpStatus.OK; } /** * 恢復任務 * @param configDTO * @return */ @RequestMapping("/resumeJob") public Object resumeJob(@RequestBody QuartzConfigDTO configDTO) { quartzJobService.resumeJob(configDTO.getJobName(), configDTO.getGroupName()); return HttpStatus.OK; } /** * 立即運行一次定時任務 * @param configDTO * @return */ @RequestMapping("/runOnce") public Object runOnce(@RequestBody QuartzConfigDTO configDTO) { quartzJobService.runOnce(configDTO.getJobName(), configDTO.getGroupName()); return HttpStatus.OK; } /** * 更新任務 * @param configDTO * @return */ @RequestMapping("/updateJob") public Object updateJob(@RequestBody QuartzConfigDTO configDTO) { quartzJobService.updateJob(configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam()); return HttpStatus.OK; } /** * 刪除任務 * @param configDTO * @return */ @RequestMapping("/deleteJob") public Object deleteJob(@RequestBody QuartzConfigDTO configDTO) { quartzJobService.deleteJob(configDTO.getJobName(), configDTO.getGroupName()); return HttpStatus.OK; } /** * 啟動所有任務 * @return */ @RequestMapping("/startAllJobs") public Object startAllJobs() { quartzJobService.startAllJobs(); return HttpStatus.OK; } /** * 暫停所有任務 * @return */ @RequestMapping("/pauseAllJobs") public Object pauseAllJobs() { quartzJobService.pauseAllJobs(); return HttpStatus.OK; } /** * 恢復所有任務 * @return */ @RequestMapping("/resumeAllJobs") public Object resumeAllJobs() { quartzJobService.resumeAllJobs(); return HttpStatus.OK; } /** * 關閉所有任務 * @return */ @RequestMapping("/shutdownAllJobs") public Object shutdownAllJobs() { quartzJobService.shutdownAllJobs(); return HttpStatus.OK; } }
3.10、服務接口測試
運行 SpringBoot 的Application
類,啟動服務!
創建一個每5秒鍾執行一次的定時任務
可以看到服務正常運行!
3.11、注冊監聽器(選用)
當然,如果你想在 SpringBoot 里面集成 Quartz 的監聽器,操作也很簡單!
創建任務調度監聽器
@Component public class SimpleSchedulerListener extends SchedulerListenerSupport { @Override public void jobScheduled(Trigger trigger) { System.out.println("任務被部署時被執行"); } @Override public void jobUnscheduled(TriggerKey triggerKey) { System.out.println("任務被卸載時被執行"); } @Override public void triggerFinalized(Trigger trigger) { System.out.println("任務完成了它的使命,光榮退休時被執行"); } @Override public void triggerPaused(TriggerKey triggerKey) { System.out.println(triggerKey + "(一個觸發器)被暫停時被執行"); } @Override public void triggersPaused(String triggerGroup) { System.out.println(triggerGroup + "所在組的全部觸發器被停止時被執行"); } @Override public void triggerResumed(TriggerKey triggerKey) { System.out.println(triggerKey + "(一個觸發器)被恢復時被執行"); } @Override public void triggersResumed(String triggerGroup) { System.out.println(triggerGroup + "所在組的全部觸發器被回復時被執行"); } @Override public void jobAdded(JobDetail jobDetail) { System.out.println("一個JobDetail被動態添加進來"); } @Override public void jobDeleted(JobKey jobKey) { System.out.println(jobKey + "被刪除時被執行"); } @Override public void jobPaused(JobKey jobKey) { System.out.println(jobKey + "被暫停時被執行"); } @Override public void jobsPaused(String jobGroup) { System.out.println(jobGroup + "(一組任務)被暫停時被執行"); } @Override public void jobResumed(JobKey jobKey) { System.out.println(jobKey + "被恢復時被執行"); } @Override public void jobsResumed(String jobGroup) { System.out.println(jobGroup + "(一組任務)被恢復時被執行"); } @Override public void schedulerError(String msg, SchedulerException cause) { System.out.println("出現異常" + msg + "時被執行"); cause.printStackTrace(); } @Override public void schedulerInStandbyMode() { System.out.println("scheduler被設為standBy等候模式時被執行"); } @Override public void schedulerStarted() { System.out.println("scheduler啟動時被執行"); } @Override public void schedulerStarting() { System.out.println("scheduler正在啟動時被執行"); } @Override public void schedulerShutdown() { System.out.println("scheduler關閉時被執行"); } @Override public void schedulerShuttingdown() { System.out.println("scheduler正在關閉時被執行"); } @Override public void schedulingDataCleared() { System.out.println("scheduler中所有數據包括jobs, triggers和calendars都被清空時被執行"); } }
創建任務觸發監聽器
@Component public class SimpleTriggerListener extends TriggerListenerSupport { /** * Trigger監聽器的名稱 * @return */ @Override public String getName() { return "mySimpleTriggerListener"; } /** * Trigger被激發 它關聯的job即將被運行 * @param trigger * @param context */ @Override public void triggerFired(Trigger trigger, JobExecutionContext context) { System.out.println("myTriggerListener.triggerFired()"); } /** * Trigger被激發 它關聯的job即將被運行, TriggerListener 給了一個選擇去否決 Job 的執行,如果返回TRUE 那么任務job會被終止 * @param trigger * @param context * @return */ @Override public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) { System.out.println("myTriggerListener.vetoJobExecution()"); return false; } /** * 當Trigger錯過被激發時執行,比如當前時間有很多觸發器都需要執行,但是線程池中的有效線程都在工作, * 那么有的觸發器就有可能超時,錯過這一輪的觸發。 * @param trigger */ @Override public void triggerMisfired(Trigger trigger) { System.out.println("myTriggerListener.triggerMisfired()"); } /** * 任務完成時觸發 * @param trigger * @param context * @param triggerInstructionCode */ @Override public void triggerComplete(Trigger trigger, JobExecutionContext context, Trigger.CompletedExecutionInstruction triggerInstructionCode) { System.out.println("myTriggerListener.triggerComplete()"); } }
創建任務執行監聽器
@Component public class SimpleJobListener extends JobListenerSupport { /** * job監聽器名稱 * @return */ @Override public String getName() { return "mySimpleJobListener"; } /** * 任務被調度前 * @param context */ @Override public void jobToBeExecuted(JobExecutionContext context) { System.out.println("simpleJobListener監聽器,准備執行:"+context.getJobDetail().getKey()); } /** * 任務調度被拒了 * @param context */ @Override public void jobExecutionVetoed(JobExecutionContext context) { System.out.println("simpleJobListener監聽器,取消執行:"+context.getJobDetail().getKey()); } /** * 任務被調度后 * @param context * @param jobException */ @Override public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) { System.out.println("simpleJobListener監聽器,執行結束:"+context.getJobDetail().getKey()); } }
最后,將監聽器注冊到Scheduler
@Autowired private SimpleSchedulerListener simpleSchedulerListener; @Autowired private SimpleJobListener simpleJobListener; @Autowired private SimpleTriggerListener simpleTriggerListener; @Bean(name = "scheduler") public Scheduler scheduler() throws IOException, SchedulerException { Scheduler scheduler = schedulerFactoryBean().getScheduler(); //全局添加監聽器 //添加SchedulerListener監聽器 scheduler.getListenerManager().addSchedulerListener(simpleSchedulerListener); // 添加JobListener, 支持帶條件匹配監聽器 scheduler.getListenerManager().addJobListener(simpleJobListener, KeyMatcher.keyEquals(JobKey.jobKey("myJob", "myGroup"))); // 添加triggerListener,設置全局監聽 scheduler.getListenerManager().addTriggerListener(simpleTriggerListener, EverythingMatcher.allTriggers()); return scheduler; }
3.12、采用項目數據源(選用)
在上面的 Quartz 數據源配置中,我們使用了自定義的數據源,目的是和項目中的數據源實現解耦,當然有的同學不想單獨建庫,想和項目中數據源保持一致,配置也很簡單!
在quartz.properties
配置文件中,去掉org.quartz.jobStore.dataSource
配置:
#注釋掉quartz的數據源配置
#org.quartz.jobStore.dataSource=qzDS
在QuartzConfig
配置類中加入dataSource
數據源,並將其注入到quartz
中:
@Autowired private DataSource dataSource; @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException { //... SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setQuartzProperties(propertiesFactoryBean.getObject()); //使用數據源,自定義數據源 factory.setDataSource(dataSource); //... return factory; }
四、任務調度測試
在實際的部署中,項目都是集群進行部署,因此為了和正式環境一致,我們再新建兩個相同的項目來測試一下在集群環境下 quartz 是否可以實現分布式調度,保證任何一個定時任務只有一台機器在運行?
理論上,我們只需要將剛剛新建好的項目,重新復制一份,然后修改一下端口號就可以實現本地測試!
因為curd服務只需要一個,因此我們不需要再編寫QuartzJobService等增、刪、改服務,僅僅保持QuartzConfig、DruidConnectionProvider、QuartzJobFactory、TfCommandJob、quartz.properties類和配置都是相同的就可以了!
依次啟動服務quartz-001
、quartz-002
、quartz-003
,看看效果如何:
第一個啟動的服務quartz-001
會優先加載數據庫中已經配置好的定時任務,其他兩個服務quartz-002
、quartz-003
都沒有主動調度服務
當我們主動關閉quartz-001
,quartz-002
服務主動接收任務調度
當我們主動關閉quartz-002
,同樣quartz-003
服務主動接收任務調度
最終結果,和我們預期效果一致!
五、小結
本文主要圍繞springboot + quartz + mysql
實現持久化分布式調度進行介紹,所有的代碼功能,都測試過。