Xxl-job客户端源码解析


一、简介

1、添加jar

        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.1.2</version>
        </dependency>

2、实例化执行器

@Slf4j
//控制是否实例化
@ConditionalOnProperty(value = "xxljob.enabled", matchIfMissing = true)
@Configuration
public class XxlJobConfig {
    //服务端地址
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    //执行器名称
    @Value("${xxl.job.executor.appName}")
    private String appName;
    //客户端端ip ${spring.cloud.client.ip-address}
    @Value("${xxl.job.executor.ip}")
    private String ip;
    //端口
    @Value("${xxl.job.executor.port}")
    private int port;
//    //安全凭据
//    @Value("${xxl.job.accessToken}")
//    private String accessToken;
//    //日志地址
//    @Value("${xxl.job.executor.logPath}")
//    private String logPath;
//      //日志留存时间
//    @Value("${xxl.job.executor.logRetentionDays}")
//    private int logRetentionDays;

    // 实例化执行器bean
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppName(appName);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
//        xxlJobSpringExecutor.setAccessToken(accessToken);
//        xxlJobSpringExecutor.setLogPath(logPath);
//        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
}

本文是基于2.1.2版本来解析,其他版本的源码实现稍有不同。

二、XxlJobSpringExecutor实例化

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, InitializingBean, DisposableBean

1、ApplicationContextAware

  目的就是为了获取ApplicationContext。

2、InitializingBean

  实例化时,会执行afterPropertiesSet方法

    public void afterPropertiesSet() throws Exception {

        // 1、使用@JobHandler注册job handler,就是放入本地map中
        initJobHandlerRepository(applicationContext);

        // 2、通过@Xxljob注册 job handler
        initJobHandlerMethodRepository(applicationContext);

        // 3、创建GlueFactory
        GlueFactory.refreshInstance(1);

        // 4、启动
        super.start();
    }

step1:通过@JobHandler注册

    private void initJobHandlerRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }

        //获取@JobHandler注解的bean
        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);

        if (serviceBeanMap != null && serviceBeanMap.size() > 0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                if (serviceBean instanceof IJobHandler) {
                    String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                    IJobHandler handler = (IJobHandler) serviceBean;
                    if (loadJobHandler(name) != null) {
                        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                    }
                    //注册job handler 放入map
                    registJobHandler(name, handler);
                }
            }
        }
    }
    //本地注册表
    private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
        logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
        return jobHandlerRepository.put(name, jobHandler);
    }

step2:通过@Xxljob注册

    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }

        // 获取所有的bd
        String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames();
        for (String beanDefinitionName : beanDefinitionNames) {
            // 根据bd 找到 bean
            Object bean = applicationContext.getBean(beanDefinitionName);
            // 获取bean的所有方法 包括private、protected、默认以及public的方法
            Method[] methods = bean.getClass().getDeclaredMethods();
            for (Method method: methods) {
                //获取方法上的注解
                XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class);
                if (xxlJob != null) {

                    // @XxlJob注解的value值
                    String name = xxlJob.value();
                    if (name.trim().length() == 0) {
                        throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#"+ method.getName() +"] .");
                    }
                    if (loadJobHandler(name) != null) {
                        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                    }

                    // 对execute参数进行判断  如果没有参数,或者参数的个数不是 1,或者参数的类型不是String,都将抛出异常
                    if (!(method.getParameterTypes()!=null && method.getParameterTypes().length==1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                        throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " +
                                "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                    }
                    // 校验方法的返回值类型,必须是ReturnT
                    if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                        throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " +
                                "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                    }
                    //取消 Java 语言访问检查,提升访问执行速度
                    method.setAccessible(true);

                    Method initMethod = null;
                    Method destroyMethod = null;

                    if(xxlJob.init().trim().length() > 0) {
                        try {
                            //获取xxlJob注解指定的init方法
                            initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                            initMethod.setAccessible(true);
                        } catch (NoSuchMethodException e) {
                            throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] .");
                        }
                    }
                    if(xxlJob.destroy().trim().length() > 0) {
                        try {
                            //获取xxlJob注解指定的destroy方法
                            destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                            destroyMethod.setAccessible(true);
                        } catch (NoSuchMethodException e) {
                            throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] .");
                        }
                    }

                    // 将方法封装进 MethodJobHandler,注册到本地map
                    registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
                }
            }
        }
    }

step3:创建GlueFactory,处理glue模式

  一般基于Spring使用的时候,都是bean模式,即任务以JobHandler方式维护在执行器端,通过rpc的方式调用执行。而glue模式是指,任务以源码方式维护在调度中心,这里初始化了一个SpringGlueFactory。

step4:调用父类启动

    public void start() throws Exception {

        // 1、初始化执行器日志路径 默认 /data/applogs/xxl-job/jobhandler
        XxlJobFileAppender.initLogPath(logPath);

        // 2、根据配置的adminAddresses地址(多个用逗号拼接),构造AdminBiz列表
        initAdminBizList(adminAddresses, accessToken);

        // 3、启动一个daemon线程,每天定期清理调度日志文件
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // 4、执行反馈线程启动
        TriggerCallbackThread.getInstance().start();

        // 5、初始化客户端server 默认端口9999 基于netty实现
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
        initRpcProvider(ip, port, appName, accessToken);
    }

(1)初始化执行器日志路径,默认 /data/applogs/xxl-job/jobhandler,这个XxlJobFileAppender是个单独写日志文件的工具类。在xxl-job-admin界面上,可以通过界面查看定时任务调度执行的日志。我们在业务代码中,也可以通过XxlJobLogger.log方法,写自己的日志。

(2)根据配置的adminAddresses地址,构造AdminBiz列表(后面注册、调服务端接口等,会需要调到这个地址)。

(3)启动一个daemon线程,每天定期清理调度日志文件(上述(1)目录下的文件)。

(4)定义了一个LinkedBlockingQueue,这个queue里面放job执行结果。然后启动triggerCallbackThread和triggerRetryCallbackThread 两个线程,向job-admin反馈job执行结果。正常情况下,只有triggerCallbackThread从queue里面拿数据,提交到admin。但是当它提交失败的时候,triggerCallbackThread就会写一个callbacklog文件。再由triggerRetryCallbackThread读取callbacklog文件,重新向admin提交执行结果。

(5)启动客户端server,通过netty实现

    private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {

        String address = IpUtil.getIpPort(ip, port);
        Map<String, String> serviceRegistryParam = new HashMap<String, String>();
        serviceRegistryParam.put("appName", appName);
        serviceRegistryParam.put("address", address);

        xxlRpcProviderFactory = new XxlRpcProviderFactory();
        // 设置server实现类
        xxlRpcProviderFactory.setServer(NettyHttpServer.class);
        xxlRpcProviderFactory.setSerializer(HessianSerializer.class);
        xxlRpcProviderFactory.setCorePoolSize(20);
        xxlRpcProviderFactory.setMaxPoolSize(200);
        xxlRpcProviderFactory.setIp(ip);
        xxlRpcProviderFactory.setPort(port);
        xxlRpcProviderFactory.setAccessToken(accessToken);
        // 客户端执行器注册使用
        xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class);
        xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam);

        // 添加执行job的服务 添加到了serviceData
        xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

        // 启动server
        xxlRpcProviderFactory.start();

    }

启动server

    public void start() throws Exception {

        ··· 省略非关键代码···

        // 实例化序列化器 HessianSerializer
        this.serializerInstance = serializer.newInstance();

        serviceAddress = IpUtil.getIpPort(this.ip, port);
        // 实例化server NettyHttpServer
        serverInstance = server.newInstance();
        // 设置钩子函数 server启动后会调用
        serverInstance.setStartedCallback(new BaseCallback() {
            @Override
            public void run() throws Exception {
                // start registry
                if (serviceRegistry != null) {
                    // 实例化客户端注册 ExecutorServiceRegistry
                    serviceRegistryInstance = serviceRegistry.newInstance();
                    // 启动了一个线程 用于注册客户端信息 每隔30s执行一次
                    serviceRegistryInstance.start(serviceRegistryParam);
                    if (serviceData.size() > 0) {
                        // 暂时 null 实现 返回的false
                        serviceRegistryInstance.registry(serviceData.keySet(), serviceAddress);
                    }
                }
            }
        });
        // 设置钩子函数 server停止时调用
        serverInstance.setStopedCallback(new BaseCallback() {
            @Override
            public void run() {
    
                if (serviceRegistryInstance != null) {
                    if (serviceData.size() > 0) {
                        // 空实现 默认false
                        serviceRegistryInstance.remove(serviceData.keySet(), serviceAddress);
                    }
                    // 停止 客户端注册 线程
                    serviceRegistryInstance.stop();
                    serviceRegistryInstance = null;
                }
            }
        });
        // 启动server
        serverInstance.start(this);
    }

server的启动是标准的Netty启动方式,通过NettyHttpServer的start方法启动了一个netty服务端,主要通过NettyHttpServerHandler处理请求。

    protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

        // 解析request
        final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
        final String uri = msg.uri();
        final boolean keepAlive = HttpUtil.isKeepAlive(msg);

        // 执行处理 
        serverHandlerPool.execute(new Runnable() {
            @Override
            public void run() {
                process(ctx, uri, requestBytes, keepAlive);
            }
        });
    }
    // 处理请求
    private void process(ChannelHandlerContext ctx, String uri, byte[] requestBytes, boolean keepAlive){
        String requestId = null;
        try {
            if ("/services".equals(uri)) {

                ···

            } else {
                // 处理正常请求分支
                if (requestBytes.length == 0) {
                    throw new XxlRpcException("xxl-rpc request data empty.");
                }
                // request 按序列化
                XxlRpcRequest xxlRpcRequest = (XxlRpcRequest) xxlRpcProviderFactory.getSerializerInstance().deserialize(requestBytes, XxlRpcRequest.class);
                requestId = xxlRpcRequest.getRequestId();

                // ping请求 直接返回
                if (Beat.BEAT_ID.equalsIgnoreCase(xxlRpcRequest.getRequestId())){
                    logger.debug(">>>>>>>>>>> xxl-rpc provider netty_http server read beat-ping.");
                    return;
                }

                // 业务处理
                XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);

                // response 序列化
                byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse);

                // 写入 response
                writeResponse(ctx, keepAlive, responseBytes);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);

            // response error
            XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
            xxlRpcResponse.setRequestId(requestId);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e));

            // response serialize
            byte[] responseBytes = xxlRpcProviderFactory.getSerializerInstance().serialize(xxlRpcResponse);

            // response-write
            writeResponse(ctx, keepAlive, responseBytes);
        }

    }

处理请求invokeService

    public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {

        //  创建 response
        XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
        xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());

        // 获取的就是前面添加的ExecutorBizImpl
        String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
        Object serviceBean = serviceData.get(serviceKey);

        // 为null 直接响应错误
        if (serviceBean == null) {
            xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
            return xxlRpcResponse;
        }
        // 服务端 与 客户端 的时间差 不能超过 3 分钟
        if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
            xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
            return xxlRpcResponse;
        }
        // token 校验
        if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
            xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
            return xxlRpcResponse;
        }

        try {
            
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            // 如果是真正的job调用,执行ExecutorBizImpl的run方法
            Object result = method.invoke(serviceBean, parameters);

            xxlRpcResponse.setResult(result);
        } catch (Throwable t) {
            // catch error
            logger.error("xxl-rpc provider invokeService error.", t);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
        }

        return xxlRpcResponse;
    }

ExecutorBizImpl#run

    public ReturnT<String> run(TriggerParam triggerParam) {
        
        // job的处理是通过JobThread进行的,这里会加载 jobThread
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        // 每一个jobHandler对应一个jobThread,所以这里继续通过thread获取handler
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // 判断执行模式,我们一般都是通过bean的方式,所以这里就看一下bean模式就好了
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // 通过之前的handler注册map 获取 jobhandler
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // jobThread 不为null 并且 两个jobHandler不一样,说明原线程持有的handler是旧的,需要更新
            if (jobThread!=null && jobHandler != newJobHandler) {
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
                // 先置空
                jobThread = null;
                jobHandler = null;
            }

            if (jobHandler == null) {
                // 赋值新的jobHandler
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }

        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
            // gule模式
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
            // Script脚本
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // jobThread不为null的情况 需要判断阻塞策略
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            // 丢弃后续调度,如果当前thread 正在执行 则不再响应本次调度 直接返回
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // 覆盖之前的调度,如果当前线程正在执行,则将此次获取的线程置为null,后续会新建一个线程处理本次请求
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // 单机串行,将任务放入线程的队列等待执行
            }
        }

        if (jobThread == null) {
            // jobThread为null 注册job线程,并启动
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // 放入执行队列
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

可以看到真正处理我们业务逻辑的是JobThread线程,是异步执行的,这里直接把任务添加到队列里就返回了,所以就需要前面我们所看到的,通过其他线程反馈执行结果。接下来我们就看一下JobThread的run方法

    public void run() {

        // init
        try {
            // 执行init方法
            handler.init();
        } catch (Throwable e) {
            logger.error(e.getMessage(), e);
        }

        // execute
        while(!toStop){
            running = false;
            idleTimes++;

            TriggerParam triggerParam = null;
            ReturnT<String> executeResult = null;
            try {
                // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
                // 每3秒拉取一次任务
                triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
                if (triggerParam!=null) {
                    running = true;
                    idleTimes = 0;
                    triggerLogIdSet.remove(triggerParam.getLogId());

                    // 记录日志
                    // log filename, like "logPath/yyyy-MM-dd/9999.log"
                    String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                    XxlJobFileAppender.contextHolder.set(logFileName);
                    ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));

                    // execute
                    XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
                    // 带有超时时间的请求
                    if (triggerParam.getExecutorTimeout() > 0) {
                        // limit timeout
                        Thread futureThread = null;
                        try {
                            final TriggerParam triggerParamTmp = triggerParam;
                            FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
                                @Override
                                public ReturnT<String> call() throws Exception {
                                    return handler.execute(triggerParamTmp.getExecutorParams());
                                }
                            });
                            // 新建一个线程处理
                            futureThread = new Thread(futureTask);
                            futureThread.start();
                            // 通过get方法阻塞 triggerParam.getExecutorTimeout() 如果超时则抛异常
                            executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                        } catch (TimeoutException e) {

                            XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
                            XxlJobLogger.log(e);
                            // 超时执行结果设置
                            executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
                        } finally {
                            // 标记新建的线程应该中断
                            futureThread.interrupt();
                        }
                    } else {
                        // just execute
                        // 直接执行
                        executeResult = handler.execute(triggerParam.getExecutorParams());
                    }
                    
                    if (executeResult == null) {
                        executeResult = IJobHandler.FAIL;
                    } else {
                        executeResult.setMsg(
                                (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
                                        ?executeResult.getMsg().substring(0, 50000).concat("...")
                                        :executeResult.getMsg());
                        executeResult.setContent(null);    // limit obj size
                    }
                    XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

                } else {
                    // 队列任务为null 且空循环次数超过30次 停掉线程
                    if (idleTimes > 30) {
                        if(triggerQueue.size() == 0) {    // avoid concurrent trigger causes jobId-lost
                            XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                        }
                    }
                }
            } catch (Throwable e) {
                if (toStop) {
                    XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
                }

                StringWriter stringWriter = new StringWriter();
                e.printStackTrace(new PrintWriter(stringWriter));
                String errorMsg = stringWriter.toString();
                executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

                XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
            } finally {
                if(triggerParam != null) {
                    // callback handler info
                    if (!toStop) {
                        // commonm
                        // 执行结果反馈 放入反馈队列
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
                    } else {
                        // is killed
                        ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
                    }
                }
            }
        }

        // callback trigger request in queue
        while(triggerQueue !=null && triggerQueue.size()>0){
            TriggerParam triggerParam = triggerQueue.poll();
            if (triggerParam!=null) {
                // is killed
                ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
            }
        }

        // destroy
        try {
            // 执行销毁方法
            handler.destroy();
        } catch (Throwable e) {
            logger.error(e.getMessage(), e);
        }

        logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
    }

3、DisposableBean

  在spring容器启动异常的时候,会调用其destroy方法,那么我们就看一下xxxljob的destroy方法做了哪些事。

    public void destroy() {
        super.destroy();
    }
    public void destroy(){
        // 停掉server服务
        stopRpcProvider();

        // 销毁job线程
        if (jobThreadRepository.size() > 0) {
            for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
                removeJobThread(item.getKey(), "web container destroy and kill the job.");
            }
            jobThreadRepository.clear();
        }
        jobHandlerRepository.clear();

        // 销毁清理日志线程
        JobLogFileCleanThread.getInstance().toStop();

        // 销毁结果反馈和重试线程
        TriggerCallbackThread.getInstance().toStop();

    }

源码也比较清朗了,就看一下第一步停掉server就好了,其余的很简单

    private void stopRpcProvider() {
        try {
            xxlRpcProviderFactory.stop();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
    public void  stop() throws Exception {
        // stop server
        serverInstance.stop();
    }
    public void stop() throws Exception {
        // 这里会优雅的关闭netty服务 关闭处理请求的线程池
        if (thread!=null && thread.isAlive()) {
            thread.interrupt();
        }

        // 调用父类的onStoped方法,实际上就是调用在XxlRpcProviderFactory的start方法中添加的钩子函数
        // 作用就是停掉客户端注册线程
        onStoped();
        logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success.");
    }

至此,客户端的源码基本分析完毕,下一篇就看一下调度中心源码。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM