分布式任務調度系統 xxl-job


 

微服務難不難,不難!無非就是一個消費方,一個生產方,一個注冊中心,然后就是實現一些微服務,其實微服務的難點在於治理,給你一堆

微服務,如何來管理?這就有很多方面了,比如容器化,服務間通信,服務上下線發布。我今天要說的是任務調度,如果我們將全部服務中那

些任務都拿出來統一管理,不在服務內使用Scheduled或者Quartz,是不是很爽呢,而且大神們已經幫我們實現了xxl-job,拿來摩擦一下吧。

作者原創文章,謝絕一切轉載,違者必究!

本文只發表在"公眾號"和"博客園",其他均屬復制粘貼!如果覺得排版不清晰,請查看公眾號文章。 

准備:

Idea2019.03/Gradle6.0.1/Maven3.6.3/JDK11.0.4/Lombok0.28/SpringBoot2.2.4RELEASE/mybatisPlus3.3.0/Soul2.1.2/Dubbo2.7.5/Druid1.2.21/

Zookeeper3.5.5/Mysql8.0.11/Redis5.0.5/Skywalking7.0.0/XXL-JOB2.2.1

難度: 新手--戰士--老兵--大師

目標:

  1. 對當天注冊成功的用戶,發送產品促銷郵件。使用BEAN模式實現。
  2. 每5分鍾定時掃描訂單數據,發現當天30分鍾未付款的訂單,直接刪除。使用HTTP模式實現。

說明:

為了遇見各種問題,同時保持時效性,我盡量使用最新的軟件版本。源碼地址:https://github.com/xiexiaobiao/vehicle-shop-admin

1 原理

“調度中心”使用線程池對每個任務隔離運行,並按照Cron表達式將任務分配給具體的“執行器”,其中,調度中心和執行器均可實現HA負載均衡。

HA架構如下圖所示,多調度中心必須使用同一DB源,並確保機器時鍾一致

2 步驟

2.1 下載運行

下載xxl-job源碼,略!再運行其中的sql初始化腳本。IDE打開,先啟動xxl-job-admin項目,打開http://localhost:8080/xxl-job-admin/

默認賬號密碼 admin/123456,即可進入調度中心UI。

2.2 改造vehicle-shop-admin項目

我這里只改了customer 和 order 模塊,先以order為例: resources/config/application-dev.yml 添加的配置:

### xxl-job
xxl:
  job:
    admin:
      #admin address list, such as "http://address" or "http://address01,http://address02"
      addresses: http://127.0.0.1:8080/xxl-job-admin  #調度中心地址
    accessToken: 6eccc15a-5fa2-4737-a606-f74d4f3cee61     #需要與調度中心配對使用
    # 執行器配置信息
    executor:
      appname: vehicle-admin-order-service
      address: #default use address to registry , otherwise use ip:port if address is null
      ip: 127.0.0.1
      port: 9998
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30 #日志保留時長

以上代碼解析:accessToken需要與調度中心配對使用,要么都用,要么都不用。addresses調度中心地址可以有多個,直接逗號隔開即可。

 

com.biao.shop.order.conf.XxlJobConfig

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.accessToken}")
    private String accessToken;
    @Value("${xxl.job.executor.appname}")
    private String appname;
    @Value("${xxl.job.executor.address}")
    private String address;
    @Value("${xxl.job.executor.ip}")
    private String ip;
    @Value("${xxl.job.executor.port}")
    private int port;
    @Value("${xxl.job.executor.logpath}")
    private String logPath;
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
}

以上代碼解析:XxlJobSpringExecutor是具體執行器類,結合配置文件和SpringBean機制,完成Bean自動注入,實現OnLine 機器地址自動注冊。

 

com.biao.shop.order.controller.OrderController

@RestController
@RequestMapping("/order")
public class OrderController { 
    ...
    //  省略部分代碼
    ...
    @SoulClient(path = "/vehicle/order/autoCancel", desc = "自動取消未支付訂單")
    @GetMapping("/autoCancel")
    public ObjectResponse<Integer> autoCancelOrder(){
        return orderService.autoCancelOrder();
    }
}

以上代碼解析: @SoulClient為soul網關注解,即網關轉發地址,詳細可以看我之前的文章,有關Soul網關的。

 

com.biao.shop.order.impl.OrderServiceImpl

@Service
public class OrderServiceImpl extends ServiceImpl<ShopOrderDao, ShopOrderEntity>  implements OrderService {

     ...
    //  省略部分代碼
    ...

    @Override
    public ObjectResponse<Integer> autoCancelOrder() {
        ObjectResponse<Integer> response = new ObjectResponse<>();
        try{
            // 查找當天30分鍾內未付款訂單
            List<ShopOrderEntity> orderEntityList = shopOrderDao.selectList(new LambdaQueryWrapper<ShopOrderEntity>()
                    .gt(ShopOrderEntity::getGenerateDate, LocalDate.now())
                    .lt(ShopOrderEntity::getGenerateDate,LocalDateTime.now().minusMinutes(30L)));
            if (!Objects.isNull(orderEntityList) && !orderEntityList.isEmpty()){
                int result = shopOrderDao.deleteBatchIds(orderEntityList);
                response.setCode(RespStatusEnum.SUCCESS.getCode());
                response.setMessage(RespStatusEnum.SUCCESS.getMessage());
                response.setData(result);
            }
            return response;
        }catch (Exception e){
            response.setCode(RespStatusEnum.FAIL.getCode());
            response.setMessage(RespStatusEnum.FAIL.getMessage());
            response.setData(null);
            return response;
        }
    }

    /** 
     * 這里為了演示http模式,直接使用參數:
     *      url: http://127.0.0.1:9195/order/vehicle/order/autoCancel
     *      method: get
     *      data: content
     */
    @XxlJob("autoCancelOrderJobHandler")
    public ReturnT<String> autoCancelOrderJob( String param ){
        // param parse
        if (param==null || param.trim().length()==0) {
            XxlJobLogger.log("param["+ param +"] invalid.");
            return ReturnT.FAIL;
        }
        String[] httpParams = param.split("\n");
        String url = null;
        String method = null;
        String data = null;
        for (String httpParam: httpParams) {
            if (httpParam.startsWith("url:")) {
                url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
            }
            if (httpParam.startsWith("method:")) {
                method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
                System.out.println("method>>>>>>>>"+ method);
            }
            if (httpParam.startsWith("data:")) {
                data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
            }
        }

        // param valid
        if (url==null || url.trim().length()==0) {
            XxlJobLogger.log("url["+ url +"] invalid.");
            return ReturnT.FAIL;
        }
        // 限制只支持 "GET" 和 "POST"
        if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
            XxlJobLogger.log("method["+ method +"] invalid.");
            return ReturnT.FAIL;
        }

        // request
        HttpURLConnection connection = null;
        BufferedReader bufferedReader = null;
        try {
            // connection
            URL realUrl = new URL(url);
            connection = (HttpURLConnection) realUrl.openConnection();

            // connection setting
            connection.setRequestMethod(method);
            connection.setDoOutput(true);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setReadTimeout(5 * 1000);
            connection.setConnectTimeout(3 * 1000);
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
            connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

            // do connection
            connection.connect();

            // data
            if (data!=null && data.trim().length()>0) {
                DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                dataOutputStream.write(data.getBytes("UTF-8"));
                dataOutputStream.flush();
                dataOutputStream.close();
            }

            // valid StatusCode
            int statusCode = connection.getResponseCode();
            if (statusCode != 200) {
                throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
            }

            // result
            bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                result.append(line);
            }
            String responseMsg = result.toString();

            XxlJobLogger.log(responseMsg);
            return ReturnT.SUCCESS;
        } catch (Exception e) {
            XxlJobLogger.log(e);
            return ReturnT.FAIL;
        } finally {
            try {
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                if (connection != null) {
                    connection.disconnect();
                }
            } catch (Exception e2) {
                XxlJobLogger.log(e2);
            }
        }
    }
}

以上代碼解析: 1.autoCancelOrder方法直接實現了查找當天30分鍾內未付款訂單並批量刪除的業務需求, 2.autoCancelOrderJob方法

則是跨平台HTTP任務模式的實現,通過通用的URL連接訪問方式實現跨平台,即只需有URL地址即可,外部異構系統都可接入Xxl-Job,

系統內則實現了解耦的目的。需注意的是代碼中限制了"GET"/"POST",其他動詞我沒測試,其實這兩個也夠用了。

 

Customer模塊,因需使用郵件功能,我先在com.biao.shop.common.utils.MailUtil實現了郵件發送功能,直接使用了javax.mail包,騰

訊的SMTP服務,代碼明了,非本文目標知識點,不解釋:

public class MailUtil {

    @Value("${spring.mail.host}")
    private static String host;
    @Value("${spring.mail.port}")
    private static String port;
    @Value("${spring.mail.sendMail}")
    private static String sendMail;
    @Value("${spring.mail.password}")
    private static String myEmailPassword;
    @Value("${spring.mail.properties.mail.smtp.auth}")
    private static String fallback; // false
    @Value("${spring.mail.properties.mail.smtp.socketFactory.class}")
    private static String socketFactory;

    @Resource
    private static JavaMailSender mailSender;
    
    public static boolean sendMailTo(String userName,String receiveMail) throws Exception {
        // JavaMailSender javaMailSender = new JavaMailSenderImpl();
        Properties props = new Properties();
        props.setProperty("mail.transport.protocol", "smtp");
        props.setProperty("mail.smtp.host", host);
        props.setProperty("mail.smtp.port", port);
        props.setProperty("mail.smtp.auth", "true");
        // 如郵箱服務器要求 SMTP 連接需要使用 SSL 安全認證,則需要使用以下配置項
       /*  SMTP 服務器的端口 (非 SSL 連接的端口一般默認為 25, 可以不添加, 如果開啟了 SSL 連接,
                          需要改為對應郵箱的 SMTP 服務器的端口, 具體可查看對應郵箱服務的幫助,
                          QQ郵箱的SMTP(SLL)端口為465或587, 其他郵箱自行去查看)*/
        /*final String smtpPort = "465";
        props.setProperty("mail.smtp.port", smtpPort);
        props.setProperty("mail.smtp.socketFactory.class", socketFactory);
        props.setProperty("mail.smtp.socketFactory.fallback", fallback);
        props.setProperty("mail.smtp.socketFactory.port", smtpPort);*/

        Session session = Session.getDefaultInstance(props);
        // 設置為debug模式, 可以查看詳細的發送 log
        session.setDebug(true);
        MimeMessage message = createMimeMessage(userName,session, sendMail, receiveMail);
        Transport transport = session.getTransport();
        transport.connect(sendMail, myEmailPassword);
        mailSender.send(message);
        // Send the given array of JavaMail MIME messages in batch.
        // void send(MimeMessage... mimeMessages) throws MailException;
        transport.close();
        return true;
    }

    static MimeMessage createMimeMessage(String userName,Session session, String sendMail, String receiveMail) throws Exception {
        // MIME郵件類型,還有一種為簡單郵件類型
        MimeMessage message = new MimeMessage(session);
        message.setFrom(new InternetAddress(sendMail, "龍崗汽車", "UTF-8"));
        // 可以增加多個收件人、抄送、密送
        message.setRecipient(MimeMessage.RecipientType.TO, new InternetAddress(receiveMail, userName, "UTF-8"));
        // 郵件主題
        message.setSubject("新品信息", "UTF-8");
        // 郵件正文(可以使用html標簽)
        message.setContent(userName + ",您好,新品到店,快來體驗", "text/html;charset=UTF-8");
        // 設置發件時間
        message.setSentDate(new Date());
        // 保存設置
        message.saveChanges();
        return message;
    }
}

 

然后在com.biao.shop.customer.impl.ShopClientServiceImpl 實現找出當天注冊的用戶,並發送郵件信息的業務需求:

@org.springframework.stereotype.Service
@Slf4j
public class ShopClientServiceImpl extends ServiceImpl<ShopClientDao, ShopClientEntity> implements ShopClientService {
     ...
    //  省略部分代碼
    ...
    @Override
    @XxlJob("autoSendPromotionJobHandler")
    public ReturnT<String> autoSendPromotion(String param) {
        try{
            // 找出當天注冊的用戶
            List<ShopClientEntity> clientEntityList =
                    shopClientDao.selectList(new LambdaQueryWrapper<ShopClientEntity>()
                            .gt(ShopClientEntity::getGenerateDate,LocalDate.now())
                            .lt(ShopClientEntity::getGenerateDate,LocalDate.now().plusDays(1L)));
            // 發送郵件信息
            if (!Objects.isNull(clientEntityList) && !clientEntityList.isEmpty()){
                // shopClientEntity中需要設計用戶郵箱地址,我這里簡化為一個固定的郵箱地址
                clientEntityList.forEach(shopClientEntity -> {
                    try {
                        MailUtil.sendMailTo(shopClientEntity.getClientName(),mailReceiverAddr);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
            return ReturnT.SUCCESS;
        }catch (Exception e){
            return ReturnT.FAIL;
        }
    }
}
 

其他,如引入依賴之類的邊角料代碼我就直接略過了 !請直接看代碼吧!

2.3 添加執行器

AppName對應resources/config/application-dev.yml中的 xxl.job.executor.appname 配置項,自動注冊即通過引入xxl-job-core依賴

和 XxlJobSpringExecutor自動注冊,手動錄入,即直接填寫URL地址和端口信息。

再啟動應用vehicle-shop-admin項目,建議將mysql/redis作為window服務開機自啟動,順序: MySQL—>souladmin—>soulbootstrap

—>redis—>authority—>customer—>stock—>order —>business,等待一會,“OnLine 機器地址”就會顯示自動注冊節點,

比如下圖的customer微服務:

2.4 添加任務

WEBUI界面—>任務管理:

建立發送郵件給新用戶的任務,重要參數有:

  • JobHandler對應到com.biao.shop.customer.impl.ShopClientServiceImpl 中的@XxlJob("autoSendPromotionJobHandler");
  • 路由策略,即分配任務給多個執行者時的策略;阻塞處理策略,即某個執行者的一個任務還在執行,而后同類任務又到達該執行者時的處理;
  • 運行模式是指任務源碼位置,幾個GLUE模式是執行器代碼直接在調度中心編輯並保存,BEAN則是類模式,需自定義JobHandler類,但不支持自動掃描任務並注入到執行器容器,需要手動注入,就像下圖這樣,點“新增”后:

 

建立“自動取消無效訂單”任務,注意“任務參數”,對應於com.biao.shop.order.impl.OrderServiceImpl 中 ReturnT autoCancelOrderJob( String param )的實參,

方法中對 param 進行解析,注意其中的url是soul網關的地址,因為我將整個項目都做了網關轉發。

 

操作-->執行一次,再到調度日志中查看,訂單服務調度日志:

 

客戶服務調度日志:

 

最后看個首頁報表圖,總結就是:有UI,就是爽!

 

留個問題請君思考:自動取消30分鍾未付款的訂單,你能想到多少種方案呢?

作者原創文章,謝絕一切轉載,違者必究!

全文完!


我的其他文章:

只寫原創,敬請關注 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM