springboot整合elasticJob實戰(純代碼開發三種任務類型用法)以及分片系統,事件追蹤詳解


一 springboot整合

介紹就不多說了,只有這個框架是當當網開源的,支持分布式調度,分布式系統中非常合適(兩個服務同時跑不會重復,並且可靈活配置分開分批處理數據,賊方便)!

這里主要還是用到zookeeper,如果沒有zk環境,可以百度或者參考我之前的博客搭建

添加依賴,這里有一點,如果是在springcloud中的話,需要排除自帶的curator依賴,因為cloud已經集成一些,會沖突:

 1  <!-- elastic-job -->
 2         <dependency>
 3             <groupId>com.dangdang</groupId>
 4             <artifactId>elastic-job-lite-core</artifactId>
 5             <version>2.1.5</version>
 6             <exclusions>
 7                 <exclusion>
 8                     <artifactId>curator-client</artifactId>
 9                     <groupId>org.apache.curator</groupId>
10                 </exclusion>
11                 <exclusion>
12                     <artifactId>curator-framework</artifactId>
13                     <groupId>org.apache.curator</groupId>
14                 </exclusion>
15                 <exclusion>
16                     <artifactId>curator-recipes</artifactId>
17                     <groupId>org.apache.curator</groupId>
18                 </exclusion>
19             </exclusions>
20         </dependency>
21         <dependency>
22             <groupId>com.dangdang</groupId>
23             <artifactId>elastic-job-lite-spring</artifactId>
24             <version>2.1.5</version>
25         </dependency>
26         <dependency>
27             <groupId>org.apache.curator</groupId>
28             <artifactId>curator-framework</artifactId>
29             <version>2.10.0</version>
30         </dependency>
31         <dependency>
32             <groupId>org.apache.curator</groupId>
33             <artifactId>curator-client</artifactId>
34             <version>2.10.0</version>
35         </dependency>
36         <dependency>
37             <groupId>org.apache.curator</groupId>
38             <artifactId>curator-recipes</artifactId>
39             <version>2.10.0</version>
40         </dependency>
41     </dependencies>
View Code

 

然后就是配置zk注冊中心,分布式功能主要依賴這個,所有屬性都從yml中注入,這里注意一點,可以把超時時間設置大一點:

@Configuration public class ElasticRegCenterConfig { /** * 配置zookeeper注冊中心 */ @Bean(initMethod = "init")  // 需要配置init執行初始化邏輯
    public ZookeeperRegistryCenter regCenter( @Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace); zookeeperConfiguration.setMaxRetries(3); //設置重試次數,可設置其他屬性
        zookeeperConfiguration.setSessionTimeoutMilliseconds(500000); //設置會話超時時間,盡量大一點,否則項目無法正常啟動
        return new ZookeeperRegistryCenter(zookeeperConfiguration); } }

 

然后就是配置job了,其實和spring的quartz配置都差不多,一個job類,一個調度類

這里先貼我的yml配置,任務執行周期,分片個數都從這里注入即可,分片使用后面單獨說明:

 

二 simplejob

job類:

@Component public class MySimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { System.out.println(shardingContext.getJobName()+"執行:"+
                "分片參數:"+shardingContext.getShardingParameter()+
                ",當前分片項:"+shardingContext.getShardingItem()+
                ",time:"+ LocalDate.now()); } }

 

 

配置類,這里用到了一個工具方法,工具類放下面:

/** * 配置MySimpleJob */ @Configuration public class MySimpleJobConf { @Autowired ZookeeperRegistryCenter regCenter; @Autowired MySimpleJob mySimpleJob; /** * 配置任務調度: 參數: 任務 * zk注冊中心 * 任務詳情 */ @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(@Value("${mySimpleJob.cron}") final String cron,  //yml注入
                                           @Value("${mySimpleJob.shardingTotalCount}") final int shardingTotalCount, @Value("${mySimpleJob.shardingItemParameters}") final String shardingItemParameters) { return new SpringJobScheduler(mySimpleJob, regCenter, ElasticJobUtils.getSimpleJobConfiguration( mySimpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters) //,new MyElasticJobListener() 可配置監聽器
 ); } }

 

工具類:

public class ElasticJobUtils { /** * 創建簡單任務詳細信息 */
    public static LiteJobConfiguration getSimpleJobConfiguration(final Class<? extends SimpleJob> jobClass, //任務類
                                                                final String cron,    // 運行周期配置
                                                                final int shardingTotalCount,  //分片個數
                                                                final String shardingItemParameters) {  // 分片參數
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration( JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount) .shardingItemParameters(shardingItemParameters).build() , jobClass.getCanonicalName()) ).overwrite(true).build(); } /** * 創建流式作業配置 */
    public static LiteJobConfiguration getDataFlowJobConfiguration(final Class<? extends DataflowJob> jobClass, //任務類
                                                                   final String cron,    // 運行周期配置
                                                                   final int shardingTotalCount,  //分片個數
                                                                   final String shardingItemParameters, final Boolean streamingProcess   //是否是流式作業
                                                                   ) {  // 分片參數
        return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration( JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount) .shardingItemParameters(shardingItemParameters).build() // true為流式作業,除非fetchData返回數據為null或者size為0,否則會一直執行 // false 非流式,只會按配置時間執行一次
 , jobClass.getCanonicalName(),streamingProcess) ).overwrite(true).build(); } }
View Code

測試:

三 dataflowjob

job類:

@Component public class MyDataFlowJob implements DataflowJob<String> { @Override public List<String> fetchData(ShardingContext shardingContext) { //抓取數據
        System.out.println("---------獲取數據---------"); return Arrays.asList("1","2","3"); } @Override public void processData(ShardingContext shardingContext, List<String> list) {//處理數據
        System.out.println("---------處理數據---------"); list.forEach(x-> System.out.println("數據處理:"+x)); } }

 

配置類:

@Configuration public class MyDataFlowJobConf { @Autowired ZookeeperRegistryCenter regCenter; @Autowired MyDataFlowJob myDataFlowJob; /** * 配置任務調度: 參數: 任務 * zk注冊中心 * 任務詳情 */ @Bean(initMethod = "init") public JobScheduler dataFlowJobScheduler(@Value("${myDataFlowJob.cron}") final String cron,  //yml注入
                                           @Value("${myDataFlowJob.shardingTotalCount}") final int shardingTotalCount, @Value("${myDataFlowJob.shardingItemParameters}") final String shardingItemParameters) { return new SpringJobScheduler(myDataFlowJob, regCenter, ElasticJobUtils.getDataFlowJobConfiguration( myDataFlowJob.getClass(), cron, shardingTotalCount, shardingItemParameters,true) //,new MyElasticJobListener() 可配置監聽器
 ); } }

 

測試:

需要注意一點流式作業如果數據不為空會一直跑

四 scriptjob

腳本任務有一點,不需要創建類實例,否則會報錯,參數直接傳null即可

配置類:

@Configuration
public class MyScriptJobConf {
    @Autowired ZookeeperRegistryCenter regCenter;
    /**
     * 配置任務調度: 參數:  任務
     *                    zk注冊中心
     *                    任務詳情
     */
    @Bean(initMethod = "init")
    public JobScheduler scriptJobScheduler(@Value("${myScriptJob.cron}") final String cron,  //yml注入
                                           @Value("${myScriptJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${myScriptJob.shardingItemParameters}") final String shardingItemParameters) {
        return new SpringJobScheduler(null, regCenter,
                                      ElasticJobUtils.getScriptJobConfiguration(
                                              "script_job",
                                              cron,
                                              shardingTotalCount,
                                              //命令或者腳本路徑
                                              shardingItemParameters,"echo hello")
                                              //,new MyElasticJobListener() 可配置監聽器
        );
    }
}

工具添加靜態方法:

/**
     * 創建腳本作業配置
     */
    public static LiteJobConfiguration getScriptJobConfiguration(final String jobName, //任務名字
                                                                 final String cron,    // 運行周期配置
                                                                 final int shardingTotalCount,  //分片個數
                                                                 final String shardingItemParameters,
                                                                 final String scriptCommandLine   //是腳本路徑或者命令
    ) {  // 分片參數
        return LiteJobConfiguration.newBuilder(new ScriptJobConfiguration(
                JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters).build()
                // 此處配置文件路徑或者執行命令
                , scriptCommandLine)
        ).overwrite(true).build();
    }

測試:

 

五 分片用法

分片的目的就是通過配置分片個數,讓不同的分片參數到不同的服務中去,比如配置了分片個數是2,那么分片一會到服務一中,分片二到服務二中

項目中根據分片參數來決定哪個服務處理哪些數據,比如  0=客戶甲,1=客戶乙,但是分片item是從1開始

分片算法默認是平均,可自定義,然后參數就是上面yml那種配置,比如2,就是 0=,1=  4就是0=,1=,2=,3=,兩個服務的話服務一就是0,1的參數,服務二就是2,3的參數,並且分片item是3,4

然后要注意一點的是,這個分片識別是根據ip的,也就是說同一台電腦,跑兩個程序沒用,兩個程序都會全部執行,還是會重復

主要是這個分片保證分布式中處理數據不重復,分片也會轉移,即一個服務掛了之后,分片參數和item會自動轉移到剩下服務中

六 事件追蹤(即任務信息持久化到mysql)

需要提前創建btach_log數據庫

配置數據源Bean,在任務配置中添加event

@Configuration
@ConfigurationProperties(prefix = "spring.datasource")
public class JobDataSourceConf {
    private String url;
    private String username;
    private String password;
    private String driver_class_name;

    @Bean
    @Primary
    public DataSource hikariDataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        dataSource.setJdbcUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        dataSource.setDriverClassName(driver_class_name);
        return dataSource;
    }

 

程序會自動創建兩張表並添加記錄

 

 

七 容易踩的坑

一 配置類中配置bean的時候,方法名不要重復,否則會發現任務不跑,

二 測試分布式的時候,必須跑在ip不一樣的服務上,否則不會實現分片

三 我的版本再pom里面,springboot版本是2.0.6,版本不一樣可能用法也有些區別

四 理論上xml更簡單,但是我個人比較喜歡代碼風格,哈哈

五 腳本任務不能新建實例,參數傳null,且確認命令是否有權限


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM