Elastic-Job是當當網的任務調度開源框架,有以下功能
分布式調度協調
彈性擴容縮容
失效轉移
錯過執行作業重觸發
作業分片一致性,保證同一分片在分布式環境中僅一個執行實例
自診斷並修復分布式不穩定造成的問題
支持並行調度
支持作業生命周期操作
豐富的作業類型
Spring整合以及命名空間提供
運維平台
具體信息可以查看 官網 ,Elastic-Job的文檔很詳細,同時也有相應的demo。但是,美中不足的是他的springboot版本的demo用的是xml結構的。網上的例子都有點亂,花了點時間整合了下,現在就開始吧。
快速入門
1.pom文件
<elastic-job.version>2.1.5</elastic-job.version> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>${elastic-job.version}</version> </dependency> <!-- elastic-job-lite-spring --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>${elastic-job.version}</version> </dependency>
2.定義zookeeper
@Configuration public class ElasticRegCenterConfig { /** * 配置zookeeper * @param serverList * @param namespace * @return */ @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter( @Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) { return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); } }
3.定義job
@Component public class SimpleJobDemo implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " + "當前分片項: %s.當前參數: %s," + "當前任務名稱: %s.當前任務參數: %s" , Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); } }
4.定義任務監聽器,統計每次任務執行的時間
public class MyElasticJobListener implements ElasticJobListener { private static final Logger logger = LoggerFactory.getLogger(MyElasticJobListener.class); private long beginTime = 0; @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { beginTime = System.currentTimeMillis(); logger.info("===>{} JOB BEGIN TIME: {} <===",shardingContexts.getJobName(), TimeUtil.mill2Time(beginTime)); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { long endTime = System.currentTimeMillis(); logger.info("===>{} JOB END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), TimeUtil.mill2Time(endTime), endTime - beginTime); } }
5.配置JobConfiuration,配置job隨容器一起啟動
@Configuration public class ElasticJobConfig { @Autowired private ZookeeperRegistryCenter regCenter; /** * 配置任務監聽器 * @return */ @Bean public ElasticJobListener elasticJobListener() { return new MyElasticJobListener(); } /** * 配置任務詳細信息 * @param jobClass * @param cron * @param shardingTotalCount * @param shardingItemParameters * @return */ private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration( JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount) .shardingItemParameters(shardingItemParameters).build() , jobClass.getCanonicalName()) ).overwrite(true).build(); } @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(final SimpleJobDemo simpleJob, @Value("${stockJob.cron}") final String cron, @Value("${stockJob.shardingTotalCount}") final int shardingTotalCount, @Value("${stockJob.shardingItemParameters}") final String shardingItemParameters) { MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), elasticJobListener); } }
配置文件如下
server.port=${random.int[10000,19999]} regCenter.serverList = localhoost:2181 regCenter.namespace = elastic-job-lite-springboot stockJob.cron = 0/5 * * * * ? stockJob.shardingTotalCount = 4 stockJob.shardingItemParameters = 0=0,1=1,2=0,3=1
啟動項目,輸出
大家可以仔細的觀察下配置文件和輸出的對應關系。假設我先有這樣的需求。
定時任務在兩台主機上A,B同時運行.A處理id是奇數的數據,B處理Id為偶數的數據
用傳統的定時任務例如@Schedule注解能實現么?那當然是可以,直接寫死硬編碼即可。用Elastic-Job的話,則相當靈活,示例如下
public class DbQueryJob implements SimpleJob { @Autowired private XXXDao xxxDao; @Override public void execute(ShardingContext shardingContext) { String shardingParameter = shardingContext.getShardingParameter(); //mod是對id取余后的結果 List<xxx> xxxlists=xxxDao.select("select * from table where mod="+shardingParameter); xxxlists.forEach(x->{ System.out.println("參數:"+shardingContext.getShardingParameter()+"狀態"+x.getStatus()); }); } }
稍微講下原理
Elastic-Job默認采用平均分片策略
如果有3台服務器,分成9片,則每台服務器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8] 如果有3台服務器,分成8片,則每台服務器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5] 如果有3台服務器,分成10片,則每台服務器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]
上述配置文件我們分成4片,A服務分到0,1 B服務分到2,3,同時 shardingItemParameters 參數表明了每個分片對應的ItemParameters,所以A服務的 shardingContext.getShardingParameter()=0
通過這樣的邏輯,我們就能實現我們的分片業務了。
以上,就是Springboot整合Elastic-Job