elasticjob學習一:simplejob初識和springboot整合


Elastic-Job是一個分布式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務的協調服務; --摘自官網

具體的詳細介紹,大家可以去官網查閱

這篇文章主要是整合springboot 的簡單例子。通過一步一步實現,來逐步的熟悉elastic-job 這個組件,首要條件就是需要你有個運行的zookeeper

搭建springboot項目

這里不在贅述如果搭建,這里我只貼出我的pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>2.0.1.RELEASE</version>
       <relativePath/> <!-- lookup parent from repository -->
   </parent>
   <groupId>com.kevin</groupId>
   <artifactId>es-job</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>es-job</name>
   <description>Demo project for Spring Boot</description>

   <properties>
       <java.version>1.8</java.version>
       <elastic-job.version>2.1.4</elastic-job.version>
   </properties>

   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter</artifactId>
       </dependency>

       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>

       <!--  elastic-job dependency -->
       <dependency>
           <groupId>com.dangdang</groupId>
           <artifactId>elastic-job-lite-core</artifactId>
           <version>${elastic-job.version}</version>
       </dependency>
       <dependency>
           <groupId>com.dangdang</groupId>
           <artifactId>elastic-job-lite-spring</artifactId>
           <version>${elastic-job.version}</version>
       </dependency>

       <!-- spring boot dependency -->

       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-actuator</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-configuration-processor</artifactId>
           <optional>true</optional>
       </dependency>

       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-jdbc</artifactId>
       </dependency>
       <!-- mysql驅動 -->
       <dependency>
           <groupId>mysql</groupId>
           <artifactId>mysql-connector-java</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-data-jpa</artifactId>
       </dependency>
   </dependencies>

   <build>
       <plugins>
           <plugin>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-maven-plugin</artifactId>
           </plugin>
       </plugins>
   </build>

</project>

主要的就是加載這2個依賴

    <!--  elastic-job dependency -->
       <dependency>
           <groupId>com.dangdang</groupId>
           <artifactId>elastic-job-lite-core</artifactId>
           <version>${elastic-job.version}</version>
       </dependency>
       <dependency>
           <groupId>com.dangdang</groupId>
           <artifactId>elastic-job-lite-spring</artifactId>
           <version>${elastic-job.version}</version>
       </dependency>

整合elasticjob

1.編寫配置application.yml



zookeeper:
 address: 192.168.247.7:2181
 namespace: elastic-job
 connectionTimeout: 10000
 sessionTimeout: 10000
 maxRetries: 3

# simplejob配置
simpleJob:
   cron: 0/5 * * * * ?
   shardingTotalCount: 5
   shardingItemParameters: 0=java,1=php,2=erlang,3=angular,4=vue
   jobParameter: source1=public,source2=private
   failover: true
   monitorExecution: true
   monitorPort: 8889
   maxTimeDiffSeconds: -1
   jobShardingStrategyClass: com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy

dataflowJob:
   cron: 0/10 * * * * ?
   shardingTotalCount: 2
   shardingItemParameters: 0=jinan,1=qingdao




############################################################
#
# 配置數據源信息
#
############################################################
spring:
 datasource: # 數據源的相關配置
     type: com.zaxxer.hikari.HikariDataSource          # 數據源類型:HikariCP
     driver-class-name: com.mysql.jdbc.Driver          # mysql驅動
     url: jdbc:mysql://localhost:3306/elasticjob?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false
     username: root
     password: root
     hikari:
       connection-timeout: 30000       # 等待連接池分配連接的最大時長(毫秒),超過這個時長還沒可用的連接則發生SQLException, 默認:30秒
       minimum-idle: 5                 # 最小連接數
       maximum-pool-size: 20           # 最大連接數
       auto-commit: true               # 自動提交
       idle-timeout: 600000            # 連接超時的最大時長(毫秒),超時則被釋放(retired),默認:10分鍾
       pool-name: DateSourceHikariCP     # 連接池名字
       max-lifetime: 1800000           # 連接的生命時長(毫秒),超時而且沒被使用則被釋放(retired),默認:30分鍾 1800000ms
       connection-test-query: SELECT 1






2.注冊中心加載到spring 容器中

/**
* @author: kevin
* @Date: 2020/1/22
*/
@Configuration
@ConditionalOnExpression("'${zookeeper.address}'.length() > 0")
public class RegistryCenterConfig {

   /**
    * 	把注冊中心加載到spring 容器中
    * @return
    */
   @Bean(initMethod = "init")
   public ZookeeperRegistryCenter registryCenter(@Value("${zookeeper.address}") final String serverLists,
                                                 @Value("${zookeeper.namespace}") final String namespace,
                                                 @Value("${zookeeper.connectionTimeout}") final int connectionTimeout,
                                                 @Value("${zookeeper.sessionTimeout}") final int sessionTimeout,
                                                 @Value("${zookeeper.maxRetries}") final int maxRetries) {
       ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverLists, namespace);
       zookeeperConfiguration.setConnectionTimeoutMilliseconds(connectionTimeout);
       zookeeperConfiguration.setSessionTimeoutMilliseconds(sessionTimeout);
       zookeeperConfiguration.setMaxRetries(maxRetries);

       return new ZookeeperRegistryCenter(zookeeperConfiguration);

   }
}

3.配置JobEventConfig事件追蹤

/**
* @author: kevin
* @Date: 2020/1/22
*/
@Configuration
public class JobEventConfig {

   @Autowired
   private DataSource dataSource;

   @Bean
   public JobEventConfiguration jobEventConfiguration() {
       return new JobEventRdbConfiguration(dataSource);
   }

}

4.編寫自己的job並完成配置

4.1 定義自己的job

/**
* @author: kevin
* @Date: 2020/1/22
*/

@Component
public class MySimpleJob implements SimpleJob {
   @Override
   public void execute(ShardingContext shardingContext) {
       System.out.println(shardingContext.getShardingParameter());
   }
}


4.2 編寫job配置

@Configuration
public class MySimpleJobConfig {
   @Autowired
   private ZookeeperRegistryCenter registryCenter;

   @Autowired
   private JobEventConfiguration jobEventConfiguration;

   /**
    * 	具體真正的定時任務執行邏輯
    * @return
    */
   @Bean
   public SimpleJob simpleJob() {
       return new MySimpleJob();
   }

   /**
    * @param simpleJob
    * @return
    */
   @Bean(initMethod = "init")
   public JobScheduler simpleJobScheduler(final SimpleJob simpleJob,
                                          @Value("${simpleJob.cron}") final String cron,
                                          @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
                                          @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters,
                                          @Value("${simpleJob.jobParameter}") final String jobParameter,
                                          @Value("${simpleJob.failover}") final boolean failover,
                                          @Value("${simpleJob.monitorExecution}") final boolean monitorExecution,
                                          @Value("${simpleJob.monitorPort}") final int monitorPort,
                                          @Value("${simpleJob.maxTimeDiffSeconds}") final int maxTimeDiffSeconds,
                                          @Value("${simpleJob.jobShardingStrategyClass}") final String jobShardingStrategyClass) {

       return new SpringJobScheduler(simpleJob,
               registryCenter,
               getLiteJobConfiguration(simpleJob.getClass(),
                       cron,
                       shardingTotalCount,
                       shardingItemParameters,
                       jobParameter,
                       failover,
                       monitorExecution,
                       monitorPort,
                       maxTimeDiffSeconds,
                       jobShardingStrategyClass),
               jobEventConfiguration,
               new SimpleJobListener());

   }


   private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,
                                                        int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,
                                                        boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {

       //定義作業核心配置
       JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
               .newBuilder(jobClass.getName(), cron, shardingTotalCount)
               .misfire(true)
               .failover(failover)
               .jobParameter(jobParameter)
               .shardingItemParameters(shardingItemParameters)
               .build();

       //定義SIMPLE類型配置
       SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());

       //定義Lite作業根配置
       LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration)
               .jobShardingStrategyClass(jobShardingStrategyClass)
               .monitorExecution(monitorExecution)
               .monitorPort(monitorPort)
               .maxTimeDiffSeconds(maxTimeDiffSeconds)
               .overwrite(true)
               .build();

       return liteJobConfiguration;
   }

運行

這里直接啟動后,可以看到控制台打印

java
php
erlang
angular
vue

運維平台

elastic-job也提供運維平台,大家可以去官網下載源碼,然后本地install下。找到elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar 解壓后,直接運行bin下面的運行文件

如圖所示,添加自己的配置


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM