分布式系統:xxl-job改造spring-cloud


修改后的源碼倉庫地址:GitHub. :

改造原因

  1. 原有的xxl-job使用自己實現的http協議進行注冊以及調度等,與目前框架中本身的注冊中心格格不入,會影響健康檢查、日志處理、問題排查。
  2. 技術棧統一。避免執行器內包含兩套注冊邏輯。
  3. 提高分布式健壯性,原有的服務注冊以及發現等功能較弱,且與實際應用可用與否完全無關,經常存在xxl-job線程出問題,但主服務正常,或主服務出問題,但xxl-job線程正常。
  4. 灰度擴展,目前系統灰度使用eureka定制實現,為執行器支持灰度,必須進行改造。

主要改造思路

調度中心

調度中心側獲取服務時,將原有的基於數據庫的地址list,修改為動態從eureka中心獲取服務的地址列表,兩者通過xxl-job-admin配置的執行器app-name,與執行器的spring.application.name(即注冊到eureka的服務標識)關聯。

//com.xxl.job.admin.core.trigger.XxlJobTrigger
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
//@edit 如果是自動獲取地址的話,則使用
if (group.getAddressType() == 0) {
    group.setAddressList(SpringAdminContext.getEurekaAddressList(group.getAppname()));
}
//這種靜態和spring容器不分的寫法,還是有點別扭的
@Component("springAdminContext")
public class SpringAdminContext {
    @Autowired
    DiscoveryClient discoveryClient;

    private static SpringAdminContext springAdminContext;
    @PostConstruct
    public void initialize() {
        springAdminContext = this;
        springAdminContext.discoveryClient=this.discoveryClient;
    }
    public static String getEurekaAddressList(String appName){
        //may be springContext not init
        if(springAdminContext !=null){
            DiscoveryClient discoveryClient = SpringAdminContext.springAdminContext.discoveryClient;
            List<ServiceInstance> instances = discoveryClient.getInstances(appName);
            StringBuilder addressBuilder = new StringBuilder();
            for (int i = 0; i < instances.size(); i++) {
                addressBuilder.append(instances.get(i).getUri().toString());
                if(i!=instances.size()) {
                    addressBuilder.append(",");
                }
            }
            return addressBuilder.toString();
        }else {
            return "";
        }

    }
}

調度中心側調度服務時,增加灰度策略,在獲取到eureka的instanceList后,從instance的meta原數據中取出灰度標識,進行灰度調度。代碼結合1中的列表獲取,具體灰度實現與百度到的eureka灰度相同,略。

調度中心 執行器側

通過修改core包,將原有的注冊線程刪除,並刪除embedServer的實現,修改為springMVC。

//修改后的代替embedServer的處理類
@Controller
public class XxlJobHandlerController {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobHandlerController.class);

    private ExecutorBiz executorBiz;
    @Value("${xxl.job.accessToken:}")
    private String accessToken;
    @Autowired
    ThreadPoolExecutor bizThreadPool;
    @PostConstruct
    public void start() {
        executorBiz = new ExecutorBizImpl();
    }

    @PostMapping("/job/{method}")
    @ResponseBody
    public ReturnT jobHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, @PathVariable("method") String methodName) {
        return doHandlerReq(httpServletRequest,httpServletResponse,"/"+methodName);
    }
    // ---------------------- registry ----------------------

    protected ReturnT doHandlerReq(HttpServletRequest httpServletRequest,HttpServletResponse httpServletResponse,String method) {
        try {
            //read request
            //@edit 這里請求都模擬的原有處理方式,包括header這些
            int contentLength = httpServletRequest.getContentLength();
            byte[] reqBody=new byte[contentLength];
            httpServletRequest.getInputStream().read(reqBody,0,contentLength);
            String requestData=new String(reqBody, StandardCharsets.UTF_8);
            String uri = method;
            String accessTokenReq = httpServletRequest.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
            //@edit 這里原有是netty純異步,但是到這邊http就不太合適了,這么寫雖然看起來吞吐會下降,但是一般web容器現在底層也支持nio了,應該關系不大。
            FutureTask<ReturnT> stringFutureTask=new FutureTask<ReturnT>(() -> process(uri, requestData, accessTokenReq));
            // invoke
            bizThreadPool.execute(stringFutureTask);
            ReturnT returnT = stringFutureTask.get();
            httpServletResponse.setHeader(HttpHeaders.CONTENT_TYPE, "text/html;charset=UTF-8");


            return returnT;
        } catch (Exception e) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
        }
    }

    private ReturnT process(String uri, String requestData, String accessTokenReq) {

        if (uri == null || uri.trim().length() == 0) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
        }
        if (accessToken != null
                && accessToken.trim().length() > 0
                && !accessToken.equals(accessTokenReq)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }

        // services mapping
        try {
            if ("/beat".equals(uri)) {
                return executorBiz.beat();
            } else if ("/idleBeat".equals(uri)) {
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            } else if ("/run".equals(uri)) {
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            } else if ("/kill".equals(uri)) {
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            } else if ("/log".equals(uri)) {
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            } else {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
        }
    }
}

修改處理callback的邏輯,使得任務執行結果可以回調到admin

//com.xxl.job.core.executor
//@edit 這個方法主要用於執行器在獲取調度中心列表時調用,目的是執行器獲取調度中心列表進行callback回調通知執行結果。
    //如果callback失敗或者這里出問題,那么會導致在管理台看到的執行結果永遠沒有。
    //管理台的調度結果和執行結果是分開的,調度結果依賴單次http請求,執行結果依賴callback
    // @see TriggerCallbackThread
    public static List<AdminBiz> getAdminBizList(){
        List<AdminBiz> adminBizs=new ArrayList<>();
        String adminAppName="xxl-job-admin-cloud";
        List<String> addressList = Arrays.asList(SpringContext.getEurekaAddressList(adminAppName).split(","));
        addressList.forEach(e ->{
            adminBizs.add(new AdminBizClient(e.concat("/").concat("xxl-job-admin"),accessToken));
        });
        return adminBizs;
    }

總結

其實還有一些細節的修改包括eureka的配置,原有注冊代碼等的調整,就不好一一列出來了,具體可以拉下來項目搜索@edit,主要修改的地方我都加了這個。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM