為何要使用分布式任務調度
**本人博客網站 **IT小神 www.itxiaoshen.com
演示項目源碼地址** https://gitee.com/yongzhebuju/spring-task **
在企業系統開發過程中難免少不了要使用定時任務來進行定時觸發執行,對於非分布式環境系統的單一應用來說則非常容易解決,我們只需要在系統中內部集成一些開源的調度庫配置定時觸發即可;但是隨着企業的系統越來越多,逐步從單一應用慢慢演變為微服務,在分布式系統中常見的任務重復執行、任務負載均衡、統一任務調度、任務故障轉移、任務監控和報警等一些列的問題都是需要在分布式系統中進行解決的,分布式任務調度則應運而生
Java定時常用方式
基礎理論原理
很多人寫過基於線程的while+sleep來實現定時任務完成一些定時后台任務,而Jdk原生也有提供定時器實現;一般定時器實現底層有下面幾種原理,主要涉及數據結構和算法的應用
- 小頂堆
- 堆看作一個數組,也可以被看作一個完全二叉樹,通俗來講堆其實就是利用完全二叉樹的結構來維護的一維數組
- 每個結點的值都小於或等於其左右孩子結點的值
- t通過建堆和堆化操作,從定時任務上使用每次可以從堆頂取出最近一個需要執行的任務
- 時間輪算法(顧名思義就是以時間)
- 鏈表或數組實現時間輪
- 通過while true sleep然后遍歷數組,每個數組下標放置一個鏈表,而這個鏈表放置定時任務,只要遍歷到就取出執行
- round型時間輪
- 任務記錄一個round值,遍歷到就減1,為0時取出執行
- 需要遍歷所有任務,效率較低
- 分層時間輪
- 使用多個不同維度的時間輪
- 天輪,記錄幾點執行
- 月輪,記錄幾號執行
- 當在月輪遍歷好了取出放到天輪,以這樣方式時間幾月幾號執行
- 使用多個不同維度的時間輪
- 鏈表或數組實現時間輪
Jdk Timer
Jdk的timer核心實現
- 最小堆:queue,存放TimerTask
- 任務線程:TimerThread,任務執行線程,繼承自Thread基類,死循環判斷是否有任務需要執行
- 單線程:執行任務,任務可能會相互阻塞
- schedule
- scheduleAtFixedRate
package com.itxs.timer;
import java.util.TimerTask;
public class MyTimerTask extends TimerTask {
@Override
public void run() {
System.out.println("hello my timer task");
}
}
package com.itxs.timer;
import org.joda.time.DateTime; //joda-time日期類型庫
import java.util.Timer;
public class JdkTimer {
public static void main(String[] args) {
Timer timer = new Timer();
//指定時間點執行
timer.schedule(new MyTimerTask(),new DateTime(2021,8,26,17,19,30).toDate());
//延遲兩秒執行,然后再每個3秒執行
timer.schedule(new MyTimerTask(),2000,3000);
}
}
Jdk定時任務線程池
核心實現也是小頂堆,無界隊列,可以使用多線程執行任務,有Leader-Follower模式,避免沒必要阻塞和喚醒操作,節省系統資源
package com.itxs.scheduler;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SchedulerThreadPool {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
//延遲3秒執行,執行一次
scheduledExecutorService.schedule(new MySchedulerThreadPoolTask(),3,TimeUnit.SECONDS);
//延遲3秒執行,之后每隔十秒執行
scheduledExecutorService.scheduleAtFixedRate(new MySchedulerThreadPoolTask(),3,10,TimeUnit.SECONDS);
}
}
package com.itxs.scheduler;
public class MySchedulerThreadPoolTask implements Runnable{
@Override
public void run() {
System.out.println("MySchedulerThreadPoolTask");
}
}
Spring Task
下面我們借助Spring Boot來演示下Spring Task,配置為多線程模式
package com.itxs.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class ScheduleConfig {
@Bean
public TaskScheduler taskScheduler(){
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(10);
threadPoolTaskScheduler.setThreadNamePrefix("spring-task-thread");
return threadPoolTaskScheduler;
}
}
package com.itxs.scheduler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class MySpringTask {
@Scheduled(cron = "0/30 * * * * ?")
private void process(){
System.out.println("MySpringTask:"+Thread.currentThread());
}
}
package com.itxs;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling //注意需要在Spring Boot啟動類上加開啟Spring任務的注解
public class SpringTaskApplication {
public static void main(String[] args) {
new SpringApplication().run(SpringTaskApplication.class,args);
}
}
Quartz定時任務框架
定義
Quartz是一個功能豐富的開源作業調度庫,可以集成到幾乎任何Java應用程序中——從最小的獨立應用程序到最大的電子商務系統。Quartz可用於創建簡單或復雜的調度,以執行數十個、數百個甚至數萬個作業;這些作業的任務被定義為標准Java組件,這些組件實際上可以執行您編程讓它們執行的任何事情。
運行環境
- Quartz可以嵌入到另一個獨立的應用程序中運行
- Quartz可以在應用程序服務器(或servlet容器)中實例化,並參與XA事務
- Quartz可以作為一個獨立的程序(在它自己的Java虛擬機中)運行,通過RMI使用
- 可以將Quartz實例化為一個獨立程序集群(具有負載平衡和故障轉移功能),用於執行作業
持久化
- Quartz的設計包括一個JobStore接口,該接口可以實現為作業的存儲提供各種機制。
- 通過使用包含的Jdbc JobStore,所有配置為“非易失性”的job和觸發器都通過JDBC存儲在關系數據庫中。
- 使用包含的RAM JobStore,所有的job和觸發器都存儲在RAM中,因此不會在程序執行之間持久化——但這樣做的好處是不需要外部數據庫
集群
- 故障轉移
- 負載平衡
- Quartz的內置集群特性依賴於通過JDBCJobStore(前面描述過)實現的數據庫持久性。
- Terracotta對Quartz的擴展提供了集群功能,而不需要支持數據庫。
使用說明
可以使用 SchedulerFactory 類來達到程序調度的目的,一旦調度器實例化后,它就能夠啟動,等待執行和關閉。需要注意的是一旦調度器調用 了shutdown 方法關閉后,如果不重新實例化,它就不會啟動了。觸發器在調度器未啟動時,或是終止狀態時,都不會被觸發
- Scheduler - 與調度程序交互的主要API。
- Job - 你想要調度器執行的任務組件需要實現的接口
- JobDetail - 用於定義作業的實例。
- Trigger - 觸發器,定義執行給定作業的計划的組件。
- JobBuilder - 用於定義/構建 JobDetail 實例,用於定義作業的實例。
- TriggerBuilder - 用於定義/構建觸發器實例。
- Scheduler 的生命期 - 從 SchedulerFactory 創建它時開始,到 Scheduler 調用shutdown() 方法時結束;Scheduler 被創建后,可以增加、刪除和列舉 Job 和 Trigger,以及執行其它與調度相關的操作(如暫停 Trigger)。但是,Scheduler 只有在調用 start() 方法后,才會真正地觸發 trigger(即執行 job)
Spring Boot 整合Quartz
我們本篇的主角ElasticJob底層是依賴Quartz實現的,所以我們有必要先簡單了解Quartz使用,本篇采用jdbc持久化模式,我們這里選擇基於mysql的持久化,因此需要將tables_mysql_innodb.sql包含11張表導入到mysql數據庫中
靜態配置Quartz任務
pom文件內容如下,大部分都是常見啟動器,我們重點是spring-boot-starter-quartz
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itxs</groupId>
<artifactId>spring-task</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.5.2</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<quartz-springboot.version>2.5.2</quartz-springboot.version>
<lombok.version>1.18.20</lombok.version>
<druid.version>1.2.6</druid.version>
<mysql.version>8.0.25</mysql.version>
<mybatis-plus.version>3.4.0</mybatis-plus.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
//Spring Boot整合Quartz的啟動器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>${quartz-springboot.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
</dependencies>
</project>
application配置文件,profiles激活dev環境,在dev環境中進行數據庫配置,include: quartz包含一個單獨配置文件,在里面可以配置多個Quantz的任務參數
application.yml
spring:
application:
name: itxs-spring-task
profiles:
active: dev
include: quartz
quartz:
job-store-type: jdbc # 使用數據庫存儲
scheduler-name: cluster_scheduler # 相同 Scheduler 名字的節點,形成一個 Quartz 集群
wait-for-jobs-to-complete-on-shutdown: true # 應用關閉時,是否等待定時任務執行完成。默認為 false ,建議設置為 true
jdbc:
initialize-schema: never # 是否自動使用 SQL 初始化 Quartz 表結構。這里設置成 never ,我們手動創建表結構。
mybatis-plus:
# mapper-locations: classpath:mapper/*.xml
global-config:
db-config:
id-type: auto
logic-delete-field: deleted
logic-delete-value: 1
logic-not-delete-value: 0
configuration:
map-underscore-to-camel-case: on
call-setters-on-nulls: on
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
application-dev.yml
server:
port: 8080
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.3.117:3306/testdb?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8
username: itxs
password: itxs@123
type: com.alibaba.druid.pool.DruidDataSource
druid:
max-active: 1000
min-idle: 5
initial-size: 10
application-quartz.yml
quartz:
# jobGroup名稱一致的情況下,不可出現相同jobName
jobs[0]:
jobName: myJob1
# 以服務名為組名
jobGroup: myGroup1
# 業務邏輯處理類的包名
jobClassName: com.itxs.scheduler.MySpringQuartzOneJob
# 觸發器名稱
triggerName: myTrigger1
# cron表達式 每30秒執行一次
cronExpression: 0/30 * * * * ?
# 任務狀態 1 正常 0 暫停
triggerState: 1
# 排序
sort: 1
jobs[1]:
jobName: myJob2
# 以服務名為組名
jobGroup: myGroup2
# 業務邏輯處理類的包名
jobClassName: com.itxs.scheduler.MySpringQuartzSecondJob
# 觸發器名稱
triggerName: myTrigger2
# cron表達式 每分鍾執行一次
cronExpression: 0 * * * * ?
# 任務狀態 1 正常 0 暫停
triggerState: 1
# 排序
sort: 2
配置類,主要配置schedulerFactoryBean和線程池,初始化quartz的scheduler
package com.itxs.config;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.util.concurrent.Executor;
@Configuration
public class ScheduleConfig {
@Autowired
private DataSource dataSource;
@Bean
public TaskScheduler taskScheduler(){
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(10);
threadPoolTaskScheduler.setThreadNamePrefix("spring-task-thread");
return threadPoolTaskScheduler;
}
@Value("${spring.quartz.job-store-type}")
private String storeType;
@Bean
public Scheduler scheduler(){
return schedulerFactoryBean().getScheduler();
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(){
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
schedulerFactoryBean.setSchedulerName("cluster_scheduler");
if (storeType.equals("jdbc")){
schedulerFactoryBean.setDataSource(dataSource);
}
schedulerFactoryBean.setApplicationContextSchedulerContextKey("application");
schedulerFactoryBean.setTaskExecutor(schedulerThreadPool());
schedulerFactoryBean.setStartupDelay(0);
return schedulerFactoryBean;
}
@Bean
public Executor schedulerThreadPool(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
threadPoolTaskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
threadPoolTaskExecutor.setQueueCapacity(Runtime.getRuntime().availableProcessors());
return threadPoolTaskExecutor;
}
}
Quartz實現類,集成QuartzJobBean實現executeInternal的接口
package com.itxs.scheduler;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 運行在spring
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Slf4j
@Component
public class MySpringQuartzJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext){
try {
log.info("MySpringQuartzJob------調度實例:{},任務名稱:{},執行時間:{}" + jobExecutionContext.getScheduler().getSchedulerInstanceId(),
jobExecutionContext.getJobDetail().getKey().getName(),new Date());
} catch (SchedulerException e) {
e.printStackTrace();
}
}
}
接下來是創建監聽器並將job啟動執行,在Spring 容器刷新后執行監聽器,SpringQuartzApplicationListener是在將Quantz Job配置直接寫在代碼里,而SpringQuartzYamlApplicationListener則是讀取application-quartz.yml里面的每個job的配置然后循環創建
package com.itxs.listener;
import com.itxs.scheduler.MySpringQuartzJob;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SpringQuartzApplicationListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private Scheduler scheduler;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
log.info("SpringQuartzApplicationListener quartz調度任務創建開始-------");
TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", "group1");
try {
Trigger trigger = scheduler.getTrigger(triggerKey);
System.out.println(scheduler.getSchedulerName());
if (trigger == null){
trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule("0 * * * * ?"))
.startNow()
.build();
JobDetail jobDetail = JobBuilder.newJob(MySpringQuartzJob.class)
.withIdentity("job1","group1")
.build();
scheduler.scheduleJob(jobDetail,trigger);
scheduler.start();
}
} catch (SchedulerException e) {
e.printStackTrace();
}
log.info("SpringQuartzApplicationListener quartz調度任務創建結束-------");
}
}
package com.itxs.listener;
import com.itxs.pojo.JobEntity;
import com.itxs.utils.QuartzEnum;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@ConfigurationProperties(prefix = "quartz")
public class SpringQuartzYamlApplicationListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private Scheduler scheduler;
List<JobEntity> jobs = new ArrayList<>();
public List<JobEntity> getJobs() {
return jobs;
}
public void setJobs(List<JobEntity> jobs) {
this.jobs = jobs;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
log.info("SpringQuartzYamlApplicationListener quartz調度任務創建開始-------");
for(JobEntity entity : jobs) {
log.info("調度任務-------"+jobs.toString());
// 當定時任務狀態為0時,不啟動
if (entity.getTriggerState() == QuartzEnum.PAUSED.getCode()) {
continue;
}
try {
Class<? extends Job> jobClass = (Class<? extends Job>) (Class.forName(entity.getJobClassName()).newInstance().getClass());
if (jobClass != null){
TriggerKey triggerKey = TriggerKey.triggerKey(entity.getTriggerName(), entity.getJobGroup());
Trigger trigger = scheduler.getTrigger(triggerKey);
if (trigger == null){
trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(entity.getCronExpression()))
.startNow()
.build();
Map<String,Object> map = new HashMap<>();
map.put("objectName","object");
JobDataMap jobDataMap = new JobDataMap(map);
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.usingJobData(jobDataMap)
.withIdentity(entity.getJobName(), entity.getJobGroup())
.build();
scheduler.scheduleJob(jobDetail,trigger);
}
}
} catch (SchedulerException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
e.printStackTrace();
}
}
try {
scheduler.start();
} catch (SchedulerException e) {
e.printStackTrace();
}
log.info("SpringQuartzYamlApplicationListener quartz調度任務創建結束-------");
}
}
調用的Json參數為
{
"cron": "0/3 * * * * ?",
"beginTime": "2021-08-27",
"clazzName": "com.itxs.scheduler.RemindJob",
"jobGroup": "mygroup",
"jobName": "myjob",
"parmas": "elastic job dynamic hello world"
}
啟動Spring Boot啟動類,日志顯示quartz使用db持久化方式,所有的job實現類也按照配置參數定時執行,並寫持久化到mysql數據庫里quartz表里
動態配置Quantz
如果我們需要針對定時任務進行創建、停止等操作,那么我們需要動態操作Quantz,本篇也基於Spring Boot + Quartz封裝任務調度實現了動態管理
詳細代碼可以到gitee項目源碼里獲取
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-72e5t825-1630077809479)(file://F:\creation\markdown\article\ElasticJob%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%BA%94%E7%94%A8\image-20210827150653182.png?lastModify=1630077763)]
ElasticJob-分布式任務調度
其他框架名詞了解
開源的分布式任務或作業調度框架除了我們本篇當當的ElasticJob,還有大眾點評開發人員許雪里的XXL-JOB、唯品會Saturn、淘寶的TBSchedule和SchedulerX,此外另外一個在大數據批處理作業調度器Azkaban也非常出名,
XXL-JOB:是一個分布式任務調度平台,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。
TBSchedule:一個簡潔的分布式任務調度引擎,基於ZooKeeper純Java實現,由Alibaba開源。
SchedulerX:Alibaba Cloud SchedulerX精確、高可靠、高可用的調度任務服務,響應時間在秒內,SchedulerX(分布式任務調度) 是隸屬於阿里雲EDAS產品的組件, Spring Cloud AliCloud SchedulerX 提供了在Spring Cloud的配置規范下,分布式任務調度的功能支持。SchedulerX可提供秒級、精准、高可靠、高可用的定時任務調度服務,並支持多種類型的任務調度,如簡單單機任務、簡單多機任務、腳本任務以及網格任務。
Saturn:來自唯品會開發的一個分布式、容錯和高可用的作業調度服務。
此外,這里也提一下Azkaban,Linkedin開源的一個批量工作流調度器,實現可以一個工作流內,多個作業可以按照特定的順序執行,作業之間的順序關系依靠key-value的形式來建立依賴關系,並提供可視化界面編制作業的工作流程。
由於我們基於Java技術大都以SpringBoot開發為主,ElasticJob與Spring整合也相當不錯,且ElasticJob子項目ElasticJob-Lite定位為輕量級無中心化解決方案,所以本篇我們主要推薦使用ElasticJob分布式任務調度框架
概述
shardingsphere官方網站** ,ShardingSphere 已於2020年4月16日成為 Apache 軟件基金會的頂級項目**
ElasticJob官方網站** ElasticJob作為Apache ShardingSphere的子項目**
Apache ShardingSphere 是一套開源的分布式數據庫解決方案組成的生態圈,它由 JDBC、Proxy 和 Sidecar(規划中)這 3 款既能夠獨立部署,又支持混合部署配合使用的產品組成。 它們均提供標准化的數據水平擴展、分布式事務和分布式治理等功能,可適用於如 Java 同構、異構語言、雲原生等各種多樣化的應用場景。
Apache ShardingSphere 旨在充分合理地在分布式的場景下利用關系型數據庫的計算和存儲能力,而並非實現一個全新的關系型數據庫。 關系型數據庫當今依然占有巨大市場份額,是企業核心系統的基石,未來也難於撼動,我們更加注重在原有基礎上提供增量,而非顛覆。
Apache ShardingSphere 5.x 版本開始致力於可插拔架構,項目的功能組件能夠靈活的以可插拔的方式進行擴展。 目前,數據分片、讀寫分離、數據加密、影子庫壓測等功能,以及 MySQL、PostgreSQL、SQLServer、Oracle 等 SQL 與協議的支持,均通過插件的方式織入項目。 開發者能夠像使用積木一樣定制屬於自己的獨特系統。Apache ShardingSphere 目前已提供數十個 SPI 作為系統的擴展點,仍在不斷增加中。
- ElasticJob 是一個面向互聯網生態和海量任務的分布式調度解決方案,由 2 個相互獨立的子項目 ElasticJob-Lite 和 ElasticJob-Cloud 組成。
- ElasticJob-Lite 定位為輕量級無中心化解決方案,使用jar的形式提供分布式任務的協調服務;
- ElasticJob-Cloud 使用 Mesos 的解決方案,額外提供資源治理、應用分發以及進程隔離等服務。
- ElasticJob 通過彈性調度、資源管控、以及作業治理的功能,打造一個適用於互聯網場景的分布式調度解決方案,並通過開放的架構設計,提供多元化的作業生態。
- ElasticJob 的各個產品使用統一的作業 API,開發者僅需要一次開發,即可隨意部署
- 使用 ElasticJob 能夠讓開發工程師不再擔心任務的線性吞吐量提升等非功能需求,使他們能夠更加專注於面向業務編碼設計; 同時,它也能夠解放運維工程師,使他們不必再擔心任務的可用性和相關管理需求,只通過輕松的增加服務節點即可達到自動化運維的目的。
- ElasticJob實現分布式特性主要依賴於Zookeeper,比如leader選舉、彈性擴縮容、故障轉移、負載均衡等機制
可以通過快速入門快速體驗ElasticJob
架構
ElasticJob-Lite
定位為輕量級無中心化解決方案,使用 jar 的形式提供分布式任務的協調服務。
ElasticJob-Cloud
采用自研 Mesos Framework 的解決方案,額外提供資源治理、應用分發以及進程隔離等功能。
特性 | ElasticJob-Lite | ElasticJob-Cloud |
---|---|---|
無中心化 | 是 | 否 |
資源分配 | 不支持 | 支持 |
作業模式 | 常駐 | 常駐 + 瞬時 |
部署依賴 | ZooKeeper | ZooKeeper + Mesos |
功能列表
- 彈性調度
- 支持任務在分布式場景下的分片和高可用
- 能夠水平擴展任務的吞吐量和執行效率
- 任務處理能力隨資源配備彈性伸縮
- 資源分配
- 在適合的時間將適合的資源分配給任務並使其生效
- 相同任務聚合至相同的執行器統一處理
- 動態調配追加資源至新分配的任務
- 作業治理
- 失效轉移
- 錯過作業重新執行
- 自診斷修復
- 作業依賴(TODO)
- 基於有向無環圖(DAG)的作業間依賴
- 基於有向無環圖(DAG)的作業分片間依賴
- 作業開放生態
- 可擴展的作業類型統一接口
- 豐富的作業類型庫,如數據流、腳本、HTTP、文件、大數據等
- 易於對接業務作業,能夠與 Spring 依賴注入無縫整合
- 可視化管控端
- 作業管控端
- 作業執行歷史數據追蹤
- 注冊中心管理
基本使用
作業類型
- 簡單作業,我們本篇文章主要使用這個,其他后續再補充
- 數據流作業
- 腳本作業
- HTTP作業(3.0.0-beta 提供)
靜態任務配置
前面我們學習Spring Boot 整合Quartz的使用,ElasticJob使用可所謂簡單至極,還是原來我們說的Spring Boot的三板斧,加依賴和配置,另外增加實現類Ok搞掂。由於我們還用使用之前工程項目,因此依賴和配置內容較多,核心是添加elasticjob-lite-spring-boot-starter和elasticjob項配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itxs</groupId>
<artifactId>spring-task</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.5.2</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<quartz-springboot.version>2.5.2</quartz-springboot.version>
<lombok.version>1.18.20</lombok.version>
<druid.version>1.2.6</druid.version>
<mysql.version>8.0.25</mysql.version>
<mybatis-plus.version>3.4.0</mybatis-plus.version>
<elasticjob-lite-core.version>3.0.0-RC1</elasticjob-lite-core.version>
<curator.version>5.2.0</curator.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>${quartz-springboot.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
<version>${elasticjob-lite-core.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-spring-boot-starter</artifactId>
<version>${elasticjob-lite-core.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
</dependencies>
</project>
spring:
application:
name: itxs-spring-task
profiles:
active: dev
include: quartz
quartz:
job-store-type: jdbc # 使用數據庫存儲
scheduler-name: cluster_scheduler # 相同 Scheduler 名字的節點,形成一個 Quartz 集群
wait-for-jobs-to-complete-on-shutdown: true # 應用關閉時,是否等待定時任務執行完成。默認為 false ,建議設置為 true
jdbc:
initialize-schema: never # 是否自動使用 SQL 初始化 Quartz 表結構。這里設置成 never ,我們手動創建表結構。
elasticjob:
reg-center:
server-lists: 192.168.50.201:2181,192.168.50.204:2182,192.168.50.153:2183
namespace: itxs-elastic-job
jobs:
elasticDemoOneJob:
elasticJobClass: com.itxs.scheduler.ElasticDemoOneJob
cron: 0/30 * * * * ?
shardingTotalCount: 1
shardingItemParameters: 0=Beijing
mybatis-plus:
# mapper-locations: classpath:mapper/*.xml
global-config:
db-config:
id-type: auto
logic-delete-field: deleted
logic-delete-value: 1
logic-not-delete-value: 0
configuration:
map-underscore-to-camel-case: on
call-setters-on-nulls: on
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
ElasticJob簡單實現類示例
package com.itxs.scheduler;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
* 運行在spring
*/
@Slf4j
@Component
@Scope("prototype")
public class ElasticDemoOneJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("ElasticDemoOneJob Start ------jobname={},taskid={},parameter={},shardingitem={},shardingparameter={}",shardingContext.getJobName(),
shardingContext.getTaskId(),shardingContext.getJobParameter(),
shardingContext.getShardingItem(),shardingContext.getShardingParameter());
}
}
搞掂運行,出現我們ElasticDemoOneJob調度日志,我們再啟動一個8081端口,也即是同時有多個進程實現調度任務,發現目前的Job一直在8080這個進程上運行,當我們關閉8080端口這個SpringBoot程序后,過一會8081端口這個微服務就會ElasticDemoOneJob調度日志執行任務接替原來8080定時任務。注意由於我們job元數據信息是存在zookeeper里面,如果我們沒有使用覆蓋等級制,重新修改job配置參數后沒有生效,建議先刪除掉zookeeper的節點數據然后啟動再服務
當我們配置分片后,比如配置為5個分配,啟動多個進程會將分片負載均衡分配到各個進程任務去支持,比如當前8082則執行0和3兩個分片,8080是2分片,8081是1和4兩個分片,也即是根據當前可用節點數據針對分片數量進行動態調整,這種場景比較適用於處理任務執行時間較長需要處理的數據較大
動態任務
如果我們需要動態創建啟動和停止ElasticJob,我們可以自己實現封裝,具體如下
動態創建啟用和停止的接口聲明類
package com.itxs.service;
import com.itxs.pojo.SysTask;
/**
* @Description 任務管理
* @Version 1.0
*/
public interface IElasticJobService {
/**
* @Description //添加一個任務
* @Param [sysTask]
* @return boolean
*/
boolean addJob(SysTask sysTask) throws Exception;
/**
* @Description //刪除某個任務
* @Param [sysTask]
* @return boolean
*/
boolean deleteJob(SysTask sysTask) throws Exception;
}
接口實現類:
package com.itxs.service.impl;
import com.itxs.pojo.SysTask;
import com.itxs.service.IElasticJobService;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class ElasticJobServiceImpl implements IElasticJobService {
@Value("${elasticjob.reg-center.server-lists}")
private String serverList;
@Value("${elasticjob.reg-center.namespace}")
private String namespace;
@Override
public boolean addJob(SysTask sysTask){
try {
Class jobClass = Class.forName(sysTask.getClazzName());
JobConfiguration jobConfig = JobConfiguration.newBuilder(sysTask.getJobName(),1)
.cron(sysTask.getCron()).overwrite(true).jobParameter(sysTask.getParmas()).disabled(false).build();
ScheduleJobBootstrap scheduleJobBootstrap = new ScheduleJobBootstrap(createRegistryCenter(), (SimpleJob)jobClass.newInstance(), jobConfig);
scheduleJobBootstrap.schedule();
return true;
}catch (Exception e){
return false;
}
}
@Override
public boolean deleteJob(SysTask sysTask){
try {
Class jobClass = Class.forName(sysTask.getClazzName());
JobConfiguration jobConfig = JobConfiguration.newBuilder(sysTask.getJobName(),1)
.cron(sysTask.getCron()).overwrite(true).jobParameter(sysTask.getParmas()).disabled(true).build();
ScheduleJobBootstrap scheduleJobBootstrap = new ScheduleJobBootstrap(createRegistryCenter(), (SimpleJob)jobClass.newInstance(), jobConfig);
scheduleJobBootstrap.schedule();
return true;
}catch (Exception e){
return false;
}
}
public CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
regCenter.init();
return regCenter;
}
}
在原來controller上增加addElasticTask和deleteElasticTask兩個方法
package com.itxs.controller;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.itxs.pojo.SysTask;
import com.itxs.service.IElasticJobService;
import com.itxs.service.IJobManageService;
import com.itxs.utils.JsonResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @Description 任務管理控制,此處只實現了增加和刪除,
* @Version 1.0
*/
@Slf4j
@RestController
@RequestMapping("/quartz")
public class SysTaskController {
@Autowired
private IJobManageService jobManageService;
@Autowired
private IElasticJobService iElasticJobService;
private boolean validateParmas(SysTask task) {
return task != null && StringUtils.isNotBlank(task.getCron())
&& StringUtils.isNotBlank(task.getJobGroup())
&& StringUtils.isNotBlank(task.getJobName());
}
@GetMapping("/add")
public JsonResult addTask() {
return JsonResult.success();
}
/**
* @Description //添加一個任務
* @Param [task]
* @return com.quartz.result.JsonResult
*/
@PostMapping("/add-task")
public JsonResult addTask(@RequestBody SysTask task) {
if (validateParmas(task)) {
try {
this.jobManageService.addJob(task);
return JsonResult.success();
} catch (Exception e) {
log.error("添加任務異常,異常任務名稱:" + task.getJobGroup() + "; 任務名稱" + task.getJobName());
}
}
return JsonResult.error();
}
@PostMapping("/add-elastic-task")
public JsonResult addElasticTask(@RequestBody SysTask task) {
if (validateParmas(task)) {
try {
this.iElasticJobService.addJob(task);
return JsonResult.success();
} catch (Exception e) {
log.error("添加任務異常,異常任務名稱:" + task.getJobGroup() + "; 任務名稱" + task.getJobName());
}
}
return JsonResult.error();
}
/**
* @Description //刪除一個任務
* @Param [task]
* @return com.quartz.result.JsonResult
*/
@RequestMapping("/delete-elastic-task")
public JsonResult deleteElasticTask(@RequestBody SysTask task) {
if (validateParmas(task)) {
try {
this.iElasticJobService.deleteJob(task);
return JsonResult.success();
} catch (Exception e) {
log.error("刪除任務異常,異常任務名稱:" + task.getJobGroup() + "; 任務名稱" + task.getJobName());
}
}
return JsonResult.error();
}
/**
* @Description //刪除一個任務
* @Param [task]
* @return com.quartz.result.JsonResult
*/
@RequestMapping("/delete-task")
public JsonResult deleteTask(@RequestBody SysTask task) {
if (validateParmas(task)) {
try {
this.jobManageService.deleteJob(task);
return JsonResult.success();
} catch (Exception e) {
log.error("刪除任務異常,異常任務名稱:" + task.getJobGroup() + "; 任務名稱" + task.getJobName());
}
}
return JsonResult.error();
}
}
重新啟動服務,端口為8080,post調用地址:http://192.168.3.224:8080/quartz/add-elastic-task,調用添加接口成功任務開始定時執行,查看zookeeper上也已經存儲了新創建Job元數據信息,當我們調用刪除任務接口后定時任務不再執行
{
"cron": "0/5 * * * * ?",
"beginTime": "2021-08-27",
"clazzName": "com.itxs.scheduler.ElasticDemoJob",
"jobGroup": "myelasticgroup",
"jobName": "myelasticjob",
"parmas": "real elastic job dynamic hello world"
}
有耐心看在這里的小伙伴們,恭喜你,已經入門了分布式任務調度