1 調度中心API服務
1、任務結果回調服務;
2、執行器注冊服務;
3、執行器注冊摘除服務;
4、觸發任務單次執行服務,支持任務根據業務事件觸發;
API暴露代碼:com.xxl.job.admin.controller.JobApiController.java
API服務位置:com.xxl.job.core.biz.AdminBiz.java
通過請求參數匹配對應方法反射執行后把結果返回給客戶端,見代碼com.xxl.rpc.remoting.provider.XxlRpcProviderFactoryinvokeService方法invokeService
2 任務注冊/
執行器注冊
任務注冊以 "執行器" 為最小粒度進行注冊; 每個任務通過其綁定的執行器可感知對應的執行器機器列表;注冊表: 見"XXL_JOB_QRTZ_TRIGGER_REGISTRY"表。
"執行器"
注冊代碼見com.xxl.job.core.thread.ExecutorRegistryThread方法start,在"執行器"
啟動時通過遠程調用com.xxl.job.core.biz.AdminBiz方法注冊。"執行器" 在進行任務注冊時將會周期性維護一條注冊記錄,即機器地址和AppName的綁定關系; "調度中心" 從而可以動態感知每個AppName在線的機器列表; 執行器注冊: 任務注冊Beat周期默認30s; 執行器以一倍Beat進行執行器注冊, 調度中心以一倍Beat進行動態任務發現; 注冊信息的失效時間被三倍Beat; 執行器注冊摘除:執行器銷毀時,將會主動上報調度中心並摘除對應的執行器機器信息,提高心跳注冊的實時性
。見代碼com.xxl.job.admin.core.thread.JobRegistryMonitorHelper
3 執行器API服務
執行器提供了API服務,供調度中心選擇使用,目前提供的API服務有:
1、心跳檢測:調度中心使用
2、忙碌檢測:調度中心使用
3、觸發任務執行:調度中心使用;本地進行任務開發時,可使用該API服務模擬觸發任務;
4、獲取Rolling Log:調度中心使用
5、終止任務:調度中心使用
API服務位置:com.xxl.job.core.biz.ExecutorBiz
API服務請求參考代碼:com.xxl.job.executor.ExecutorBizTest
API暴露代碼片段
com.xxl.job.core.executor.XxlJobExecutor.initRpcProvider() // add services xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
API遠程調用代碼,jetty啟動注冊一個handler,handler含有API遠程調用處理邏輯
com.xxl.rpc.remoting.net.impl.jetty.server.JettyServer.start()
JettyServer.this.server.setConnectors(new Connector[]{connector}); HandlerCollection handlerc = new HandlerCollection(); handlerc.setHandlers(new Handler[]{new JettyServerHandler(xxlRpcProviderFactory)}); JettyServer.this.server.setHandler(handlerc);
com.xxl.rpc.remoting.net.impl.jetty.server.JettyServerHandler.handle() XxlRpcResponse xxlRpcResponse = this.xxlRpcProviderFactory.invokeService(xxlRpcRequest); byte[] responseBytes = this.xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse); this.writeResponse(baseRequest, response, responseBytes);
com.xxl.rpc.remoting.provider.XxlRpcProviderFactory.invokeService() XxlRpcResponse xxlRpcResponse = new XxlRpcResponse(); xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId()); String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion()); Object serviceBean = this.serviceData.get(serviceKey); if (serviceBean == null) { xxlRpcResponse.setErrorMsg("The serviceKey[" + serviceKey + "] not found."); return xxlRpcResponse; } else if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 180000L) { xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit."); return xxlRpcResponse; } else if (this.accessToken != null && this.accessToken.trim().length() > 0 && !this.accessToken.trim().equals(xxlRpcRequest.getAccessToken())) { xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong."); return xxlRpcResponse; } else { try { Class<?> serviceClass = serviceBean.getClass(); String methodName = xxlRpcRequest.getMethodName(); Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes(); Object[] parameters = xxlRpcRequest.getParameters(); Method method = serviceClass.getMethod(methodName, parameterTypes); method.setAccessible(true); Object result = method.invoke(serviceBean, parameters); xxlRpcResponse.setResult(result); } catch (Throwable var11) { logger.error("xxl-rpc provider invokeService error.", var11); xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(var11)); } return xxlRpcResponse; }
參考資料