海豚調度Dolphinscheduler源碼分析(四)MasterServer的啟動


//Curator是zk的一個客戶端框架,其中分裝了分布式公平可重入互斥鎖,最為常見是InterProcessMutex

先簡單總結下MasterServier服務的啟動流程:

  1. 初始化netty服務器,並啟動
  2. 通過zookeeper客戶端Curator創建一個znode臨時節點 /dolphinscheduler/nodes/master/<ip>:<port>,如果主機因為宕機,網絡等問題,臨時節點會消失。
  3. 通過zookeeper客戶端Curator對上面的znode注冊監聽器 (監聽斷開連接,重新連接,中止事件)因為是臨時節點所以重新連接需要重新創建節點
  4. 嘗試獲取 znode節點 /dolphinscheduler/lock/failover/startup-masters 的分布式鎖 調用了mutex.acquire();獲取鎖,只有一個線程可以在同一時獲取到鎖,然后成為Master(active),沒有獲取到鎖的,需要在上面注冊watcher。
  5. 啟動一個Master的zk客戶端
  6. 啟動master scheduler 服務
  7. 啟動quartz 定時任務服務
  8. 添加一個jvm的鈎子 當jvm關閉時,可以優雅的停止掉服務

分布式鎖可以保證同一時間只有一個線程可以獲取到鎖,

 

 

今天來分析server模塊的master,MasterServer類

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.server.master;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;

import javax.annotation.PostConstruct;




@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
        @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class})
})
public class MasterServer {

    /**
     * logger of MasterServer
     */
    private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);

    /**
     * master config
     * master 配置類注入
     */
    @Autowired
    private MasterConfig masterConfig;

    /**
     *  spring application context
     *  only use it for initialization
     *  僅用於初始化
     */
    @Autowired
    private SpringApplicationContext springApplicationContext;

    /**
     * 網絡遠程服務器
     * netty remote server
     */
    private NettyRemotingServer nettyRemotingServer;

    /**
     * master registry
     * master服務監聽
     */
    @Autowired
    private MasterRegistry masterRegistry;

    /**
     * zk master client
     * zk 客戶端curator
     */
    @Autowired
    private ZKMasterClient zkMasterClient;

    /**
     * scheduler service
     */
    @Autowired
    private MasterSchedulerService masterSchedulerService;

    /**
     * master server startup
     *
     * master server not use web service
     * @param args arguments
     */
    public static void main(String[] args) {
        Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
        new SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);
    }

    /**
     * run master server
     * @PostConstruct 會在該bean依賴注入完成后,執行該方法
     */
    @PostConstruct
    public void run(){

        //init remoting server
        //初始化 netty 服務器
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(masterConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
        this.nettyRemotingServer.start();

        // register
        // 添加node/master/XXX:5678節點,並監聽事件
        this.masterRegistry.registry();

        // self tolerant
        // 這個地方有個分布式鎖,保持/dolphinscheduler/lock/failover/startup-masters
        // 鎖,然后創建znode節點,
        // 並查詢master任務和worker任務是否需要容錯
        this.zkMasterClient.start();

        // scheduler start
        // 啟動master scheduler
        // MasterSchedulerService,繼承了Thread類
        this.masterSchedulerService.start();

        // start QuartzExecutors
        // what system should do if exception
        try {
            logger.info("start Quartz server...");
            // 開啟quartzExecutor 服務
            QuartzExecutors.getInstance().start();
        } catch (Exception e) {
            try {
                QuartzExecutors.getInstance().shutdown();
            } catch (SchedulerException e1) {
                logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
            }
            logger.error("start Quartz failed", e);
        }

        /**
         *  register hooks, which are called before the process exits,在關閉程序時,jvm會先執行close方法
         */
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                close("shutdownHook");
            }
        }));

    }

    /**
     * gracefully close
     * @param cause close cause
     */
    public void close(String cause) {

        try {
            //execute only once
            if(Stopper.isStopped()){
                return;
            }

            logger.info("master server is stopping ..., cause : {}", cause);

            // set stop signal is true
            Stopper.stop();

            try {
                //thread sleep 3 seconds for thread quietly stop
                Thread.sleep(3000L);
            }catch (Exception e){
                logger.warn("thread sleep exception ", e);
            }
            //
            this.masterSchedulerService.close();
            this.nettyRemotingServer.close();
            this.masterRegistry.unRegistry();
            this.zkMasterClient.close();
            //close quartz
            try{
                QuartzExecutors.getInstance().shutdown();
                logger.info("Quartz service stopped");
            }catch (Exception e){
                logger.warn("Quartz service stopped exception:{}",e.getMessage());
            }
        } catch (Exception e) {
            logger.error("master server stop exception ", e);
            System.exit(-1);
        }
    }
}

 

監聽器注冊org.apache.dolphinscheduler.server.master.registry.MasterRegistry類

 

/**
     * registry
     */
    public void registry() {
        String address = NetUtils.getHost();
        //獲取master的zk節點 localNodePath = /dolphinscheduler/nodes/master/xxxx:5678
        String localNodePath = getMasterPath();
        // 通過service模塊下zk的代碼 來創建臨時節點 路徑為/dolphinscheduler/nodes/master/xxxx:5678
        zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
        // 初始化curator客戶端,並監聽狀態變化
        zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                if (newState == ConnectionState.LOST) {
                    // client連接斷開
                    logger.error("master : {} connection lost from zookeeper", address);
                } else if (newState == ConnectionState.RECONNECTED) {
                    // client重新連接,在zk中重新創建節點,因為是臨時節點,會失效
                    logger.info("master : {} reconnected to zookeeper", address);
                    zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
                } else if (newState == ConnectionState.SUSPENDED) {
            // client連接終止 logger.warn(
"master : {} connection SUSPENDED ", address); } } }); // 獲取zk配置的心跳間隔時間的參數 int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); // 初始化heartBeatTask實例 HeartBeatTask這個類繼承了Thread類,重寫了run()方法 HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, masterConfig.getMasterReservedMemory(), masterConfig.getMasterMaxCpuloadAvg(), Sets.newHashSet(getMasterPath()), zookeeperRegistryCenter); //心跳線程,監測心跳是否正常 this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0, masterHeartbeatInterval, TimeUnit.SECONDS); logger.info("master node : {} registry to ZK path {} successfully with heartBeatInterval : {}s" , address, localNodePath, masterHeartbeatInterval); }

 

 

 


 

有兩個點 需要看一下:

一:Runtime.getRuntime().addShutdownHook(shutdownHook);

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                close("shutdownHook");
            }
        }));


   這個方法的含義說明:
    這個方法的意思就是在jvm中增加一個關閉的鈎子,當jvm關閉的時候,會執行系統中已經設置的所有通過方法addShutdownHook添加的鈎子,當系統執行完這些鈎子后,jvm才會關閉。所以這些鈎子可以在jvm關閉的時候進行內存清理、對象銷毀等操作。


 

二:PostConstruct

public static void main(String[] args) {
        Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
        new SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);
    }

    /**
     * run master server
     * @PostConstruct 會在該bean依賴注入完成后,執行該方法
     */
    @PostConstruct
    public void run(){

        //init remoting server
        //初始化 netty 服務器
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(masterConfig.getListenPort());
        this.nettyRemotingServer.start();

        // register
        // 添加node/master/XXX:5678節點,並監聽事件
        this.masterRegistry.registry();
}

 

在main方法中調用了MasterServer.class 此時構造方法還無法完成初始化,需要借助@PostConstruct,通過run()方法,完成MasterServer類的

對象的初始化。


這個參考https://www.cnblogs.com/erlou96/p/13753824.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM