一、简介
1、添加jar
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.1.2</version> </dependency>
2、实例化执行器
@Slf4j //控制是否实例化 @ConditionalOnProperty(value = "xxljob.enabled", matchIfMissing = true) @Configuration public class XxlJobConfig { //服务端地址 @Value("${xxl.job.admin.addresses}") private String adminAddresses; //执行器名称 @Value("${xxl.job.executor.appName}") private String appName; //客户端端ip ${spring.cloud.client.ip-address} @Value("${xxl.job.executor.ip}") private String ip; //端口 @Value("${xxl.job.executor.port}") private int port; // //安全凭据 // @Value("${xxl.job.accessToken}") // private String accessToken; // //日志地址 // @Value("${xxl.job.executor.logPath}") // private String logPath; // //日志留存时间 // @Value("${xxl.job.executor.logRetentionDays}") // private int logRetentionDays; // 实例化执行器bean @Bean public XxlJobSpringExecutor xxlJobExecutor() { log.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppName(appName); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); // xxlJobSpringExecutor.setAccessToken(accessToken); // xxlJobSpringExecutor.setLogPath(logPath); // xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
本文是基于2.1.2版本来解析,其他版本的源码实现稍有不同。
二、XxlJobSpringExecutor实例化
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, InitializingBean, DisposableBean
1、ApplicationContextAware
目的就是为了获取ApplicationContext。
2、InitializingBean
实例化时,会执行afterPropertiesSet方法
public void afterPropertiesSet() throws Exception { // 1、使用@JobHandler注册job handler,就是放入本地map中 initJobHandlerRepository(applicationContext); // 2、通过@Xxljob注册 job handler initJobHandlerMethodRepository(applicationContext); // 3、创建GlueFactory GlueFactory.refreshInstance(1); // 4、启动 super.start(); }
step1:通过@JobHandler注册
private void initJobHandlerRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; } //获取@JobHandler注解的bean Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class); if (serviceBeanMap != null && serviceBeanMap.size() > 0) { for (Object serviceBean : serviceBeanMap.values()) { if (serviceBean instanceof IJobHandler) { String name = serviceBean.getClass().getAnnotation(JobHandler.class).value(); IJobHandler handler = (IJobHandler) serviceBean; if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } //注册job handler 放入map registJobHandler(name, handler); } } } } //本地注册表 private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>(); public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); return jobHandlerRepository.put(name, jobHandler); }
step2:通过@Xxljob注册
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; } // 获取所有的bd String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames(); for (String beanDefinitionName : beanDefinitionNames) { // 根据bd 找到 bean Object bean = applicationContext.getBean(beanDefinitionName); // 获取bean的所有方法 包括private、protected、默认以及public的方法 Method[] methods = bean.getClass().getDeclaredMethods(); for (Method method: methods) { //获取方法上的注解 XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class); if (xxlJob != null) { // @XxlJob注解的value值 String name = xxlJob.value(); if (name.trim().length() == 0) { throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); } if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } // 对execute参数进行判断 如果没有参数,或者参数的个数不是 1,或者参数的类型不是String,都将抛出异常 if (!(method.getParameterTypes()!=null && method.getParameterTypes().length==1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) { throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } // 校验方法的返回值类型,必须是ReturnT if (!method.getReturnType().isAssignableFrom(ReturnT.class)) { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } //取消 Java 语言访问检查,提升访问执行速度 method.setAccessible(true); Method initMethod = null; Method destroyMethod = null; if(xxlJob.init().trim().length() > 0) { try { //获取xxlJob注解指定的init方法 initMethod = bean.getClass().getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); } } if(xxlJob.destroy().trim().length() > 0) { try { //获取xxlJob注解指定的destroy方法 destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy()); destroyMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); } } // 将方法封装进 MethodJobHandler,注册到本地map registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod)); } } } }
step3:创建GlueFactory,处理glue模式
一般基于Spring使用的时候,都是bean模式,即任务以JobHandler方式维护在执行器端,通过rpc的方式调用执行。而glue模式是指,任务以源码方式维护在调度中心,这里初始化了一个SpringGlueFactory。
step4:调用父类启动
public void start() throws Exception { // 1、初始化执行器日志路径 默认 /data/applogs/xxl-job/jobhandler XxlJobFileAppender.initLogPath(logPath); // 2、根据配置的adminAddresses地址(多个用逗号拼接),构造AdminBiz列表 initAdminBizList(adminAddresses, accessToken); // 3、启动一个daemon线程,每天定期清理调度日志文件 JobLogFileCleanThread.getInstance().start(logRetentionDays); // 4、执行反馈线程启动 TriggerCallbackThread.getInstance().start(); // 5、初始化客户端server 默认端口9999 基于netty实现 port = port>0?port: NetUtil.findAvailablePort(9999); ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); initRpcProvider(ip, port, appName, accessToken); }
(1)初始化执行器日志路径,默认 /data/applogs/xxl-job/jobhandler,这个XxlJobFileAppender是个单独写日志文件的工具类。在xxl-job-admin界面上,可以通过界面查看定时任务调度执行的日志。我们在业务代码中,也可以通过XxlJobLogger.log方法,写自己的日志。
(2)根据配置的adminAddresses地址,构造AdminBiz列表(后面注册、调服务端接口等,会需要调到这个地址)。
(3)启动一个daemon线程,每天定期清理调度日志文件(上述(1)目录下的文件)。
(4)定义了一个LinkedBlockingQueue,这个queue里面放job执行结果。然后启动triggerCallbackThread和triggerRetryCallbackThread 两个线程,向job-admin反馈job执行结果。正常情况下,只有triggerCallbackThread从queue里面拿数据,提交到admin。但是当它提交失败的时候,triggerCallbackThread就会写一个callbacklog文件。再由triggerRetryCallbackThread读取callbacklog文件,重新向admin提交执行结果。
(5)启动客户端server,通过netty实现
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception { String address = IpUtil.getIpPort(ip, port); Map<String, String> serviceRegistryParam = new HashMap<String, String>(); serviceRegistryParam.put("appName", appName); serviceRegistryParam.put("address", address); xxlRpcProviderFactory = new XxlRpcProviderFactory(); // 设置server实现类 xxlRpcProviderFactory.setServer(NettyHttpServer.class); xxlRpcProviderFactory.setSerializer(HessianSerializer.class); xxlRpcProviderFactory.setCorePoolSize(20); xxlRpcProviderFactory.setMaxPoolSize(200); xxlRpcProviderFactory.setIp(ip); xxlRpcProviderFactory.setPort(port); xxlRpcProviderFactory.setAccessToken(accessToken); // 客户端执行器注册使用 xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class); xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam); // 添加执行job的服务 添加到了serviceData xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl()); // 启动server xxlRpcProviderFactory.start(); }
启动server
public void start() throws Exception { ··· 省略非关键代码··· // 实例化序列化器 HessianSerializer this.serializerInstance = serializer.newInstance(); serviceAddress = IpUtil.getIpPort(this.ip, port); // 实例化server NettyHttpServer serverInstance = server.newInstance(); // 设置钩子函数 server启动后会调用 serverInstance.setStartedCallback(new BaseCallback() { @Override public void run() throws Exception { // start registry if (serviceRegistry != null) { // 实例化客户端注册 ExecutorServiceRegistry serviceRegistryInstance = serviceRegistry.newInstance(); // 启动了一个线程 用于注册客户端信息 每隔30s执行一次 serviceRegistryInstance.start(serviceRegistryParam); if (serviceData.size() > 0) { // 暂时 null 实现 返回的false serviceRegistryInstance.registry(serviceData.keySet(), serviceAddress); } } } }); // 设置钩子函数 server停止时调用 serverInstance.setStopedCallback(new BaseCallback() { @Override public void run() { if (serviceRegistryInstance != null) { if (serviceData.size() > 0) { // 空实现 默认false serviceRegistryInstance.remove(serviceData.keySet(), serviceAddress); } // 停止 客户端注册 线程 serviceRegistryInstance.stop(); serviceRegistryInstance = null; } } }); // 启动server serverInstance.start(this); }
server的启动是标准的Netty启动方式,通过NettyHttpServer的start方法启动了一个netty服务端,主要通过NettyHttpServerHandler处理请求。
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { // 解析request final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); final String uri = msg.uri(); final boolean keepAlive = HttpUtil.isKeepAlive(msg); // 执行处理 serverHandlerPool.execute(new Runnable() { @Override public void run() { process(ctx, uri, requestBytes, keepAlive); } }); } // 处理请求 private void process(ChannelHandlerContext ctx, String uri, byte[] requestBytes, boolean keepAlive){ String requestId = null; try { if ("/services".equals(uri)) { ··· } else { // 处理正常请求分支 if (requestBytes.length == 0) { throw new XxlRpcException("xxl-rpc request data empty."); } // request 按序列化 XxlRpcRequest xxlRpcRequest = (XxlRpcRequest) xxlRpcProviderFactory.getSerializerInstance().deserialize(requestBytes, XxlRpcRequest.class); requestId = xxlRpcRequest.getRequestId(); // ping请求 直接返回 if (Beat.BEAT_ID.equalsIgnoreCase(xxlRpcRequest.getRequestId())){ logger.debug(">>>>>>>>>>> xxl-rpc provider netty_http server read beat-ping."); return; } // 业务处理 XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest); // response 序列化 byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse); // 写入 response writeResponse(ctx, keepAlive, responseBytes); } } catch (Exception e) { logger.error(e.getMessage(), e); // response error XxlRpcResponse xxlRpcResponse = new XxlRpcResponse(); xxlRpcResponse.setRequestId(requestId); xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e)); // response serialize byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse); // response-write writeResponse(ctx, keepAlive, responseBytes); } }
处理请求invokeService
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) { // 创建 response XxlRpcResponse xxlRpcResponse = new XxlRpcResponse(); xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId()); // 获取的就是前面添加的ExecutorBizImpl String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion()); Object serviceBean = serviceData.get(serviceKey); // 为null 直接响应错误 if (serviceBean == null) { xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found."); return xxlRpcResponse; } // 服务端 与 客户端 的时间差 不能超过 3 分钟 if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) { xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit."); return xxlRpcResponse; } // token 校验 if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) { xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong."); return xxlRpcResponse; } try { Class<?> serviceClass = serviceBean.getClass(); String methodName = xxlRpcRequest.getMethodName(); Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes(); Object[] parameters = xxlRpcRequest.getParameters(); Method method = serviceClass.getMethod(methodName, parameterTypes); method.setAccessible(true); // 如果是真正的job调用,执行ExecutorBizImpl的run方法 Object result = method.invoke(serviceBean, parameters); xxlRpcResponse.setResult(result); } catch (Throwable t) { // catch error logger.error("xxl-rpc provider invokeService error.", t); xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t)); } return xxlRpcResponse; }
ExecutorBizImpl#run
public ReturnT<String> run(TriggerParam triggerParam) { // job的处理是通过JobThread进行的,这里会加载 jobThread JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); // 每一个jobHandler对应一个jobThread,所以这里继续通过thread获取handler IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; // 判断执行模式,我们一般都是通过bean的方式,所以这里就看一下bean模式就好了 GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); if (GlueTypeEnum.BEAN == glueTypeEnum) { // 通过之前的handler注册map 获取 jobhandler IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // jobThread 不为null 并且 两个jobHandler不一样,说明原线程持有的handler是旧的,需要更新 if (jobThread!=null && jobHandler != newJobHandler) { removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; // 先置空 jobThread = null; jobHandler = null; } if (jobHandler == null) { // 赋值新的jobHandler jobHandler = newJobHandler; if (jobHandler == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } } } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // gule模式 } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // Script脚本 } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); } // jobThread不为null的情况 需要判断阻塞策略 if (jobThread != null) { ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); // 丢弃后续调度,如果当前thread 正在执行 则不再响应本次调度 直接返回 if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // 覆盖之前的调度,如果当前线程正在执行,则将此次获取的线程置为null,后续会新建一个线程处理本次请求 if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // 单机串行,将任务放入线程的队列等待执行 } } if (jobThread == null) { // jobThread为null 注册job线程,并启动 jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // 放入执行队列 ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; }
可以看到真正处理我们业务逻辑的是JobThread线程,是异步执行的,这里直接把任务添加到队列里就返回了,所以就需要前面我们所看到的,通过其他线程反馈执行结果。接下来我们就看一下JobThread的run方法
public void run() { // init try { // 执行init方法 handler.init(); } catch (Throwable e) { logger.error(e.getMessage(), e); } // execute while(!toStop){ running = false; idleTimes++; TriggerParam triggerParam = null; ReturnT<String> executeResult = null; try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) // 每3秒拉取一次任务 triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); // 记录日志 // log filename, like "logPath/yyyy-MM-dd/9999.log" String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName); ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); // execute XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams()); // 带有超时时间的请求 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { final TriggerParam triggerParamTmp = triggerParam; FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() { @Override public ReturnT<String> call() throws Exception { return handler.execute(triggerParamTmp.getExecutorParams()); } }); // 新建一个线程处理 futureThread = new Thread(futureTask); futureThread.start(); // 通过get方法阻塞 triggerParam.getExecutorTimeout() 如果超时则抛异常 executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { XxlJobLogger.log("<br>----------- xxl-job job execute timeout"); XxlJobLogger.log(e); // 超时执行结果设置 executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout "); } finally { // 标记新建的线程应该中断 futureThread.interrupt(); } } else { // just execute // 直接执行 executeResult = handler.execute(triggerParam.getExecutorParams()); } if (executeResult == null) { executeResult = IJobHandler.FAIL; } else { executeResult.setMsg( (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000) ?executeResult.getMsg().substring(0, 50000).concat("...") :executeResult.getMsg()); executeResult.setContent(null); // limit obj size } XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult); } else { // 队列任务为null 且空循环次数超过30次 停掉线程 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } } catch (Throwable e) { if (toStop) { XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason); } StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg); XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------"); } finally { if(triggerParam != null) { // callback handler info if (!toStop) { // commonm // 执行结果反馈 放入反馈队列 TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult)); } else { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); } } } } // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); } } // destroy try { // 执行销毁方法 handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); }
3、DisposableBean
在spring容器启动异常的时候,会调用其destroy方法,那么我们就看一下xxxljob的destroy方法做了哪些事。
public void destroy() { super.destroy(); } public void destroy(){ // 停掉server服务 stopRpcProvider(); // 销毁job线程 if (jobThreadRepository.size() > 0) { for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) { removeJobThread(item.getKey(), "web container destroy and kill the job."); } jobThreadRepository.clear(); } jobHandlerRepository.clear(); // 销毁清理日志线程 JobLogFileCleanThread.getInstance().toStop(); // 销毁结果反馈和重试线程 TriggerCallbackThread.getInstance().toStop(); }
源码也比较清朗了,就看一下第一步停掉server就好了,其余的很简单
private void stopRpcProvider() { try { xxlRpcProviderFactory.stop(); } catch (Exception e) { logger.error(e.getMessage(), e); } } public void stop() throws Exception { // stop server serverInstance.stop(); } public void stop() throws Exception { // 这里会优雅的关闭netty服务 关闭处理请求的线程池 if (thread!=null && thread.isAlive()) { thread.interrupt(); } // 调用父类的onStoped方法,实际上就是调用在XxlRpcProviderFactory的start方法中添加的钩子函数 // 作用就是停掉客户端注册线程 onStoped(); logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success."); }
至此,客户端的源码基本分析完毕,下一篇就看一下调度中心源码。