本文源碼:GitHub·點這里 || GitEE·點這里
一、ElasticJob簡介
1、定時任務
在前面的文章中,說過QuartJob這個定時任務,被廣泛應用的定時任務標准。但Quartz核心點在於執行定時任務並不是在於關注的業務模式和場景,缺少高度自定義的功能。Quartz能夠基於數據庫實現任務的高可用,但是不具備分布式並行調度的功能。
2、ElasticJob說明
- 基礎簡介
Elastic-Job 是一個開源的分布式調度中間件,由兩個相互獨立的子項目 Elastic-Job-Lite 和 Elastic-Job-Cloud 組成。Elastic-Job-Lite 為輕量級無中心化解決方案,使用 jar 包提供分布式任務的調度和治理。 Elastic-Job-Cloud 是一個 Mesos Framework,依托於Mesos額外提供資源治理、應用分發以及進程隔離等服務。
- 功能特點
分布式調度協調
彈性擴容縮容
失效轉移
錯過執行作業重觸發
作業分片一致性,保證同一分片在分布式環境中僅一個執行實例
補刀
:人家官網這樣描述的,這里贅述一下,充實一下文章。
- 基礎框架結構
該圖片來自ElasticJob官網。
由圖可知如下內容:
需要Zookeeper組件支持,作為分布式的調度任務,有良好的監聽機制,和控制台,下面的案例也就沖這個圖解來。
3、分片管理
這個概念在ElasticJob中是最具有特點的,實用性極好。
- 分片概念
任務的分布式執行,需要將一個任務拆分為多個獨立的任務項,然后由分布式的服務器分別執行某一個或幾個分片項。
場景描述:假設有服務3台,分3片管理,要處理數據表100條,那就可以100%3,按照余數0,1,2分散到三台服務上執行,看到這里分庫分表的基本邏輯涌上心頭,這就是為何很多大牛講說,編程思維很重要。
- 個性化參數
個性化參數即shardingItemParameter,可以和分片項匹配對應關系,用於將分片項的數字轉換為更加可讀的業務代碼。
場景描述:這里猛一讀好像很飄逸,其實就是這個意思,如果分3片,取名[0,1,2]不好看,或者不好標識,可以分別給個別名標識一下,[0=A,1=B,2=C]。
二、定時任務加載
1、核心依賴包
這里使用2.0+的版本。
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
2、核心配置文件
這里主要配置一下Zookeeper中間件,分片和分片參數。
zookeeper:
server: 127.0.0.1:2181
namespace: es-job
job-config:
cron: 0/10 * * * * ?
shardCount: 1
shardItem: 0=A,1=B,2=C,3=D
3、自定義注解
看了官方的案例,沒看到好用的注解,這里只能自己編寫一個,基於案例的加載過程和核心API作為參考。
核心配置類:
com.dangdang.ddframe.job.lite.config.LiteJobConfiguration
根據自己想如何使用注解的思路,比如我只想注解定時任務名稱和Cron表達式這兩個功能,其他參數直接統一配置(這里可能是受QuartJob影響太深,可能根本就是想省事...)
@Inherited
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface TaskJobSign {
@AliasFor("cron")
String value() default "";
@AliasFor("value")
String cron() default "";
String jobName() default "";
}
4、作業案例
這里打印一些基本參數,對照配置和注解,一目了然。
@Component
@TaskJobSign(cron = "0/5 * * * * ?",jobName = "Hello-Job")
public class HelloJob implements SimpleJob {
private static final Logger LOG = LoggerFactory.getLogger(HelloJob.class.getName()) ;
@Override
public void execute(ShardingContext shardingContext) {
LOG.info("當前線程: "+Thread.currentThread().getId());
LOG.info("任務分片:"+shardingContext.getShardingTotalCount());
LOG.info("當前分片:"+shardingContext.getShardingItem());
LOG.info("分片參數:"+shardingContext.getShardingParameter());
LOG.info("任務參數:"+shardingContext.getJobParameter());
}
}
5、加載定時任務
既然自定義注解,那加載過程自然也要自定義一下,讀取自定義的注解,配置化,加入容器,然后初始化,等着任務執行就好。
@Configuration
public class ElasticJobConfig {
@Resource
private ApplicationContext applicationContext ;
@Resource
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@Value("${job-config.cron}") private String cron ;
@Value("${job-config.shardCount}") private int shardCount ;
@Value("${job-config.shardItem}") private String shardItem ;
/**
* 配置任務監聽器
*/
@Bean
public ElasticJobListener elasticJobListener() {
return new TaskJobListener();
}
/**
* 初始化配置任務
*/
@PostConstruct
public void initTaskJob() {
Map<String, SimpleJob> jobMap = this.applicationContext.getBeansOfType(SimpleJob.class);
Iterator iterator = jobMap.entrySet().iterator();
while (iterator.hasNext()) {
// 自定義注解管理
Map.Entry<String, SimpleJob> entry = (Map.Entry)iterator.next();
SimpleJob simpleJob = entry.getValue();
TaskJobSign taskJobSign = simpleJob.getClass().getAnnotation(TaskJobSign.class);
if (taskJobSign != null){
String cron = taskJobSign.cron() ;
String jobName = taskJobSign.jobName() ;
// 生成配置
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(jobName, cron, shardCount)
.shardingItemParameters(shardItem).jobParameter(jobName).build(),
simpleJob.getClass().getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(
simpleJobConfiguration).overwrite(true).build();
TaskJobListener taskJobListener = new TaskJobListener();
// 初始化任務
SpringJobScheduler jobScheduler = new SpringJobScheduler(
simpleJob, zookeeperRegistryCenter,
liteJobConfiguration, taskJobListener);
jobScheduler.init();
}
}
}
}
絮叨一句
:不要疑問這些API是怎么知道,看下官方文檔的案例,他們怎么使用這些核心API,這里就是照着寫過來,就是多一步自定義注解類的加載過程。當然官方文檔大致讀一遍還是很有必要的。
補刀一句
:如何快速學習一些組件的用法,首先找到官方文檔,或者開源庫Wiki,再不濟ReadMe文檔(如果都沒有,酌情放棄,另尋其他),熟悉基本功能是否符合自己的需求,如果符合,就看下基本用法案例,熟悉API,最后就是研究自己需要的功能模塊,個人經驗來看,該過程是彎路最少,坑最少的。
6、任務監聽
用法非常簡單,實現ElasticJobListener接口。
@Component
public class TaskJobListener implements ElasticJobListener {
private static final Logger LOG = LoggerFactory.getLogger(TaskJobListener.class);
private long beginTime = 0;
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
beginTime = System.currentTimeMillis();
LOG.info(shardingContexts.getJobName()+"===>開始...");
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
long endTime = System.currentTimeMillis();
LOG.info(shardingContexts.getJobName()+
"===>結束...[耗時:"+(endTime - beginTime)+"]");
}
}
絮叨一句
:before和after執行前后,中間執行目標方法,標准的AOP切面思想,所以底層水平決定了對上層框架的理解速度,那本《Java編程思想》上的灰塵是不是該擦擦?
三、動態添加
1、作業任務
有部分場景需要動態添加和管理定時任務,基於上面的加載流程,在自定義一些步驟就可以。
@Component
public class GetTimeJob implements SimpleJob {
private static final Logger LOG = LoggerFactory.getLogger(GetTimeJob.class.getName()) ;
private static final SimpleDateFormat format =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") ;
@Override
public void execute(ShardingContext shardingContext) {
LOG.info("Job Name:"+shardingContext.getJobName());
LOG.info("Local Time:"+format.format(new Date()));
}
}
2、添加任務服務
這里就動態添加上面的任務。
@Service
public class TaskJobService {
@Resource
private ZookeeperRegistryCenter zookeeperRegistryCenter;
public void addTaskJob(final String jobName,final SimpleJob simpleJob,
final String cron,final int shardCount,final String shardItem) {
// 配置過程
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(
jobName, cron, shardCount)
.shardingItemParameters(shardItem).build();
JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,
simpleJob.getClass().getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(
jobTypeConfiguration).overwrite(true).build();
TaskJobListener taskJobListener = new TaskJobListener();
// 加載執行
SpringJobScheduler jobScheduler = new SpringJobScheduler(
simpleJob, zookeeperRegistryCenter,
liteJobConfiguration, taskJobListener);
jobScheduler.init();
}
}
補刀一句
:這里添加之后,任務就會定時執行,如何停止任務又是一個問題,可以在任務名上做一些配置,比如在數據庫生成一條記錄[1,job1,state],如果調度到state為停止狀態的任務,直接截胡即可。
3、測試接口
@RestController
public class TaskJobController {
@Resource
private TaskJobService taskJobService ;
@RequestMapping("/addJob")
public String addJob(@RequestParam("cron") String cron,@RequestParam("jobName") String jobName,
@RequestParam("shardCount") Integer shardCount,
@RequestParam("shardItem") String shardItem) {
taskJobService.addTaskJob(jobName, new GetTimeJob(), cron, shardCount, shardItem);
return "success";
}
}
四、源代碼地址
GitHub·地址
https://github.com/cicadasmile/middle-ware-parent
GitEE·地址
https://gitee.com/cicadasmile/middle-ware-parent