一、簡介
1、添加jar
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.1.2</version> </dependency>
2、實例化執行器
@Slf4j //控制是否實例化 @ConditionalOnProperty(value = "xxljob.enabled", matchIfMissing = true) @Configuration public class XxlJobConfig { //服務端地址 @Value("${xxl.job.admin.addresses}") private String adminAddresses; //執行器名稱 @Value("${xxl.job.executor.appName}") private String appName; //客戶端端ip ${spring.cloud.client.ip-address} @Value("${xxl.job.executor.ip}") private String ip; //端口 @Value("${xxl.job.executor.port}") private int port; // //安全憑據 // @Value("${xxl.job.accessToken}") // private String accessToken; // //日志地址 // @Value("${xxl.job.executor.logPath}") // private String logPath; // //日志留存時間 // @Value("${xxl.job.executor.logRetentionDays}") // private int logRetentionDays; // 實例化執行器bean @Bean public XxlJobSpringExecutor xxlJobExecutor() { log.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppName(appName); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); // xxlJobSpringExecutor.setAccessToken(accessToken); // xxlJobSpringExecutor.setLogPath(logPath); // xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
本文是基於2.1.2版本來解析,其他版本的源碼實現稍有不同。
二、XxlJobSpringExecutor實例化
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, InitializingBean, DisposableBean
1、ApplicationContextAware
目的就是為了獲取ApplicationContext。
2、InitializingBean
實例化時,會執行afterPropertiesSet方法
public void afterPropertiesSet() throws Exception { // 1、使用@JobHandler注冊job handler,就是放入本地map中 initJobHandlerRepository(applicationContext); // 2、通過@Xxljob注冊 job handler initJobHandlerMethodRepository(applicationContext); // 3、創建GlueFactory GlueFactory.refreshInstance(1); // 4、啟動 super.start(); }
step1:通過@JobHandler注冊
private void initJobHandlerRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; } //獲取@JobHandler注解的bean Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class); if (serviceBeanMap != null && serviceBeanMap.size() > 0) { for (Object serviceBean : serviceBeanMap.values()) { if (serviceBean instanceof IJobHandler) { String name = serviceBean.getClass().getAnnotation(JobHandler.class).value(); IJobHandler handler = (IJobHandler) serviceBean; if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } //注冊job handler 放入map registJobHandler(name, handler); } } } } //本地注冊表 private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>(); public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); return jobHandlerRepository.put(name, jobHandler); }
step2:通過@Xxljob注冊
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; } // 獲取所有的bd String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames(); for (String beanDefinitionName : beanDefinitionNames) { // 根據bd 找到 bean Object bean = applicationContext.getBean(beanDefinitionName); // 獲取bean的所有方法 包括private、protected、默認以及public的方法 Method[] methods = bean.getClass().getDeclaredMethods(); for (Method method: methods) { //獲取方法上的注解 XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class); if (xxlJob != null) { // @XxlJob注解的value值 String name = xxlJob.value(); if (name.trim().length() == 0) { throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); } if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } // 對execute參數進行判斷 如果沒有參數,或者參數的個數不是 1,或者參數的類型不是String,都將拋出異常 if (!(method.getParameterTypes()!=null && method.getParameterTypes().length==1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) { throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } // 校驗方法的返回值類型,必須是ReturnT if (!method.getReturnType().isAssignableFrom(ReturnT.class)) { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } //取消 Java 語言訪問檢查,提升訪問執行速度 method.setAccessible(true); Method initMethod = null; Method destroyMethod = null; if(xxlJob.init().trim().length() > 0) { try { //獲取xxlJob注解指定的init方法 initMethod = bean.getClass().getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); } } if(xxlJob.destroy().trim().length() > 0) { try { //獲取xxlJob注解指定的destroy方法 destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy()); destroyMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); } } // 將方法封裝進 MethodJobHandler,注冊到本地map registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod)); } } } }
step3:創建GlueFactory,處理glue模式
一般基於Spring使用的時候,都是bean模式,即任務以JobHandler方式維護在執行器端,通過rpc的方式調用執行。而glue模式是指,任務以源碼方式維護在調度中心,這里初始化了一個SpringGlueFactory。
step4:調用父類啟動
public void start() throws Exception { // 1、初始化執行器日志路徑 默認 /data/applogs/xxl-job/jobhandler XxlJobFileAppender.initLogPath(logPath); // 2、根據配置的adminAddresses地址(多個用逗號拼接),構造AdminBiz列表 initAdminBizList(adminAddresses, accessToken); // 3、啟動一個daemon線程,每天定期清理調度日志文件 JobLogFileCleanThread.getInstance().start(logRetentionDays); // 4、執行反饋線程啟動 TriggerCallbackThread.getInstance().start(); // 5、初始化客戶端server 默認端口9999 基於netty實現 port = port>0?port: NetUtil.findAvailablePort(9999); ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); initRpcProvider(ip, port, appName, accessToken); }
(1)初始化執行器日志路徑,默認 /data/applogs/xxl-job/jobhandler,這個XxlJobFileAppender是個單獨寫日志文件的工具類。在xxl-job-admin界面上,可以通過界面查看定時任務調度執行的日志。我們在業務代碼中,也可以通過XxlJobLogger.log方法,寫自己的日志。
(2)根據配置的adminAddresses地址,構造AdminBiz列表(后面注冊、調服務端接口等,會需要調到這個地址)。
(3)啟動一個daemon線程,每天定期清理調度日志文件(上述(1)目錄下的文件)。
(4)定義了一個LinkedBlockingQueue,這個queue里面放job執行結果。然后啟動triggerCallbackThread和triggerRetryCallbackThread 兩個線程,向job-admin反饋job執行結果。正常情況下,只有triggerCallbackThread從queue里面拿數據,提交到admin。但是當它提交失敗的時候,triggerCallbackThread就會寫一個callbacklog文件。再由triggerRetryCallbackThread讀取callbacklog文件,重新向admin提交執行結果。
(5)啟動客戶端server,通過netty實現
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception { String address = IpUtil.getIpPort(ip, port); Map<String, String> serviceRegistryParam = new HashMap<String, String>(); serviceRegistryParam.put("appName", appName); serviceRegistryParam.put("address", address); xxlRpcProviderFactory = new XxlRpcProviderFactory(); // 設置server實現類 xxlRpcProviderFactory.setServer(NettyHttpServer.class); xxlRpcProviderFactory.setSerializer(HessianSerializer.class); xxlRpcProviderFactory.setCorePoolSize(20); xxlRpcProviderFactory.setMaxPoolSize(200); xxlRpcProviderFactory.setIp(ip); xxlRpcProviderFactory.setPort(port); xxlRpcProviderFactory.setAccessToken(accessToken); // 客戶端執行器注冊使用 xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class); xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam); // 添加執行job的服務 添加到了serviceData xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl()); // 啟動server xxlRpcProviderFactory.start(); }
啟動server
public void start() throws Exception { ··· 省略非關鍵代碼··· // 實例化序列化器 HessianSerializer this.serializerInstance = serializer.newInstance(); serviceAddress = IpUtil.getIpPort(this.ip, port); // 實例化server NettyHttpServer serverInstance = server.newInstance(); // 設置鈎子函數 server啟動后會調用 serverInstance.setStartedCallback(new BaseCallback() { @Override public void run() throws Exception { // start registry if (serviceRegistry != null) { // 實例化客戶端注冊 ExecutorServiceRegistry serviceRegistryInstance = serviceRegistry.newInstance(); // 啟動了一個線程 用於注冊客戶端信息 每隔30s執行一次 serviceRegistryInstance.start(serviceRegistryParam); if (serviceData.size() > 0) { // 暫時 null 實現 返回的false serviceRegistryInstance.registry(serviceData.keySet(), serviceAddress); } } } }); // 設置鈎子函數 server停止時調用 serverInstance.setStopedCallback(new BaseCallback() { @Override public void run() { if (serviceRegistryInstance != null) { if (serviceData.size() > 0) { // 空實現 默認false serviceRegistryInstance.remove(serviceData.keySet(), serviceAddress); } // 停止 客戶端注冊 線程 serviceRegistryInstance.stop(); serviceRegistryInstance = null; } } }); // 啟動server serverInstance.start(this); }
server的啟動是標准的Netty啟動方式,通過NettyHttpServer的start方法啟動了一個netty服務端,主要通過NettyHttpServerHandler處理請求。
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { // 解析request final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); final String uri = msg.uri(); final boolean keepAlive = HttpUtil.isKeepAlive(msg); // 執行處理 serverHandlerPool.execute(new Runnable() { @Override public void run() { process(ctx, uri, requestBytes, keepAlive); } }); } // 處理請求 private void process(ChannelHandlerContext ctx, String uri, byte[] requestBytes, boolean keepAlive){ String requestId = null; try { if ("/services".equals(uri)) { ··· } else { // 處理正常請求分支 if (requestBytes.length == 0) { throw new XxlRpcException("xxl-rpc request data empty."); } // request 按序列化 XxlRpcRequest xxlRpcRequest = (XxlRpcRequest) xxlRpcProviderFactory.getSerializerInstance().deserialize(requestBytes, XxlRpcRequest.class); requestId = xxlRpcRequest.getRequestId(); // ping請求 直接返回 if (Beat.BEAT_ID.equalsIgnoreCase(xxlRpcRequest.getRequestId())){ logger.debug(">>>>>>>>>>> xxl-rpc provider netty_http server read beat-ping."); return; } // 業務處理 XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest); // response 序列化 byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse); // 寫入 response writeResponse(ctx, keepAlive, responseBytes); } } catch (Exception e) { logger.error(e.getMessage(), e); // response error XxlRpcResponse xxlRpcResponse = new XxlRpcResponse(); xxlRpcResponse.setRequestId(requestId); xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e)); // response serialize byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse); // response-write writeResponse(ctx, keepAlive, responseBytes); } }
處理請求invokeService
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) { // 創建 response XxlRpcResponse xxlRpcResponse = new XxlRpcResponse(); xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId()); // 獲取的就是前面添加的ExecutorBizImpl String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion()); Object serviceBean = serviceData.get(serviceKey); // 為null 直接響應錯誤 if (serviceBean == null) { xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found."); return xxlRpcResponse; } // 服務端 與 客戶端 的時間差 不能超過 3 分鍾 if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) { xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit."); return xxlRpcResponse; } // token 校驗 if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) { xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong."); return xxlRpcResponse; } try { Class<?> serviceClass = serviceBean.getClass(); String methodName = xxlRpcRequest.getMethodName(); Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes(); Object[] parameters = xxlRpcRequest.getParameters(); Method method = serviceClass.getMethod(methodName, parameterTypes); method.setAccessible(true); // 如果是真正的job調用,執行ExecutorBizImpl的run方法 Object result = method.invoke(serviceBean, parameters); xxlRpcResponse.setResult(result); } catch (Throwable t) { // catch error logger.error("xxl-rpc provider invokeService error.", t); xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t)); } return xxlRpcResponse; }
ExecutorBizImpl#run
public ReturnT<String> run(TriggerParam triggerParam) { // job的處理是通過JobThread進行的,這里會加載 jobThread JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); // 每一個jobHandler對應一個jobThread,所以這里繼續通過thread獲取handler IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; // 判斷執行模式,我們一般都是通過bean的方式,所以這里就看一下bean模式就好了 GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); if (GlueTypeEnum.BEAN == glueTypeEnum) { // 通過之前的handler注冊map 獲取 jobhandler IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // jobThread 不為null 並且 兩個jobHandler不一樣,說明原線程持有的handler是舊的,需要更新 if (jobThread!=null && jobHandler != newJobHandler) { removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; // 先置空 jobThread = null; jobHandler = null; } if (jobHandler == null) { // 賦值新的jobHandler jobHandler = newJobHandler; if (jobHandler == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } } } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // gule模式 } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // Script腳本 } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); } // jobThread不為null的情況 需要判斷阻塞策略 if (jobThread != null) { ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); // 丟棄后續調度,如果當前thread 正在執行 則不再響應本次調度 直接返回 if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // 覆蓋之前的調度,如果當前線程正在執行,則將此次獲取的線程置為null,后續會新建一個線程處理本次請求 if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // 單機串行,將任務放入線程的隊列等待執行 } } if (jobThread == null) { // jobThread為null 注冊job線程,並啟動 jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // 放入執行隊列 ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; }
可以看到真正處理我們業務邏輯的是JobThread線程,是異步執行的,這里直接把任務添加到隊列里就返回了,所以就需要前面我們所看到的,通過其他線程反饋執行結果。接下來我們就看一下JobThread的run方法
public void run() { // init try { // 執行init方法 handler.init(); } catch (Throwable e) { logger.error(e.getMessage(), e); } // execute while(!toStop){ running = false; idleTimes++; TriggerParam triggerParam = null; ReturnT<String> executeResult = null; try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) // 每3秒拉取一次任務 triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); // 記錄日志 // log filename, like "logPath/yyyy-MM-dd/9999.log" String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName); ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); // execute XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams()); // 帶有超時時間的請求 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { final TriggerParam triggerParamTmp = triggerParam; FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() { @Override public ReturnT<String> call() throws Exception { return handler.execute(triggerParamTmp.getExecutorParams()); } }); // 新建一個線程處理 futureThread = new Thread(futureTask); futureThread.start(); // 通過get方法阻塞 triggerParam.getExecutorTimeout() 如果超時則拋異常 executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { XxlJobLogger.log("<br>----------- xxl-job job execute timeout"); XxlJobLogger.log(e); // 超時執行結果設置 executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout "); } finally { // 標記新建的線程應該中斷 futureThread.interrupt(); } } else { // just execute // 直接執行 executeResult = handler.execute(triggerParam.getExecutorParams()); } if (executeResult == null) { executeResult = IJobHandler.FAIL; } else { executeResult.setMsg( (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000) ?executeResult.getMsg().substring(0, 50000).concat("...") :executeResult.getMsg()); executeResult.setContent(null); // limit obj size } XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult); } else { // 隊列任務為null 且空循環次數超過30次 停掉線程 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } } catch (Throwable e) { if (toStop) { XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason); } StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg); XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------"); } finally { if(triggerParam != null) { // callback handler info if (!toStop) { // commonm // 執行結果反饋 放入反饋隊列 TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult)); } else { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); } } } } // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); } } // destroy try { // 執行銷毀方法 handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); }
3、DisposableBean
在spring容器啟動異常的時候,會調用其destroy方法,那么我們就看一下xxxljob的destroy方法做了哪些事。
public void destroy() { super.destroy(); } public void destroy(){ // 停掉server服務 stopRpcProvider(); // 銷毀job線程 if (jobThreadRepository.size() > 0) { for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) { removeJobThread(item.getKey(), "web container destroy and kill the job."); } jobThreadRepository.clear(); } jobHandlerRepository.clear(); // 銷毀清理日志線程 JobLogFileCleanThread.getInstance().toStop(); // 銷毀結果反饋和重試線程 TriggerCallbackThread.getInstance().toStop(); }
源碼也比較清朗了,就看一下第一步停掉server就好了,其余的很簡單
private void stopRpcProvider() { try { xxlRpcProviderFactory.stop(); } catch (Exception e) { logger.error(e.getMessage(), e); } } public void stop() throws Exception { // stop server serverInstance.stop(); } public void stop() throws Exception { // 這里會優雅的關閉netty服務 關閉處理請求的線程池 if (thread!=null && thread.isAlive()) { thread.interrupt(); } // 調用父類的onStoped方法,實際上就是調用在XxlRpcProviderFactory的start方法中添加的鈎子函數 // 作用就是停掉客戶端注冊線程 onStoped(); logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success."); }
至此,客戶端的源碼基本分析完畢,下一篇就看一下調度中心源碼。