前言
之前寫SpringBoot時,有簡單介紹過分布式定時器的一些思路(SpringBoot | 第二十二章:定時任務的使用)。原來的項目本身使用dubbo實現了一個簡單的實現,目前項目遷移至SpringCloud后,原來的就不適用了,但基本原理都是差不多的,都是集中管理需要調用的api及調度等相關信息。故本篇會簡單介紹下一些常見的分布式定時器的實現方案,還會編寫一個基於http調用的
統一調度
項目,實現簡單的調用SpringCloud
項目RESTful
接口。
一些說明
本身Spring提供了
Spring Task
進行定時配置,基於注解和xml配置方式可實現簡單的定時器配置,再一些場景下,若在非單機模式下,部署了多個應用時,若不加以控制,很容易造成數據的錯誤問題。在之前編寫的文章中也有簡單的提及一些分布式解決方案,比如Quartz
等,感謝的同學可點擊:SpringBoot | 第二十二章:定時任務的使用,進行查看,這里就不再重復闡述了。
基於ShedLock實現輕量級分布式定時鎖
ShedLock
是一個在分布式環境中使用的定時任務框架,用於解決在分布式環境中的多個實例的相同定時任務在同一時間點重復執行的問題,解決思路是通過對公用的數據庫中的某個表進行記錄和加鎖,使得同一時間點只有第一個執行定時任務並成功在數據庫表中寫入相應記錄的節點能夠成功執行而其他節點直接跳過該任務。簡單來說,ShedLock
本身只做一件事情:保證一個任務最多同時執行一次。所以如官網所說的,ShedLock
不是一個分布式調度器,只是一個鎖!
注意:ShedLock
支持Mongo
,Redis
,Hazelcast
,ZooKeeper
以及任何帶有JDBC驅動程序的東西。本例子為了方便,直接使用了redis
進行示例,若本身基於jdbc等,可直接參考官網給出的提示:https://github.com/lukas-krecan/ShedLock#jdbctemplate. 創建對應的表結構。
CREATE TABLE shedlock(
name VARCHAR(64),
lock_until TIMESTAMP(3) NULL,
locked_at TIMESTAMP(3) NULL,
locked_by VARCHAR(255),
PRIMARY KEY (name)
)
集成示例
創建工程名:
java-shedlock-demo
0.maven依賴(這里使用當前最新版本及使用redis進行實現),基於SpringBoot 2.0.3.RELEASE
版本。
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-redis-spring</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--spring2.0集成redis所需common-pool2 -->
<!-- 必須加上,jedis依賴此 -->
<!-- spring boot 2.0 的操作手冊有標注 大家可以去看看 地址是:https://docs.spring.io/spring-boot/docs/2.0.3.RELEASE/reference/htmlsingle/ -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
1.配置LockProvider
,同時開啟@EnableSchedulerLock注解。
ShedLockRedisConfig.java
/**
*
* @ClassName 類名:ShedLockRedisConfig
* @Description 功能說明:redis配置
* <p>
* TODO
*</p>
************************************************************************
* @date 創建日期:2019年3月3日
* @author 創建人:oKong
* @version 版本號:V1.0
*<p>
***************************修訂記錄*************************************
*
* 2019年3月3日 oKong 創建該類功能。
*
***********************************************************************
*</p>
*/
/**
*
* @ClassName 類名:ShedLockRedisConfig
* @Description 功能說明:redis配置
* <p>
* TODO
*</p>
************************************************************************
* @date 創建日期:2019年3月3日
* @author 創建人:oKong
* @version 版本號:V1.0
*<p>
***************************修訂記錄*************************************
*
* 2019年3月3日 oKong 創建該類功能。
*
***********************************************************************
*</p>
*/
@Configuration
//defaultLockAtMostFor 指定在執行節點結束時應保留鎖的默認時間使用ISO8601 Duration格式
//作用就是在被加鎖的節點掛了時,無法釋放鎖,造成其他節點無法進行下一任務
//這里默認30s
//關於ISO8601 Duration格式用的不到,具體可上網查詢下相關資料,應該就是一套規范,規定一些時間表達方式
@EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
public class ShedLockRedisConfig {
//正常情況下 應該按實際環境來區分的
//這里為了方便 寫成test便於是測試
// @Value("${spring.profiles.active}")
String env = "test";
@Bean
public LockProvider lockProvider(RedisConnectionFactory connectionFactory) {
//環境變量 -需要區分不同環境避免沖突,如dev環境和test環境,兩者都部署時,只有一個實例進行,此時會造成相關環境未啟動情況
return new RedisLockProvider(connectionFactory, env);
}
}
2.編寫一個簡單定時任務。
/**
*
* @ClassName 類名:SimpleTask
* @Description 功能說明:
* <p>
* TODO
*</p>
************************************************************************
* @date 創建日期:2019年3月3日
* @author 創建人:oKong
* @version 版本號:V1.0
*<p>
***************************修訂記錄*************************************
*
* 2019年3月3日 oKong 創建該類功能。
*
***********************************************************************
*</p>
*/
@Component
@Slf4j
public class SimpleTask {
//區分服務
@Value("${server.port}")
String port;
//為了方便測試 設置cron表達式
@Scheduled(cron = "*/5 * * * * ?")
//lockAtLeastFor:保證在設置的期間類不執行多次任務,單位是毫秒,此處可以根據實際任務運行情況進行設置,
//簡單來說,一個每15分鍾執行的任務,若每次任務執行的時間為幾分鍾,則可以設置lockAtLeastFor大於其最大估計最大執行時間
//避免一次任務未執行完,下一個定時任務又啟動了。
//任務執行完,會自動釋放鎖。
@SchedulerLock(name="simpleTask",lockAtLeastFor = 1*1000)
public void getCurrentDate() {
log.info("端口({}),Scheduled定時任務執行:{}", port, new Date());
}
}
3.編寫啟動類開啟定時任務功能,及配置文件。
/**
*
* @ClassName 類名:ShedLockApplication
* @Description 功能說明:啟動類
* <p>
* TODO
* </p>
************************************************************************
* @date 創建日期:2019年3月3日
* @author 創建人:oKong
* @version 版本號:V1.0
* <p>
*************************** 修訂記錄*************************************
*
* 2019年3月3日 oKong 創建該類功能。
*
***********************************************************************
* </p>
*/
@SpringBootApplication
@EnableScheduling // 開啟定時任務
@Slf4j
public class ShedLockApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(ShedLockApplication.class, args);
log.info("java-shedlock-demo啟動!");
}
}
application.properties
server.port=8001
# REDIS (RedisProperties)
# Redis數據庫索引(默認為0)
spring.redis.database=0
# Redis服務器地址
spring.redis.host=127.0.0.1
# Redis服務器連接端口
spring.redis.port=6379
# Redis服務器連接密碼(默認為空)
#spring.redis.password=
# 連接池最大連接數(使用負值表示沒有限制)
spring.redis.lettuce.pool.max-active=8
# 連接池最大阻塞等待時間(使用負值表示沒有限制)Duration
spring.redis.lettuce.pool.max-wait=-1ms
# 連接池中的最大空閑連接
spring.redis.lettuce.pool.max-idle=8
# 連接池中的最小空閑連接
spring.redis.lettuce.pool.min-idle=0
# 連接超時時間-Duration 不能設置為0 一般上設置個200ms
spring.redis.timeout=200ms
4.利用多環境啟動多個服務(8001,8002),查看是否正常運行。
8001服務
2019-03-03 23:36:30.070 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定時任務執行:Mon Mar 03 23:36:30 CST 2019
2019-03-03 23:36:35.005 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定時任務執行:Mon Mar 03 23:36:35 CST 2019
2019-03-03 23:36:40.002 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定時任務執行:Mon Mar 03 23:36:40 CST 2019
2019-03-03 23:36:45.003 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定時任務執行:Mon Mar 03 23:36:45 CST 2019
2019-03-03 23:36:50.003 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定時任務執行:Mon Mar 03 23:36:50 CST 2019
2019-03-03 23:36:55.006 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定時任務執行:Mon Mar 03 23:36:55 CST 2019
2019-03-03 23:37:05.002 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定時任務執行:Mon Mar 03 23:37:05 CST 2019
2019-03-03 23:37:15.002 INFO 13396 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8001),Scheduled定時任務執行:Mon Mar 03 23:37:15 CST 2019
8002服務
2019-03-03 23:37:00.012 INFO 24492 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8002),Scheduled定時任務執行:Mon Mar 03 23:37:00 CST 2019
2019-03-03 23:37:10.007 INFO 24492 --- [pool-2-thread-1] c.l.learning.shedlock.task.SimpleTask : 端口(8002),Scheduled定時任務執行:Mon Mar 03 23:37:10 CST 2019
通過日志輸出,可以看出每次任務執行時,只有一個實例在運行。具體哪個服務,看誰獲取到鎖了。
SchedulerLock注解說明
@SchedulerLock注解一共支持五個參數,分別是
- name:用來標注一個定時服務的名字,被用於寫入數據庫作為區分不同服務的標識,如果有多個同名定時任務則同一時間點只有一個執行成功
- lockAtMostFor:成功執行任務的節點所能擁有獨占鎖的最長時間,單位是毫秒ms
- lockAtMostForString:成功執行任務的節點所能擁有的獨占鎖的最長時間的字符串表達,例如“PT14M”表示為14分鍾
- lockAtLeastFor:成功執行任務的節點所能擁有獨占所的最短時間,單位是毫秒ms
- lockAtLeastForString:成功執行任務的節點所能擁有的獨占鎖的最短時間的字符串表達,例如“PT14M”表示為14分鍾
兩種集成模式
按官網介紹,其有兩種模式:
TaskScheduler
及Method
代理,具體的可以查看官網介紹,這里就不過多闡述了。簡單來說,都是使用AOP
代理機制,一個是代理了taskScheduler
,一個是代理了被注解了SchedulerLock
具體的方法。可以具體場景進行設置,比如記錄定時任務日志等。這里需要注意,使用Method
代理時,其不依賴於Spring環境,但普通調用此方法時也會進行鎖定的,需要注意,而且目前只支持void
的方法。
TaskScheduler代理時序圖
Method代理時序圖
基於統一調度中心實現任務調用
統一調度中心
:一個管理定時任務配置及發起任務執行的一個服務。簡單來說,就是通過維護需要執行任務的服務列表,如api地址
、dubbo
服務信息等,通過配置的定時配置進行服務調用。從而避免了定時任務重復問題,同時也能利用注冊中心實現負載均衡動態調用對應任務。
技術選型
- 核心框架:
SpringBoot 2.0.3.RELEASE
、Springcloud Finchley.SR1
- 任務調度:
Quartz
- 持久層框架:
MyBatis
+MyBatis-Plus
- 數據庫:
mysql
題外話:原本想延續原先
SpringBoot1.5
版本進行開發,后面考慮此服務相對簡單,所以直接嘗試使用webflux
進行服務開發,順便也學習學習WebFlux
相關操作。
數據庫腳本
CREATE TABLE `sched_config` (
`id` bigint(20) NOT NULL,
`name` varchar(200) DEFAULT NULL COMMENT '任務名稱',
`target_service_type` varchar(2) DEFAULT NULL COMMENT '目標任務類型:01 springcloud 02 http 03 dubbo',
`targer_service` varchar(50) DEFAULT NULL COMMENT '目標服務:可為服務地址,或者dubbo服務名',
`cron_config` varchar(20) DEFAULT NULL COMMENT 'cron表達式',
`status` varchar(1) DEFAULT NULL COMMENT '狀態:1啟用 0 停用',
`remark` varchar(200) DEFAULT NULL COMMENT '備注說明',
`extra_dubbo_group` varchar(50) DEFAULT NULL COMMENT 'dubbo組名',
`extra_dubbo_version` varchar(50) DEFAULT NULL COMMENT 'dubbo服務版本信息',
`gmt_create` datetime DEFAULT NULL COMMENT '創建時間',
`gmt_modified` datetime DEFAULT NULL COMMENT '修改時間',
PRIMARY KEY (`id`)
)
相關類說明
quartz工廠類(QuartzJobFactory)
為了使得自定義的job能主動注入spring的相關bean,需要額外實現此工廠類,方便調用。當然也可以直接動態獲取bean實例了。
public class QuartzJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
// 調用父類的方法
Object jobInstance = super.createJobInstance(bundle);
//主動注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
同時,配置SchedulerFactoryBean
,設置其工廠類。
QuartzConfig.java
@Configuration
public class QuartzConfig {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(){
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setOverwriteExistingJobs(true);
// 延時啟動
factory.setStartupDelay(20);
// 自定義Job Factory,用於Spring注入
factory.setJobFactory(quartzJobFactory());
return factory;
}
@Bean
public QuartzJobFactory quartzJobFactory() {
return new QuartzJobFactory();
}
}
初始化任務(InitJob)
在服務啟動時,啟動開啟配置的任務,同時設置其定時器。
@Component
@Slf4j
public class InitJob {
@Autowired
ISchedConfigService schedConfigService;
@Autowired
Scheduler scheduler;
/**
*
* <p>函數名稱: initJob </p>
* <p>功能說明: 啟動時,進行任務初始化操作,即啟動相應的任務定時器
*
* </p>
*<p>參數說明:</p>
*
* @date 創建時間:2019年3月4日
* @author 作者:oKong
*/
@PostConstruct
public void initJob() {
log.info("初始化任務開始......");
//獲取所有啟用任務
EntityWrapper<SchedConfig> qryWrapper = new EntityWrapper<>();
qryWrapper.eq(SchedConfig.STATUS, "1");
List<SchedConfig> schedConfigList = schedConfigService.selectList(qryWrapper);
if(schedConfigList == null || schedConfigList.isEmpty()) {
log.warn("暫無定時任務");
return;
}
for(SchedConfig config : schedConfigList) {
String name = config.getName();//任務名稱
JobDetail jobDetail = newJob(TaskJob.class).withIdentity(name, "okongJobGroup").build();
//設置運行時參數
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.put("config", config);
//創建trigger觸發器
Trigger trigger = newTrigger()
.withIdentity(name, "okongTriggerGroup")
.withSchedule(cronSchedule(config.getCronConfig())).build();
//啟動定時器
try {
scheduler.scheduleJob(jobDetail, trigger);
log.info("任務[{}]啟動成功", name);
} catch (SchedulerException e) {
log.error("任務[{}]啟動失敗,{}", name,e.getMessage());
}
}
log.info("初始化任務結束......");
}
}
任務類(TaskJob)
實現具體任務的執行和調用。利用
WebClient
實現http服務的調用。暫時未實現dubbo的調用,后期再補充。
- 配置普通
WebClient
和具有負載均衡的webClient
,主要是考慮到存在訪問SpringCloud
服務和普通http
的需求,原先使用負載均衡的restTemplate
時,訪問普通的http請求是無法訪問的,不知道webClient
是否也是一樣,這里直接簡單粗暴的直接設置了兩個webClient
。
@Configuration
public class WebClientConfig {
/**
*
* <p>函數名稱: loadBalancedWebClientBuilder </p>
* <p>功能說明: 具有負載均衡的WebClient
*
* </p>
*<p>參數說明:</p>
* @return
*
* @date 創建時間:2019年3月5日
* @author 作者:oKong
*/
@Bean("balanceWebClient")
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
/**
*
* <p>函數名稱: webClientBuilder </p>
* <p>功能說明:普通WebClient
*
* </p>
*<p>參數說明:</p>
* @return
*
* @date 創建時間:2019年3月5日
* @author 作者:oKong
*/
@Bean("webClient")
public WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
}
- 具體執行任務類,根據不同的類型,進行不同的調用。
//@DisallowConcurrentExecution 說明在一個任務執行時,另一個定時點來臨時不會執行任務,比如一個定時是間隔3分鍾一次,但任務執行了5分鍾,此時會等上個任務完成后再執行下一次定時任務
@DisallowConcurrentExecution
@Slf4j
public class TaskJob implements org.quartz.Job{
/**
* spring5中 異步restTemplate已被標記位作廢了
* 這里嘗試使用webClient
*/
@Autowired
@Qualifier("balanceWebClient")
private WebClient.Builder balanceWebClientBuilder;
@Autowired
@Qualifier("webClient")
private WebClient.Builder webClientBuilder;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
//執行方法
//獲取任務實體對象
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
SchedConfig schedConfig = (SchedConfig) jobDataMap.get("config");
log.info("執行定時任務:{}", schedConfig);
//根據不同類型進行不同的處理邏輯
Mono<String> monoRst = null;
switch (schedConfig.getTargetServiceType()) {
case "01":
//springcloud方式
//利用loadBalancerClient 獲取實際服務地址
monoRst = balanceWebClientBuilder.build().post().uri(schedConfig.getTargerService()).retrieve().bodyToMono(String.class);
break;
case "02":
//普通http方式
monoRst =webClientBuilder.build().post().uri(schedConfig.getTargerService()).retrieve().bodyToMono(String.class);//無參數
break;
case "03":
//dubbo方式
//TODO 暫時未實現
break;
default:
}
if(monoRst != null) {
log.info("調用服務結果為:{}", monoRst.block());
}
}
}
服務效果
為了測試,簡單改造了
java-shedlock-demo
為SpringCloud
項目,具體就不貼代碼了,可直接下載相應工程進行查看。
數據庫配置:
服務啟動,控制台輸出:
大家可自行測試下,這里只是簡單的進行控制台輸出。
參考資料
總結
本文主要簡單介紹了一些分布式定時任務的解決方案。對於
ShedLock
大部分的分布式場景應該是夠用了,特別場景下可能需要注意,實際情況實際解決了。而對於后一種,統一調度
服務而言,本身只是個簡單的示例,后續會考慮加入dubbo
的支持,及一些其他的特性,如調用反饋,失敗次數等等,目前只是簡單的為了滿足業務需要,后需要會進行優化的,目前就且看吧,一些異常之類的都還沒有進行處理⊙﹏⊙‖∣。
完整示例:
統一調度中心:okong-scheduler
schedLock-demo:
https://github.com/xie19900123/java-learning/tree/master/java-shedlock-demo
原文地址:https://blog.lqdev.cn/2019/03/06/%E6%97%A5%E5%B8%B8%E7%A7%AF%E7%B4%AF/java-scheduler/