一、前言
在我們的項目當中,使用定時任務是避免不了的,我們在部署定時任務時,通常只部署一台機器。部署多台機器時,同一個任務會執行多次。比如短信提醒,每天定時的給用戶下發短信,如果部署了多台,同一個用戶將發送多條。只部署一台機器,可用性又無法保證。今天向大家介紹一款開源產品,分布式定時任務解決方案---- elastic-job。
二、簡介
Elastic-Job是一個分布式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。在我們的項目中使用了輕量級無中心化解決方案,Elastic-Job-Lite。
1、分片概念
任務的分布式執行,需要將一個任務拆分為多個獨立的任務項,然后由分布式的服務器分別執行某一個或幾個分片項。
例如:有一個遍歷數據庫某張表的作業,現有2台服務器。為了快速的執行作業,那么每台服務器應執行作業的50%。 為滿足此需求,可將作業分成2片,每台服務器執行 1片。作業遍歷數據的邏輯應為:服務器A遍歷ID以奇數結尾的數據;服務器B遍歷ID以偶數結尾的數據。 如果分成10片,則作業遍歷數據的邏輯應為:每片分到的分片項應為ID%10,而服務器A被分配到分片項0,1,2,3,4;服務器B被分配到分片項5,6,7,8,9,直接的結果就是服務器A遍歷ID以0-4結尾的數據;服務器B遍歷ID以5-9結尾的數據。
Elastic-Job並不直接提供數據處理的功能,框架只會將分片項分配至各個運行中的作業服務器,開發者需要自行處理分片項與真實數據的對應關系。
2、作業高可用
上述作業中,如果有一個應用掛掉,分片項將會重新分片,沒有掛掉的應用將獲得分片項0-9。
三、實際應用
這里我們采用大家都比較熟悉的基於spring配置文件的配置。
1、引入jar包
在pom.xml中添加如下配置:
<!-- 引入elastic-job-lite核心模塊 --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>${latest.release.version}</version> </dependency> <!-- 使用springframework自定義命名空間時引入 --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>${latest.release.version}</version> </dependency>
2、作業程序
public class MyElasticJob implements SimpleJob { @Override public void execute(ShardingContext context) { switch (context.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } }
我們的定時任務要實現SimpleJob接口,並實現execute方法。在寫程序時,我們通常不會用case區分不同的分片,context.getShardingItem() 可以獲得當前的分片項,context.getShardingTotalCount()獲得總分片數。我們把當前分片項,總分片數傳入到sql中,按照規則字段取模,檢索出該分片處理的數據,再進行處理。
3、spring配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "> <!--配置作業注冊中心 --> <reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /> <!-- 配置作業--> <job:simple id="oneOffElasticJob" overwrite="true" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> </beans>
作業中心我們采用zookeeper,我們項目中采用做小的zk集群,3台。在作業中心配置中,server-lists填寫3台zk地址,用“,”隔開,zk1:port1,zk2:port2,zk3:port3。下面就是我們作業的具體實現的配置規則,class實現類、registry-center-ref配置中心zk的id(regCenter)、cron定時任務規則、sharding-total-count總分片數。
overwrite="true"這個配置很重要,因為這些配置都要上傳到zk中,當你改變了配置之后,zk中並沒有改變,執行的任務還是舊的。所以要加上這個配置。
這樣,我們的分布式定時任務就配置好了,剩下的就是部署,上面的例子中,我們的總分片數是4,如果我們部署2台機器,每台機器將獲得2個分片,部署4台機器,每台機器獲得一個分片。如果出現宕機情況,分片將重新分配,從而做到高可用。
四、總結
當當的這款開源產品是非常棒的,解決了我的項目中定時任務的單點問題,使系統有了高可用的保證。要說缺點嘛,也有一個,就是每一個任務都需要新寫一個類實SimpleJob接口。
博主原創,轉載請聯系博主