【Canal源碼分析】Canal Server的啟動和停止過程


本文主要解析下canal server的啟動過程,希望能有所收獲。

一、序列圖

1.1 啟動

1.2 停止

二、源碼分析

整個server啟動的過程比較復雜,看圖難以理解,需要輔以文字說明。

首先程序的入口在CanalLauncher的main方法中。

2.1 加載配置文件

String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
    conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
    properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
    properties.load(new FileInputStream(conf));
}

從canal.properties文件中load所有的配置信息,加載到上下文中。不再贅述。

2.2 構造CanalController

根據配置文件來構造CanalController,這塊的代碼比較多,主要分為七個步驟,具體如下。

2.2.1 初始化全局參數配置

調用initGlobalConfig方法,過程如下:

  • 判斷運行模式,是從spring加載還是manager加載,目前開源版本建議使用spring
  • 獲取是否懶加載
  • 如果是manager模式啟動,獲取manager的ip地址;如果是spring模式啟動,獲取spring xml的文件地址,加載到全部配置中
  • 構造一個實例構造器CanalInstanceGenerator,我們用到的就是在spring的beanFactory中加上destination的bean,這個destination就是canal instance的名稱

這塊邏輯在CanalController的initGlobalConfig方法中。

2.2.2 初始化實例配置

這塊的邏輯是從instance.properties里面初始化實例。

private void initInstanceConfig(Properties properties) {
    String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
    String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);

    for (String destination : destinations) {
        InstanceConfig config = parseInstanceConfig(properties, destination);
        InstanceConfig oldConfig = instanceConfigs.put(destination, config);

        if (oldConfig != null) {
            logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
                    oldConfig, config });
        }
    }
}

從這段代碼中可以看出,我們在一個canal.properties文件中,可以配置多個destination,也就是可以配置多個instance,不同的instance以逗號隔開。這里主要看的是parseInstanceConfig()方法,里面的邏輯如下:

  • 獲取啟動模式,是manager還是spring,我們這邊默認都是spring。
  • 獲取懶加載字段
  • 獲取spring xml配置文件地址

2.2.3 初始SocketChannel

從配置文件中獲取canal.socketChannel字段,放到全局變量中。

2.2.4 准備canal server

從配置文件中分別獲取canal.id、ip、port(對外提供socket服務的端口),獲取一個內存級的server單例,同時也獲取一個對外提供Netty服務的單例。

cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
ip = getProperty(properties, CanalConstants.CANAL_IP);
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設置自定義的instanceGenerator
canalServer = CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);

2.2.5 初始化系統目錄

從配置文件中獲取zk地址(canal.zkServers),啟動一個zk客戶端,然后初始化兩個系統目錄,分別是:

  • /otter/canal/destinations
  • /otter/canal/cluster

2.2.6 初始化系統監控

根據destination構造運行時監控,其實就是根據instance名來構造ServerRunningMonitor。其實就是實現了ServerRunningListener中的一些方法。

public interface ServerRunningListener {

    /**
     * 啟動時回調做點事情
     */
    public void processStart();

    /**
     * 關閉時回調做點事情
     */
    public void processStop();

    /**
     * 觸發現在輪到自己做為active,需要載入上一個active的上下文數據
     */
    public void processActiveEnter();

    /**
     * 觸發一下當前active模式失敗
     */
    public void processActiveExit();

}

然后初始化一下ServerRunningMonitor。

runningMonitor.init();

這個init方法跟蹤的結果,其實就是執行了ServerRunningListener中的processStart方法。

public void processStart() {
    try {
        if (zkclientx != null) {
            final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port);
            initCid(path);
            zkclientx.subscribeStateChanges(new IZkStateListener() {

                public void handleStateChanged(KeeperState state) throws Exception {

                }

                public void handleNewSession() throws Exception {
                    initCid(path);
                }

                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    logger.error("failed to connect to zookeeper", error);
                }
            });
        }
    } finally {
        MDC.remove(CanalConstants.MDC_DESTINATION);
    }
}

首先獲取了/otter/canal/destinations/{destination}/cluster/ip:port的內容,其實就是server的地址,最后一個ip:port是個zk的臨時節點。然后訂閱一下節點事件,當節點有事件推送過來后,做一些動作。

2.2.7 初始化配置文件監控

如果canal.auto.scan配置為true(默認為true),首先定義一個InstanceAction,包含了啟動、停止、重啟instance的動作。

定義一個SpringInstanceConfigMonitor,配置定時掃描的事件為canal.auto.scan.interval,默認5s,掃描canal.conf.dir目錄下的文件,與上面定義的InstanceAction結合起來。

2.3 啟動CanalController

上面的構造方法其實就是定義一些必要的內容,真正的啟動在這個方法中。

2.3.1 創建工作節點

創建臨時節點/otter/canal/cluster/ip:port,同時啟動監聽器.

2.3.2 啟動embeded服務

embededCanalServer.start();

這個start里面,一個是將當前server的running狀態置為true,同時根據destination構建CanalInstance。

2.3.3 HA啟動

遍歷Map<String, InstanceConfig>中的InstanceConfig,如果CanalInsance還沒啟動,如果不是懶加載的話,直接HA啟動ServerRunningMonitor。

ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
    runningMonitor.start();
}

public synchronized void start() {
    super.start();
    try {
        processStart();
        if (zkClient != null) {
            // 如果需要盡可能釋放instance資源,不需要監聽running節點,不然即使stop了這台機器,另一台機器立馬會start
            String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
            zkClient.subscribeDataChanges(path, dataListener);

            initRunning();
        } else {
            processActiveEnter();// 沒有zk,直接啟動
        }
    } catch (Exception e) {
        logger.error("start failed", e);
        // 沒有正常啟動,重置一下狀態,避免干擾下一次start
        stop();
    }

}

這里面啟動的內容我們來看看。

  • 首先調用super.start()把當前的running狀態置為true。
  • 然后啟動zk節點的監聽(這邊的processStart是否多余了?)。
  • 監聽路徑/otter/canal/destinations/{destination}/running節點的變化
zkClient.subscribeDataChanges(path, dataListener);
  • 這里的dataListener是ServerRunningMonitor構造函數中定義的,就是定義一些zk節點監聽的動作。
    • 如果有數據變化,如果running節點中的內容ServerRunningData發生了變化,字段active變為了false,而且address就是本機,說明本機出現了主動釋放,需要釋放運行時狀態。此時需要調用到processActiveExit方法,其實就是停止了本機的server中destination對應的instance。
    • 如果節點發生了刪除動作,如果上一次active的狀態就是本機,則即時觸發一下active搶占,調用initRunning()方法,當然,如果啟動失敗,也不是立即切換,而是會等待5s,再嘗試啟動。這個啟動方法中,主要調用的是processActiveEnter()方法,來啟動了embededCanalServer.start(destination)。其實就是啟動canalInstance,這塊后續再分析。
  • 其實除了監聽器,在本身的ServerRunningMonitor的start方法中,也有initRunning方法。這塊啟動canalInstance的方法,我們下一篇文章分析。

2.3.4 instance文件掃描啟動

在掃描之前,把destination和InstanceAction綁定到緩存中。

instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);

首先啟動一個全局掃描,然后再對應的destination配置文件的掃描。

if (autoScan) {
    instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
    for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
        if (!monitor.isStart()) {
            monitor.start();
        }
    }
}

這個start方法啟動了一個定時器,默認5s掃描一次。掃描的內容就是配置文件路徑下的內容,針對文件的新增、刪除、修改,對應InstanceAction中的start,stop和reload方法。也就是說,我們在canal運行的過程中,通過動態修改配置文件,來實現動態調整運行時參數,主要可以用來進行重復消費,位點的遷移等等。

2.3.5 網絡接口啟動

CanalServerWithNetty的啟動,首先需要啟動CanalServerWithEmbedded,主要的業務邏輯在SessionHandler中。這塊其實是暴露外部服務,給canal client進行調用。

2.4 增加關閉hook

Runtime.getRuntime().addShutdownHook(new Thread() {

    public void run() {
        try {
            logger.info("## stop the canal server");
            controller.stop();
        } catch (Throwable e) {
            logger.warn("##something goes wrong when stopping canal Server:", e);
        } finally {
            logger.info("## canal server is down.");
        }
    }

});

在server停止時,調用controller.stop()方法。

public void stop() throws Throwable {
    canalServer.stop();

    if (autoScan) {
        for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
            if (monitor.isStart()) {
                monitor.stop();
            }
        }
    }

    for (ServerRunningMonitor runningMonitor : ServerRunningMonitors.getRunningMonitors().values()) {
        if (runningMonitor.isStart()) {
            runningMonitor.stop();
        }
    }

    // 釋放canal的工作節點
    releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
    logger.info("## stop the canal server[{}:{}]", ip, port);
        
    if (zkclientx != null) {
        zkclientx.close();
    }
}

主要是停止controller,server相關的monitor,instance相關的monitor,然后釋放zk節點,關閉zk連接。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM