elastic-job 新手指南


大多數情況下,定時任務我們一般使用 quartz開源框架就能滿足應用場景。但如果考慮到健壯性等其它一些因素,就需要自己下點工夫,比如:要避免單點故障,至少得部署2個節點吧,但是部署多個節點,又有其它問題,有些數據在某一個時刻只能處理一次,比如 i = i+1 這些無法保證冪等的操作,run多次跟run一次,完全是不同的效果。
 
對於上面的問題,我曾經自行設計過一個基於 zk分布式鎖的解決方案:
1、每類定時job,可以分配一個獨立的標識(比如:xxx_job)
2、這類job的實例,部署在多個節點上時,每個節點啟動前,向zk申請一個分布式鎖(在xxx_job節點下)
3、拿到鎖的實例,才允許啟動定時任務(通過代碼控制quartz的schedule),沒拿到鎖的,處於standby狀態,一直監聽鎖的變化
4、如果某個節點掛了,分布式鎖自動釋放,其它節點這時會搶到鎖,按上面的邏輯,就會從standby狀態,轉為激活狀態,小三正式上位,繼續執行定時job。
 
這個方案,基本上解決了HA和業務正確性的問題,但是美中不足的地方有2點:
1、無法充分利用機器性能,處於standby的節點,實際上只是一個備胎,平時啥也不干
2、性能不方便擴展,比如:某個job一次要處理上千萬的數據,僅1個激活節點,要處理很久
 
好了,前戲鋪墊了這么久,該請主角登場了, elastic-job相當於quartz+zk的加強版,它允許對定時任務分片,可以集群部署(每個job的"分片"會分散到各個節點上),如果某個節點掛了,該節點上的分片,會調度到其它節點上。官網上有比較詳細的教程,一般情況下,使用SimpleJob這種就可以了。
 
使用步驟:
前提:要先添加下面二個jar的依賴 
    compile "com.dangdang:elastic-job-lite-core:2.1.5"
    compile "com.dangdang:elastic-job-lite-spring:2.1.5" 
1、自己的job要繼承自SimpleJob,然后實現void execute(ShardingContext shardingContext)。
public interface SimpleJob extends ElasticJob {
    
    /**
     * 執行作業.
     *
     * @param shardingContext 分片上下文
     */
    void execute(ShardingContext shardingContext);
}

注意這里面有一個shardingContext參數,看下源碼:

/**
 * 分片上下文.
 * 
 * @author zhangliang
 */
@Getter
@ToString
public final class ShardingContext {
    
    /**
     * 作業名稱.
     */
    private final String jobName;
    
    /**
     * 作業任務ID.
     */
    private final String taskId;
    
    /**
     * 分片總數.
     */
    private final int shardingTotalCount;
    
    /**
     * 作業自定義參數.
     * 可以配置多個相同的作業, 但是用不同的參數作為不同的調度實例.
     */
    private final String jobParameter;
    
    /**
     * 分配於本作業實例的分片項.
     */
    private final int shardingItem;
    
    /**
     * 分配於本作業實例的分片參數.
     */
    private final String shardingParameter;
    
    public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
        jobName = shardingContexts.getJobName();
        taskId = shardingContexts.getTaskId();
        shardingTotalCount = shardingContexts.getShardingTotalCount();
        jobParameter = shardingContexts.getJobParameter();
        this.shardingItem = shardingItem;
        shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
    }
}

這里面有2個很重要的屬性:shardingTotalCount 分片總數(比如:2)、shardingItem 當前分片索引(比如:1),前面提到的性能擴容,就可以根據2個參數進行簡單的處理,假設在電商系統中,每天晚上有個定時任務,要統計每家店的銷量。商家id一般在表設計上是一個自增數字,如果總共2個分片(注:通常也就是部署2個節點),可以把 id為奇數的放到分片0,id為偶數的放到分片1,這樣2個機器各跑一半,相對只有1台機器而言,就快多了。

偽代碼如下:

public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        int shardIndx = shardingContext.getShardingItem();
        if (shardIndx == 0) {
            //處理id為奇數的商家
        } else {
            //處理id為偶數的商家
        }
    }
}

這個還可以進一步簡化,如果使用mysql查詢商家列表,mysql中有一個mod函數,直接可以對商家id進行取模運算

select * from shop where mod(shop_id,2)=0

如果把上面的2、0換成參數,mybatis中類似這樣:

select * from shop where mod(shop_id,#{shardTotal})=#{shardIndex}

這樣邏輯就轉換到sql中處理了,java代碼中把參數傳進來就行,連if都可以省掉。

2、接下來看看如何配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.dangdang.com/schema/ddframe/reg
       http://www.dangdang.com/schema/ddframe/reg/reg.xsd
       http://www.dangdang.com/schema/ddframe/job
       http://www.dangdang.com/schema/ddframe/job/job.xsd">

    <reg:zookeeper id="regCenter" server-lists="${zk_address}" namespace="my-xxx-job"
                   base-sleep-time-milliseconds="1000"
                   max-sleep-time-milliseconds="3000" max-retries="3"/>

    <job:simple id="xxxJob" class="com.cnblogs.yjmyzz.XXXJob" registry-center-ref="regCenter"
                cron="${xxxJob_cornExpress}"
                sharding-total-count="2" sharding-item-parameters="0=A,1=B"/>

                ...
</beans>

與常規的spring配置幾乎沒啥區別,幾個要點如下:

a) 因為分片調度是基於zk的,所以要先配置zk注冊中心,其中${zk_address}大家可以改成實際的zk地址列表,比如:10.x.x.1:2181,10.x.x.2:2181,10.x.x.3:2181 

b) 每個job中的corn屬性,就是quartz中的cornExpress表達式,然后sharding-total-count即總分片數,而sharding-item-parameters則是指定每個分片中的具體參數

(注:剛才的電商每天晚上算銷量,這個case其實只用到了分片索引、分片數,並不需要參數,所以這里隨便配置一個類似0=A, 1=B就可以了,如果有些業務場景,希望在知道分片索引的同時,還希望額外傳一些參數進來,就可以在這里配置你希望的參數,然后在execute中,也能讀到相應的參數)

3、控制台

elastic-job還提供了一個不錯的UI控制台,項目源代碼git clone到本地,mvn install就能得到一個elastic-job-lite-console-${version}.tar.gz的包,解壓,然后運行里面的bin/start.sh 就能跑起來,界面類似如下:

點擊查看原圖

通過這個控制台,可以動態調整每個定時任務的觸發時間(即:cornExpress)。詳情可參考官網文檔-運維平台部分。

4、與spring-cloud/spring-boot的整合

如果是傳統的spring項目,按上面的步驟就可以無縫整合了,如果是spring-cloud/spring-boot,則稍微要復雜點。

由於spring-boot倡導零xml配置,所以大部分配置就用代碼替代了,先定義一個elasticJob的配置類:

@Data
@Configuration
public class ElasticJobConfig {

    @Value("${rxQuartz.app.zkAddress}")
    private String zkNodes;

    @Value("${rxQuartz.app.namespace}")
    private String namespace;

    @Bean
    public ZookeeperConfiguration zkConfig() {
        return new ZookeeperConfiguration(zkNodes, namespace);
    }

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
        return new ZookeeperRegistryCenter(config);
    }
}

上面這段代碼,主要是解決zk注冊中心的注入問題,然后各種xxxJob,由於要讓spring自動注入,需要打上component注解

@Component("xxxJob")
public class XXXJob extends AbstractJob {
   
    ...
}

然后在真正要用的地方,把他們組裝起來

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: yangjunming
 */
@Configuration
public class ElasticJobs {

    @Autowired
    @Qualifier("xxxJob")
    public SimpleJob xxxJob;
   
    @Autowired
    private ZookeeperRegistryCenter regCenter;

    @Bean(initMethod = "init")
    public JobScheduler settlementJobScheduler(@Autowired @Qualifier("xxxJob") SimpleJob simpleJob,
                                               @Value("${xxxJob.billCronExpress}") final String cron,
                                               @Value("${xxxJob.shardingCount}") int shardingCount,
                                               @Value("${xxxJob.shardingItemParameters}") String shardingItemParameters) {
        return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingCount, shardingItemParameters));
    }

    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
    }
}

大功告成,祝大家周末擼碼愉快!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM