先說這個Sharding item parameters '1' format error, should be int=xx,int=xx異常吧,這是在做動態添加調度任務的時候出現的,網上找了一會沒有搜到任何信息,最后發現,是添加任務這個方法里有一個漏洞。
這個源碼出自:
1 private ShardingItem parse(final String shardingItemParameter, final String originalShardingItemParameters) { 2 String[] pair = shardingItemParameter.trim().split(KEY_VALUE_DELIMITER); 3 if (2 != pair.length) { 4 throw new JobConfigurationException("Sharding item parameters '%s' format error, should be int=xx,int=xx", originalShardingItemParameters); 5 } 6 try { 7 return new ShardingItem(Integer.parseInt(pair[0].trim()), pair[1].trim()); 8 } catch (final NumberFormatException ex) { 9 throw new JobConfigurationException("Sharding item parameters key '%s' is not an integer.", pair[0]); 10 } 11 }
修改前代碼(報這個異常的代碼):
1 public void addJobScheduler(final Class<? extends SimpleJob> jobClass, 2 final String cron, 3 final int shardingTotalCount, 4 final String shardingItemParameters) { 5 JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(); 6 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, jobClass.getCanonicalName()); 7 JobScheduler jobScheduler = new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build()); 8 jobScheduler.init(); 9 }
是不是發現不管你怎么設置,都給你報這個,你明明傳的就不是1這個參數,還是給你報這個,問題出在build()那里,需要overwrite。修改后:
1 public void addJobScheduler(final Class<? extends SimpleJob> jobClass, 2 final String cron, 3 final int shardingTotalCount, 4 final String shardingItemParameters) { 5 JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(); 6 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, jobClass.getCanonicalName()); 7 JobScheduler jobScheduler = new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()); 8 jobScheduler.init(); 9 }
紅色代碼為修改后加的代碼。
先說說這個dangdang的elastic-job,它是一個分布式任務調度插件。今天我遇到的問題就是,有部分任務,在多節點環境中,不需要每個節點執行,比如只需要一個節點(確切地說就是作業分片總數=1)上運行的任務,這時候elastic-job就是個不錯的選擇,它可以很靈活的配置作業分片總數等。它的官方文檔,鏈接指向配置說明:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/
那么spring boot中如何集成進它。需要的一個前提條件是zookeeper服務,這個一般項目里都會用到,你只需要連就好了,如果dev或者你們還沒用上,可以找個教程安裝一下。鏈接指向在 CentOS7 上安裝 Zookeeper服務 。
然后.propertis配置文件(yml類同):
regCenter.serverList=10.0.30.140:2181
regCenter.namespace=elastic-job
simpleJob.cron=0/5 * * * * ?
# 作業分片總數,設為1只在一個節點執行
simpleJob.shardingTotalCount=1
simpleJob.shardingItemParameters=0=A,1=B,2=C
當然,pom中需要引入elastic-job:
<dependency> <artifactId>elastic-job-common-core</artifactId> <groupId>com.dangdang</groupId> <version>${elastic-job.version}</version> </dependency> <dependency> <artifactId>elastic-job-lite-core</artifactId> <groupId>com.dangdang</groupId> <version>${elastic-job.version}</version> </dependency> <dependency> <artifactId>elastic-job-lite-spring</artifactId> <groupId>com.dangdang</groupId> <version>${elastic-job.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>${curator.version}</version> </dependency>
本示例代碼用到的版本是:
<elastic-job.version>2.1.5</elastic-job.version> <curator.version>2.10.0</curator.version>
連接zookeeper注冊中心:
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author binhy *@date 2019-1-22 */ @Configuration @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0") public class RegistryCenterConfig { @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) { return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); } }
然后做一個任務信息持久化:
import javax.annotation.Resource; import javax.sql.DataSource; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.dangdang.ddframe.job.event.JobEventConfiguration; import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration; /** * @author binhy *@date 2019-1-22 */ @Configuration public class JobEventConfig { @Resource private DataSource dataSource; @Bean public JobEventConfiguration jobEventConfiguration() { return new JobEventRdbConfiguration(dataSource); } }
當你最后運行后會發現你的庫里多了兩張表job_execution_log和job_status_trace_log他們會詳細的記錄你的任務執行信息,包括執行ip,開始結束時間等,還是非常不錯的。
然后需要一個任務管理類,初始化一些任務,我這里把動態添加任務的方法也寫在了這里。因為這種形式需要你一個任務寫一個配置和類去實現。動態添加會省很多事。
import com.goopal.exdata.dangdang.DemoJob; import javax.annotation.Resource; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.event.JobEventConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; /** * @author binhy * @date 2019-1-22 */ @Configuration public class SimpleJobConfig { @Resource private ZookeeperRegistryCenter regCenter; @Resource private JobEventConfiguration jobEventConfiguration; @Bean public SimpleJob simpleJob() { return new DemoJob(); } @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron, @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount, @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) { return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration); } private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder( jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build(); } /** * 動態添加 * @param jobClass * @param cron * @param shardingTotalCount * @param shardingItemParameters */ public void addJobScheduler(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, jobClass.getCanonicalName()); JobScheduler jobScheduler = new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()); jobScheduler.init(); } }
到這里,你在配置文件中配置的定時任務就已經可以在多節點環境中,僅在1個節點執行了。需要添加更多的不同cron的任務,只需要在代碼業務邏輯處調用即可。如:
@Autowired private SimpleJobConfig simpleJobConfig; @Override public void run(ApplicationArguments args) throws Exception { simpleJobConfig.addJobScheduler(AnalysisData.class,"0/3 * * * * ?",3,"0=A,1=B,2=C");//0=A,1=B,2=C }
如果有幫助到你,給我點個贊哦~