SpringBoot整合Elastic-job(詳細)


 

SpringBoot整合Elastic-job(詳細)

一 作業分片
1.分片概念
作業分片是指任務的分布式執行,需要將一個任務拆分為多個獨立的任務項,然后由分布式的應用實例分別執行某
一個或幾個分片項。
例如:Elastic-Job快速入門中文件備份的例子,現有2台服務器,每台服務器分別跑一個應用實例。為了快速的執
行作業,那么可以將作業分成4片,每個應用實例個執行2片。作業遍歷數據的邏輯應為:實例1查找text和image
類型文件執行備份;實例2查找radio和video類型文件執行備份。 如果由於服務器擴容應用實例數量增加為4,則
作業遍歷數據的邏輯應為:4個實例分別處理text、image、radio、video類型的文件。
可以看到,通過對任務合理的分片化,從而達到任務並行處理的效果,最大限度的提高執行作業的吞吐量。
分片項與業務處理解耦
Elastic-Job並不直接提供數據處理的功能,框架只會將分片項分配至各個運行中的作業服務器,開發者需要自行處
理分片項與真實數據的對應關系。
最大限度利用資源
將分片項設置為大於服務器的數量,最好是大於服務器倍數的數量,作業將會合理的利用分布式資源,動態的分配
分片項。
例如:3台服務器,分成10片,則分片項分配結果為服務器A=0,1,2;服務器B=3,4,5;服務器C=6,7,8,9。 如果服務器C
崩潰,則分片項分配結果為服務器A=0,1,2,3,4;服務器B=5,6,7,8,9。在不丟失分片項的情況下,最大限度的利用現
有資源提高吞吐量。


2.作業分片實現
基於Spring boot集成方式的而產出的工程代碼,完成對作業分片的實現,文件數據備份采取更接近真實項目的數
據庫存取方式。

CREATE DATABASE `elastic_job_demo` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
復制代碼
DROP TABLE IF EXISTS `t_file`;
CREATE TABLE `t_file`  (
  `id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `type` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `content` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `backedUp` tinyint(1) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
復制代碼

 

 pom文件

復制代碼
 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.15</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-spring -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
復制代碼

 

開發類

 

復制代碼
@Configuration
public class ElasticJobConfig {

    @Autowired
    private DataSource dataSource; //數據源已經存在,直接引入

//    @Autowired
//    SimpleJob fileBackupJob;

    @Autowired
    FileBackupJobDb fileBackupJob;

//    @Autowired
//    FileBackupJobDataFlow fileBackupJob;

    @Autowired
    CoordinatorRegistryCenter registryCenter;

    /**
     * 配置任務詳細信息
     * @param jobClass 任務執行類
     * @param cron  執行策略
     * @param shardingTotalCount 分片數量
     * @param shardingItemParameters 分片個性化參數
     * @return
     */
    private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                        final String cron,
                                                        final int shardingTotalCount,
                                                        final String shardingItemParameters){
        //JobCoreConfigurationBuilder
        JobCoreConfiguration.Builder JobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
        //設置shardingItemParameters
        if(!StringUtils.isEmpty(shardingItemParameters)){
            JobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfigurationBuilder.build();
        //創建SimpleJobConfiguration
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
        //創建LiteJobConfiguration
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true)
                .monitorPort(9888)//設置dump端口
                .build();
        return liteJobConfiguration;
    }

    //創建支持dataFlow類型的作業的配置信息
    private LiteJobConfiguration createFlowJobConfiguration(final Class<? extends ElasticJob> jobClass,
                                                        final String cron,
                                                        final int shardingTotalCount,
                                                        final String shardingItemParameters){
        //JobCoreConfigurationBuilder
        JobCoreConfiguration.Builder JobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
        //設置shardingItemParameters
        if(!StringUtils.isEmpty(shardingItemParameters)){
            JobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
        }
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfigurationBuilder.build();
        // 定義數據流類型任務配置
        DataflowJobConfiguration jobConfig = new DataflowJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName(),true);
        //創建LiteJobConfiguration
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true)
               // .monitorPort(9888)//設置dump端口
                .build();
        return liteJobConfiguration;
    }
    @Bean(initMethod = "init")
    public SpringJobScheduler initSimpleElasticJob() {
        // 增加任務事件追蹤配置
        JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);
        //創建SpringJobScheduler

        SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileBackupJob, registryCenter,
                createJobConfiguration(fileBackupJob.getClass(), "0/3 * * * * ?", 1, "0=text,1=image,2=radio,3=vedio")
                ,jobEventConfig);
        return springJobScheduler;
    }
}
復制代碼
復制代碼
@Configuration
public class ElasticJobRegistryCenterConfig {

    //zookeeper鏈接字符串 localhost:2181
    private  String ZOOKEEPER_CONNECTION_STRING = "192.168.180.113:2181" ;
    //定時任務命名空間
    private  String JOB_NAMESPACE = "elastic-job-boot-java";

    //zk的配置及創建注冊中心
    @Bean(initMethod = "init")
    public  CoordinatorRegistryCenter setUpRegistryCenter(){
        //zk的配置
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);

        zookeeperConfiguration.setSessionTimeoutMilliseconds(1000);
        //創建注冊中心
        CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        return zookeeperRegistryCenter;

    }
}
復制代碼

 

Job類

復制代碼
@Component
public class FileBackupJob implements SimpleJob {

    //每次任務執行要備份文件的數量
    private final int FETCH_SIZE = 1;
    //文件列表(模擬)
    public static List<FileCustom> files = new ArrayList<>();

    static {
        for(int i=1;i<11;i++){
            FileBackupJob.files.add(new FileCustom(String.valueOf(i+10),"文件"+(i+10),"text","content"+ (i+10)));
            FileBackupJob.files.add(new FileCustom(String.valueOf(i+20),"文件"+(i+20),"image","content"+ (i+20)));
            FileBackupJob.files.add(new FileCustom(String.valueOf(i+30),"文件"+(i+30),"radio","content"+ (i+30)));
            FileBackupJob.files.add(new FileCustom(String.valueOf(i+40),"文件"+(i+40),"video","content"+ (i+40)));
        }
        System.out.println("生產測試數據完成");
    }

    //任務執行代碼邏輯
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("作業分片:"+shardingContext.getShardingItem());
        //分片參數,(0=text,1=image,2=radio,3=vedio,參數就是text、image...)
        String jobParameter = shardingContext.getJobParameter();

        //獲取未備份的文件
        List<FileCustom> fileCustoms = fetchUnBackupFiles(FETCH_SIZE);
        //進行文件備份
        backupFiles(fileCustoms);
    }

    /**
     * 獲取未備份的文件
     * @param count   文件數量
     * @return
     */
    public List<FileCustom> fetchUnBackupFiles(int count){

        //獲取的文件列表
        List<FileCustom> fileCustoms = new ArrayList<>();
        int num=0;
        for(FileCustom fileCustom:files){
            if(num >=count){
                break;
            }
            if(!fileCustom.getBackedUp()){
                fileCustoms.add(fileCustom);
                num ++;
            }

        }
        System.out.printf("time:%s,獲取文件%d個\n", LocalDateTime.now(),num);
        return fileCustoms;

    }

    /**
     * 文件備份
     * @param files
     */
    public void backupFiles(List<FileCustom> files){
        for(FileCustom fileCustom:files){
            fileCustom.setBackedUp(true);
            System.out.printf("time:%s,備份文件,名稱:%s,類型:%s\n", LocalDateTime.now(),fileCustom.getName(),fileCustom.getType());
        }
    }
}
復制代碼
復制代碼
@Component
public class FileBackupJobDb implements SimpleJob {

    //每次任務執行要備份文件的數量
    private final int FETCH_SIZE = 1;

    @Autowired
    FileService fileService;

    //任務執行代碼邏輯
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("作業分片:"+shardingContext.getShardingItem());
        //分片參數,(0=text,1=image,2=radio,3=vedio,參數就是text、image...)
        String jobParameter = shardingContext.getShardingParameter();
        //獲取未備份的文件
        List<FileCustom> fileCustoms = fetchUnBackupFiles(jobParameter,FETCH_SIZE);
        //進行文件備份
        backupFiles(fileCustoms);
    }

    /**
     * 獲取未備份的文件
     * @param count   文件數量
     * @return
     */
    public List<FileCustom> fetchUnBackupFiles(String fileType,int count){

        List<FileCustom> fileCustoms = fileService.fetchUnBackupFiles(fileType, count);
        System.out.printf("time:%s,獲取文件%d個\n", LocalDateTime.now(),count);
        return fileCustoms;

    }

    /**
     * 文件備份
     * @param files
     */
    public void backupFiles(List<FileCustom> files){
        fileService.backupFiles(files);
    }
}
復制代碼

模型類

復制代碼
@Data
@NoArgsConstructor
public class FileCustom {
    /**
     * 標識
     */
    private String id;

    /**
     * 文件名
     */
    private String name;

    /**
     * 文件類型,如text、image、radio、vedio
     */
    private String type;

    /**
     * 文件內容
     */
    private String content;

    /**
     * 是否已備份
     */
    private Boolean backedUp = false;

    public FileCustom(String id, String name, String type, String content){
        this.id = id;
        this.name = name;
        this.type = type;
        this.content = content;
    }
}
復制代碼

Service類

復制代碼
@Service
public class FileService {

    @Autowired
    JdbcTemplate jdbcTemplate;

    /**
     * 獲取某文件類型未備份的文件
     * @param fileType 文件類型
     * @param count 獲取條數
     * @return
     */
    public List<FileCustom> fetchUnBackupFiles(String fileType, Integer count){
        String sql="select * from t_file where type = ? and backedUp = 0 limit 0,?";
        List<FileCustom> files = jdbcTemplate.query(sql, new Object[]{fileType, count}, new BeanPropertyRowMapper(FileCustom.class));
        return files;
    }

    /**
     * 備份文件
     * @param files 要備份的文件
     */
    public void backupFiles(List<FileCustom> files){
        for(FileCustom fileCustom:files){
            String sql="update t_file set backedUp = 1 where id = ?";
            jdbcTemplate.update(sql,new Object[]{fileCustom.getId()});
            System.out.println(String.format("線程 %d | 已備份文件:%s  文件類型:%s"
                    ,Thread.currentThread().getId()
                    ,fileCustom.getName()
                    ,fileCustom.getType()));
        }

    }
}
復制代碼

application.yml

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/elastic_job_demo?serverTimezone=UTC
    username : root
    password : 123456
    driver-class-name: com.mysql.jdbc.Driver

啟動類

復制代碼
@SpringBootApplication
public class ElasticJobApplication {

    public static void main(String[] args) {
        SpringApplication.run(ElasticJobApplication.class, args);
    }

}
復制代碼

測試類,往數據庫插入數據

復制代碼
@RunWith(SpringRunner.class)
@SpringBootTest
class ElasticJobApplicationTests {

    @Autowired
    JdbcTemplate jdbcTemplate;

    @Test
    public void testGenerateTestData(){
        //清除數據
        clearTestFiles();
        //制造數據
        generateTestFiles();
    }

    /**
     * 清除模擬數據
     */
    public void clearTestFiles(){
        jdbcTemplate.update("delete from t_file");
    }

    /**
     * 創建模擬數據
     */
    public void generateTestFiles(){
        List<FileCustom> files =new ArrayList<>();
        for(int i=1;i<11;i++){
            files.add(new FileCustom(String.valueOf(i),"文件"+ i,"text","content"+ i));
            files.add(new FileCustom(String.valueOf((i+10)),"文件"+(i+10),"image","content"+ (i+10)));
            files.add(new FileCustom(String.valueOf((i+20)),"文件"+(i+20),"radio","content"+ (i+20)));
            files.add(new FileCustom(String.valueOf((i+30)),"文件"+(i+30),"vedio","content"+ (i+30)));
        }
        for(FileCustom file : files){
            jdbcTemplate.update("insert into t_file (id,name,type,content,backedUp) values (?,?,?,?,?)",
                    new Object[]{file.getId(),file.getName(),file.getType(),file.getContent(),file.getBackedUp()});
        }
    }

}
復制代碼

當只開一個窗口

復制代碼
作業分片:1
作業分片:0
作業分片:3
作業分片:2
time:2019-12-19T16:12:02.614,獲取文件1個
time:2019-12-19T16:12:02.614,獲取文件1個
time:2019-12-19T16:12:02.614,獲取文件1個
time:2019-12-19T16:12:02.614,獲取文件1個
線程 109 | 已備份文件:文件31  文件類型:vedio
線程 108 | 已備份文件:文件21  文件類型:radio
線程 106 | 已備份文件:文件1  文件類型:text
線程 107 | 已備份文件:文件11  文件類型:image
作業分片:0
作業分片:1
作業分片:2
作業分片:3
time:2019-12-19T16:12:10.059,獲取文件1個
線程 118 | 已備份文件:文件12  文件類型:image
time:2019-12-19T16:12:12.411,獲取文件1個
time:2019-12-19T16:12:12.428,獲取文件1個
線程 117 | 已備份文件:文件2  文件類型:text
time:2019-12-19T16:12:12.438,獲取文件1個
線程 119 | 已備份文件:文件22  文件類型:radio
線程 120 | 已備份文件:文件32  文件類型:vedio
復制代碼

當開2個窗口的時候,結果如下:

 

 當開3個窗口的時候,結果如下:

 

 

 

查看控制台輸出可以得出如下結論: 1、任務運行期間,如果有新機器加入,則會立刻觸發分片機制,將任務相對
平均的分配到每台機器上並行執行調度。 2、如果有機器退出集群,則經過短暫的一段時間(大約40秒)后又會重
新觸發分片機制
如果在設置zookeeper注冊中心時,設置了session超時時間100 毫秒,則下次任務前就會觸發分片

復制代碼
@Bean(initMethod = "init")
    public CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(registryServerList,
registryNamespace);
        zkConfig.setSessionTimeoutMilliseconds(100);//這里設置了session超時時間100 毫秒
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
        return regCenter;
    }
復制代碼

如果在sessionTimeoutMs的時間段之內觸發任務,則異常分片的任務會丟失。舉個例子:假如
sessionTimeoutMs被設置成1分鍾,而本身的任務是30秒執行一次,有三個任務實例在三台機器各自執行分片
1,2,3。當分片3所在的機器出現問題,和zookeeper斷開了,那么zookeeper節點失效至少要到1分鍾以后。期間30
秒執行一次的任務分片3,至少會少執行一次。1分鍾過后,zookeeper節點失效,觸發
ListenServersChangedJobListener類的dataChanged方法,在這里方法中判斷instance節點變化,然后通過方法
shardingService.setReshardingFlag設置重新分片標志位,下次執行任務的時候,leader節點重新分配分片,分片
3就會轉移到其他好的機器上。

3.作業分片策略

AverageAllocationJobShardingStrategy
全路徑:
com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
策略說明:
基於平均分配算法的分片策略,也是默認的分片策略。
如果分片不能整除,則不能整除的多余分片將依次追加到序號小的服務器。如:
如果有3台服務器,分成9片,則每台服務器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
如果有3台服務器,分成8片,則每台服務器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
如果有3台服務器,分成10片,則每台服務器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]


OdevitySortByNameJobShardingStrategy
全路徑:
com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy
策略說明:
根據作業名的哈希值奇偶數決定IP升降序算法的分片策略。
作業名的哈希值為奇數則IP升序。
作業名的哈希值為偶數則IP降序。
用於不同的作業平均分配負載至不同的服務器。
AverageAllocationJobShardingStrategy的缺點是,一旦分片數小於作業服務器數,作業將永遠分配至IP地址靠前
的服務器,導致IP地址靠后的服務器空閑。而OdevitySortByNameJobShardingStrategy則可以根據作業名稱重新
分配服務器負載。如:
如果有3台服務器,分成2片,作業名稱的哈希值為奇數,則每台服務器分到的分片是:1=[0], 2=[1], 3=[]
如果有3台服務器,分成2片,作業名稱的哈希值為偶數,則每台服務器分到的分片是:3=[0], 2=[1], 1=[]
RotateServerByNameJobShardingStrategy
全路徑:
com.dangdang.ddframe.job.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy
策略說明:
根據作業名的哈希值對服務器列表進行輪轉的分片策略。
配置分片策略
與配置通常的作業屬性相同,在spring命名空間或者JobConfiguration中配置jobShardingStrategyClass屬性,屬
性值是作業分片策略類的全路徑。
分片策略配置xml方式:

<job:simple id="hotelSimpleSpringJob" class="com.chuanzhi.spiderhotel.job.SpiderJob" registry‐
center‐ref="regCenter" cron="0/10 * * * * ?" sharding‐total‐count="4" sharding‐item‐
parameters="0=A,1=B,2=C,3=D" monitor‐port="9888"  reconcile‐interval‐minutes="10" job‐sharding‐
strategy‐
class="com.dangdang.ddframe.job.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy"/>

分片策略配置java方式:

// 定義Lite作業根配置
        JobRootConfiguration simpleJobRootConfig =
LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass("com.dangdang.ddframe.
job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy").build();

4.Elastic-Job 高級
4.1 事件追蹤
Elastic-Job-Lite在配置中提供了JobEventConfiguration,支持數據庫方式配置,會在數據庫中自動創建
JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG兩張表以及若干索引,來記錄作業的相關信息。

4.1.2.啟動項目
啟動后會發現在elastic_job_demo數據庫中新增以下兩個表。
job_execution_log:

 

 job_status_trace_log:

JOB_EXECUTION_LOG記錄每次作業的執行歷史。分為兩個步驟:
1. 作業開始執行時向數據庫插入數據,除failure_cause和complete_time外的其他字段均不為空。
2. 作業完成執行時向數據庫更新數據,更新is_success, complete_time和failure_cause(如果作業執行失敗)。
JOB_STATUS_TRACE_LOG記錄作業狀態變更痕跡表。可通過每次作業運行的task_id查詢作業狀態變化的生命周期
和運行軌跡。

4.2 運維
elastic-job中提供了一個elastic-job-lite-console控制台
設計理念
1. 本控制台和Elastic Job並無直接關系,是通過讀取Elastic Job的注冊中心數據展現作業狀態,或更新注冊中心
數據修改全局配置。
2. 控制台只能控制作業本身是否運行,但不能控制作業進程的啟停,因為控制台和作業本身服務器是完全分布
式的,控制台並不能控制作業服務器。


主要功能
1. 查看作業以及服務器狀態
2. 快捷的修改以及刪除作業設置
3. 啟用和禁用作業
北京市昌平區建材城西路金燕龍辦公樓一層 電話:400-618-9090
4 . 跨注冊中心查看作業
5. 查看作業運行軌跡和運行狀態


不支持項
1. 添加作業。因為作業都是在首次運行時自動添加,使用控制台添加作業並無必要。直接在作業服務器啟動包
含Elastic Job的作業進程即可
具體搭建步驟如下:

下載地址:https://raw.githubusercontent.com/miguangying/elastic-job-lite-console/master/elastic-job-lite-
console-2.1.4.tar.gz
解壓縮 elastic -job-lite-console-${version}.tar.gz 。
進入 bin目錄 並執行:
打開瀏覽器訪問 http://localhost:8899/ 即可訪問控制台。8899為默認端口號,可通過啟動腳本輸入-p自定義端
口號。
elastic -job-lite-console-${version}.tar.gz 也可通過 elastic-job 源碼用 mvn install編譯獲取

 

配置及使用
1、 配置注冊中心地址
先啟動zookeeper 然后在注冊中心配置界面 點添加

 

連接成功后,在作業維度下可以顯示該命名空間下作業名稱、分片數量及該作業的cron表達式等信息
在服務器維度可以查看服務器ip、當前運行的實例數、作業總數等信息。

 

配置事件追蹤數據源
在事件追蹤數據源配置頁面點添加按鈕,輸入相關信息

 由於本地ipping不同,因此就不能查看詳細的日志


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM