Xxl-job客戶端源碼解析


一、簡介

1、添加jar

        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.1.2</version>
        </dependency>

2、實例化執行器

@Slf4j
//控制是否實例化
@ConditionalOnProperty(value = "xxljob.enabled", matchIfMissing = true)
@Configuration
public class XxlJobConfig {
    //服務端地址
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    //執行器名稱
    @Value("${xxl.job.executor.appName}")
    private String appName;
    //客戶端端ip ${spring.cloud.client.ip-address}
    @Value("${xxl.job.executor.ip}")
    private String ip;
    //端口
    @Value("${xxl.job.executor.port}")
    private int port;
//    //安全憑據
//    @Value("${xxl.job.accessToken}")
//    private String accessToken;
//    //日志地址
//    @Value("${xxl.job.executor.logPath}")
//    private String logPath;
//      //日志留存時間
//    @Value("${xxl.job.executor.logRetentionDays}")
//    private int logRetentionDays;

    // 實例化執行器bean
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppName(appName);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
//        xxlJobSpringExecutor.setAccessToken(accessToken);
//        xxlJobSpringExecutor.setLogPath(logPath);
//        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
}

本文是基於2.1.2版本來解析,其他版本的源碼實現稍有不同。

二、XxlJobSpringExecutor實例化

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, InitializingBean, DisposableBean

1、ApplicationContextAware

  目的就是為了獲取ApplicationContext。

2、InitializingBean

  實例化時,會執行afterPropertiesSet方法

    public void afterPropertiesSet() throws Exception {

        // 1、使用@JobHandler注冊job handler,就是放入本地map中
        initJobHandlerRepository(applicationContext);

        // 2、通過@Xxljob注冊 job handler
        initJobHandlerMethodRepository(applicationContext);

        // 3、創建GlueFactory
        GlueFactory.refreshInstance(1);

        // 4、啟動
        super.start();
    }

step1:通過@JobHandler注冊

    private void initJobHandlerRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }

        //獲取@JobHandler注解的bean
        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);

        if (serviceBeanMap != null && serviceBeanMap.size() > 0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                if (serviceBean instanceof IJobHandler) {
                    String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                    IJobHandler handler = (IJobHandler) serviceBean;
                    if (loadJobHandler(name) != null) {
                        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                    }
                    //注冊job handler 放入map
                    registJobHandler(name, handler);
                }
            }
        }
    }
    //本地注冊表
    private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
        logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
        return jobHandlerRepository.put(name, jobHandler);
    }

step2:通過@Xxljob注冊

    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }

        // 獲取所有的bd
        String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames();
        for (String beanDefinitionName : beanDefinitionNames) {
            // 根據bd 找到 bean
            Object bean = applicationContext.getBean(beanDefinitionName);
            // 獲取bean的所有方法 包括private、protected、默認以及public的方法
            Method[] methods = bean.getClass().getDeclaredMethods();
            for (Method method: methods) {
                //獲取方法上的注解
                XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class);
                if (xxlJob != null) {

                    // @XxlJob注解的value值
                    String name = xxlJob.value();
                    if (name.trim().length() == 0) {
                        throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#"+ method.getName() +"] .");
                    }
                    if (loadJobHandler(name) != null) {
                        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                    }

                    // 對execute參數進行判斷  如果沒有參數,或者參數的個數不是 1,或者參數的類型不是String,都將拋出異常
                    if (!(method.getParameterTypes()!=null && method.getParameterTypes().length==1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                        throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " +
                                "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                    }
                    // 校驗方法的返回值類型,必須是ReturnT
                    if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                        throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " +
                                "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                    }
                    //取消 Java 語言訪問檢查,提升訪問執行速度
                    method.setAccessible(true);

                    Method initMethod = null;
                    Method destroyMethod = null;

                    if(xxlJob.init().trim().length() > 0) {
                        try {
                            //獲取xxlJob注解指定的init方法
                            initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                            initMethod.setAccessible(true);
                        } catch (NoSuchMethodException e) {
                            throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] .");
                        }
                    }
                    if(xxlJob.destroy().trim().length() > 0) {
                        try {
                            //獲取xxlJob注解指定的destroy方法
                            destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                            destroyMethod.setAccessible(true);
                        } catch (NoSuchMethodException e) {
                            throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] .");
                        }
                    }

                    // 將方法封裝進 MethodJobHandler,注冊到本地map
                    registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
                }
            }
        }
    }

step3:創建GlueFactory,處理glue模式

  一般基於Spring使用的時候,都是bean模式,即任務以JobHandler方式維護在執行器端,通過rpc的方式調用執行。而glue模式是指,任務以源碼方式維護在調度中心,這里初始化了一個SpringGlueFactory。

step4:調用父類啟動

    public void start() throws Exception {

        // 1、初始化執行器日志路徑 默認 /data/applogs/xxl-job/jobhandler
        XxlJobFileAppender.initLogPath(logPath);

        // 2、根據配置的adminAddresses地址(多個用逗號拼接),構造AdminBiz列表
        initAdminBizList(adminAddresses, accessToken);

        // 3、啟動一個daemon線程,每天定期清理調度日志文件
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // 4、執行反饋線程啟動
        TriggerCallbackThread.getInstance().start();

        // 5、初始化客戶端server 默認端口9999 基於netty實現
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
        initRpcProvider(ip, port, appName, accessToken);
    }

(1)初始化執行器日志路徑,默認 /data/applogs/xxl-job/jobhandler,這個XxlJobFileAppender是個單獨寫日志文件的工具類。在xxl-job-admin界面上,可以通過界面查看定時任務調度執行的日志。我們在業務代碼中,也可以通過XxlJobLogger.log方法,寫自己的日志。

(2)根據配置的adminAddresses地址,構造AdminBiz列表(后面注冊、調服務端接口等,會需要調到這個地址)。

(3)啟動一個daemon線程,每天定期清理調度日志文件(上述(1)目錄下的文件)。

(4)定義了一個LinkedBlockingQueue,這個queue里面放job執行結果。然后啟動triggerCallbackThread和triggerRetryCallbackThread 兩個線程,向job-admin反饋job執行結果。正常情況下,只有triggerCallbackThread從queue里面拿數據,提交到admin。但是當它提交失敗的時候,triggerCallbackThread就會寫一個callbacklog文件。再由triggerRetryCallbackThread讀取callbacklog文件,重新向admin提交執行結果。

(5)啟動客戶端server,通過netty實現

    private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {

        String address = IpUtil.getIpPort(ip, port);
        Map<String, String> serviceRegistryParam = new HashMap<String, String>();
        serviceRegistryParam.put("appName", appName);
        serviceRegistryParam.put("address", address);

        xxlRpcProviderFactory = new XxlRpcProviderFactory();
        // 設置server實現類
        xxlRpcProviderFactory.setServer(NettyHttpServer.class);
        xxlRpcProviderFactory.setSerializer(HessianSerializer.class);
        xxlRpcProviderFactory.setCorePoolSize(20);
        xxlRpcProviderFactory.setMaxPoolSize(200);
        xxlRpcProviderFactory.setIp(ip);
        xxlRpcProviderFactory.setPort(port);
        xxlRpcProviderFactory.setAccessToken(accessToken);
        // 客戶端執行器注冊使用
        xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class);
        xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam);

        // 添加執行job的服務 添加到了serviceData
        xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

        // 啟動server
        xxlRpcProviderFactory.start();

    }

啟動server

    public void start() throws Exception {

        ··· 省略非關鍵代碼···

        // 實例化序列化器 HessianSerializer
        this.serializerInstance = serializer.newInstance();

        serviceAddress = IpUtil.getIpPort(this.ip, port);
        // 實例化server NettyHttpServer
        serverInstance = server.newInstance();
        // 設置鈎子函數 server啟動后會調用
        serverInstance.setStartedCallback(new BaseCallback() {
            @Override
            public void run() throws Exception {
                // start registry
                if (serviceRegistry != null) {
                    // 實例化客戶端注冊 ExecutorServiceRegistry
                    serviceRegistryInstance = serviceRegistry.newInstance();
                    // 啟動了一個線程 用於注冊客戶端信息 每隔30s執行一次
                    serviceRegistryInstance.start(serviceRegistryParam);
                    if (serviceData.size() > 0) {
                        // 暫時 null 實現 返回的false
                        serviceRegistryInstance.registry(serviceData.keySet(), serviceAddress);
                    }
                }
            }
        });
        // 設置鈎子函數 server停止時調用
        serverInstance.setStopedCallback(new BaseCallback() {
            @Override
            public void run() {
    
                if (serviceRegistryInstance != null) {
                    if (serviceData.size() > 0) {
                        // 空實現 默認false
                        serviceRegistryInstance.remove(serviceData.keySet(), serviceAddress);
                    }
                    // 停止 客戶端注冊 線程
                    serviceRegistryInstance.stop();
                    serviceRegistryInstance = null;
                }
            }
        });
        // 啟動server
        serverInstance.start(this);
    }

server的啟動是標准的Netty啟動方式,通過NettyHttpServer的start方法啟動了一個netty服務端,主要通過NettyHttpServerHandler處理請求。

    protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

        // 解析request
        final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
        final String uri = msg.uri();
        final boolean keepAlive = HttpUtil.isKeepAlive(msg);

        // 執行處理 
        serverHandlerPool.execute(new Runnable() {
            @Override
            public void run() {
                process(ctx, uri, requestBytes, keepAlive);
            }
        });
    }
    // 處理請求
    private void process(ChannelHandlerContext ctx, String uri, byte[] requestBytes, boolean keepAlive){
        String requestId = null;
        try {
            if ("/services".equals(uri)) {

                ···

            } else {
                // 處理正常請求分支
                if (requestBytes.length == 0) {
                    throw new XxlRpcException("xxl-rpc request data empty.");
                }
                // request 按序列化
                XxlRpcRequest xxlRpcRequest = (XxlRpcRequest) xxlRpcProviderFactory.getSerializerInstance().deserialize(requestBytes, XxlRpcRequest.class);
                requestId = xxlRpcRequest.getRequestId();

                // ping請求 直接返回
                if (Beat.BEAT_ID.equalsIgnoreCase(xxlRpcRequest.getRequestId())){
                    logger.debug(">>>>>>>>>>> xxl-rpc provider netty_http server read beat-ping.");
                    return;
                }

                // 業務處理
                XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);

                // response 序列化
                byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse);

                // 寫入 response
                writeResponse(ctx, keepAlive, responseBytes);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);

            // response error
            XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
            xxlRpcResponse.setRequestId(requestId);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e));

            // response serialize
            byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse);

            // response-write
            writeResponse(ctx, keepAlive, responseBytes);
        }

    }

處理請求invokeService

    public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {

        //  創建 response
        XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
        xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());

        // 獲取的就是前面添加的ExecutorBizImpl
        String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
        Object serviceBean = serviceData.get(serviceKey);

        // 為null 直接響應錯誤
        if (serviceBean == null) {
            xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
            return xxlRpcResponse;
        }
        // 服務端 與 客戶端 的時間差 不能超過 3 分鍾
        if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
            xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
            return xxlRpcResponse;
        }
        // token 校驗
        if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
            xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
            return xxlRpcResponse;
        }

        try {
            
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            // 如果是真正的job調用,執行ExecutorBizImpl的run方法
            Object result = method.invoke(serviceBean, parameters);

            xxlRpcResponse.setResult(result);
        } catch (Throwable t) {
            // catch error
            logger.error("xxl-rpc provider invokeService error.", t);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
        }

        return xxlRpcResponse;
    }

ExecutorBizImpl#run

    public ReturnT<String> run(TriggerParam triggerParam) {
        
        // job的處理是通過JobThread進行的,這里會加載 jobThread
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        // 每一個jobHandler對應一個jobThread,所以這里繼續通過thread獲取handler
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // 判斷執行模式,我們一般都是通過bean的方式,所以這里就看一下bean模式就好了
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // 通過之前的handler注冊map 獲取 jobhandler
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // jobThread 不為null 並且 兩個jobHandler不一樣,說明原線程持有的handler是舊的,需要更新
            if (jobThread!=null && jobHandler != newJobHandler) {
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
                // 先置空
                jobThread = null;
                jobHandler = null;
            }

            if (jobHandler == null) {
                // 賦值新的jobHandler
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }

        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
            // gule模式
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
            // Script腳本
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // jobThread不為null的情況 需要判斷阻塞策略
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            // 丟棄后續調度,如果當前thread 正在執行 則不再響應本次調度 直接返回
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // 覆蓋之前的調度,如果當前線程正在執行,則將此次獲取的線程置為null,后續會新建一個線程處理本次請求
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // 單機串行,將任務放入線程的隊列等待執行
            }
        }

        if (jobThread == null) {
            // jobThread為null 注冊job線程,並啟動
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // 放入執行隊列
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

可以看到真正處理我們業務邏輯的是JobThread線程,是異步執行的,這里直接把任務添加到隊列里就返回了,所以就需要前面我們所看到的,通過其他線程反饋執行結果。接下來我們就看一下JobThread的run方法

    public void run() {

        // init
        try {
            // 執行init方法
            handler.init();
        } catch (Throwable e) {
            logger.error(e.getMessage(), e);
        }

        // execute
        while(!toStop){
            running = false;
            idleTimes++;

            TriggerParam triggerParam = null;
            ReturnT<String> executeResult = null;
            try {
                // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
                // 每3秒拉取一次任務
                triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
                if (triggerParam!=null) {
                    running = true;
                    idleTimes = 0;
                    triggerLogIdSet.remove(triggerParam.getLogId());

                    // 記錄日志
                    // log filename, like "logPath/yyyy-MM-dd/9999.log"
                    String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                    XxlJobFileAppender.contextHolder.set(logFileName);
                    ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));

                    // execute
                    XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
                    // 帶有超時時間的請求
                    if (triggerParam.getExecutorTimeout() > 0) {
                        // limit timeout
                        Thread futureThread = null;
                        try {
                            final TriggerParam triggerParamTmp = triggerParam;
                            FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
                                @Override
                                public ReturnT<String> call() throws Exception {
                                    return handler.execute(triggerParamTmp.getExecutorParams());
                                }
                            });
                            // 新建一個線程處理
                            futureThread = new Thread(futureTask);
                            futureThread.start();
                            // 通過get方法阻塞 triggerParam.getExecutorTimeout() 如果超時則拋異常
                            executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                        } catch (TimeoutException e) {

                            XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
                            XxlJobLogger.log(e);
                            // 超時執行結果設置
                            executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
                        } finally {
                            // 標記新建的線程應該中斷
                            futureThread.interrupt();
                        }
                    } else {
                        // just execute
                        // 直接執行
                        executeResult = handler.execute(triggerParam.getExecutorParams());
                    }
                    
                    if (executeResult == null) {
                        executeResult = IJobHandler.FAIL;
                    } else {
                        executeResult.setMsg(
                                (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
                                        ?executeResult.getMsg().substring(0, 50000).concat("...")
                                        :executeResult.getMsg());
                        executeResult.setContent(null);    // limit obj size
                    }
                    XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

                } else {
                    // 隊列任務為null 且空循環次數超過30次 停掉線程
                    if (idleTimes > 30) {
                        if(triggerQueue.size() == 0) {    // avoid concurrent trigger causes jobId-lost
                            XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                        }
                    }
                }
            } catch (Throwable e) {
                if (toStop) {
                    XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
                }

                StringWriter stringWriter = new StringWriter();
                e.printStackTrace(new PrintWriter(stringWriter));
                String errorMsg = stringWriter.toString();
                executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

                XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
            } finally {
                if(triggerParam != null) {
                    // callback handler info
                    if (!toStop) {
                        // commonm
                        // 執行結果反饋 放入反饋隊列
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
                    } else {
                        // is killed
                        ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
                    }
                }
            }
        }

        // callback trigger request in queue
        while(triggerQueue !=null && triggerQueue.size()>0){
            TriggerParam triggerParam = triggerQueue.poll();
            if (triggerParam!=null) {
                // is killed
                ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
            }
        }

        // destroy
        try {
            // 執行銷毀方法
            handler.destroy();
        } catch (Throwable e) {
            logger.error(e.getMessage(), e);
        }

        logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
    }

3、DisposableBean

  在spring容器啟動異常的時候,會調用其destroy方法,那么我們就看一下xxxljob的destroy方法做了哪些事。

    public void destroy() {
        super.destroy();
    }
    public void destroy(){
        // 停掉server服務
        stopRpcProvider();

        // 銷毀job線程
        if (jobThreadRepository.size() > 0) {
            for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
                removeJobThread(item.getKey(), "web container destroy and kill the job.");
            }
            jobThreadRepository.clear();
        }
        jobHandlerRepository.clear();

        // 銷毀清理日志線程
        JobLogFileCleanThread.getInstance().toStop();

        // 銷毀結果反饋和重試線程
        TriggerCallbackThread.getInstance().toStop();

    }

源碼也比較清朗了,就看一下第一步停掉server就好了,其余的很簡單

    private void stopRpcProvider() {
        try {
            xxlRpcProviderFactory.stop();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
    public void  stop() throws Exception {
        // stop server
        serverInstance.stop();
    }
    public void stop() throws Exception {
        // 這里會優雅的關閉netty服務 關閉處理請求的線程池
        if (thread!=null && thread.isAlive()) {
            thread.interrupt();
        }

        // 調用父類的onStoped方法,實際上就是調用在XxlRpcProviderFactory的start方法中添加的鈎子函數
        // 作用就是停掉客戶端注冊線程
        onStoped();
        logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success.");
    }

至此,客戶端的源碼基本分析完畢,下一篇就看一下調度中心源碼。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM