xxl-job源碼分析


1 調度中心API服務
1、任務結果回調服務;
2、執行器注冊服務;
3、執行器注冊摘除服務;
4、觸發任務單次執行服務,支持任務根據業務事件觸發;
API暴露代碼:com.xxl.job.admin.controller.JobApiController.java
API服務位置:com.xxl.job.core.biz.AdminBiz.java
通過請求參數匹配對應方法反射執行后把結果返回給客戶端,見代碼com.xxl.rpc.remoting.provider.XxlRpcProviderFactoryinvokeService方法invokeService
 
2 任務注冊/ 執行器注冊
任務注冊以 "執行器" 為最小粒度進行注冊; 每個任務通過其綁定的執行器可感知對應的執行器機器列表;注冊表: 見"XXL_JOB_QRTZ_TRIGGER_REGISTRY"表。"執行器"注冊代碼見com.xxl.job.core.thread.ExecutorRegistryThread方法start,在"執行器"啟動時通過遠程調用com.xxl.job.core.biz.AdminBiz方法注冊。"執行器" 在進行任務注冊時將會周期性維護一條注冊記錄,即機器地址和AppName的綁定關系; "調度中心" 從而可以動態感知每個AppName在線的機器列表; 執行器注冊: 任務注冊Beat周期默認30s; 執行器以一倍Beat進行執行器注冊, 調度中心以一倍Beat進行動態任務發現; 注冊信息的失效時間被三倍Beat; 執行器注冊摘除:執行器銷毀時,將會主動上報調度中心並摘除對應的執行器機器信息,提高心跳注冊的實時性。見代碼com.xxl.job.admin.core.thread.JobRegistryMonitorHelper
 
3 執行器API服務

執行器提供了API服務,供調度中心選擇使用,目前提供的API服務有:

1、心跳檢測:調度中心使用
2、忙碌檢測:調度中心使用
3、觸發任務執行:調度中心使用;本地進行任務開發時,可使用該API服務模擬觸發任務;
4、獲取Rolling Log:調度中心使用
5、終止任務:調度中心使用

API服務位置:com.xxl.job.core.biz.ExecutorBiz
API服務請求參考代碼:com.xxl.job.executor.ExecutorBizTest

API暴露代碼片段

com.xxl.job.core.executor.XxlJobExecutor.initRpcProvider()
        // add services
        xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

API遠程調用代碼,jetty啟動注冊一個handler,handler含有API遠程調用處理邏輯

com.xxl.rpc.remoting.net.impl.jetty.server.JettyServer.start()        
JettyServer.this.server.setConnectors(new Connector[]{connector}); HandlerCollection handlerc = new HandlerCollection(); handlerc.setHandlers(new Handler[]{new JettyServerHandler(xxlRpcProviderFactory)}); JettyServer.this.server.setHandler(handlerc);
com.xxl.rpc.remoting.net.impl.jetty.server.JettyServerHandler.handle()
      XxlRpcResponse xxlRpcResponse = this.xxlRpcProviderFactory.invokeService(xxlRpcRequest);
      byte[] responseBytes = this.xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);
      this.writeResponse(baseRequest, response, responseBytes);
com.xxl.rpc.remoting.provider.XxlRpcProviderFactory.invokeService()
    XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
    xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
    String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
    Object serviceBean = this.serviceData.get(serviceKey);
    if (serviceBean == null) {
      xxlRpcResponse.setErrorMsg("The serviceKey[" + serviceKey + "] not found.");
      return xxlRpcResponse;
    } else if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 180000L) {
      xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
      return xxlRpcResponse;
    } else if (this.accessToken != null && this.accessToken.trim().length() > 0 && !this.accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
      xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
      return xxlRpcResponse;
    } else {
      try {
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = xxlRpcRequest.getMethodName();
        Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
        Object[] parameters = xxlRpcRequest.getParameters();
        Method method = serviceClass.getMethod(methodName, parameterTypes);
        method.setAccessible(true);
        Object result = method.invoke(serviceBean, parameters);
        xxlRpcResponse.setResult(result);
      } catch (Throwable var11) {
        logger.error("xxl-rpc provider invokeService error.", var11);
        xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(var11));
      }

      return xxlRpcResponse;
    }

 


 

 
 
參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM