Elastic-Job開發指南(轉)


原文地址:http://dangdangdotcom.github.io/elastic-job/post/1.x/user_guide/

 

開發指南

代碼開發

作業類型

目前提供3種作業類型,分別是SimpleDataFlowScript

DataFlow類型用於處理數據流,它又提供2種作業類型,分別是ThroughputDataFlowSequenceDataFlow。需要繼承相應的抽象類。

Script類型用於處理腳本,可直接使用,無需編碼。

方法參數shardingContext包含作業配置,分片和運行時信息。可通過getShardingTotalCount()getShardingItems()等方法分別獲取分片總數,運行在本作業服務器的分片序列號集合等。

Simple類型作業

Simple類型作業意為簡單實現,未經任何封裝的類型。需要繼承AbstractSimpleElasticJob,該類只提供了一個方法用於覆蓋,此方法將被定時執行。用於執行普通的定時任務,與Quartz原生接口相似,只是增加了彈性擴縮容和分片等功能。

public class MyElasticJob extends AbstractSimpleElasticJob {
 
    @Override
    public void process(JobExecutionMultipleShardingContext context) {
        // do something by sharding items
    }
}

ThroughputDataFlow類型作業

ThroughputDataFlow類型作業意為高吞吐的數據流作業。需要繼承AbstractIndividualThroughputDataFlowElasticJob並可以指定返回值泛型,該類提供3個方法可覆蓋,分別用於抓取數據,處理數據和指定是否流式處理數據。可以獲取數據處理成功失敗次數等輔助監控信息。如果流式處理數據,fetchData方法的返回值只有為null或長度為空時,作業才會停止執行,否則作業會一直運行下去;非流式處理數據則只會在每次作業執行過程中執行一次fetchData方法和processData方法,即完成本次作業。流式數據處理參照TbSchedule設計,適用於不間歇的數據處理。

作業執行時會將fetchData的數據傳遞給processData處理,其中processData得到的數據是通過多線程(線程池大小可配)拆分的。如果采用流式作業處理方式,建議processData處理數據后更新其狀態,避免fetchData再次抓取到,從而使得作業永遠不會停止。processData的返回值用於表示數據是否處理成功,拋出異常或者返回false將會在統計信息中歸入失敗次數,返回true則歸入成功次數。

public class MyElasticJob extends AbstractIndividualThroughputDataFlowElasticJob<Foo> {
 
    @Override
    public List<Foo> fetchData(JobExecutionMultipleShardingContext context) {
        Map<Integer, String> offset = context.getOffsets();
        List<Foo> result = // get data from database by sharding items and by offset
        return result;
    }
 
    @Override
    public boolean processData(JobExecutionMultipleShardingContext context, Foo data) {
        // process data
        // ...
 
        // store offset
        for (int each : context.getShardingItems()) {
            updateOffset(each, "your offset, maybe id");
        }
        return true;
    }
}

SequenceDataFlow類型作業

SequenceDataFlow類型作業和ThroughputDataFlow作業類型極為相似,所不同的是ThroughputDataFlow作業類型可以將獲取到的數據多線程處理,但不會保證多線程處理數據的順序。如:從2個分片共獲取到100條數據,第1個分片40條,第2個分片60條,配置為兩個線程處理,則第1個線程處理前50條數據,第2個線程處理后50條數據,無視分片項;SequenceDataFlow類型作業則根據當前服務器所分配的分片項數量進行多線程處理,每個分片項使用同一線程處理,防止了同一分片的數據被多線程處理,從而導致的順序問題。如:從2個分片共獲取到100條數據,第1個分片40條,第2個分片60條,則系統自動分配兩個線程處理,第1個線程處理第1個分片的40條數據,第2個線程處理第2個分片的60條數據。由於ThroughputDataFlow作業可以使用多於分片項的任意線程數處理,所以性能調優的可能會優於SequenceDataFlow作業。

public class MyElasticJob extends AbstractIndividualSequenceDataFlowElasticJob<Foo> {
 
    @Override
    public List<Foo> fetchData(JobExecutionSingleShardingContext context) {
        int offset = context.getOffset();
        List<Foo> result = // get data from database by sharding items and by offset
        return result;
    }
 
    @Override
    public boolean processData(JobExecutionSingleShardingContext context, Foo data) {
        // process data
        // ...
 
        // store offset
        updateOffset(context.getShardingItem(), "your offset, maybe id");
        return true;
    }
}

Script類型作業

Script類型作業意為腳本類型作業,支持shellpythonperl等所有類型腳本。只需通過控制台/代碼配置scriptCommandLine即可。執行腳本路徑可以包含參數,最后一個參數為作業運行時信息.

#!/bin/bash
echo sharding execution context is $*

作業運行時輸出

sharding execution context is {"shardingItems":[0,1,2,3,4,5,6,7,8,9],"shardingItemParameters":{},"offsets":{},"jobName":"scriptElasticDemoJob","shardingTotalCount":10,"jobParameter":"","monitorExecution":true,"fetchDataCount":1}

批量處理

為了提高數據處理效率,數據流類型作業提供了批量處理數據的功能。之前逐條處理數據的兩個抽象類分別是AbstractIndividualThroughputDataFlowElasticJobAbstractIndividualSequenceDataFlowElasticJob,批量處理則使用另外兩個接口AbstractBatchThroughputDataFlowElasticJobAbstractBatchSequenceDataFlowElasticJob。不同之處在於processData方法的返回值從boolean類型變為int類型,用於表示一批數據處理的成功數量,第二個入參則轉變為List數據集合。

異常處理

elastic-job在最上層接口提供了handleJobExecutionException方法,使用作業時可以覆蓋此方法,並使用quartz提供的JobExecutionException控制異常后作業的聲明周期。默認實現是直接將異常拋出。示例:

任務監聽配置

可以通過配置多個任務監聽器,在任務執行前和執行后執行監聽的方法。監聽器分為每台作業節點均執行和分布式場景中僅單一節點執行兩種。

每台作業節點均執行的監聽

若作業處理作業服務器的文件,處理完成后刪除文件,可考慮使用每個節點均執行清理任務。此類型任務實現簡單,且無需考慮全局分布式任務是否完成,請盡量使用此類型監聽器。

步驟:

  • 定義監聽器
import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.api.listener.ElasticJobListener;
 
public class MyElasticJobListener implements ElasticJobListener {
 
    @Override
    public void beforeJobExecuted(final JobExecutionMultipleShardingContext shardingContext) {
        // do something ...
    }
 
    @Override
    public void afterJobExecuted(final JobExecutionMultipleShardingContext shardingContext) {
        // do something ...
    }
}
  • 將監聽器作為參數傳入JobScheduler
public class JobMain {
 
    public static void main(final String[] args) {
        new JobScheduler(regCenter, jobConfig, new MyElasticJobListener()).init();
    }
}

分布式場景中僅單一節點執行的監聽

若作業處理數據庫數據,處理完成后只需一個節點完成數據清理任務即可。此類型任務處理復雜,需同步分布式環境下作業的狀態同步,提供了超時設置來避免作業不同步導致的死鎖,請謹慎使用。

步驟:

  • 定義監聽器
import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener;
 
public final class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener {
 
    public TestDistributeOnceElasticJobListener(final long startTimeoutMills, final long completeTimeoutMills) {
        super(startTimeoutMills, completeTimeoutMills);
    }
 
    @Override
    public void doBeforeJobExecutedAtLastStarted(final JobExecutionMultipleShardingContext shardingContext) {
        // do something ...
    }
 
    @Override
    public void doAfterJobExecutedAtLastCompleted(final JobExecutionMultipleShardingContext shardingContext) {
        // do something ...
    }
}
  • 將監聽器作為參數傳入JobScheduler
public class JobMain {
 
    public static void main(final String[] args) {
        long startTimeoutMills = 5000L;
        long completeTimeoutMills = 10000L;
        new JobScheduler(regCenter, jobConfig, new MyDistributeOnceElasticJobListener(startTimeoutMills, completeTimeoutMills)).init();
    }
}

作業配置

Spring容器配合使用作業,可以將作業Bean配置為Spring Bean,可在作業中通過依賴注入使用Spring容器管理的數據源等對象。可用placeholder占位符從屬性文件中取值。

Spring命名空間配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
    xmlns:job="http://www.dangdang.com/schema/ddframe/job"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
    <!--配置作業注冊中心 -->
    <reg:zookeeper id="regCenter" server-lists=" yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
 
    <!-- 配置簡單作業-->
    <job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?"   sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
 
    <!-- 配置數據流作業-->
    <job:dataflow id="throughputDataFlow" class="xxx.MyThroughputDataFlowElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" process-count-interval-seconds="10" concurrent-data-process-thread-count="10" />
 
    <!-- 配置腳本作業-->
    <job:script id="scriptElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" />
 
    <!-- 配置帶監聽的簡單作業-->
    <job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?"   sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C">
        <job:listener class="xx.MySimpleJobListener"/>
        <job:listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
    </job:simple>
</beans>

job:simple命名空間屬性詳細說明

 

 

 

 

 

job:dataflow命名空間屬性詳細說明

job:dataflow命名空間擁有job:simple命名空間的全部屬性,以下僅列出特有屬性

 

 

 job:script命名空間屬性詳細說明,基本屬性參照job:simple命名空間屬性詳細說明

job:script命名空間擁有job:simple命名空間的全部屬性,以下僅列出特有屬性

 

 

 

 

不使用Spring配置

如果不使用Spring框架,可以用如下方式啟動作業。

import com.dangdang.ddframe.job.api.config.JobConfiguration;
import com.dangdang.ddframe.job.api.JobScheduler;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter;
import com.dangdang.example.elasticjob.core.job.SimpleJobDemo;
import com.dangdang.example.elasticjob.core.job.ThroughputDataFlowJobDemo;
import com.dangdang.example.elasticjob.core.job.SequenceDataFlowJobDemo;
import com.dangdang.ddframe.job.plugin.job.type.integrated.ScriptElasticJob;
 
public class JobDemo {
 
    // 定義Zookeeper注冊中心配置對象
    private ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "elastic-job-example", 1000, 3000, 3);
 
    // 定義Zookeeper注冊中心
    private CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
 
    // 定義簡單作業配置對象
    private final SimpleJobConfiguration simpleJobConfig = JobConfigurationFactory.createSimpleJobConfigurationBuilder("simpleElasticDemoJob",
                    SimpleJobDemo.class, 10, "0/30 * * * * ?").build();
 
    // 定義高吞吐流式處理的數據流作業配置對象
    private final DataFlowJobConfiguration throughputJobConfig = JobConfigurationFactory.createDataFlowJobConfigurationBuilder("throughputDataFlowElasticDemoJob",
                    ThroughputDataFlowJobDemo.class, 10, "0/5 * * * * ?").streamingProcess(true).build();
 
    // 定義順序的數據流作業配置對象
    private final DataFlowJobConfiguration sequenceJobConfig = JobConfigurationFactory.createDataFlowJobConfigurationBuilder("sequenceDataFlowElasticDemoJob",
                    SequenceDataFlowJobDemo.class, 10, "0/5 * * * * ?").build();
 
    // 定義腳本作業配置對象
    private final ScriptJobConfiguration scriptJobConfig = JobConfigurationFactory.createScriptJobConfigurationBuilder("scriptElasticDemoJob",
                    10, "0/5 * * * * ?", "test.sh").build();
 
    public static void main(final String[] args) {
        new JobDemo().init();
    }
 
    private void init() {
        // 連接注冊中心
        regCenter.init();
        // 啟動簡單作業
        new JobScheduler(regCenter, simpleJobConfig).init();
        // 啟動高吞吐流式處理的數據流作業
        new JobScheduler(regCenter, throughputJobConfig).init();
        // 啟動順序的數據流作業
        new JobScheduler(regCenter, sequenceJobConfig).init();
        // 啟動腳本作業
        new JobScheduler(regCenter, scriptJobConfig).init();
    }
}

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM