分布式定時任務 -- elastic-job


一、前言

  在我們的項目當中,使用定時任務是避免不了的,我們在部署定時任務時,通常只部署一台機器。部署多台機器時,同一個任務會執行多次。比如短信提醒,每天定時的給用戶下發短信,如果部署了多台,同一個用戶將發送多條。只部署一台機器,可用性又無法保證。今天向大家介紹一款開源產品,分布式定時任務解決方案---- elastic-job。

二、簡介

  Elastic-Job是一個分布式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。在我們的項目中使用了輕量級無中心化解決方案,Elastic-Job-Lite。

  1、分片概念

  任務的分布式執行,需要將一個任務拆分為多個獨立的任務項,然后由分布式的服務器分別執行某一個或幾個分片項。

  例如:有一個遍歷數據庫某張表的作業,現有2台服務器。為了快速的執行作業,那么每台服務器應執行作業的50%。 為滿足此需求,可將作業分成2片,每台服務器執行  1片。作業遍歷數據的邏輯應為:服務器A遍歷ID以奇數結尾的數據;服務器B遍歷ID以偶數結尾的數據。 如果分成10片,則作業遍歷數據的邏輯應為:每片分到的分片項應為ID%10,而服務器A被分配到分片項0,1,2,3,4;服務器B被分配到分片項5,6,7,8,9,直接的結果就是服務器A遍歷ID以0-4結尾的數據;服務器B遍歷ID以5-9結尾的數據。

  Elastic-Job並不直接提供數據處理的功能,框架只會將分片項分配至各個運行中的作業服務器,開發者需要自行處理分片項與真實數據的對應關系。

  2、作業高可用

  上述作業中,如果有一個應用掛掉,分片項將會重新分片,沒有掛掉的應用將獲得分片項0-9。

三、實際應用

  這里我們采用大家都比較熟悉的基於spring配置文件的配置。

  1、引入jar包

  在pom.xml中添加如下配置:

<!-- 引入elastic-job-lite核心模塊 -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>${latest.release.version}</version>
</dependency>

<!-- 使用springframework自定義命名空間時引入 -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>${latest.release.version}</version>
</dependency>

  2、作業程序

public class MyElasticJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

  我們的定時任務要實現SimpleJob接口,並實現execute方法。在寫程序時,我們通常不會用case區分不同的分片,context.getShardingItem() 可以獲得當前的分片項,context.getShardingTotalCount()獲得總分片數。我們把當前分片項,總分片數傳入到sql中,按照規則字段取模,檢索出該分片處理的數據,再進行處理。

  3、spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
    xmlns:job="http://www.dangdang.com/schema/ddframe/job"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
    <!--配置作業注冊中心 -->
    <reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
    
    <!-- 配置作業-->
    <job:simple id="oneOffElasticJob" overwrite="true" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> </beans>

  作業中心我們采用zookeeper,我們項目中采用做小的zk集群,3台。在作業中心配置中,server-lists填寫3台zk地址,用“,”隔開,zk1:port1,zk2:port2,zk3:port3。下面就是我們作業的具體實現的配置規則,class實現類、registry-center-ref配置中心zk的id(regCenter)、cron定時任務規則、sharding-total-count總分片數。

  overwrite="true"這個配置很重要,因為這些配置都要上傳到zk中,當你改變了配置之后,zk中並沒有改變,執行的任務還是舊的。所以要加上這個配置。

  這樣,我們的分布式定時任務就配置好了,剩下的就是部署,上面的例子中,我們的總分片數是4,如果我們部署2台機器,每台機器將獲得2個分片,部署4台機器,每台機器獲得一個分片。如果出現宕機情況,分片將重新分配,從而做到高可用。

四、總結

  當當的這款開源產品是非常棒的,解決了我的項目中定時任務的單點問題,使系統有了高可用的保證。要說缺點嘛,也有一個,就是每一個任務都需要新寫一個類實SimpleJob接口。

  

 

博主原創,轉載請聯系博主

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM