elastic-job-spring-boot
qq交流群:812321371
1 簡介
Elastic-Job
是一個分布式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite
和Elastic-Job-Cloud
組成。Elastic-Job-Lite
定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務的協調服務。
基於quartz
定時任務框架為基礎的,因此具備quartz
的大部分功能
使用zookeeper
做協調,調度中心,更加輕量級
支持任務的分片
支持彈性擴容,可以水平擴展, 當任務再次運行時,會檢查當前的服務器數量,重新分片,分片結束之后才會繼續執行任務
失效轉移,容錯處理,當一台調度服務器宕機或者跟zookeeper
斷開連接之后,會立即停止作業,然后再去尋找其他空閑的調度服務器,來運行剩余的任務
提供運維界面,可以管理作業和注冊中心。
1.1 使用場景
由於項目為微服務,單模塊可能在兩個實例以上的數量,定時器就會出現多實例同時執行的情況。
一般定時器缺少管理界面,無法監控定時器是否執行成功。
市面上常見的解決方案為定時器加鎖的操作,或者采用第3方分布式定時器。
分布式定時器有多種方案,比如阿里內部的ScheduledX
,當當網的Elastic job
,個人開源的xxl-job
等。
1.2 功能列表
- 分布式調度協調
- 彈性擴容縮容
- 失效轉移
- 錯過執行作業重觸發
- 作業分片一致性,保證同一分片在分布式環境中僅一個執行實例
- 自診斷並修復分布式不穩定造成的問題
- 支持並行調度
- 支持作業生命周期操作
- 豐富的作業類型
- Spring整合以及命名空間提供
- 運維平台
1.3 概念
分片:任務的分布式執行,需要將一個任務拆分為多個獨立的任務項,然后由分布式的服務器分別執行某一個或幾個分片項。
例如:有一個遍歷數據庫某張表的作業,現有2台服務器。為了快速的執行作業,那么每台服務器應執行作業的50%
。 為滿足此需求,可將作業分成2片,每台服務器執行1片。作業遍歷數據的邏輯應為:服務器A遍歷ID以奇數結尾的數據;服務器B遍歷ID以偶數結尾的數據。 如果分成10片,則作業遍歷數據的邏輯應為:每片分到的分片項應為ID%10,而服務器A被分配到分片項0,1,2,3,4
;服務器B被分配到分片項5,6,7,8,9
,直接的結果就是服務器A遍歷ID
以0-4
結尾的數據;服務器B遍歷ID
以5-9
結尾的數據。
歷史軌跡:Elastic-Job
提供了事件追蹤功能,可通過事件訂閱的方式處理調度過程的重要事件,用於查詢、統計和監控。
1.4 封裝elasticjob
由於當當網Elastic job
處於1年間未更新階段,相關jar處於可以使用階段功能不全。考慮到使用場景為多項目使用,將elastic-job-lite-spring
簡單封裝便於使用。
2.使用說明:
2.1 添加依賴
ps:實際version版本請使用最新版
<dependency>
<groupId>com.purgeteam</groupId>
<artifactId>elasticjob-spring-boot-starter</artifactId>
<version>0.1.1.RELEASE</version>
</dependency>
2.2 配置
ps: 需要mysql
,zookeeper
支持,請提前搭建好。
配置bootstrap.yml
或者application.yml
。
加入以下配置:
spring:
elasticjob:
datasource: # job需要的記錄數據源
url: jdbc:mysql://127.0.0.1:3306/batch_log?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: Rtqw123OpnmER
regCenter: # 注冊中心
serverList: 127.0.0.1:2181
namespace: elasticJobDemo
2.3 定時器實現方法編寫
創建定時器類(唯一不同的地方在於將@Scheduled
改為實現SimpleJob
接口即可)
定時器實現方法編寫在execute
方法里。
@Slf4j
@Component
public class MySimpleJob implements SimpleJob {
// @Scheduled(cron = "0 0/1 * * * ?")
@Override
public void execute(ShardingContext shardingContext) {
log.info(String.format("Thread ID: %s, 作業分片總數: %s, " +
"當前分片項: %s.當前參數: %s," +
"作業名稱: %s.作業自定義參數: %s",
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
));
// 分片大致如下:根據配置的分片參數執行相應的邏輯
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
log:Thread ID: 66, 作業分片總數: 1, 當前分片項: 0.當前參數: Beijing,作業名稱: PropertiesSimpleJob.作業自定義參數: test
2.4 配置定時器
2.4.1 創建Configuration類
將ZookeeperRegistryCenter
和JobEventConfiguration
注入。
創建JobScheduler
@Bean(initMethod = "init")
。
在mySimpleJobScheduler
方法里先通過ElasticJobUtils#getLiteJobConfiguration
獲取LiteJobConfiguration
對象。
創建SpringJobScheduler
對象返回即可。
@Configuration
public class MyJobConfig {
// job 名稱
private static final String JOB_NAME = "MySimpleJob";
// 定時器cron參數
private static final String CRON = "0 0/1 * * * ?";
// 定時器分片
private static final int SHARDING_TOTAL_COUNT = 1;
// 分片參數
private static final String SHARDING_ITEM_PARAMETERS = "0=Beijing,1=Shanghai,2=Guangzhou";
// 自定義參數
private static final String JOB_PARAMETERS = "parameter";
@Resource
private ZookeeperRegistryCenter regCenter;
@Resource
private JobEventConfiguration jobEventConfiguration;
@Bean(initMethod = "init")
public JobScheduler mySimpleJobScheduler(final MySimpleJob mySimpleJob) {
LiteJobConfiguration liteJobConfiguration = ElasticJobUtils
.getLiteJobConfiguration(mySimpleJob.getClass(), JOB_NAME, CRON,
SHARDING_TOTAL_COUNT, SHARDING_ITEM_PARAMETERS, JOB_PARAMETERS);
// 參數:1.定時器實例,2.注冊中心類,3.LiteJobConfiguration,
// 3.歷史軌跡(不需要可以省略)
return new SpringJobScheduler(mySimpleJob, regCenter, liteJobConfiguration, jobEventConfiguration);
}
}
ElasticJobUtils#getLiteJobConfiguration
參數簡介:
/**
* 獲取 {@link LiteJobConfiguration} 對象
*
* @param jobClass 定時器實現類
* @param jobName 定時器名稱
* @param cron 定時參數
* @param shardingTotalCount 作業分片總數
* @param shardingItemParameters 當前參數 可以為null
* @param jobParameters 作業自定義參數 可以為null
* @return {@link LiteJobConfiguration}
*/
public static LiteJobConfiguration getLiteJobConfiguration(
final Class<? extends SimpleJob> jobClass,
final String jobName,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters,
final String jobParameters) {
...
return ...;
}
2.4.2 簡化Configuration類
當然也可以用下面的@Configuration
實現簡化,配置bootstrap.yml
或者application.yml
。
spring:
elasticjob:
scheduled:
jobConfigMap: // 為map集合
PropertiesSimpleJob: // 定時器key名稱
jobName: PropertiesSimpleJob // job名稱
cron: 0 0/1 * * * ? // cron表達式
shardingTotalCount: 2 // 分片數量
shardingItemParameters: 0=123,1=332 // 分片參數
jobParameters: test // 自定義參數
注入SpringJobSchedulerFactory
,在propertiesSimpleJobScheduler
方法里調用gerSpringJobScheduler
方法即可。
@Configuration
public class PropertiesSimpleJobConfig {
@Resource
private SpringJobSchedulerFactory springJobSchedulerFactory;
@Bean(initMethod = "init")
public JobScheduler propertiesSimpleJobScheduler(final PropertiesSimpleJob job) {
// 參數:1.定時器實例,2.配置名稱,3.是否開啟歷史軌跡
return springJobSchedulerFactory.getSpringJobScheduler(job,"PropertiesSimpleJob", true);
}
}
2.4.3 注解方式配置(推薦方式)
ps:這個注解包含了上述方式,簡化定時器注入。
繼承SimpleJob
實現方法execute
。
在AnnotationSimpleJob
類上加入注解@ElasticJobScheduler
即可。
下面為完整注解。
@Slf4j
@ElasticJobScheduler(
name = "AnnotationSimpleJob", // 定時器名稱
cron = "0/8 * * * * ?", // 定時器表達式
shardingTotalCount = 1, // 作業分片總數 默認為1
shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou", // 分片序列號和參數用等號分隔 不需要參數可以不加
jobParameters = "123", // 作業自定義參數 不需要參數可以不加
isEvent = true // 是否開啟數據記錄 默認為true
)
public class AnnotationSimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info(String.format("Thread ID: %s, 作業分片總數: %s, " +
"當前分片項: %s.當前參數: %s," +
"作業名稱: %s.作業自定義參數: %s",
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
));
}
}
總結
分布式job可以解決多個項目同一個定時器都執行的問題,配合elastic-job控制台可以直觀監控定時器執行情況等。
示例代碼地址:elastic-job-spring-boot