微服务难不难,不难!无非就是一个消费方,一个生产方,一个注册中心,然后就是实现一些微服务,其实微服务的难点在于治理,给你一堆
微服务,如何来管理?这就有很多方面了,比如容器化,服务间通信,服务上下线发布。我今天要说的是任务调度,如果我们将全部服务中那
些任务都拿出来统一管理,不在服务内使用Scheduled
或者Quartz
,是不是很爽呢,而且大神们已经帮我们实现了xxl-job,拿来摩擦一下吧。
作者原创文章,谢绝一切转载,违者必究!
本文只发表在"公众号"和"博客园",其他均属复制粘贴!如果觉得排版不清晰,请查看公众号文章。
准备:
Idea2019.03/Gradle6.0.1/Maven3.6.3/JDK11.0.4/Lombok0.28/SpringBoot2.2.4RELEASE/mybatisPlus3.3.0/Soul2.1.2/Dubbo2.7.5/Druid1.2.21/
Zookeeper3.5.5/Mysql8.0.11/Redis5.0.5/Skywalking7.0.0/XXL-JOB2.2.1
难度: 新手--战士--老兵--大师
目标:
- 对当天注册成功的用户,发送产品促销邮件。使用BEAN模式实现。
- 每5分钟定时扫描订单数据,发现当天30分钟未付款的订单,直接删除。使用HTTP模式实现。
说明:
为了遇见各种问题,同时保持时效性,我尽量使用最新的软件版本。源码地址:https://github.com/xiexiaobiao/vehicle-shop-admin
1 原理
“调度中心”使用线程池对每个任务隔离运行,并按照Cron表达式将任务分配给具体的“执行器”,其中,调度中心和执行器均可实现HA负载均衡。
HA架构如下图所示,多调度中心必须使用同一DB源,并确保机器时钟一致:
2 步骤
2.1 下载运行
下载xxl-job源码,略!再运行其中的sql初始化脚本。IDE打开,先启动xxl-job-admin项目,打开http://localhost:8080/xxl-job-admin/
默认账号密码 admin/123456
,即可进入调度中心UI。
2.2 改造vehicle-shop-admin项目
我这里只改了customer 和 order 模块,先以order为例: resources/config/application-dev.yml
添加的配置:
### xxl-job xxl: job: admin: #admin address list, such as "http://address" or "http://address01,http://address02" addresses: http://127.0.0.1:8080/xxl-job-admin #调度中心地址 accessToken: 6eccc15a-5fa2-4737-a606-f74d4f3cee61 #需要与调度中心配对使用 # 执行器配置信息 executor: appname: vehicle-admin-order-service address: #default use address to registry , otherwise use ip:port if address is null ip: 127.0.0.1 port: 9998 logpath: /data/applogs/xxl-job/jobhandler logretentiondays: 30 #日志保留时长
以上代码解析:accessToken需要与调度中心配对使用,要么都用,要么都不用。addresses调度中心地址可以有多个,直接逗号隔开即可。
com.biao.shop.order.conf.XxlJobConfig
@Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
以上代码解析:XxlJobSpringExecutor是具体执行器类,结合配置文件和SpringBean机制,完成Bean自动注入,实现OnLine 机器地址自动注册。
com.biao.shop.order.controller.OrderController
@RestController @RequestMapping("/order") public class OrderController { ... // 省略部分代码 ... @SoulClient(path = "/vehicle/order/autoCancel", desc = "自动取消未支付订单") @GetMapping("/autoCancel") public ObjectResponse<Integer> autoCancelOrder(){ return orderService.autoCancelOrder(); } }
以上代码解析: @SoulClient为soul网关注解,即网关转发地址,详细可以看我之前的文章,有关Soul网关的。
com.biao.shop.order.impl.OrderServiceImpl
@Service public class OrderServiceImpl extends ServiceImpl<ShopOrderDao, ShopOrderEntity> implements OrderService { ... // 省略部分代码 ... @Override public ObjectResponse<Integer> autoCancelOrder() { ObjectResponse<Integer> response = new ObjectResponse<>(); try{ // 查找当天30分钟内未付款订单 List<ShopOrderEntity> orderEntityList = shopOrderDao.selectList(new LambdaQueryWrapper<ShopOrderEntity>() .gt(ShopOrderEntity::getGenerateDate, LocalDate.now()) .lt(ShopOrderEntity::getGenerateDate,LocalDateTime.now().minusMinutes(30L))); if (!Objects.isNull(orderEntityList) && !orderEntityList.isEmpty()){ int result = shopOrderDao.deleteBatchIds(orderEntityList); response.setCode(RespStatusEnum.SUCCESS.getCode()); response.setMessage(RespStatusEnum.SUCCESS.getMessage()); response.setData(result); } return response; }catch (Exception e){ response.setCode(RespStatusEnum.FAIL.getCode()); response.setMessage(RespStatusEnum.FAIL.getMessage()); response.setData(null); return response; } } /** * 这里为了演示http模式,直接使用参数: * url: http://127.0.0.1:9195/order/vehicle/order/autoCancel * method: get * data: content */ @XxlJob("autoCancelOrderJobHandler") public ReturnT<String> autoCancelOrderJob( String param ){ // param parse if (param==null || param.trim().length()==0) { XxlJobLogger.log("param["+ param +"] invalid."); return ReturnT.FAIL; } String[] httpParams = param.split("\n"); String url = null; String method = null; String data = null; for (String httpParam: httpParams) { if (httpParam.startsWith("url:")) { url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); } if (httpParam.startsWith("method:")) { method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); System.out.println("method>>>>>>>>"+ method); } if (httpParam.startsWith("data:")) { data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); } } // param valid if (url==null || url.trim().length()==0) { XxlJobLogger.log("url["+ url +"] invalid."); return ReturnT.FAIL; } // 限制只支持 "GET" 和 "POST" if (method==null || !Arrays.asList("GET", "POST").contains(method)) { XxlJobLogger.log("method["+ method +"] invalid."); return ReturnT.FAIL; } // request HttpURLConnection connection = null; BufferedReader bufferedReader = null; try { // connection URL realUrl = new URL(url); connection = (HttpURLConnection) realUrl.openConnection(); // connection setting connection.setRequestMethod(method); connection.setDoOutput(true); connection.setDoInput(true); connection.setUseCaches(false); connection.setReadTimeout(5 * 1000); connection.setConnectTimeout(3 * 1000); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); // do connection connection.connect(); // data if (data!=null && data.trim().length()>0) { DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); dataOutputStream.write(data.getBytes("UTF-8")); dataOutputStream.flush(); dataOutputStream.close(); } // valid StatusCode int statusCode = connection.getResponseCode(); if (statusCode != 200) { throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); } // result bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line; while ((line = bufferedReader.readLine()) != null) { result.append(line); } String responseMsg = result.toString(); XxlJobLogger.log(responseMsg); return ReturnT.SUCCESS; } catch (Exception e) { XxlJobLogger.log(e); return ReturnT.FAIL; } finally { try { if (bufferedReader != null) { bufferedReader.close(); } if (connection != null) { connection.disconnect(); } } catch (Exception e2) { XxlJobLogger.log(e2); } } } }
以上代码解析: 1.autoCancelOrder方法直接实现了查找当天30分钟内未付款订单并批量删除的业务需求, 2.autoCancelOrderJob方法
则是跨平台HTTP任务模式的实现,通过通用的URL连接访问方式实现跨平台,即只需有URL地址即可,外部异构系统都可接入Xxl-Job,
系统内则实现了解耦的目的。需注意的是代码中限制了"GET"/"POST",其他动词我没测试,其实这两个也够用了。
Customer模块,因需使用邮件功能,我先在com.biao.shop.common.utils.MailUtil
实现了邮件发送功能,直接使用了javax.mail包,腾
讯的SMTP服务,代码明了,非本文目标知识点,不解释:
public class MailUtil { @Value("${spring.mail.host}") private static String host; @Value("${spring.mail.port}") private static String port; @Value("${spring.mail.sendMail}") private static String sendMail; @Value("${spring.mail.password}") private static String myEmailPassword; @Value("${spring.mail.properties.mail.smtp.auth}") private static String fallback; // false @Value("${spring.mail.properties.mail.smtp.socketFactory.class}") private static String socketFactory; @Resource private static JavaMailSender mailSender; public static boolean sendMailTo(String userName,String receiveMail) throws Exception { // JavaMailSender javaMailSender = new JavaMailSenderImpl(); Properties props = new Properties(); props.setProperty("mail.transport.protocol", "smtp"); props.setProperty("mail.smtp.host", host); props.setProperty("mail.smtp.port", port); props.setProperty("mail.smtp.auth", "true"); // 如邮箱服务器要求 SMTP 连接需要使用 SSL 安全认证,则需要使用以下配置项 /* SMTP 服务器的端口 (非 SSL 连接的端口一般默认为 25, 可以不添加, 如果开启了 SSL 连接, 需要改为对应邮箱的 SMTP 服务器的端口, 具体可查看对应邮箱服务的帮助, QQ邮箱的SMTP(SLL)端口为465或587, 其他邮箱自行去查看)*/ /*final String smtpPort = "465"; props.setProperty("mail.smtp.port", smtpPort); props.setProperty("mail.smtp.socketFactory.class", socketFactory); props.setProperty("mail.smtp.socketFactory.fallback", fallback); props.setProperty("mail.smtp.socketFactory.port", smtpPort);*/ Session session = Session.getDefaultInstance(props); // 设置为debug模式, 可以查看详细的发送 log session.setDebug(true); MimeMessage message = createMimeMessage(userName,session, sendMail, receiveMail); Transport transport = session.getTransport(); transport.connect(sendMail, myEmailPassword); mailSender.send(message); // Send the given array of JavaMail MIME messages in batch. // void send(MimeMessage... mimeMessages) throws MailException; transport.close(); return true; } static MimeMessage createMimeMessage(String userName,Session session, String sendMail, String receiveMail) throws Exception { // MIME邮件类型,还有一种为简单邮件类型 MimeMessage message = new MimeMessage(session); message.setFrom(new InternetAddress(sendMail, "龙岗汽车", "UTF-8")); // 可以增加多个收件人、抄送、密送 message.setRecipient(MimeMessage.RecipientType.TO, new InternetAddress(receiveMail, userName, "UTF-8")); // 邮件主题 message.setSubject("新品信息", "UTF-8"); // 邮件正文(可以使用html标签) message.setContent(userName + ",您好,新品到店,快来体验", "text/html;charset=UTF-8"); // 设置发件时间 message.setSentDate(new Date()); // 保存设置 message.saveChanges(); return message; } }
然后在com.biao.shop.customer.impl.ShopClientServiceImpl
实现找出当天注册的用户,并发送邮件信息的业务需求:
@org.springframework.stereotype.Service @Slf4j public class ShopClientServiceImpl extends ServiceImpl<ShopClientDao, ShopClientEntity> implements ShopClientService { ... // 省略部分代码 ... @Override @XxlJob("autoSendPromotionJobHandler") public ReturnT<String> autoSendPromotion(String param) { try{ // 找出当天注册的用户 List<ShopClientEntity> clientEntityList = shopClientDao.selectList(new LambdaQueryWrapper<ShopClientEntity>() .gt(ShopClientEntity::getGenerateDate,LocalDate.now()) .lt(ShopClientEntity::getGenerateDate,LocalDate.now().plusDays(1L))); // 发送邮件信息 if (!Objects.isNull(clientEntityList) && !clientEntityList.isEmpty()){ // shopClientEntity中需要设计用户邮箱地址,我这里简化为一个固定的邮箱地址 clientEntityList.forEach(shopClientEntity -> { try { MailUtil.sendMailTo(shopClientEntity.getClientName(),mailReceiverAddr); } catch (Exception e) { e.printStackTrace(); } }); } return ReturnT.SUCCESS; }catch (Exception e){ return ReturnT.FAIL; } } }
其他,如引入依赖之类的边角料代码我就直接略过了 !请直接看代码吧!
2.3 添加执行器
AppName对应resources/config/application-dev.yml
中的 xxl.job.executor.appname
配置项,自动注册即通过引入xxl-job-core依赖
和 XxlJobSpringExecutor自动注册,手动录入,即直接填写URL地址和端口信息。
再启动应用vehicle-shop-admin项目,建议将mysql/redis作为window服务开机自启动,顺序: MySQL—>souladmin—>soulbootstrap
—>redis—>authority—>customer—>stock—>order —>business
,等待一会,“OnLine 机器地址”就会显示自动注册节点,
比如下图的customer微服务:
2.4 添加任务
WEBUI界面—>任务管理:
建立发送邮件给新用户的任务,重要参数有:
- JobHandler对应到com.biao.shop.customer.impl.ShopClientServiceImpl 中的@XxlJob("autoSendPromotionJobHandler");
- 路由策略,即分配任务给多个执行者时的策略;阻塞处理策略,即某个执行者的一个任务还在执行,而后同类任务又到达该执行者时的处理;
- 运行模式是指任务源码位置,几个GLUE模式是执行器代码直接在调度中心编辑并保存,BEAN则是类模式,需自定义JobHandler类,但不支持自动扫描任务并注入到执行器容器,需要手动注入,就像下图这样,点“新增”后:
建立“自动取消无效订单”任务,注意“任务参数”,对应于com.biao.shop.order.impl.OrderServiceImpl
中 ReturnT autoCancelOrderJob( String param )的实参,
方法中对 param 进行解析,注意其中的url是soul网关的地址,因为我将整个项目都做了网关转发。
操作-->执行一次,再到调度日志中查看,订单服务调度日志:
客户服务调度日志:
最后看个首页报表图,总结就是:有UI,就是爽!
留个问题请君思考:自动取消30分钟未付款的订单,你能想到多少种方案呢?
作者原创文章,谢绝一切转载,违者必究!
全文完!
我的其他文章:
- 1 Dubbo学习系列之十八(Skywalking服务跟踪)
- 2 Spring优雅整合Redis缓存
- 3 SOFARPC模式下的Consul注册中心
- 4 八种控制线程顺序的方法
- 5 移动应用APP购物车(店铺系列二)
只写原创,敬请关注