微服務難不難,不難!無非就是一個消費方,一個生產方,一個注冊中心,然后就是實現一些微服務,其實微服務的難點在於治理,給你一堆
微服務,如何來管理?這就有很多方面了,比如容器化,服務間通信,服務上下線發布。我今天要說的是任務調度,如果我們將全部服務中那
些任務都拿出來統一管理,不在服務內使用Scheduled
或者Quartz
,是不是很爽呢,而且大神們已經幫我們實現了xxl-job,拿來摩擦一下吧。
作者原創文章,謝絕一切轉載,違者必究!
本文只發表在"公眾號"和"博客園",其他均屬復制粘貼!如果覺得排版不清晰,請查看公眾號文章。
准備:
Idea2019.03/Gradle6.0.1/Maven3.6.3/JDK11.0.4/Lombok0.28/SpringBoot2.2.4RELEASE/mybatisPlus3.3.0/Soul2.1.2/Dubbo2.7.5/Druid1.2.21/
Zookeeper3.5.5/Mysql8.0.11/Redis5.0.5/Skywalking7.0.0/XXL-JOB2.2.1
難度: 新手--戰士--老兵--大師
目標:
- 對當天注冊成功的用戶,發送產品促銷郵件。使用BEAN模式實現。
- 每5分鍾定時掃描訂單數據,發現當天30分鍾未付款的訂單,直接刪除。使用HTTP模式實現。
說明:
為了遇見各種問題,同時保持時效性,我盡量使用最新的軟件版本。源碼地址:https://github.com/xiexiaobiao/vehicle-shop-admin
1 原理
“調度中心”使用線程池對每個任務隔離運行,並按照Cron表達式將任務分配給具體的“執行器”,其中,調度中心和執行器均可實現HA負載均衡。
HA架構如下圖所示,多調度中心必須使用同一DB源,並確保機器時鍾一致:
2 步驟
2.1 下載運行
下載xxl-job源碼,略!再運行其中的sql初始化腳本。IDE打開,先啟動xxl-job-admin項目,打開http://localhost:8080/xxl-job-admin/
默認賬號密碼 admin/123456
,即可進入調度中心UI。
2.2 改造vehicle-shop-admin項目
我這里只改了customer 和 order 模塊,先以order為例: resources/config/application-dev.yml
添加的配置:
### xxl-job xxl: job: admin: #admin address list, such as "http://address" or "http://address01,http://address02" addresses: http://127.0.0.1:8080/xxl-job-admin #調度中心地址 accessToken: 6eccc15a-5fa2-4737-a606-f74d4f3cee61 #需要與調度中心配對使用 # 執行器配置信息 executor: appname: vehicle-admin-order-service address: #default use address to registry , otherwise use ip:port if address is null ip: 127.0.0.1 port: 9998 logpath: /data/applogs/xxl-job/jobhandler logretentiondays: 30 #日志保留時長
以上代碼解析:accessToken需要與調度中心配對使用,要么都用,要么都不用。addresses調度中心地址可以有多個,直接逗號隔開即可。
com.biao.shop.order.conf.XxlJobConfig
@Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
以上代碼解析:XxlJobSpringExecutor是具體執行器類,結合配置文件和SpringBean機制,完成Bean自動注入,實現OnLine 機器地址自動注冊。
com.biao.shop.order.controller.OrderController
@RestController @RequestMapping("/order") public class OrderController { ... // 省略部分代碼 ... @SoulClient(path = "/vehicle/order/autoCancel", desc = "自動取消未支付訂單") @GetMapping("/autoCancel") public ObjectResponse<Integer> autoCancelOrder(){ return orderService.autoCancelOrder(); } }
以上代碼解析: @SoulClient為soul網關注解,即網關轉發地址,詳細可以看我之前的文章,有關Soul網關的。
com.biao.shop.order.impl.OrderServiceImpl
@Service public class OrderServiceImpl extends ServiceImpl<ShopOrderDao, ShopOrderEntity> implements OrderService { ... // 省略部分代碼 ... @Override public ObjectResponse<Integer> autoCancelOrder() { ObjectResponse<Integer> response = new ObjectResponse<>(); try{ // 查找當天30分鍾內未付款訂單 List<ShopOrderEntity> orderEntityList = shopOrderDao.selectList(new LambdaQueryWrapper<ShopOrderEntity>() .gt(ShopOrderEntity::getGenerateDate, LocalDate.now()) .lt(ShopOrderEntity::getGenerateDate,LocalDateTime.now().minusMinutes(30L))); if (!Objects.isNull(orderEntityList) && !orderEntityList.isEmpty()){ int result = shopOrderDao.deleteBatchIds(orderEntityList); response.setCode(RespStatusEnum.SUCCESS.getCode()); response.setMessage(RespStatusEnum.SUCCESS.getMessage()); response.setData(result); } return response; }catch (Exception e){ response.setCode(RespStatusEnum.FAIL.getCode()); response.setMessage(RespStatusEnum.FAIL.getMessage()); response.setData(null); return response; } } /** * 這里為了演示http模式,直接使用參數: * url: http://127.0.0.1:9195/order/vehicle/order/autoCancel * method: get * data: content */ @XxlJob("autoCancelOrderJobHandler") public ReturnT<String> autoCancelOrderJob( String param ){ // param parse if (param==null || param.trim().length()==0) { XxlJobLogger.log("param["+ param +"] invalid."); return ReturnT.FAIL; } String[] httpParams = param.split("\n"); String url = null; String method = null; String data = null; for (String httpParam: httpParams) { if (httpParam.startsWith("url:")) { url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); } if (httpParam.startsWith("method:")) { method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); System.out.println("method>>>>>>>>"+ method); } if (httpParam.startsWith("data:")) { data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); } } // param valid if (url==null || url.trim().length()==0) { XxlJobLogger.log("url["+ url +"] invalid."); return ReturnT.FAIL; } // 限制只支持 "GET" 和 "POST" if (method==null || !Arrays.asList("GET", "POST").contains(method)) { XxlJobLogger.log("method["+ method +"] invalid."); return ReturnT.FAIL; } // request HttpURLConnection connection = null; BufferedReader bufferedReader = null; try { // connection URL realUrl = new URL(url); connection = (HttpURLConnection) realUrl.openConnection(); // connection setting connection.setRequestMethod(method); connection.setDoOutput(true); connection.setDoInput(true); connection.setUseCaches(false); connection.setReadTimeout(5 * 1000); connection.setConnectTimeout(3 * 1000); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); // do connection connection.connect(); // data if (data!=null && data.trim().length()>0) { DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); dataOutputStream.write(data.getBytes("UTF-8")); dataOutputStream.flush(); dataOutputStream.close(); } // valid StatusCode int statusCode = connection.getResponseCode(); if (statusCode != 200) { throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); } // result bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line; while ((line = bufferedReader.readLine()) != null) { result.append(line); } String responseMsg = result.toString(); XxlJobLogger.log(responseMsg); return ReturnT.SUCCESS; } catch (Exception e) { XxlJobLogger.log(e); return ReturnT.FAIL; } finally { try { if (bufferedReader != null) { bufferedReader.close(); } if (connection != null) { connection.disconnect(); } } catch (Exception e2) { XxlJobLogger.log(e2); } } } }
以上代碼解析: 1.autoCancelOrder方法直接實現了查找當天30分鍾內未付款訂單並批量刪除的業務需求, 2.autoCancelOrderJob方法
則是跨平台HTTP任務模式的實現,通過通用的URL連接訪問方式實現跨平台,即只需有URL地址即可,外部異構系統都可接入Xxl-Job,
系統內則實現了解耦的目的。需注意的是代碼中限制了"GET"/"POST",其他動詞我沒測試,其實這兩個也夠用了。
Customer模塊,因需使用郵件功能,我先在com.biao.shop.common.utils.MailUtil
實現了郵件發送功能,直接使用了javax.mail包,騰
訊的SMTP服務,代碼明了,非本文目標知識點,不解釋:
public class MailUtil { @Value("${spring.mail.host}") private static String host; @Value("${spring.mail.port}") private static String port; @Value("${spring.mail.sendMail}") private static String sendMail; @Value("${spring.mail.password}") private static String myEmailPassword; @Value("${spring.mail.properties.mail.smtp.auth}") private static String fallback; // false @Value("${spring.mail.properties.mail.smtp.socketFactory.class}") private static String socketFactory; @Resource private static JavaMailSender mailSender; public static boolean sendMailTo(String userName,String receiveMail) throws Exception { // JavaMailSender javaMailSender = new JavaMailSenderImpl(); Properties props = new Properties(); props.setProperty("mail.transport.protocol", "smtp"); props.setProperty("mail.smtp.host", host); props.setProperty("mail.smtp.port", port); props.setProperty("mail.smtp.auth", "true"); // 如郵箱服務器要求 SMTP 連接需要使用 SSL 安全認證,則需要使用以下配置項 /* SMTP 服務器的端口 (非 SSL 連接的端口一般默認為 25, 可以不添加, 如果開啟了 SSL 連接, 需要改為對應郵箱的 SMTP 服務器的端口, 具體可查看對應郵箱服務的幫助, QQ郵箱的SMTP(SLL)端口為465或587, 其他郵箱自行去查看)*/ /*final String smtpPort = "465"; props.setProperty("mail.smtp.port", smtpPort); props.setProperty("mail.smtp.socketFactory.class", socketFactory); props.setProperty("mail.smtp.socketFactory.fallback", fallback); props.setProperty("mail.smtp.socketFactory.port", smtpPort);*/ Session session = Session.getDefaultInstance(props); // 設置為debug模式, 可以查看詳細的發送 log session.setDebug(true); MimeMessage message = createMimeMessage(userName,session, sendMail, receiveMail); Transport transport = session.getTransport(); transport.connect(sendMail, myEmailPassword); mailSender.send(message); // Send the given array of JavaMail MIME messages in batch. // void send(MimeMessage... mimeMessages) throws MailException; transport.close(); return true; } static MimeMessage createMimeMessage(String userName,Session session, String sendMail, String receiveMail) throws Exception { // MIME郵件類型,還有一種為簡單郵件類型 MimeMessage message = new MimeMessage(session); message.setFrom(new InternetAddress(sendMail, "龍崗汽車", "UTF-8")); // 可以增加多個收件人、抄送、密送 message.setRecipient(MimeMessage.RecipientType.TO, new InternetAddress(receiveMail, userName, "UTF-8")); // 郵件主題 message.setSubject("新品信息", "UTF-8"); // 郵件正文(可以使用html標簽) message.setContent(userName + ",您好,新品到店,快來體驗", "text/html;charset=UTF-8"); // 設置發件時間 message.setSentDate(new Date()); // 保存設置 message.saveChanges(); return message; } }
然后在com.biao.shop.customer.impl.ShopClientServiceImpl
實現找出當天注冊的用戶,並發送郵件信息的業務需求:
@org.springframework.stereotype.Service @Slf4j public class ShopClientServiceImpl extends ServiceImpl<ShopClientDao, ShopClientEntity> implements ShopClientService { ... // 省略部分代碼 ... @Override @XxlJob("autoSendPromotionJobHandler") public ReturnT<String> autoSendPromotion(String param) { try{ // 找出當天注冊的用戶 List<ShopClientEntity> clientEntityList = shopClientDao.selectList(new LambdaQueryWrapper<ShopClientEntity>() .gt(ShopClientEntity::getGenerateDate,LocalDate.now()) .lt(ShopClientEntity::getGenerateDate,LocalDate.now().plusDays(1L))); // 發送郵件信息 if (!Objects.isNull(clientEntityList) && !clientEntityList.isEmpty()){ // shopClientEntity中需要設計用戶郵箱地址,我這里簡化為一個固定的郵箱地址 clientEntityList.forEach(shopClientEntity -> { try { MailUtil.sendMailTo(shopClientEntity.getClientName(),mailReceiverAddr); } catch (Exception e) { e.printStackTrace(); } }); } return ReturnT.SUCCESS; }catch (Exception e){ return ReturnT.FAIL; } } }
其他,如引入依賴之類的邊角料代碼我就直接略過了 !請直接看代碼吧!
2.3 添加執行器
AppName對應resources/config/application-dev.yml
中的 xxl.job.executor.appname
配置項,自動注冊即通過引入xxl-job-core依賴
和 XxlJobSpringExecutor自動注冊,手動錄入,即直接填寫URL地址和端口信息。
再啟動應用vehicle-shop-admin項目,建議將mysql/redis作為window服務開機自啟動,順序: MySQL—>souladmin—>soulbootstrap
—>redis—>authority—>customer—>stock—>order —>business
,等待一會,“OnLine 機器地址”就會顯示自動注冊節點,
比如下圖的customer微服務:
2.4 添加任務
WEBUI界面—>任務管理:
建立發送郵件給新用戶的任務,重要參數有:
- JobHandler對應到com.biao.shop.customer.impl.ShopClientServiceImpl 中的@XxlJob("autoSendPromotionJobHandler");
- 路由策略,即分配任務給多個執行者時的策略;阻塞處理策略,即某個執行者的一個任務還在執行,而后同類任務又到達該執行者時的處理;
- 運行模式是指任務源碼位置,幾個GLUE模式是執行器代碼直接在調度中心編輯並保存,BEAN則是類模式,需自定義JobHandler類,但不支持自動掃描任務並注入到執行器容器,需要手動注入,就像下圖這樣,點“新增”后:
建立“自動取消無效訂單”任務,注意“任務參數”,對應於com.biao.shop.order.impl.OrderServiceImpl
中 ReturnT autoCancelOrderJob( String param )的實參,
方法中對 param 進行解析,注意其中的url是soul網關的地址,因為我將整個項目都做了網關轉發。
操作-->執行一次,再到調度日志中查看,訂單服務調度日志:
客戶服務調度日志:
最后看個首頁報表圖,總結就是:有UI,就是爽!
留個問題請君思考:自動取消30分鍾未付款的訂單,你能想到多少種方案呢?
作者原創文章,謝絕一切轉載,違者必究!
全文完!
我的其他文章:
- 1 Dubbo學習系列之十八(Skywalking服務跟蹤)
- 2 Spring優雅整合Redis緩存
- 3 SOFARPC模式下的Consul注冊中心
- 4 八種控制線程順序的方法
- 5 移動應用APP購物車(店鋪系列二)
只寫原創,敬請關注