1.0 canal源碼分析簡介
canal是阿里巴巴開源的mysql數據庫binlog的增量訂閱&消費組件。項目github地址為:https://github.com/alibaba/canal。
本教程是從源碼的角度來分析canal,適用於對canal有一定基礎的同學。本教程使用的版本是1.0.24,這也是筆者寫這篇教程時的最新穩定版,關於canal的基礎知識可以參考:https://github.com/alibaba/canal/wiki。
下載項目源碼
下載
- git clone https://github.com/alibaba/canal.git
切換到canal-1.0.24這個tag
- git checkout canal-1.0.24
源碼模塊划分
canal是基於maven構建的,總共分成了14個模塊,如下所示:
模塊雖多,但是每個模塊的代碼都很少。各個模塊的作用如下所示:
common模塊:主要是提供了一些公共的工具類和接口。
client模塊:canal的客戶端。核心接口為CanalConnector
example模塊:提供client模塊使用案例。
protocol模塊:client和server模塊之間的通信協議
deployer:部署模塊。通過該模塊提供的CanalLauncher來啟動canal server
server模塊:canal服務器端。核心接口為CanalServer
instance模塊:一個server有多個instance。每個instance都會模擬成一個mysql實例的slave。instance模塊有四個核心組成部分:parser模塊、sink模塊、store模塊,meta模塊。核心接口為CanalInstance
parser模塊:數據源接入,模擬slave協議和master進行交互,協議解析。parser模塊依賴於dbsync、driver模塊。
driver模塊和dbsync模塊:從這兩個模塊的artifactId(canal.parse.driver、canal.parse.dbsync),就可以看出來,這兩個模塊實際上是parser模塊的組件。事實上parser 是通過driver模塊與mysql建立連接,從而獲取到binlog。由於原始的binlog都是二進制流,需要解析成對應的binlog事件,這些binlog事件對象都定義在dbsync模塊中,dbsync 模塊來自於淘寶的tddl。
sink模塊:parser和store鏈接器,進行數據過濾,加工,分發的工作。核心接口為CanalEventSink
store模塊:數據存儲。核心接口為CanalEventStore
meta模塊:增量訂閱&消費信息管理器,核心接口為CanalMetaManager,主要用於記錄canal消費到的mysql binlog的位置,
下面再通過一張圖來說明各個模塊之間的依賴關系
通過deployer模塊,啟動一個canal-server,一個cannal-server內部包含多個instance,每個instance都會偽裝成一個mysql實例的slave。client與server之間的通信協議由protocol模塊定義。client在訂閱binlog信息時,需要傳遞一個destination參數,server會根據這個destination確定由哪一個instance為其提供服務。
在分析源碼的時候,本人也是按照模塊來划分的,基本上一個模塊對應一篇文章。
2.0 deployer模塊
canal有兩種使用方式:1、獨立部署 2、內嵌到應用中。 deployer模塊主要用於獨立部署canal server。關於這兩種方式的區別,請參見server模塊源碼分析。deployer模塊源碼目錄結構如下所示:
在獨立部署canal時,需要首先對canal的源碼進行打包
- mvn clean install -Dmaven.test.skip -Denv=release
以本教程使用1.0.24版本為例,打包后會在target目錄生成一個以下兩個文件:
其中canal.deployer-1.0.24.tar.gz就是canal的獨立部署包。解壓縮后,目錄如下所示。其中bin目錄和conf目錄(包括子目錄spring)中的所有文件,都來自於deployer模塊。
- canal
- ├── bin
- │ ├── startup.bat
- │ ├── startup.sh
- │ └── stop.sh
- ├── conf
- │ ├── canal.properties
- │ ├── example
- │ │ └── instance.properties
- │ ├── logback.xml
- │ └── spring
- │ ├── default-instance.xml
- │ ├── file-instance.xml
- │ ├── group-instance.xml
- │ ├── local-instance.xml
- │ └── memory-instance.xml
- ├── lib
- │ └── ....依賴的各種jar
- └── logs
deployer模塊主要完成以下功能:
1、讀取canal,properties配置文件
2、啟動canal server,監聽canal client的請求
3、啟動canal instance,連接mysql數據庫,偽裝成slave,解析binlog
4、在canal的運行過程中,監聽配置文件的變化
1、啟動和停止腳本
bin目錄中包含了canal的啟動和停止腳本startup.sh
和stop.sh
,當我們要啟動canal時,只需要輸入以下命令即可
- sh bin/startup.sh
在windows環境下,可以直接雙擊startup.bat。
在startup.sh腳本內,會調用com.alibaba.otter.canal.deployer.CanalLauncher類來進行啟動,這是分析Canal源碼的入口類,如下圖所示:
同時,startup.sh還會在bin目錄下生成一個canal.pid
文件,用於存儲canal的進程id。當停止canal的時候
- sh bin/stop.sh
會根據canal.pid文件中記錄的進程id,kill掉canal進程,並且刪除這個文件。
2、CannalLauncher
CanalLauncher
是整個源碼分析的入口類,代碼相當簡單。步驟是:
1、讀取canal.properties文件中的配置
2、利用讀取的配置構造一個CanalController實例,將所有的啟動操作都委派給CanalController進行處理。
3、最后注冊一個鈎子函數,在JVM停止時同時也停止canal server。
com.alibaba.otter.canal.deployer.CanalLauncher
- public class CanalLauncher {
- private static final String CLASSPATH_URL_PREFIX = "classpath:";
- private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class);
- public static void main(String[] args) throws Throwable {
- try {
- //1、讀取canal.properties文件中配置,默認讀取classpath下的canal.properties
- String conf = System.getProperty("canal.conf", "classpath:canal.properties");
- Properties properties = new Properties();
- if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
- conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
- properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
- } else {
- properties.load(new FileInputStream(conf));
- }
- //2、啟動canal,首先將properties對象傳遞給CanalController,然后調用其start方法啟動
- logger.info("## start the canal server.");
- final CanalController controller = new CanalController(properties);
- controller.start();
- logger.info("## the canal server is running now ......");
- //3、關閉canal,通過添加JVM的鈎子,JVM停止前會回調run方法,其內部調用controller.stop()方法進行停止
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- try {
- logger.info("## stop the canal server");
- controller.stop();
- } catch (Throwable e) {
- logger.warn("##something goes wrong when stopping canal Server:\n{}",
- ExceptionUtils.getFullStackTrace(e));
- } finally {
- logger.info("## canal server is down.");
- }
- }
- });
- } catch (Throwable e) {
- logger.error("## Something goes wrong when starting up the canal Server:\n{}",
- ExceptionUtils.getFullStackTrace(e));
- System.exit(0);
- }
- }
- }
可以看到,CanalLauncher實際上只是負責讀取canal.properties配置文件,然后構造CanalController對象,並通過其start和stop方法來開啟和停止canal。因此,如果說CanalLauncher是canal源碼分析的入口類,那么CanalController就是canal源碼分析的核心類。
3、CanalController
在CanalController的構造方法中,會對配置文件內容解析,初始化相關成員變量,做好canal server的啟動前的准備工作,之后在CanalLauncher中調用CanalController.start方法來啟動。
CanalController中定義的相關字段和構造方法,如下所示:
- public class CanalController {
- private static final Logger logger = LoggerFactory.getLogger(CanalController.class);
- private Long cid;
- private String ip;
- private int port;
- // 默認使用spring的方式載入
- private Map<String, InstanceConfig> instanceConfigs;
- private InstanceConfig globalInstanceConfig;
- private Map<String, CanalConfigClient> managerClients;
- // 監聽instance config的變化
- private boolean autoScan = true;
- private InstanceAction defaultAction;
- private Map<InstanceMode, InstanceConfigMonitor> instanceConfigMonitors;
- private CanalServerWithEmbedded embededCanalServer;
- private CanalServerWithNetty canalServer;
- private CanalInstanceGenerator instanceGenerator;
- private ZkClientx zkclientx;
- public CanalController(){
- this(System.getProperties());
- }
- public CanalController(final Properties properties){
- managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>() {
- public CanalConfigClient apply(String managerAddress) {
- return getManagerClient(managerAddress);
- }
- });
- //1、配置解析
- globalInstanceConfig = initGlobalConfig(properties);
- instanceConfigs = new MapMaker().makeMap();
- initInstanceConfig(properties);
- // 2、准備canal server
- cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
- ip = getProperty(properties, CanalConstants.CANAL_IP);
- port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
- embededCanalServer = CanalServerWithEmbedded.instance();
- embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設置自定義的instanceGenerator
- canalServer = CanalServerWithNetty.instance();
- canalServer.setIp(ip);
- canalServer.setPort(port);
- //3、初始化zk相關代碼
- // 處理下ip為空,默認使用hostIp暴露到zk中
- if (StringUtils.isEmpty(ip)) {
- ip = AddressUtils.getHostIp();
- }
- final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
- if (StringUtils.isNotEmpty(zkServers)) {
- zkclientx = ZkClientx.getZkClient(zkServers);
- // 初始化系統目錄
- zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
- zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
- }
- //4 CanalInstance運行狀態監控
- final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
- ServerRunningMonitors.setServerData(serverData);
- ServerRunningMonitors.setRunningMonitors(//...);
- //5、autoScan機制相關代碼
- autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
- if (autoScan) {
- defaultAction = new InstanceAction() {//....};
- instanceConfigMonitors = //....
- }
- }
- ....
- }
為了讀者能夠盡量容易的看出CanalController的構造方法中都做了什么,上面代碼片段中省略了部分代碼。這樣,我們可以很明顯的看出來, ,在CanalController構造方法中的代碼分划分為了固定的幾個處理步驟,下面按照幾個步驟的划分,逐一進行講解,並詳細的介紹CanalController中定義的各個字段的作用。
3.1 配置解析相關代碼
- // 初始化全局參數設置
- globalInstanceConfig = initGlobalConfig(properties);
- instanceConfigs = new MapMaker().makeMap();
- // 初始化instance config
- initInstanceConfig(properties);
3.1.1 globalInstanceConfig字段
表示canal instance的全局配置,類型為InstanceConfig,通過initGlobalConfig方法進行初始化。主要用於解析canal.properties
以下幾個配置項:
-
canal.instance.global.mode:確定canal instance配置加載方式,取值有manager|spring兩種方式
-
canal.instance.global.lazy:確定canal instance是否延遲初始化
-
canal.instance.global.manager.address:配置中心地址。如果canal.instance.global.mode=manager,需要提供此配置項
-
canal.instance.global.spring.xml:spring配置文件路徑。如果canal.instance.global.mode=spring,需要提供此配置項
initGlobalConfig源碼如下所示:
- private InstanceConfig initGlobalConfig(Properties properties) {
- InstanceConfig globalConfig = new InstanceConfig();
- //讀取canal.instance.global.mode
- String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME));
- if (StringUtils.isNotEmpty(modeStr)) {
- //將modelStr轉成枚舉InstanceMode,這是一個枚舉類,只有2個取值,SPRING\MANAGER,對應兩種配置方式
- globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr)));
- }
- //讀取canal.instance.global.lazy
- String lazyStr = getProperty(properties, CanalConstants.getInstancLazyKey(CanalConstants.GLOBAL_NAME));
- if (StringUtils.isNotEmpty(lazyStr)) {
- globalConfig.setLazy(Boolean.valueOf(lazyStr));
- }
- //讀取canal.instance.global.manager.address
- String managerAddress = getProperty(properties,
- CanalConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME));
- if (StringUtils.isNotEmpty(managerAddress)) {
- globalConfig.setManagerAddress(managerAddress);
- }
- //讀取canal.instance.global.spring.xml
- String springXml = getProperty(properties, CanalConstants.getInstancSpringXmlKey(CanalConstants.GLOBAL_NAME));
- if (StringUtils.isNotEmpty(springXml)) {
- globalConfig.setSpringXml(springXml);
- }
- instanceGenerator = //...初始化instanceGenerator
- return globalConfig;
- }
其中canal.instance.global.mode
用於確定canal instance的全局配置加載方式,其取值范圍有2個:spring
、manager
。我們知道一個canal server中可以啟動多個canal instance,每個instance都有各自的配置。instance的配置也可以放在本地,也可以放在遠程配置中心里。我們可以自定義每個canal instance配置文件存儲的位置,如果所有canal instance的配置都在本地或者遠程,此時我們就可以通過canal.instance.global.mode這個配置項,來統一的指定配置文件的位置,避免為每個canal instance單獨指定。
其中:
spring方式:
表示所有的canal instance的配置文件位於本地。此時,我們必須提供配置項canal.instance.global.spring.xml指定spring配置文件的路徑。canal提供了多個spring配置文件:file-instance.xml、default-instance.xml、memory-instance.xml、local-instance.xml、group-instance.xml。這么多配置文件主要是為了支持canal instance不同的工作方式。我們在稍后將會講解各個配置文件的區別。而在這些配置文件的開頭,我們無一例外的可以看到以下配置:
- <bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
- <property name="ignoreResourceNotFound" value="true" />
- <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允許system覆蓋 -->
- <property name="locationNames">
- <list>
- <value>classpath:canal.properties</value>
- <value>classpath:${canal.instance.destination:}/instance.properties</value>
- </list>
- </property>
- </bean>
這里我們可以看到,所謂通過spring方式加載canal instance配置,無非就是通過spring提供的PropertyPlaceholderConfigurer來加載canal instance的配置文件instance.properties。
這里instance.properties的文件完整路徑是${canal.instance.destination:}/instance.properties,其中${canal.instance.destination}是一個變量。這是因為我們可以在一個canal server中配置多個canal instance,每個canal instance配置文件的名稱都是instance.properties,因此我們需要通過目錄進行區分。例如我們通過配置項canal.destinations指定多個canal instance的名字
- canal.destinations= example1,example2
此時我們就要conf目錄下,新建兩個子目錄example1和example2,每個目錄下各自放置一個instance.properties。
canal在初始化時就會分別使用example1和example2來替換${canal.instance.destination:},從而分別根據example1/instance.properties和example2/instance.properties創建2個canal instance。
manager方式:
表示所有的canal instance的配置文件位於遠程配置中心,此時我們必須提供配置項 canal.instance.global.manager.address來指定遠程配置中心的地址。目前alibaba內部配置使用這種方式。開發者可以自己實現CanalConfigClient,連接各自的管理系統,完成接入。
3.1.2 instanceGenerator字段
類型為CanalInstanceGenerator
。在initGlobalConfig方法中,除了創建了globalInstanceConfig實例,同時還為字段instanceGenerator字段進行了賦值。
顧名思義,這個字段用於創建CanalInstance
實例。這是instance模塊中的類,其作用就是為canal.properties文件中canal.destinations
配置項列出的每個destination,創建一個CanalInstance實例。CanalInstanceGenerator是一個接口,定義如下所示:
- public interface CanalInstanceGenerator {
- /**
- * 通過 destination 產生特定的 {@link CanalInstance}
- */
- CanalInstance generate(String destination);
- }
針對spring和manager兩種instance配置的加載方式,CanalInstanceGenerator提供了兩個對應的實現類,如下所示:
instanceGenerator字段通過一個匿名內部類進行初始化。其內部會判斷配置的各個destination的配置加載方式,spring 或者manager。
- instanceGenerator = new CanalInstanceGenerator() {
- public CanalInstance generate(String destination) {
- //1、根據destination從instanceConfigs獲取對應的InstanceConfig對象
- InstanceConfig config = instanceConfigs.get(destination);
- if (config == null) {
- throw new CanalServerException("can't find destination:{}");
- }
- //2、如果destination對應的InstanceConfig的mode是manager方式,使用ManagerCanalInstanceGenerator
- if (config.getMode().isManager()) {
- ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator();
- instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
- return instanceGenerator.generate(destination);
- } else if (config.getMode().isSpring()) {
- //3、如果destination對應的InstanceConfig的mode是spring方式,使用SpringCanalInstanceGenerator
- SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
- synchronized (this) {
- try {
- // 設置當前正在加載的通道,加載spring查找文件時會用到該變量
- System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
- instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
- return instanceGenerator.generate(destination);
- } catch (Throwable e) {
- logger.error("generator instance failed.", e);
- throw new CanalException(e);
- } finally {
- System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
- }
- }
- } else {
- throw new UnsupportedOperationException("unknow mode :" + config.getMode());
- }
- }
- };
上述代碼中的第1步比較變態,從instanceConfigs中根據destination作為參數,獲得對應的InstanceConfig。而instanceConfigs目前還沒有被初始化,這個字段是在稍后將后將要講解的initInstanceConfig方法初始化的,不過由於這是一個引用類型,當initInstanceConfig方法被執行后,instanceConfigs字段中也就有值了。目前,我們姑且認為, instanceConfigs這個Map<String, InstanceConfig>類型的字段已經被初始化好了。
2、3兩步用於確定是instance的配置加載方式是spring還是manager,如果是spring,就使用SpringCanalInstanceGenerator創建CanalInstance實例,如果是manager,就使用ManagerCanalInstanceGenerator創建CanalInstance實例。
由於目前manager方式的源碼並未開源,因此,我們只分析SpringCanalInstanceGenerator相關代碼。
上述代碼中,首先創建了一個SpringCanalInstanceGenerator實例,然后往里面設置了一個BeanFactory。
- instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
其中config.getSpringXml()返回的就是我們在canal.properties中通過canal.instance.global.spring.xml配置項指定了spring配置文件路徑。getBeanFactory方法源碼如下所示:
- private BeanFactory getBeanFactory(String springXml) {
- ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
- return applicationContext;
- }
往SpringCanalInstanceGenerator
設置了BeanFactory之后,就可以通過其的generate方法獲得CanalInstance實例。
SpringCanalInstanceGenerator的源碼如下所示:
- public class SpringCanalInstanceGenerator implements CanalInstanceGenerator, BeanFactoryAware {
- private String defaultName = "instance";
- private BeanFactory beanFactory;
- public CanalInstance generate(String destination) {
- String beanName = destination;
- //首先判斷beanFactory是否包含以destination為id的bean
- if (!beanFactory.containsBean(beanName)) {
- beanName = defaultName;//如果沒有,設置要獲取的bean的id為instance。
- }
- //以默認的bean的id值"instance"來獲取CanalInstance實例
- return (CanalInstance) beanFactory.getBean(beanName);
- }
- public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
- this.beanFactory = beanFactory;
- }
- }
首先嘗試以傳入的參數destination來獲取CanalInstance實例,如果沒有,就以默認的bean的id值"instance"來獲取CanalInstance實例。事實上,如果你沒有修改spring配置文件,那么默認的名字就是instance。事實上,在canal提供的各個spring配置文件xxx-instance.xml中,都有類似以下配置:
- <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
- <property name="destination" value="${canal.instance.destination}" />
- <property name="eventParser">
- <ref local="eventParser" />
- </property>
- <property name="eventSink">
- <ref local="eventSink" />
- </property>
- <property name="eventStore">
- <ref local="eventStore" />
- </property>
- <property name="metaManager">
- <ref local="metaManager" />
- </property>
- <property name="alarmHandler">
- <ref local="alarmHandler" />
- </property>
- </bean>
上面的代碼片段中,我們看到的確有一個bean的名字是instance,其類型是CanalInstanceWithSpring
,這是CanalInstance接口的實現類。類似的,我們可以想到在manager配置方式下,獲取的CanalInstance實現類是CanalInstanceWithManager
。事實上,你想的沒錯,CanalInstance的類圖繼承關系如下所示:
需要注意的是,到目前為止,我們只是創建好了CanalInstanceGenerator,而CanalInstance尚未創建。在CanalController的start方法被調用時,CanalInstance才會被真正的創建,相關源碼將在稍后分析。
3.1.3 instanceConfigs字段
類型為Map<String, InstanceConfig>。前面提到初始化instanceGenerator后,當其generate方法被調用時,會嘗試從instanceConfigs根據一個destination獲取對應的InstanceConfig
,現在分析instanceConfigs的相關初始化代碼。
我們知道globalInstanceConfig定義全局的配置加載方式。如果需要把部分CanalInstance配置放於本地,另外一部分CanalIntance配置放於遠程配置中心,則只通過全局方式配置,無法達到這個要求。雖然這種情況很少見,但是為了提供最大的靈活性,canal支持每個CanalIntance自己來定義自己的加載方式,來覆蓋默認的全局配置加載方式。而每個destination對應的InstanceConfig配置就存放於instanceConfigs字段中。
舉例來說:
- //當前server上部署的instance列表
- canal.destinations=instance1,instance2
- //instance配置全局加載方式
- canal.instance.global.mode = spring
- canal.instance.global.lazy = false
- canal.instance.global.spring.xml = classpath:spring/file-instance.xml
- //instance1覆蓋全局加載方式
- canal.instance.instance1.mode = manager
- canal.instance.instance1.manager.address = 127.0.0.1:1099
- canal.instance.instance1.lazy = tue
這段配置中,設置了instance的全局加載方式為spring,instance1覆蓋了全局配置,使用manager方式加載配置。而instance2沒有覆蓋配置,因此默認使用spring加載方式。
instanceConfigs字段通過initInstanceConfig方法進行初始化
- instanceConfigs = new MapMaker().makeMap();//這里利用Google Guava框架的MapMaker創建Map實例並賦值給instanceConfigs
- // 初始化instance config
- initInstanceConfig(properties);
initInstanceConfig方法源碼如下:
- private void initInstanceConfig(Properties properties) {
- //讀取配置項canal.destinations
- String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
- //以","分割canal.destinations,得到一個數組形式的destination
- String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);
- for (String destination : destinations) {
- //為每一個destination生成一個InstanceConfig實例
- InstanceConfig config = parseInstanceConfig(properties, destination);
- //將destination對應的InstanceConfig放入instanceConfigs中
- InstanceConfig oldConfig = instanceConfigs.put(destination, config);
- if (oldConfig != null) {
- logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
- oldConfig, config });
- }
- }
- }
上面代碼片段中,首先解析canal.destinations配置項,可以理解一個destination就對應要初始化一個canal instance。針對每個destination會創建各自的InstanceConfig,最終都會放到instanceConfigs這個Map中。
各個destination對應的InstanceConfig都是通過parseInstanceConfig方法來解析
- private InstanceConfig parseInstanceConfig(Properties properties, String destination) {
- //每個destination對應的InstanceConfig都引用了全局的globalInstanceConfig
- InstanceConfig config = new InstanceConfig(globalInstanceConfig);
- //...其他幾個配置項與獲取globalInstanceConfig類似,不再贅述,唯一注意的的是配置項的key部分中的global變成傳遞進來的destination
- return config;
- }
此時我們可以看一下InstanceConfig類的源碼:
- public class InstanceConfig {
- private InstanceConfig globalConfig;
- private InstanceMode mode;
- private Boolean lazy;
- private String managerAddress;
- private String springXml;
- public InstanceConfig(){
- }
- public InstanceConfig(InstanceConfig globalConfig){
- this.globalConfig = globalConfig;
- }
- public static enum InstanceMode {
- SPRING, MANAGER;
- public boolean isSpring() {
- return this == InstanceMode.SPRING;
- }
- public boolean isManager() {
- return this == InstanceMode.MANAGER;
- }
- }
- public Boolean getLazy() {
- if (lazy == null && globalConfig != null) {
- return globalConfig.getLazy();
- } else {
- return lazy;
- }
- }
- public void setLazy(Boolean lazy) {
- this.lazy = lazy;
- }
- public InstanceMode getMode() {
- if (mode == null && globalConfig != null) {
- return globalConfig.getMode();
- } else {
- return mode;
- }
- }
- public void setMode(InstanceMode mode) {
- this.mode = mode;
- }
- public String getManagerAddress() {
- if (managerAddress == null && globalConfig != null) {
- return globalConfig.getManagerAddress();
- } else {
- return managerAddress;
- }
- }
- public void setManagerAddress(String managerAddress) {
- this.managerAddress = managerAddress;
- }
- public String getSpringXml() {
- if (springXml == null && globalConfig != null) {
- return globalConfig.getSpringXml();
- } else {
- return springXml;
- }
- }
- public void setSpringXml(String springXml) {
- this.springXml = springXml;
- }
- public String toString() {
- return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
- }
- }
可以看到,InstanceConfig類中維護了一個globalConfig字段,其類型也是InstanceConfig。而其相關get方法在執行時,會按照以下邏輯進行判斷:如果沒有自身沒有這個配置,則返回全局配置,如果有,則返回自身的配置。通過這種方式實現對全局配置的覆蓋。
3.2 准備canal server相關代碼
- cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
- ip = getProperty(properties, CanalConstants.CANAL_IP);
- port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
- embededCanalServer = CanalServerWithEmbedded.instance();
- embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設置自定義的instanceGenerator
- canalServer = CanalServerWithNetty.instance();
- canalServer.setIp(ip);
- canalServer.setPort(port);
上述代碼中,首先解析了cid、ip、port字段,其中:
cid:Long,對應canal.properties文件中的canal.id,目前無實際用途
ip:String,對應canal.properties文件中的canal.ip,canal server監聽的ip。
port:int,對應canal.properties文件中的canal.port,canal server監聽的端口
之后分別為以下兩個字段賦值:
embededCanalServer:類型為CanalServerWithEmbedded
canalServer:類型為CanalServerWithNetty
CanalServerWithEmbedded
和 CanalServerWithNetty
都實現了CanalServer接口,且都實現了單例模式,通過靜態方法instance獲取實例。
關於這兩種類型的實現,canal官方文檔有以下描述:
說白了,就是我們可以不必獨立部署canal server。在應用直接使用CanalServerWithEmbedded直連mysql數據庫。如果覺得自己的技術hold不住相關代碼,就獨立部署一個canal server,使用canal提供的客戶端,連接canal server獲取binlog解析后數據。而CanalServerWithNetty是在CanalServerWithEmbedded的基礎上做的一層封裝,用於與客戶端通信。
在獨立部署canal server時,Canal客戶端發送的所有請求都交給CanalServerWithNetty處理解析,解析完成之后委派給了交給CanalServerWithEmbedded進行處理。因此CanalServerWithNetty就是一個馬甲而已。CanalServerWithEmbedded才是核心。
因此,在上述代碼中,我們看到,用於生成CanalInstance實例的instanceGenerator被設置到了CanalServerWithEmbedded中,而ip和port被設置到CanalServerWithNetty中。
關於CanalServerWithNetty如何將客戶端的請求委派給CanalServerWithEmbedded進行處理,我們將在server模塊源碼分析中進行講解。
3.3 初始化zk相關代碼
- //讀取canal.properties中的配置項canal.zkServers,如果沒有這個配置,則表示項目不使用zk
- final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
- if (StringUtils.isNotEmpty(zkServers)) {
- //創建zk實例
- zkclientx = ZkClientx.getZkClient(zkServers);
- // 初始化系統目錄
- //destination列表,路徑為/otter/canal/destinations
- zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
- //整個canal server的集群列表,路徑為/otter/canal/cluster
- zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
- }
canal支持利用了zk來完成HA機制、以及將當前消費到到的mysql的binlog位置記錄到zk中。ZkClientx是canal對ZkClient進行了一層簡單的封裝。
顯然,當我們沒有配置canal.zkServers,那么zkclientx不會被初始化。
關於Canal如何利用ZK做HA,我們將在稍后的代碼中進行分。而利用zk記錄binlog的消費進度,將在之后的章節進行分析。
3.4 CanalInstance運行狀態監控相關代碼
由於這段代碼比較長且惡心,這里筆者暫時對部分代碼進行省略,以便讀者看清楚整各脈絡
- final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
- ServerRunningMonitors.setServerData(serverData);
- ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
- public ServerRunningMonitor apply(final String destination) {
- ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
- runningMonitor.setDestination(destination);
- runningMonitor.setListener(new ServerRunningListener() {....});//省略ServerRunningListener的具體實現
- if (zkclientx != null) {
- runningMonitor.setZkClient(zkclientx);
- }
- // 觸發創建一下cid節點
- runningMonitor.init();
- return runningMonitor;
- }
- }));
上述代碼中,ServerRunningMonitors
是ServerRunningMonitor對象的容器,而ServerRunningMonitor
用於監控CanalInstance。
canal會為每一個destination創建一個CanalInstance,每個CanalInstance都會由一個ServerRunningMonitor來進行監控。而ServerRunningMonitor統一由ServerRunningMonitors進行管理。
除了CanalInstance需要監控,CanalServer本身也需要監控。因此我們在代碼一開始,就看到往ServerRunningMonitors設置了一個ServerRunningData對象,封裝了canal server監聽的ip和端口等信息。
ServerRunningMonitors源碼如下所示:
- public class ServerRunningMonitors {
- private static ServerRunningData serverData;
- private static Map runningMonitors; // <String,ServerRunningMonitor>
- public static ServerRunningData getServerData() {
- return serverData;
- }
- public static Map<String, ServerRunningMonitor> getRunningMonitors() {
- return runningMonitors;
- }
- public static ServerRunningMonitor getRunningMonitor(String destination) {
- return (ServerRunningMonitor) runningMonitors.get(destination);
- }
- public static void setServerData(ServerRunningData serverData) {
- ServerRunningMonitors.serverData = serverData;
- }
- public static void setRunningMonitors(Map runningMonitors) {
- ServerRunningMonitors.runningMonitors = runningMonitors;
- }
- }
ServerRunningMonitors的setRunningMonitors方法接收的參數是一個Map,其中Map的key是destination,value是ServerRunningMonitor,也就是說針對每一個destination都有一個ServerRunningMonitor來監控。
上述代碼中,在往ServerRunningMonitors設置Map時,是通過MigrateMap.makeComputingMap方法來創建的,其接受一個Function類型的參數,這是guava中定義的接口,其聲明了apply抽象方法。其工作原理可以通過下面代碼片段進行介紹:
- Map<String, User> map = MigrateMap.makeComputingMap(new Function<String, User>() {
- @Override
- public User apply(String name) {
- return new User(name);
- }
- });
- User user = map.get("tianshouzhi");//第一次獲取時會創建
- assert user != null;
- assert user == map.get("tianshouzhi");//之后獲取,總是返回之前已經創建的對象
這段代碼中,我們利用MigrateMap.makeComputingMap創建了一個Map,其中key為String類型,value為User類型。當我們調用map.get("tianshouzhi")方法,最開始這個Map中並沒有任何key/value的,於是其就會回調Function的apply方法,利用參數"tianshouzhi"創建一個User對象並返回。之后當我們再以"tianshouzhi"為key從Map中獲取User對象時,會直接將前面創建的對象返回。不會回調apply方法,也就是說,只有在第一次嘗試獲取時,才會回調apply方法。
而在上述代碼中,實際上就利用了這個特性,只不過是根據destination獲取ServerRunningMonitor對象,如果不存在就創建。
在創建ServerRunningMonitor對象時,首先根據ServerRunningData創建ServerRunningMonitor實例,之后設置了destination和ServerRunningListener
對象,接着,判斷如果zkClientx字段如果不為空,也設置到ServerRunningMonitor中,最后調用init方法進行初始化。
- ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
- runningMonitor.setDestination(destination);
- runningMonitor.setListener(new ServerRunningListener(){...})//省略ServerRunningListener具體代碼
- if (zkclientx != null) {
- runningMonitor.setZkClient(zkclientx);
- }
- // 觸發創建一下cid節點
- runningMonitor.init();
- return runningMonitor;
ServerRunningListener的實現如下:
- new ServerRunningListener() {
- /*內部調用了embededCanalServer的start(destination)方法。
- 此處需要划重點,說明每個destination對應的CanalInstance是通過embededCanalServer的start方法啟動的,
- 這與我們之前分析將instanceGenerator設置到embededCanalServer中可以對應上。
- embededCanalServer負責調用instanceGenerator生成CanalInstance實例,並負責其啟動。*/
- public void processActiveEnter() {
- try {
- MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
- embededCanalServer.start(destination);
- } finally {
- MDC.remove(CanalConstants.MDC_DESTINATION);
- }
- }
- //內部調用embededCanalServer的stop(destination)方法。與上start方法類似,只不過是停止CanalInstance。
- public void processActiveExit() {
- try {
- MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
- embededCanalServer.stop(destination);
- } finally {
- MDC.remove(CanalConstants.MDC_DESTINATION);
- }
- }
- /*處理存在zk的情況下,在Canalinstance啟動之前,在zk中創建節點。
- 路徑為:/otter/canal/destinations/{0}/cluster/{1},其0會被destination替換,1會被ip:port替換。
- 此方法會在processActiveEnter()之前被調用*/
- public void processStart() {
- try {
- if (zkclientx != null) {
- final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port);
- initCid(path);
- zkclientx.subscribeStateChanges(new IZkStateListener() {
- public void handleStateChanged(KeeperState state) throws Exception {
- }
- public void handleNewSession() throws Exception {
- initCid(path);
- }
- });
- }
- } finally {
- MDC.remove(CanalConstants.MDC_DESTINATION);
- }
- }
- //處理存在zk的情況下,在Canalinstance停止前,釋放zk節點,路徑為/otter/canal/destinations/{0}/cluster/{1},
- //其0會被destination替換,1會被ip:port替換。此方法會在processActiveExit()之前被調用
- public void processStop() {
- try {
- MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
- if (zkclientx != null) {
- final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port);
- releaseCid(path);
- }
- } finally {
- MDC.remove(CanalConstants.MDC_DESTINATION);
- }
- }
- }
上述代碼中,我們可以看到啟動一個CanalInstance實際上是在ServerRunningListener的processActiveEnter方法中,通過調用embededCanalServer的start(destination)方法進行的,對於停止也是類似。
那么ServerRunningListener中的相關方法到底是在哪里回調的呢?我們可以在ServerRunningMonitor的start和stop方法中找到答案,這里只列出start方法。
- public class ServerRunningMonitor extends AbstractCanalLifeCycle {
- ...
- public void start() {
- super.start();
- processStart();//其內部會調用ServerRunningListener的processStart()方法
- if (zkClient != null) {//存在zk,以HA方式啟動
- // 如果需要盡可能釋放instance資源,不需要監聽running節點,不然即使stop了這台機器,另一台機器立馬會start
- String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
- zkClient.subscribeDataChanges(path, dataListener);
- initRunning();
- } else {//沒有zk,直接啟動
- processActiveEnter();
- }
- }
- //...stop方法邏輯類似,相關代碼省略
- }
當ServerRunningMonitor的start方法被調用時,其首先會直接調用processStart方法,這個方法內部直接調了ServerRunningListener的processStart()方法,源碼如下所示。通過前面的分析,我們已經知道在存在zkClient!=null的情況,會往zk中創建一個節點。
- private void processStart() {
- if (listener != null) {
- try {
- listener.processStart();
- } catch (Exception e) {
- logger.error("processStart failed", e);
- }
- }
- }
之后會判斷是否存在zkClient,如果不存在,則以本地方式啟動,如果存在,則以HA方式啟動。我們知道,canal server可以部署成兩種方式:集群方式或者獨立部署。其中集群方式是利用zk來做HA,獨立部署則可以直接進行啟動。我們先來看比較簡單的直接啟動。
直接啟動:
不存在zk的情況下,會進入else代碼塊,調用processActiveEnter方法,其內部調用了listener的processActiveEnter,啟動相應destination對應的CanalInstance。
- private void processActiveEnter() {
- if (listener != null) {
- try {
- listener.processActiveEnter();
- } catch (Exception e) {
- logger.error("processActiveEnter failed", e);
- }
- }
- }
HA方式啟動:
存在zk,說明canal server可能做了集群,因為canal就是利用zk來做HA的。首先根據destination構造一個zk的節點路徑,然后進行監聽。
- /*構建臨時節點的路徑:/otter/canal/destinations/{0}/running,其中占位符{0}會被destination替換。
- 在集群模式下,可能會有多個canal server共同處理同一個destination,
- 在某一時刻,只能由一個canal server進行處理,處理這個destination的canal server進入running狀態,其他canal server進入standby狀態。*/
- String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
- /*對destination對應的running節點進行監聽,一旦發生了變化,則說明可能其他處理相同destination的canal server可能出現了異常,
- 此時需要嘗試自己進入running狀態。*/
- zkClient.subscribeDataChanges(path, dataListener);
上述只是監聽代碼,之后嘗試調用initRunning方法通過HA的方式來啟動CanalInstance。
- private void initRunning() {
- if (!isStart()) {
- return;
- }
- //構建臨時節點的路徑:/otter/canal/destinations/{0}/running,其中占位符{0}會被destination替換
- String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
- // 序列化
- //構建臨時節點的數據,標記當前destination由哪一個canal server處理
- byte[] bytes = JsonUtils.marshalToByte(serverData);
- try {
- mutex.set(false);
- //嘗試創建臨時節點。如果節點已經存在,說明是其他的canal server已經啟動了這個canal instance。
- //此時會拋出ZkNodeExistsException,進入catch代碼塊。
- zkClient.create(path, bytes, CreateMode.EPHEMERAL);
- activeData = serverData;
- processActiveEnter();//如果創建成功,觸發一下事件,內部調用ServerRunningListener的processActiveEnter方法
- mutex.set(true);
- } catch (ZkNodeExistsException e) {
- //創建節點失敗,則根據path從zk中獲取當前是哪一個canal server創建了當前canal instance的相關信息。
- //第二個參數true,表示的是,如果這個path不存在,則返回null。
- bytes = zkClient.readData(path, true);
- if (bytes == null) {// 如果不存在節點,立即嘗試一次
- initRunning();
- } else {
- //如果的確存在,則將創建該canal instance實例信息存入activeData中。
- activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
- }
- } catch (ZkNoNodeException e) {//如果/otter/canal/destinations/{0}/節點不存在,進行創建其中占位符{0}會被destination替換
- zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true);
- // 嘗試創建父節點
- initRunning();
- }
- }
可以看到,initRunning方法內部只有在嘗試在zk中創建節點成功后,才會去調用listener的processActiveEnter方法來真正啟動destination對應的canal instance,這是canal HA方式啟動的核心。canal官方文檔中介紹了CanalServer
HA機制啟動的流程,如下:
事實上,這個說明的前兩步,都是在initRunning方法中實現的。從上面的代碼中,我們可以看出,在HA機啟動的情況下,initRunning方法不一定能走到processActiveEnter()方法,因為創建臨時節點可能會出錯。
此外,根據官方文檔說明,如果出錯,那么當前canal instance則進入standBy狀態。也就是另外一個canal instance出現異常時,當前canal instance頂上去。那么相關源碼在什么地方呢?在HA方式啟動最開始的2行代碼的監聽邏輯中:
- String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
- zkClient.subscribeDataChanges(path, dataListener);
其中dataListener類型是IZkDataListener
,這是zkclient客戶端提供的接口,定義如下:
- public interface IZkDataListener {
- public void handleDataChange(String dataPath, Object data) throws Exception;
- public void handleDataDeleted(String dataPath) throws Exception;
- }
當zk節點中的數據發生變更時,會自動回調這兩個方法,很明顯,一個是用於處理節點數據發生變化,一個是用於處理節點數據被刪除。
而dataListener是在ServerRunningMonitor的構造方法中初始化的,如下:
- public ServerRunningMonitor(){
- // 創建父節點
- dataListener = new IZkDataListener() {
- //!!!目前看來,好像並沒有存在修改running節點數據的代碼,為什么這個方法不是空實現?
- public void handleDataChange(String dataPath, Object data) throws Exception {
- MDC.put("destination", destination);
- ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
- if (!isMine(runningData.getAddress())) {
- mutex.set(false);
- }
- if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說明出現了主動釋放的操作,並且本機之前是active
- release = true;
- releaseRunning();// 徹底釋放mainstem }
- activeData = (ServerRunningData) runningData;
- }
- //當其他canal instance出現異常,臨時節點數據被刪除時,會自動回調這個方法,此時當前canal instance要頂上去
- public void handleDataDeleted(String dataPath) throws Exception {
- MDC.put("destination", destination);
- mutex.set(false);
- if (!release && activeData != null && isMine(activeData.getAddress())) {
- // 如果上一次active的狀態就是本機,則即時觸發一下active搶占
- initRunning();
- } else {
- // 否則就是等待delayTime,避免因網絡瞬端或者zk異常,導致出現頻繁的切換操作
- delayExector.schedule(new Runnable() {
- public void run() {
- initRunning();//嘗試自己進入running狀態
- }
- }, delayTime, TimeUnit.SECONDS);
- }
- }
- };
- }
那么現在問題來了?ServerRunningMonitor的start方法又是在哪里被調用的, 這個方法被調用了,才能真正的啟動canal instance。這部分代碼我們放到后面的CanalController中的start方法進行講解。
下面分析最后一部分代碼,autoScan機制相關代碼。
3.5 autoScan機制相關代碼
關於autoscan,官方文檔有以下介紹:
結合autoscan機制的相關源碼:
- //
- autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
- if (autoScan) {
- defaultAction = new InstanceAction() {//....};
- instanceConfigMonitors = //....
- }
可以看到,autoScan是否需要自動掃描的開關,只有當autoScan為true時,才會初始化defaultAction字段和instanceConfigMonitors字段。其中:
其中:
defaultAction:其作用是如果配置發生了變更,默認應該采取什么樣的操作。其實現了InstanceAction
接口定義的三個抽象方法:start、stop和reload。當新增一個destination配置時,需要調用start方法來啟動;當移除一個destination配置時,需要調用stop方法來停止;當某個destination配置發生變更時,需要調用reload方法來進行重啟。
instanceConfigMonitors:類型為Map<InstanceMode, InstanceConfigMonitor>。defaultAction字段只是定義了配置發生變化默認應該采取的操作,那么總該有一個類來監聽配置是否發生了變化,這就是InstanceConfigMonitor的作用。官方文檔中,只提到了對canal.conf.dir配置項指定的目錄的監聽,這指的是通過spring方式加載配置。顯然的,通過manager方式加載配置,配置中心的內容也是可能發生變化的,也需要進行監聽。此時可以理解為什么instanceConfigMonitors的類型是一個Map,key為InstanceMode,就是為了對這兩種方式的配置加載方式都進行監聽。
defaultAction字段初始化源碼如下所示:
- defaultAction = new InstanceAction() {
- public void start(String destination) {
- InstanceConfig config = instanceConfigs.get(destination);
- if (config == null) {
- // 重新讀取一下instance config
- config = parseInstanceConfig(properties, destination);
- instanceConfigs.put(destination, config);
- }
- if (!embededCanalServer.isStart(destination)) {
- // HA機制啟動
- ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
- if (!config.getLazy() && !runningMonitor.isStart()) {
- runningMonitor.start();
- }
- }
- }
- public void stop(String destination) {
- // 此處的stop,代表強制退出,非HA機制,所以需要退出HA的monitor和配置信息
- InstanceConfig config = instanceConfigs.remove(destination);
- if (config != null) {
- embededCanalServer.stop(destination);
- ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
- if (runningMonitor.isStart()) {
- runningMonitor.stop();
- }
- }
- }
- public void reload(String destination) {
- // 目前任何配置變化,直接重啟,簡單處理
- stop(destination);
- start(destination);
- }
- };
instanceConfigMonitors字段初始化源碼如下所示:
- instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
- public InstanceConfigMonitor apply(InstanceMode mode) {
- int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
- if (mode.isSpring()) {//如果加載方式是spring,返回SpringInstanceConfigMonitor
- SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
- monitor.setScanIntervalInSecond(scanInterval);
- monitor.setDefaultAction(defaultAction);
- // 設置conf目錄,默認是user.dir + conf目錄組成
- String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
- if (StringUtils.isEmpty(rootDir)) {
- rootDir = "../conf";
- }
- if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
- monitor.setRootConf(rootDir);
- } else {
- // eclipse debug模式
- monitor.setRootConf("src/main/resources/");
- }
- return monitor;
- } else if (mode.isManager()) {//如果加載方式是manager,返回ManagerInstanceConfigMonitor
- return new ManagerInstanceConfigMonitor();
- } else {
- throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
- }
- }
- });
可以看到instanceConfigMonitors也是根據mode屬性,來采取不同的監控實現類SpringInstanceConfigMonitor
或者ManagerInstanceConfigMonitor
,二者都實現了InstanceConfigMonitor
接口。
- public interface InstanceConfigMonitor extends CanalLifeCycle {
- void register(String destination, InstanceAction action);
- void unregister(String destination);
- }
當需要對一個destination進行監聽時,調用register方法
當取消對一個destination監聽時,調用unregister方法。
事實上,unregister方法在canal 內部並沒有有任何地方被調用,也就是說,某個destination如果開啟了autoScan=true,那么你是無法在運行時停止對其進行監控的。如果要停止,你可以選擇將對應的目錄刪除。
InstanceConfigMonitor本身並不知道哪些canal instance需要進行監控,因為不同的canal instance,有的可能設置autoScan為true,另外一些可能設置為false。
在CanalConroller的start方法中,對於autoScan為true的destination,會調用InstanceConfigMonitor的register方法進行注冊,此時InstanceConfigMonitor才會真正的對這個destination配置進行掃描監聽。對於那些autoScan為false的destination,則不會進行監聽。
目前SpringInstanceConfigMonitor對這兩個方法都進行了實現,而ManagerInstanceConfigMonitor目前對這兩個方法實現的都是空,需要開發者自己來實現。
在實現ManagerInstanceConfigMonitor時,可以參考SpringInstanceConfigMonitor。
此處不打算再繼續進行分析SpringInstanceConfigMonitor的源碼,因為邏輯很簡單,感興趣的讀者可以自行查看SpringInstanceConfigMonitor 的scan方法,內部在什么情況下會回調defualtAction的start、stop、reload方法 。
4 CanalController的start方法
而ServerRunningMonitor的start方法,是在CanalController中的start方法中被調用的,CanalController中的start方法是在CanalLauncher中被調用的。
com.alibaba.otter.canal.deployer.CanalController#start
- public void start() throws Throwable {
- logger.info("## start the canal server[{}:{}]", ip, port);
- // 創建整個canal的工作節點 :/otter/canal/cluster/{0}
- final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port);
- initCid(path);
- if (zkclientx != null) {
- this.zkclientx.subscribeStateChanges(new IZkStateListener() {
- public void handleStateChanged(KeeperState state) throws Exception {
- }
- public void handleNewSession() throws Exception {
- initCid(path);
- }
- });
- }
- // 優先啟動embeded服務
- embededCanalServer.start();
- //啟動不是lazy模式的CanalInstance,通過迭代instanceConfigs,根據destination獲取對應的ServerRunningMonitor,然后逐一啟動
- for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
- final String destination = entry.getKey();
- InstanceConfig config = entry.getValue();
- // 如果destination對應的CanalInstance沒有啟動,則進行啟動
- if (!embededCanalServer.isStart(destination)) {
- ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
- //如果不是lazy,lazy模式需要等到第一次有客戶端請求才會啟動
- if (!config.getLazy() && !runningMonitor.isStart()) {
- runningMonitor.start();
- }
- }
- if (autoScan) {
- instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
- }
- }
- if (autoScan) {//啟動配置文件自動檢測機制
- instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
- for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
- if (!monitor.isStart()) {
- monitor.start();//啟動monitor
- }
- }
- }
- // 啟動網絡接口,監聽客戶端請求
- canalServer.start();
- }
5 總結
deployer模塊的主要作用:
1、讀取canal.properties,確定canal instance的配置加載方式
2、確定canal instance的啟動方式:獨立啟動或者集群方式啟動
3、監聽canal instance的配置的變化,動態停止、啟動或新增
4、啟動canal server,監聽客戶端請求