springboot 使用xxl-job (版本2.2)做任務調度處理


 

  quartz的不足

  Quartz作為開源作業調度中的佼佼者,是作業調度的首選。但是集群環境中Quartz采用API的方式對任務進行管理,從而可以避免上述問題,但是同樣存在以下問題:

  • 問題一:調用API的的方式操作任務,不人性化;
  • 問題二:需要持久化業務QuartzJobBean到底層數據表中,系統侵入性相當嚴重。
  • 問題三:調度邏輯和QuartzJobBean耦合在同一個項目中,這將導致一個問題,在調度任務數量逐漸增多,同時調度任務邏輯逐漸加重的情況下,此時調度系統的性能將大大受限於業務;
  • 問題四:quartz底層以“搶占式”獲取DB鎖並由搶占成功節點負責運行任務,會導致節點負載懸殊非常大;而XXL-JOB通過執行器實現“協同分配式”運行任務,充分發揮集群優勢,負載各節點均衡。

  XXL-JOB彌補了quartz的上述不足之處。

  xxl-job官方文檔:https://www.xuxueli.com/xxl-job/

  XXL-JOB是一個分布式任務調度平台,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。現已開放源代碼並接入多家公司線上產品線,開箱即用。

  1. 克隆完代碼后,進入doc/db目錄,將sql復制到MySQL執行一遍創建數據庫。
  2. 進入xxl-job-admin目錄,修改application.properties文件,將數據源的配置信息修改為自己剛剛創建的數據庫。
  3. 啟動xxl-job-admin項目,訪問http://localhost:8080/xxl-job-admin,用戶名:admin,密碼:123456,到此調度中心部署完畢。

  運行調度器,登錄后效果:

  • 部署執行器,克隆的代碼里面有官方的示例項目,可以直接運行,也可以集成到自己的項目中,下面集成到自己的項目,在pom.xml加入依賴,如下
   <!-- 任務調度xxl-job -->
        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.2.0</version>
        </dependency>
  • 創建XxlJobConfig配置類
package com.xj.demo.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class XxlJobConfig {

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /**
     * 針對多網卡、容器內部署等情況,可借助 "spring-cloud-commons" 提供的 "InetUtils" 組件靈活定制注冊IP;
     *
     *      1、引入依賴:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器啟動變量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、獲取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */

}
  • 創建XxlJobService類,重點關注@XxlJob,后面要用到
package com.xj.demo.service;

import com.xj.demo.annotation.JwtIgnore;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.TimeUnit;

/**
 * XxlJob開發示例(Bean模式)
 *
 * 開發步驟:
 * 1、在Spring Bean實例中,開發Job方法,方式格式要求為 "public ReturnT<String> execute(String param)"
 * 2、為Job方法添加注解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")",注解value值對應的是調度中心新建任務的JobHandler屬性的值。
 * 3、執行日志:需要通過 "XxlJobLogger.log" 打印執行日志;
 *
 * @author xuxueli 2019-12-11 21:52:51
 */
@Slf4j
@Component

public class XxlJobService {

    /**
     * 1、簡單任務示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    @JwtIgnore
    public ReturnT<String> demoJobHandler(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, Hello World.");
        log.info("執行器執行成功");
        for (int i = 0; i < 5; i++) {
            XxlJobLogger.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
        return ReturnT.SUCCESS;
    }

    /**
     * 2、分片廣播任務
     */
    @XxlJob("shardingJobHandler")
    public ReturnT<String> shardingJobHandler(String param) throws Exception {

        // 分片參數
        ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
        XxlJobLogger.log("分片參數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal());

        // 業務邏輯
        for (int i = 0; i < shardingVO.getTotal(); i++) {
            if (i == shardingVO.getIndex()) {
                XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
            } else {
                XxlJobLogger.log("第 {} 片, 忽略", i);
            }
        }

        return ReturnT.SUCCESS;
    }

    /**
     * 3、命令行任務
     */
    @XxlJob("commandJobHandler")
    public ReturnT<String> commandJobHandler(String param) throws Exception {
        String command = param;
        int exitValue = -1;

        BufferedReader bufferedReader = null;
        try {
            // command process
            Process process = Runtime.getRuntime().exec(command);
            BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
            bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

            // command log
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                XxlJobLogger.log(line);
            }

            // command exit
            process.waitFor();
            exitValue = process.exitValue();
        } catch (Exception e) {
            XxlJobLogger.log(e);
        } finally {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
        }

        if (exitValue == 0) {
            return IJobHandler.SUCCESS;
        } else {
            return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed");
        }
    }

    /**
     * 4、跨平台Http任務
     */
    @XxlJob("httpJobHandler")
    public ReturnT<String> httpJobHandler(String param) throws Exception {

        // request
        HttpURLConnection connection = null;
        BufferedReader bufferedReader = null;
        try {
            // connection
            URL realUrl = new URL(param);
            connection = (HttpURLConnection) realUrl.openConnection();

            // connection setting
            connection.setRequestMethod("GET");
            connection.setDoOutput(true);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setReadTimeout(5 * 1000);
            connection.setConnectTimeout(3 * 1000);
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
            connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

            // do connection
            connection.connect();

            //Map<String, List<String>> map = connection.getHeaderFields();

            // valid StatusCode
            int statusCode = connection.getResponseCode();
            if (statusCode != 200) {
                throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
            }

            // result
            bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                result.append(line);
            }
            String responseMsg = result.toString();

            XxlJobLogger.log(responseMsg);
            return ReturnT.SUCCESS;
        } catch (Exception e) {
            XxlJobLogger.log(e);
            return ReturnT.FAIL;
        } finally {
            try {
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                if (connection != null) {
                    connection.disconnect();
                }
            } catch (Exception e2) {
                XxlJobLogger.log(e2);
            }
        }

    }

    /**
     * 5、生命周期任務示例:任務初始化與銷毀時,支持自定義相關邏輯;
     */
    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public ReturnT<String> demoJobHandler2(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, Hello World.");
        return ReturnT.SUCCESS;
    }

    public void init(){
        log.info("init");
    }

    public void destroy(){
        log.info("destory");
    }

}
  • 在配置文件中,添加如下配置
spring:
  application:
    name: xxljob-demo-server
server:
  port: 8090


# xxl-job配置
xxl:
  job:
    admin:
      # 調度中心部署跟地址 [選填]:如調度中心集群部署存在多個地址則用逗號分隔。執行器將會使用該地址進行"執行器心跳注冊"和"任務結果回調";為空則關閉自動注冊;
      addresses: http://localhost:8080/xxl-job-admin
    executor:
      # 執行器注冊 [選填]:優先使用該配置作為注冊地址,為空時使用內嵌服務 ”IP:PORT“ 作為注冊地址。從而更靈活的支持容器類型執行器動態IP和動態映射端口問題。
      address:
      # 執行器AppName [選填]:執行器心跳注冊分組依據;為空則關閉自動注冊
      appname: demo-app
      # 執行器IP [選填]:默認為空表示自動獲取IP,多網卡時可手動設置指定IP,該IP不會綁定Host僅作為通訊實用;地址信息用於 "執行器注冊" 和 "調度中心請求並觸發任務";
      ip:
      # 執行器端口號 [選填]:小於等於0則自動獲取;默認端口為9999,單機部署多個執行器時,注意要配置不同執行器端口;
      port: 9999
      # 執行器運行日志文件存儲磁盤路徑 [選填] :需要對該路徑擁有讀寫權限;為空則使用默認路徑;
      logpath: /Users/luoyu/Documents/log/xxl-job/jobhandler
      # 執行器日志文件保存天數 [選填] : 過期日志自動清理, 限制值大於等於3時生效; 否則, 如-1, 關閉自動清理功能;
      logretentiondays: 15
    # 執行器通訊TOKEN [選填]:非空時啟用;
    accessToken: 

  注意:

  1. 重點關注appname,后面要用到
  2. 需要調度中心跟執行器進行認證的話,把兩者的accessToken配置一樣的即可
  • 配置好的完整目錄結構如下:

  • 使用流程如下:
  1. 啟動調度中心項目,創建執行器,如圖

  •  在任務管理,選擇上圖創建的執行器

  創建成功后,在操作那單擊啟動,執行結果

  xxl-job 架構圖



 轉載: https://www.jianshu.com/p/20ac45a76fde


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM