Springboot整合Elastic-Job


Elastic-Job是當當網的任務調度開源框架,有以下功能

分布式調度協調
彈性擴容縮容
失效轉移
錯過執行作業重觸發
作業分片一致性,保證同一分片在分布式環境中僅一個執行實例
自診斷並修復分布式不穩定造成的問題
支持並行調度
支持作業生命周期操作
豐富的作業類型
Spring整合以及命名空間提供
運維平台

具體信息可以查看 官網 ,Elastic-Job的文檔很詳細,同時也有相應的demo。但是,美中不足的是他的springboot版本的demo用的是xml結構的。網上的例子都有點亂,花了點時間整合了下,現在就開始吧。

 

快速入門


 1.pom文件

        <elastic-job.version>2.1.5</elastic-job.version>

<dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>${elastic-job.version}</version>
        </dependency>
        <!-- elastic-job-lite-spring -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>${elastic-job.version}</version>
        </dependency>

2.定義zookeeper

@Configuration
public class ElasticRegCenterConfig {
    /**
     * 配置zookeeper
     * @param serverList
     * @param namespace
     * @return
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(
            @Value("${regCenter.serverList}") final String serverList,
            @Value("${regCenter.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }
}

3.定義job

@Component
public class SimpleJobDemo implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " +
                        "當前分片項: %s.當前參數: %s," +
                        "當前任務名稱: %s.當前任務參數: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()

        ));
    }
}

4.定義任務監聽器,統計每次任務執行的時間

public class MyElasticJobListener implements ElasticJobListener {
    private static final Logger logger = LoggerFactory.getLogger(MyElasticJobListener.class);

    private long beginTime = 0;
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        beginTime = System.currentTimeMillis();

        logger.info("===>{} JOB BEGIN TIME: {} <===",shardingContexts.getJobName(), TimeUtil.mill2Time(beginTime));
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        long endTime = System.currentTimeMillis();
        logger.info("===>{} JOB END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), TimeUtil.mill2Time(endTime), endTime - beginTime);
    }
}

5.配置JobConfiuration,配置job隨容器一起啟動

@Configuration
public class ElasticJobConfig {
    @Autowired
    private ZookeeperRegistryCenter regCenter;
    /**
     * 配置任務監聽器
     * @return
     */
    @Bean
    public ElasticJobListener elasticJobListener() {
        return new MyElasticJobListener();
    }
    /**
     * 配置任務詳細信息
     * @param jobClass
     * @param cron
     * @param shardingTotalCount
     * @param shardingItemParameters
     * @return
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters).build()
                , jobClass.getCanonicalName())
        ).overwrite(true).build();
    }
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJobDemo simpleJob,
                                           @Value("${stockJob.cron}") final String cron,
                                           @Value("${stockJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${stockJob.shardingItemParameters}") final String shardingItemParameters) {
        MyElasticJobListener elasticJobListener = new MyElasticJobListener();
        return new SpringJobScheduler(simpleJob, regCenter,
                getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters),
                elasticJobListener);
    }
}

配置文件如下

server.port=${random.int[10000,19999]}
regCenter.serverList = localhoost:2181
regCenter.namespace = elastic-job-lite-springboot

stockJob.cron = 0/5 * * * * ?
stockJob.shardingTotalCount = 4
stockJob.shardingItemParameters = 0=0,1=1,2=0,3=1

 

啟動項目,輸出

  

 大家可以仔細的觀察下配置文件和輸出的對應關系。假設我先有這樣的需求。

定時任務在兩台主機上A,B同時運行.A處理id是奇數的數據,B處理Id為偶數的數據

用傳統的定時任務例如@Schedule注解能實現么?那當然是可以,直接寫死硬編碼即可。用Elastic-Job的話,則相當靈活,示例如下

public class DbQueryJob implements SimpleJob {
    @Autowired
    private XXXDao xxxDao;
    @Override
    public void execute(ShardingContext shardingContext) {
        String shardingParameter = shardingContext.getShardingParameter();
        //mod是對id取余后的結果
        List<xxx> xxxlists=xxxDao.select("select * from table where mod="+shardingParameter);
        xxxlists.forEach(x->{
            System.out.println("參數:"+shardingContext.getShardingParameter()+"狀態"+x.getStatus());
        });
    }
}

稍微講下原理

Elastic-Job默認采用平均分片策略

如果有3台服務器,分成9片,則每台服務器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]

如果有3台服務器,分成8片,則每台服務器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]

如果有3台服務器,分成10片,則每台服務器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]

上述配置文件我們分成4片,A服務分到0,1  B服務分到2,3,同時 shardingItemParameters 參數表明了每個分片對應的ItemParameters,所以A服務的 shardingContext.getShardingParameter()=0

通過這樣的邏輯,我們就能實現我們的分片業務了。

以上,就是Springboot整合Elastic-Job

 

 源碼

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM