elastic-job集成到springboot教程,和它的一個異常處理辦法:Sharding item parameters '1' format error, should be int=xx,int=xx


先說這個Sharding item parameters '1' format error, should be int=xx,int=xx異常吧,這是在做動態添加調度任務的時候出現的,網上找了一會沒有搜到任何信息,最后發現,是添加任務這個方法里有一個漏洞。

這個源碼出自:

 1 private ShardingItem parse(final String shardingItemParameter, final String originalShardingItemParameters) {
 2         String[] pair = shardingItemParameter.trim().split(KEY_VALUE_DELIMITER);
 3         if (2 != pair.length) {
 4             throw new JobConfigurationException("Sharding item parameters '%s' format error, should be int=xx,int=xx", originalShardingItemParameters);
 5         }
 6         try {
 7             return new ShardingItem(Integer.parseInt(pair[0].trim()), pair[1].trim());
 8         } catch (final NumberFormatException ex) {
 9             throw new JobConfigurationException("Sharding item parameters key '%s' is not an integer.", pair[0]);
10         }
11     }

 

修改前代碼(報這個異常的代碼):

1  public void addJobScheduler(final Class<? extends SimpleJob> jobClass,
2       final String cron,
3       final int shardingTotalCount,
4       final String shardingItemParameters) {
5     JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();
6     SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, jobClass.getCanonicalName());
7     JobScheduler jobScheduler = new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build());
8     jobScheduler.init();
9   }

 

是不是發現不管你怎么設置,都給你報這個,你明明傳的就不是1這個參數,還是給你報這個,問題出在build()那里,需要overwrite。修改后:

1  public void addJobScheduler(final Class<? extends SimpleJob> jobClass,
2       final String cron,
3       final int shardingTotalCount,
4       final String shardingItemParameters) {
5     JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();
6     SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, jobClass.getCanonicalName());
7     JobScheduler jobScheduler = new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build());
8     jobScheduler.init();
9   }

紅色代碼為修改后加的代碼。

 

先說說這個dangdang的elastic-job,它是一個分布式任務調度插件。今天我遇到的問題就是,有部分任務,在多節點環境中,不需要每個節點執行,比如只需要一個節點(確切地說就是作業分片總數=1)上運行的任務,這時候elastic-job就是個不錯的選擇,它可以很靈活的配置作業分片總數等。它的官方文檔,鏈接指向配置說明:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/

 

那么spring boot中如何集成進它。需要的一個前提條件是zookeeper服務,這個一般項目里都會用到,你只需要連就好了,如果dev或者你們還沒用上,可以找個教程安裝一下。鏈接指向在 CentOS7 上安裝 Zookeeper服務 。

然后.propertis配置文件(yml類同):

regCenter.serverList=10.0.30.140:2181
regCenter.namespace=elastic-job

simpleJob.cron=0/5 * * * * ?  
# 作業分片總數,設為1只在一個節點執行
simpleJob.shardingTotalCount=1
simpleJob.shardingItemParameters=0=A,1=B,2=C

當然,pom中需要引入elastic-job:

<dependency>
    <artifactId>elastic-job-common-core</artifactId>
    <groupId>com.dangdang</groupId>
    <version>${elastic-job.version}</version>
  </dependency>
  <dependency>
    <artifactId>elastic-job-lite-core</artifactId>
    <groupId>com.dangdang</groupId>
    <version>${elastic-job.version}</version>
  </dependency>
  <dependency>
    <artifactId>elastic-job-lite-spring</artifactId>
    <groupId>com.dangdang</groupId>
    <version>${elastic-job.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>${curator.version}</version>
  </dependency>

本示例代碼用到的版本是:

<elastic-job.version>2.1.5</elastic-job.version>
    <curator.version>2.10.0</curator.version>

連接zookeeper注冊中心:

import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author binhy
 *@date 2019-1-22
 */
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class RegistryCenterConfig {

  @Bean(initMethod = "init")
  public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
    return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
  }
}

然后做一個任務信息持久化:

import javax.annotation.Resource;
import javax.sql.DataSource;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;

/**
 * @author binhy
 *@date 2019-1-22
 */
@Configuration
public class JobEventConfig {

  @Resource
  private DataSource dataSource;

  @Bean
  public JobEventConfiguration jobEventConfiguration() {
    return new JobEventRdbConfiguration(dataSource);
  }
}

當你最后運行后會發現你的庫里多了兩張表job_execution_log和job_status_trace_log他們會詳細的記錄你的任務執行信息,包括執行ip,開始結束時間等,還是非常不錯的。

 

然后需要一個任務管理類,初始化一些任務,我這里把動態添加任務的方法也寫在了這里。因為這種形式需要你一個任務寫一個配置和類去實現。動態添加會省很多事。

import com.goopal.exdata.dangdang.DemoJob;
import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

/**
 * @author binhy
 * @date 2019-1-22
 */
@Configuration
public class SimpleJobConfig {

  @Resource
  private ZookeeperRegistryCenter regCenter;

  @Resource
  private JobEventConfiguration jobEventConfiguration;

  @Bean
  public SimpleJob simpleJob() {
    return new DemoJob();
  }

  @Bean(initMethod = "init")
  public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron, @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
      @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
    return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
  }

  private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
    return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
        jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
  }

  /**
   * 動態添加
   * @param jobClass
   * @param cron
   * @param shardingTotalCount
   * @param shardingItemParameters
   */
  public void addJobScheduler(final Class<? extends SimpleJob> jobClass,
      final String cron,
      final int shardingTotalCount,
      final String shardingItemParameters) {
    JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();
    SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, jobClass.getCanonicalName());
    JobScheduler jobScheduler = new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build());
    jobScheduler.init();
  }
}

到這里,你在配置文件中配置的定時任務就已經可以在多節點環境中,僅在1個節點執行了。需要添加更多的不同cron的任務,只需要在代碼業務邏輯處調用即可。如:

 @Autowired
  private  SimpleJobConfig simpleJobConfig;
  @Override
  public void run(ApplicationArguments args) throws Exception {

    simpleJobConfig.addJobScheduler(AnalysisData.class,"0/3 * * * * ?",3,"0=A,1=B,2=C");//0=A,1=B,2=C
  }

 

如果有幫助到你,給我點個贊哦~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM