本文主要解析下canal server的啟動過程,希望能有所收獲。
一、序列圖
1.1 啟動
1.2 停止
二、源碼分析
整個server啟動的過程比較復雜,看圖難以理解,需要輔以文字說明。
首先程序的入口在CanalLauncher的main方法中。
2.1 加載配置文件
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
properties.load(new FileInputStream(conf));
}
從canal.properties文件中load所有的配置信息,加載到上下文中。不再贅述。
2.2 構造CanalController
根據配置文件來構造CanalController,這塊的代碼比較多,主要分為七個步驟,具體如下。
2.2.1 初始化全局參數配置
調用initGlobalConfig方法,過程如下:
- 判斷運行模式,是從spring加載還是manager加載,目前開源版本建議使用spring
- 獲取是否懶加載
- 如果是manager模式啟動,獲取manager的ip地址;如果是spring模式啟動,獲取spring xml的文件地址,加載到全部配置中
- 構造一個實例構造器CanalInstanceGenerator,我們用到的就是在spring的beanFactory中加上destination的bean,這個destination就是canal instance的名稱
這塊邏輯在CanalController的initGlobalConfig方法中。
2.2.2 初始化實例配置
這塊的邏輯是從instance.properties里面初始化實例。
private void initInstanceConfig(Properties properties) {
String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);
for (String destination : destinations) {
InstanceConfig config = parseInstanceConfig(properties, destination);
InstanceConfig oldConfig = instanceConfigs.put(destination, config);
if (oldConfig != null) {
logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
oldConfig, config });
}
}
}
從這段代碼中可以看出,我們在一個canal.properties文件中,可以配置多個destination,也就是可以配置多個instance,不同的instance以逗號隔開。這里主要看的是parseInstanceConfig()方法,里面的邏輯如下:
- 獲取啟動模式,是manager還是spring,我們這邊默認都是spring。
- 獲取懶加載字段
- 獲取spring xml配置文件地址
2.2.3 初始SocketChannel
從配置文件中獲取canal.socketChannel字段,放到全局變量中。
2.2.4 准備canal server
從配置文件中分別獲取canal.id、ip、port(對外提供socket服務的端口),獲取一個內存級的server單例,同時也獲取一個對外提供Netty服務的單例。
cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
ip = getProperty(properties, CanalConstants.CANAL_IP);
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設置自定義的instanceGenerator
canalServer = CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);
2.2.5 初始化系統目錄
從配置文件中獲取zk地址(canal.zkServers),啟動一個zk客戶端,然后初始化兩個系統目錄,分別是:
- /otter/canal/destinations
- /otter/canal/cluster
2.2.6 初始化系統監控
根據destination構造運行時監控,其實就是根據instance名來構造ServerRunningMonitor。其實就是實現了ServerRunningListener中的一些方法。
public interface ServerRunningListener {
/**
* 啟動時回調做點事情
*/
public void processStart();
/**
* 關閉時回調做點事情
*/
public void processStop();
/**
* 觸發現在輪到自己做為active,需要載入上一個active的上下文數據
*/
public void processActiveEnter();
/**
* 觸發一下當前active模式失敗
*/
public void processActiveExit();
}
然后初始化一下ServerRunningMonitor。
runningMonitor.init();
這個init方法跟蹤的結果,其實就是執行了ServerRunningListener中的processStart方法。
public void processStart() {
try {
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port);
initCid(path);
zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
首先獲取了/otter/canal/destinations/{destination}/cluster/ip:port的內容,其實就是server的地址,最后一個ip:port是個zk的臨時節點。然后訂閱一下節點事件,當節點有事件推送過來后,做一些動作。
2.2.7 初始化配置文件監控
如果canal.auto.scan配置為true(默認為true),首先定義一個InstanceAction,包含了啟動、停止、重啟instance的動作。
定義一個SpringInstanceConfigMonitor,配置定時掃描的事件為canal.auto.scan.interval,默認5s,掃描canal.conf.dir目錄下的文件,與上面定義的InstanceAction結合起來。
2.3 啟動CanalController
上面的構造方法其實就是定義一些必要的內容,真正的啟動在這個方法中。
2.3.1 創建工作節點
創建臨時節點/otter/canal/cluster/ip:port,同時啟動監聽器.
2.3.2 啟動embeded服務
embededCanalServer.start();
這個start里面,一個是將當前server的running狀態置為true,同時根據destination構建CanalInstance。
2.3.3 HA啟動
遍歷Map<String, InstanceConfig>中的InstanceConfig,如果CanalInsance還沒啟動,如果不是懶加載的話,直接HA啟動ServerRunningMonitor。
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
public synchronized void start() {
super.start();
try {
processStart();
if (zkClient != null) {
// 如果需要盡可能釋放instance資源,不需要監聽running節點,不然即使stop了這台機器,另一台機器立馬會start
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
} else {
processActiveEnter();// 沒有zk,直接啟動
}
} catch (Exception e) {
logger.error("start failed", e);
// 沒有正常啟動,重置一下狀態,避免干擾下一次start
stop();
}
}
這里面啟動的內容我們來看看。
- 首先調用super.start()把當前的running狀態置為true。
- 然后啟動zk節點的監聽(這邊的processStart是否多余了?)。
- 監聽路徑/otter/canal/destinations/{destination}/running節點的變化
zkClient.subscribeDataChanges(path, dataListener);
- 這里的dataListener是ServerRunningMonitor構造函數中定義的,就是定義一些zk節點監聽的動作。
- 如果有數據變化,如果running節點中的內容ServerRunningData發生了變化,字段active變為了false,而且address就是本機,說明本機出現了主動釋放,需要釋放運行時狀態。此時需要調用到processActiveExit方法,其實就是停止了本機的server中destination對應的instance。
- 如果節點發生了刪除動作,如果上一次active的狀態就是本機,則即時觸發一下active搶占,調用initRunning()方法,當然,如果啟動失敗,也不是立即切換,而是會等待5s,再嘗試啟動。這個啟動方法中,主要調用的是processActiveEnter()方法,來啟動了embededCanalServer.start(destination)。其實就是啟動canalInstance,這塊后續再分析。
- 其實除了監聽器,在本身的ServerRunningMonitor的start方法中,也有initRunning方法。這塊啟動canalInstance的方法,我們下一篇文章分析。
2.3.4 instance文件掃描啟動
在掃描之前,把destination和InstanceAction綁定到緩存中。
instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
首先啟動一個全局掃描,然后再對應的destination配置文件的掃描。
if (autoScan) {
instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
if (!monitor.isStart()) {
monitor.start();
}
}
}
這個start方法啟動了一個定時器,默認5s掃描一次。掃描的內容就是配置文件路徑下的內容,針對文件的新增、刪除、修改,對應InstanceAction中的start,stop和reload方法。也就是說,我們在canal運行的過程中,通過動態修改配置文件,來實現動態調整運行時參數,主要可以用來進行重復消費,位點的遷移等等。
2.3.5 網絡接口啟動
CanalServerWithNetty的啟動,首先需要啟動CanalServerWithEmbedded,主要的業務邏輯在SessionHandler中。這塊其實是暴露外部服務,給canal client進行調用。
2.4 增加關閉hook
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
logger.info("## stop the canal server");
controller.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal Server:", e);
} finally {
logger.info("## canal server is down.");
}
}
});
在server停止時,調用controller.stop()方法。
public void stop() throws Throwable {
canalServer.stop();
if (autoScan) {
for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
if (monitor.isStart()) {
monitor.stop();
}
}
}
for (ServerRunningMonitor runningMonitor : ServerRunningMonitors.getRunningMonitors().values()) {
if (runningMonitor.isStart()) {
runningMonitor.stop();
}
}
// 釋放canal的工作節點
releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
logger.info("## stop the canal server[{}:{}]", ip, port);
if (zkclientx != null) {
zkclientx.close();
}
}
主要是停止controller,server相關的monitor,instance相關的monitor,然后釋放zk節點,關閉zk連接。