canal源碼分析簡介-1


1.0 canal源碼分析簡介

canal是阿里巴巴開源的mysql數據庫binlog的增量訂閱&消費組件。項目github地址為:https://github.com/alibaba/canal

本教程是從源碼的角度來分析canal,適用於對canal有一定基礎的同學。本教程使用的版本是1.0.24,這也是筆者寫這篇教程時的最新穩定版,關於canal的基礎知識可以參考:https://github.com/alibaba/canal/wiki

下載項目源碼

下載

  1. git clone https://github.com/alibaba/canal.git

 

切換到canal-1.0.24這個tag

  1. git checkout canal-1.0.24

源碼模塊划分

canal是基於maven構建的,總共分成了14個模塊,如下所示:

 

 

模塊雖多,但是每個模塊的代碼都很少。各個模塊的作用如下所示:

common模塊:主要是提供了一些公共的工具類和接口。

client模塊:canal的客戶端。核心接口為CanalConnector

example模塊:提供client模塊使用案例。

protocol模塊:client和server模塊之間的通信協議

deployer:部署模塊。通過該模塊提供的CanalLauncher來啟動canal server

server模塊:canal服務器端。核心接口為CanalServer

instance模塊:一個server有多個instance。每個instance都會模擬成一個mysql實例的slave。instance模塊有四個核心組成部分:parser模塊、sink模塊、store模塊,meta模塊。核心接口為CanalInstance

parser模塊:數據源接入,模擬slave協議和master進行交互,協議解析。parser模塊依賴於dbsync、driver模塊。

driver模塊和dbsync模塊:從這兩個模塊的artifactId(canal.parse.driver、canal.parse.dbsync),就可以看出來,這兩個模塊實際上是parser模塊的組件。事實上parser 是通過driver模塊與mysql建立連接,從而獲取到binlog。由於原始的binlog都是二進制流,需要解析成對應的binlog事件,這些binlog事件對象都定義在dbsync模塊中,dbsync 模塊來自於淘寶的tddl。

sink模塊:parser和store鏈接器,進行數據過濾,加工,分發的工作。核心接口為CanalEventSink

store模塊:數據存儲。核心接口為CanalEventStore

meta模塊:增量訂閱&消費信息管理器,核心接口為CanalMetaManager,主要用於記錄canal消費到的mysql binlog的位置,

 

下面再通過一張圖來說明各個模塊之間的依賴關系

 

 

 

       通過deployer模塊,啟動一個canal-server,一個cannal-server內部包含多個instance,每個instance都會偽裝成一個mysql實例的slave。client與server之間的通信協議由protocol模塊定義。client在訂閱binlog信息時,需要傳遞一個destination參數,server會根據這個destination確定由哪一個instance為其提供服務。

在分析源碼的時候,本人也是按照模塊來划分的,基本上一個模塊對應一篇文章。

 

2.0 deployer模塊

    canal有兩種使用方式:1、獨立部署 2、內嵌到應用中。 deployer模塊主要用於獨立部署canal server。關於這兩種方式的區別,請參見server模塊源碼分析。deployer模塊源碼目錄結構如下所示:

 

 

在獨立部署canal時,需要首先對canal的源碼進行打包

  1. mvn clean install -Dmaven.test.skip -Denv=release

   以本教程使用1.0.24版本為例,打包后會在target目錄生成一個以下兩個文件:

Image.png

其中canal.deployer-1.0.24.tar.gz就是canal的獨立部署包。解壓縮后,目錄如下所示。其中bin目錄和conf目錄(包括子目錄spring)中的所有文件,都來自於deployer模塊。

  1. canal
  2. ├── bin
  3.    ├── startup.bat
  4.    ├── startup.sh
  5.    └── stop.sh
  6. ├── conf
  7.    ├── canal.properties
  8.    ├── example
  9.       └── instance.properties
  10.    ├── logback.xml
  11.    └── spring
  12.        ├── default-instance.xml
  13.        ├── file-instance.xml
  14.        ├── group-instance.xml
  15.        ├── local-instance.xml
  16.        └── memory-instance.xml
  17. ├── lib
  18.    └── ....依賴的各種jar
  19. └── logs

 

deployer模塊主要完成以下功能:

1、讀取canal,properties配置文件

2、啟動canal server,監聽canal client的請求

3、啟動canal instance,連接mysql數據庫,偽裝成slave,解析binlog

4、在canal的運行過程中,監聽配置文件的變化

1、啟動和停止腳本

bin目錄中包含了canal的啟動和停止腳本startup.shstop.sh,當我們要啟動canal時,只需要輸入以下命令即可

  1. sh bin/startup.sh

       在windows環境下,可以直接雙擊startup.bat。

在startup.sh腳本內,會調用com.alibaba.otter.canal.deployer.CanalLauncher類來進行啟動,這是分析Canal源碼的入口類,如下圖所示:

Image.png

同時,startup.sh還會在bin目錄下生成一個canal.pid文件,用於存儲canal的進程id。當停止canal的時候

  1. sh bin/stop.sh

會根據canal.pid文件中記錄的進程id,kill掉canal進程,並且刪除這個文件。

2、CannalLauncher

CanalLauncher是整個源碼分析的入口類,代碼相當簡單。步驟是:

1、讀取canal.properties文件中的配置

2、利用讀取的配置構造一個CanalController實例,將所有的啟動操作都委派給CanalController進行處理。

3、最后注冊一個鈎子函數,在JVM停止時同時也停止canal server。

com.alibaba.otter.canal.deployer.CanalLauncher

  1. public class CanalLauncher {
  2.  
  3.     private static final String CLASSPATH_URL_PREFIX = "classpath:";
  4.     private static final Logger logger               = LoggerFactory.getLogger(CanalLauncher.class);
  5.  
  6.     public static void main(String[] args) throws Throwable {
  7.         try {
  8.             //1、讀取canal.properties文件中配置,默認讀取classpath下的canal.properties
  9.             String conf = System.getProperty("canal.conf", "classpath:canal.properties");
  10.             Properties properties = new Properties();
  11.             if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
  12.                 conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
  13.                 properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
  14.             } else {
  15.                 properties.load(new FileInputStream(conf));
  16.             }
  17.             //2、啟動canal,首先將properties對象傳遞給CanalController,然后調用其start方法啟動
  18.             logger.info("## start the canal server.");
  19.             final CanalController controller = new CanalController(properties);
  20.             controller.start();
  21.             logger.info("## the canal server is running now ......");
  22.             //3、關閉canal,通過添加JVM的鈎子,JVM停止前會回調run方法,其內部調用controller.stop()方法進行停止
  23.             Runtime.getRuntime().addShutdownHook(new Thread() {
  24.  
  25.                 public void run() {
  26.                     try {
  27.                         logger.info("## stop the canal server");
  28.                         controller.stop();
  29.                     } catch (Throwable e) {
  30.                         logger.warn("##something goes wrong when stopping canal Server:\n{}",
  31.                             ExceptionUtils.getFullStackTrace(e));
  32.                     } finally {
  33.                         logger.info("## canal server is down.");
  34.                     }
  35.                 }
  36.  
  37.             });
  38.         } catch (Throwable e) {
  39.             logger.error("## Something goes wrong when starting up the canal Server:\n{}",
  40.                 ExceptionUtils.getFullStackTrace(e));
  41.             System.exit(0);
  42.         }
  43.     }
  44. }

可以看到,CanalLauncher實際上只是負責讀取canal.properties配置文件,然后構造CanalController對象,並通過其start和stop方法來開啟和停止canal。因此,如果說CanalLauncher是canal源碼分析的入口類,那么CanalController就是canal源碼分析的核心類。

3、CanalController

在CanalController的構造方法中,會對配置文件內容解析,初始化相關成員變量,做好canal server的啟動前的准備工作,之后在CanalLauncher中調用CanalController.start方法來啟動。

CanalController中定義的相關字段和構造方法,如下所示:

  1. public class CanalController {
  2.  
  3.     private static final Logger  logger   = LoggerFactory.getLogger(CanalController.class);
  4.     private Long                                     cid;
  5.     private String                                   ip; 
  6.     private int                                  port;
  7.     // 默認使用spring的方式載入    
  8.     private Map<String, InstanceConfig>              instanceConfigs;
  9.     private InstanceConfig                           globalInstanceConfig;
  10.     private Map<String, CanalConfigClient>           managerClients;
  11.     // 監聽instance config的變化
  12.     private boolean                             autoScan = true;
  13.     private InstanceAction                           defaultAction;
  14.     private Map<InstanceMode, InstanceConfigMonitor> instanceConfigMonitors;
  15.     private CanalServerWithEmbedded                  embededCanalServer;
  16.     private CanalServerWithNetty                     canalServer;
  17.  
  18.     private CanalInstanceGenerator                   instanceGenerator;
  19.     private ZkClientx                                zkclientx;
  20.  
  21.     public CanalController(){
  22.         this(System.getProperties());
  23.     }
  24.  
  25.     public CanalController(final Properties properties){
  26.         managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>() {
  27.  
  28.             public CanalConfigClient apply(String managerAddress) {
  29.                 return getManagerClient(managerAddress);
  30.             }
  31.         });
  32.          //1、配置解析    
  33.        globalInstanceConfig = initGlobalConfig(properties);
  34.         instanceConfigs = new MapMaker().makeMap();      
  35.        initInstanceConfig(properties);
  36.  
  37.         // 2、准備canal server
  38.         cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
  39.         ip = getProperty(properties, CanalConstants.CANAL_IP);
  40.         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
  41.         embededCanalServer = CanalServerWithEmbedded.instance();
  42.         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設置自定義的instanceGenerator       
  43.        canalServer = CanalServerWithNetty.instance();
  44.         canalServer.setIp(ip);
  45.         canalServer.setPort(port);
  46.         
  47.          //3、初始化zk相關代碼 
  48.         // 處理下ip為空,默認使用hostIp暴露到zk中       
  49.        if (StringUtils.isEmpty(ip)) {
  50.             ip = AddressUtils.getHostIp();
  51.         }
  52.         final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
  53.         if (StringUtils.isNotEmpty(zkServers)) {
  54.             zkclientx = ZkClientx.getZkClient(zkServers);
  55.             // 初始化系統目錄           
  56.           zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
  57.             zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
  58.         }
  59.         //4 CanalInstance運行狀態監控
  60.         final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
  61.         ServerRunningMonitors.setServerData(serverData);
  62.         ServerRunningMonitors.setRunningMonitors(//...);
  63.  
  64.         //5、autoScan機制相關代碼    
  65.        autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
  66.         if (autoScan) {
  67.             defaultAction = new InstanceAction() {//....};
  68.  
  69.             instanceConfigMonitors = //....
  70.         }
  71.     }
  72. ....
  73. }

為了讀者能夠盡量容易的看出CanalController的構造方法中都做了什么,上面代碼片段中省略了部分代碼。這樣,我們可以很明顯的看出來, ,在CanalController構造方法中的代碼分划分為了固定的幾個處理步驟,下面按照幾個步驟的划分,逐一進行講解,並詳細的介紹CanalController中定義的各個字段的作用。

3.1 配置解析相關代碼

  1. // 初始化全局參數設置       
  2. globalInstanceConfig = initGlobalConfig(properties);
  3. instanceConfigs = new MapMaker().makeMap();
  4. // 初始化instance config       
  5. initInstanceConfig(properties);

3.1.1 globalInstanceConfig字段

表示canal instance的全局配置,類型為InstanceConfig,通過initGlobalConfig方法進行初始化。主要用於解析canal.properties以下幾個配置項:

  • canal.instance.global.mode:確定canal instance配置加載方式,取值有manager|spring兩種方式

  • canal.instance.global.lazy:確定canal instance是否延遲初始化

  • canal.instance.global.manager.address:配置中心地址。如果canal.instance.global.mode=manager,需要提供此配置項

  • canal.instance.global.spring.xml:spring配置文件路徑。如果canal.instance.global.mode=spring,需要提供此配置項

 

initGlobalConfig源碼如下所示:

  1. private InstanceConfig initGlobalConfig(Properties properties) {
  2.     InstanceConfig globalConfig = new InstanceConfig();
  3.     //讀取canal.instance.global.mode
  4.     String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME));
  5.     if (StringUtils.isNotEmpty(modeStr)) {
  6.         //將modelStr轉成枚舉InstanceMode,這是一個枚舉類,只有2個取值,SPRING\MANAGER,對應兩種配置方式
  7.         globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr)));
  8.     }
  9.     //讀取canal.instance.global.lazy
  10.     String lazyStr = getProperty(properties, CanalConstants.getInstancLazyKey(CanalConstants.GLOBAL_NAME));
  11.     if (StringUtils.isNotEmpty(lazyStr)) {
  12.         globalConfig.setLazy(Boolean.valueOf(lazyStr));
  13.     }
  14.    //讀取canal.instance.global.manager.address
  15.     String managerAddress = getProperty(properties,
  16.         CanalConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME));
  17.     if (StringUtils.isNotEmpty(managerAddress)) {
  18.         globalConfig.setManagerAddress(managerAddress);
  19.     }
  20.     //讀取canal.instance.global.spring.xml
  21.     String springXml = getProperty(properties, CanalConstants.getInstancSpringXmlKey(CanalConstants.GLOBAL_NAME));
  22.     if (StringUtils.isNotEmpty(springXml)) {
  23.         globalConfig.setSpringXml(springXml);
  24.     }
  25.  
  26.     instanceGenerator = //...初始化instanceGenerator 
  27.  
  28.     return globalConfig;
  29. }

其中canal.instance.global.mode用於確定canal instance的全局配置加載方式,其取值范圍有2個:springmanager。我們知道一個canal server中可以啟動多個canal instance,每個instance都有各自的配置。instance的配置也可以放在本地,也可以放在遠程配置中心里。我們可以自定義每個canal instance配置文件存儲的位置,如果所有canal instance的配置都在本地或者遠程,此時我們就可以通過canal.instance.global.mode這個配置項,來統一的指定配置文件的位置,避免為每個canal instance單獨指定。

其中:

spring方式:

表示所有的canal instance的配置文件位於本地。此時,我們必須提供配置項canal.instance.global.spring.xml指定spring配置文件的路徑。canal提供了多個spring配置文件:file-instance.xml、default-instance.xml、memory-instance.xml、local-instance.xml、group-instance.xml。這么多配置文件主要是為了支持canal instance不同的工作方式。我們在稍后將會講解各個配置文件的區別。而在這些配置文件的開頭,我們無一例外的可以看到以下配置:

  1. <bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
  2.         <property name="ignoreResourceNotFound" value="true" />
  3.         <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允許system覆蓋 -->
  4.         <property name="locationNames">
  5.             <list>
  6.                 <value>classpath:canal.properties</value>
  7.                 <value>classpath:${canal.instance.destination:}/instance.properties</value>
  8.             </list>
  9.         </property>
  10.     </bean>

這里我們可以看到,所謂通過spring方式加載canal instance配置,無非就是通過spring提供的PropertyPlaceholderConfigurer來加載canal instance的配置文件instance.properties。

這里instance.properties的文件完整路徑是${canal.instance.destination:}/instance.properties,其中${canal.instance.destination}是一個變量。這是因為我們可以在一個canal server中配置多個canal instance,每個canal instance配置文件的名稱都是instance.properties,因此我們需要通過目錄進行區分。例如我們通過配置項canal.destinations指定多個canal instance的名字

  1. canal.destinations= example1,example2

此時我們就要conf目錄下,新建兩個子目錄example1和example2,每個目錄下各自放置一個instance.properties。

canal在初始化時就會分別使用example1和example2來替換${canal.instance.destination:},從而分別根據example1/instance.properties和example2/instance.properties創建2個canal instance。

manager方式:

表示所有的canal instance的配置文件位於遠程配置中心,此時我們必須提供配置項 canal.instance.global.manager.address來指定遠程配置中心的地址。目前alibaba內部配置使用這種方式。開發者可以自己實現CanalConfigClient,連接各自的管理系統,完成接入。

3.1.2 instanceGenerator字段

類型為CanalInstanceGenerator。在initGlobalConfig方法中,除了創建了globalInstanceConfig實例,同時還為字段instanceGenerator字段進行了賦值。

顧名思義,這個字段用於創建CanalInstance實例。這是instance模塊中的類,其作用就是為canal.properties文件中canal.destinations配置項列出的每個destination,創建一個CanalInstance實例。CanalInstanceGenerator是一個接口,定義如下所示:

  1. public interface CanalInstanceGenerator {
  2.  
  3.     /**
  4.      * 通過 destination 產生特定的 {@link CanalInstance}
  5.      */
  6.     CanalInstance generate(String destination);
  7. }

針對spring和manager兩種instance配置的加載方式,CanalInstanceGenerator提供了兩個對應的實現類,如下所示:

Image.png

instanceGenerator字段通過一個匿名內部類進行初始化。其內部會判斷配置的各個destination的配置加載方式,spring 或者manager。

  1. instanceGenerator = new CanalInstanceGenerator() {
  2.  
  3.         public CanalInstance generate(String destination) {
  4.            //1、根據destination從instanceConfigs獲取對應的InstanceConfig對象
  5.             InstanceConfig config = instanceConfigs.get(destination);
  6.             if (config == null) {
  7.                 throw new CanalServerException("can't find destination:{}");
  8.             }
  9.           //2、如果destination對應的InstanceConfig的mode是manager方式,使用ManagerCanalInstanceGenerator
  10.             if (config.getMode().isManager()) {
  11.                 ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator();
  12.                 instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
  13.                 return instanceGenerator.generate(destination);
  14.             } else if (config.getMode().isSpring()) {
  15.           //3、如果destination對應的InstanceConfig的mode是spring方式,使用SpringCanalInstanceGenerator
  16.                 SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
  17.                 synchronized (this) {
  18.                     try {
  19.                         // 設置當前正在加載的通道,加載spring查找文件時會用到該變量                        
  20.                         System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
  21.                         instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
  22.                         return instanceGenerator.generate(destination);
  23.                     } catch (Throwable e) {
  24.                         logger.error("generator instance failed.", e);
  25.                         throw new CanalException(e);
  26.                     } finally {
  27.                         System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
  28.                     }
  29.                 }
  30.             } else {
  31.                 throw new UnsupportedOperationException("unknow mode :" + config.getMode());
  32.             }
  33.  
  34.         }
  35.  
  36.     };

上述代碼中的第1步比較變態,從instanceConfigs中根據destination作為參數,獲得對應的InstanceConfig。而instanceConfigs目前還沒有被初始化,這個字段是在稍后將后將要講解的initInstanceConfig方法初始化的,不過由於這是一個引用類型,當initInstanceConfig方法被執行后,instanceConfigs字段中也就有值了。目前,我們姑且認為, instanceConfigs這個Map<String, InstanceConfig>類型的字段已經被初始化好了。  

2、3兩步用於確定是instance的配置加載方式是spring還是manager,如果是spring,就使用SpringCanalInstanceGenerator創建CanalInstance實例,如果是manager,就使用ManagerCanalInstanceGenerator創建CanalInstance實例。

由於目前manager方式的源碼並未開源,因此,我們只分析SpringCanalInstanceGenerator相關代碼。

上述代碼中,首先創建了一個SpringCanalInstanceGenerator實例,然后往里面設置了一個BeanFactory。

  1. instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));

其中config.getSpringXml()返回的就是我們在canal.properties中通過canal.instance.global.spring.xml配置項指定了spring配置文件路徑。getBeanFactory方法源碼如下所示:

  1. private BeanFactory getBeanFactory(String springXml) {
  2.         ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
  3.         return applicationContext;
  4.     }

 

SpringCanalInstanceGenerator設置了BeanFactory之后,就可以通過其的generate方法獲得CanalInstance實例。

SpringCanalInstanceGenerator的源碼如下所示:

  1. public class SpringCanalInstanceGenerator implements CanalInstanceGenerator, BeanFactoryAware {
  2.  
  3.     private String      defaultName = "instance";
  4.     private BeanFactory beanFactory;
  5.  
  6.     public CanalInstance generate(String destination) {
  7.         String beanName = destination;
  8.         //首先判斷beanFactory是否包含以destination為id的bean
  9.         if (!beanFactory.containsBean(beanName)) {
  10.             beanName = defaultName;//如果沒有,設置要獲取的bean的id為instance。
  11.         }
  12.         //以默認的bean的id值"instance"來獲取CanalInstance實例
  13.         return (CanalInstance) beanFactory.getBean(beanName);
  14.     }
  15.  
  16.     public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
  17.         this.beanFactory = beanFactory;
  18.     }
  19.  
  20. }

首先嘗試以傳入的參數destination來獲取CanalInstance實例,如果沒有,就以默認的bean的id值"instance"來獲取CanalInstance實例。事實上,如果你沒有修改spring配置文件,那么默認的名字就是instance。事實上,在canal提供的各個spring配置文件xxx-instance.xml中,都有類似以下配置:

  1.  <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
  2.       <property name="destination" value="${canal.instance.destination}" />
  3.       <property name="eventParser">
  4.          <ref local="eventParser" />
  5.       </property>
  6.       <property name="eventSink">
  7.          <ref local="eventSink" />
  8.       </property>
  9.       <property name="eventStore">
  10.          <ref local="eventStore" />
  11.       </property>
  12.       <property name="metaManager">
  13.          <ref local="metaManager" />
  14.       </property>
  15.       <property name="alarmHandler">
  16.          <ref local="alarmHandler" />
  17.       </property>
  18.    </bean>

上面的代碼片段中,我們看到的確有一個bean的名字是instance,其類型是CanalInstanceWithSpring,這是CanalInstance接口的實現類。類似的,我們可以想到在manager配置方式下,獲取的CanalInstance實現類是CanalInstanceWithManager。事實上,你想的沒錯,CanalInstance的類圖繼承關系如下所示:

 

 

需要注意的是,到目前為止,我們只是創建好了CanalInstanceGenerator,而CanalInstance尚未創建。在CanalController的start方法被調用時,CanalInstance才會被真正的創建,相關源碼將在稍后分析。

3.1.3 instanceConfigs字段

類型為Map<String, InstanceConfig>。前面提到初始化instanceGenerator后,當其generate方法被調用時,會嘗試從instanceConfigs根據一個destination獲取對應的InstanceConfig,現在分析instanceConfigs的相關初始化代碼。

我們知道globalInstanceConfig定義全局的配置加載方式。如果需要把部分CanalInstance配置放於本地,另外一部分CanalIntance配置放於遠程配置中心,則只通過全局方式配置,無法達到這個要求。雖然這種情況很少見,但是為了提供最大的靈活性,canal支持每個CanalIntance自己來定義自己的加載方式,來覆蓋默認的全局配置加載方式。而每個destination對應的InstanceConfig配置就存放於instanceConfigs字段中。

舉例來說:

  1. //當前server上部署的instance列表
  2. canal.destinations=instance1,instance2 
  3.  
  4. //instance配置全局加載方式
  5. canal.instance.global.mode = spring
  6. canal.instance.global.lazy = false
  7. canal.instance.global.spring.xml = classpath:spring/file-instance.xml
  8.  
  9. //instance1覆蓋全局加載方式
  10. canal.instance.instance1.mode = manager
  11. canal.instance.instance1.manager.address = 127.0.0.1:1099
  12. canal.instance.instance1.lazy = tue

這段配置中,設置了instance的全局加載方式為spring,instance1覆蓋了全局配置,使用manager方式加載配置。而instance2沒有覆蓋配置,因此默認使用spring加載方式。

instanceConfigs字段通過initInstanceConfig方法進行初始化

  1. instanceConfigs = new MapMaker().makeMap();//這里利用Google Guava框架的MapMaker創建Map實例並賦值給instanceConfigs
  2. // 初始化instance config
  3. initInstanceConfig(properties);

initInstanceConfig方法源碼如下:

  1. private void initInstanceConfig(Properties properties) {
  2.     //讀取配置項canal.destinations
  3.     String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
  4.     //以","分割canal.destinations,得到一個數組形式的destination
  5.     String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);
  6.     for (String destination : destinations) {
  7.         //為每一個destination生成一個InstanceConfig實例
  8.         InstanceConfig config = parseInstanceConfig(properties, destination);
  9.         //將destination對應的InstanceConfig放入instanceConfigs中
  10.         InstanceConfig oldConfig = instanceConfigs.put(destination, config);
  11.  
  12.         if (oldConfig != null) {
  13.             logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
  14.                     oldConfig, config });
  15.         }
  16.     }
  17. }

上面代碼片段中,首先解析canal.destinations配置項,可以理解一個destination就對應要初始化一個canal instance。針對每個destination會創建各自的InstanceConfig,最終都會放到instanceConfigs這個Map中。

各個destination對應的InstanceConfig都是通過parseInstanceConfig方法來解析

  1. private InstanceConfig parseInstanceConfig(Properties properties, String destination) {
  2.     //每個destination對應的InstanceConfig都引用了全局的globalInstanceConfig
  3.     InstanceConfig config = new InstanceConfig(globalInstanceConfig);
  4.     //...其他幾個配置項與獲取globalInstanceConfig類似,不再贅述,唯一注意的的是配置項的key部分中的global變成傳遞進來的destination
  5.     return config;
  6. }

此時我們可以看一下InstanceConfig類的源碼:

  1. public class InstanceConfig {
  2.  
  3.     private InstanceConfig globalConfig;
  4.     private InstanceMode   mode;
  5.     private Boolean        lazy;
  6.     private String         managerAddress;
  7.     private String         springXml;
  8.  
  9.     public InstanceConfig(){
  10.  
  11.     }
  12.  
  13.     public InstanceConfig(InstanceConfig globalConfig){
  14.         this.globalConfig = globalConfig;
  15.     }
  16.  
  17.     public static enum InstanceMode {
  18.         SPRING, MANAGER;
  19.  
  20.         public boolean isSpring() {
  21.             return this == InstanceMode.SPRING;
  22.         }
  23.  
  24.         public boolean isManager() {
  25.             return this == InstanceMode.MANAGER;
  26.         }
  27.     }
  28.  
  29.     public Boolean getLazy() {
  30.         if (lazy == null && globalConfig != null) {
  31.             return globalConfig.getLazy();
  32.         } else {
  33.             return lazy;
  34.         }
  35.     }
  36.  
  37.     public void setLazy(Boolean lazy) {
  38.         this.lazy = lazy;
  39.     }
  40.  
  41.     public InstanceMode getMode() {
  42.         if (mode == null && globalConfig != null) {
  43.             return globalConfig.getMode();
  44.         } else {
  45.             return mode;
  46.         }
  47.     }
  48.  
  49.     public void setMode(InstanceMode mode) {
  50.         this.mode = mode;
  51.     }
  52.  
  53.     public String getManagerAddress() {
  54.         if (managerAddress == null && globalConfig != null) {
  55.             return globalConfig.getManagerAddress();
  56.         } else {
  57.             return managerAddress;
  58.         }
  59.     }
  60.  
  61.     public void setManagerAddress(String managerAddress) {
  62.         this.managerAddress = managerAddress;
  63.     }
  64.  
  65.     public String getSpringXml() {
  66.         if (springXml == null && globalConfig != null) {
  67.             return globalConfig.getSpringXml();
  68.         } else {
  69.             return springXml;
  70.         }
  71.     }
  72.  
  73.     public void setSpringXml(String springXml) {
  74.         this.springXml = springXml;
  75.     }
  76.  
  77.     public String toString() {
  78.         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
  79.     }
  80.  
  81. }

  可以看到,InstanceConfig類中維護了一個globalConfig字段,其類型也是InstanceConfig。而其相關get方法在執行時,會按照以下邏輯進行判斷:如果沒有自身沒有這個配置,則返回全局配置,如果有,則返回自身的配置。通過這種方式實現對全局配置的覆蓋。

 3.2 准備canal server相關代碼 

  1. cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
  2. ip = getProperty(properties, CanalConstants.CANAL_IP);
  3. port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
  4.  
  5. embededCanalServer = CanalServerWithEmbedded.instance();
  6. embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設置自定義的instanceGenerator
  7. canalServer = CanalServerWithNetty.instance();
  8. canalServer.setIp(ip);
  9. canalServer.setPort(port);

上述代碼中,首先解析了cid、ip、port字段,其中:

cid:Long,對應canal.properties文件中的canal.id,目前無實際用途

ip:String,對應canal.properties文件中的canal.ip,canal server監聽的ip。

port:int,對應canal.properties文件中的canal.port,canal server監聽的端口

 

之后分別為以下兩個字段賦值:

embededCanalServer:類型為CanalServerWithEmbedded 

canalServer:類型為CanalServerWithNetty

CanalServerWithEmbedded 和 CanalServerWithNetty都實現了CanalServer接口,且都實現了單例模式,通過靜態方法instance獲取實例。

關於這兩種類型的實現,canal官方文檔有以下描述:

 

 

說白了,就是我們可以不必獨立部署canal server。在應用直接使用CanalServerWithEmbedded直連mysql數據庫。如果覺得自己的技術hold不住相關代碼,就獨立部署一個canal server,使用canal提供的客戶端,連接canal server獲取binlog解析后數據。而CanalServerWithNetty是在CanalServerWithEmbedded的基礎上做的一層封裝,用於與客戶端通信。   

在獨立部署canal server時,Canal客戶端發送的所有請求都交給CanalServerWithNetty處理解析,解析完成之后委派給了交給CanalServerWithEmbedded進行處理。因此CanalServerWithNetty就是一個馬甲而已。CanalServerWithEmbedded才是核心。

因此,在上述代碼中,我們看到,用於生成CanalInstance實例的instanceGenerator被設置到了CanalServerWithEmbedded中,而ip和port被設置到CanalServerWithNetty中。

關於CanalServerWithNetty如何將客戶端的請求委派給CanalServerWithEmbedded進行處理,我們將在server模塊源碼分析中進行講解。

3.3 初始化zk相關代碼

  1.    //讀取canal.properties中的配置項canal.zkServers,如果沒有這個配置,則表示項目不使用zk
  2. final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
  3. if (StringUtils.isNotEmpty(zkServers)) {
  4.     //創建zk實例
  5.     zkclientx = ZkClientx.getZkClient(zkServers);
  6.     // 初始化系統目錄
  7.     //destination列表,路徑為/otter/canal/destinations
  8.     zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
  9.     //整個canal server的集群列表,路徑為/otter/canal/cluster
  10.     zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
  11. }

canal支持利用了zk來完成HA機制、以及將當前消費到到的mysql的binlog位置記錄到zk中。ZkClientx是canal對ZkClient進行了一層簡單的封裝。

顯然,當我們沒有配置canal.zkServers,那么zkclientx不會被初始化。

關於Canal如何利用ZK做HA,我們將在稍后的代碼中進行分。而利用zk記錄binlog的消費進度,將在之后的章節進行分析。

3.4 CanalInstance運行狀態監控相關代碼

由於這段代碼比較長且惡心,這里筆者暫時對部分代碼進行省略,以便讀者看清楚整各脈絡

  1. final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
  2.         ServerRunningMonitors.setServerData(serverData);
  3.         ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
  4.             public ServerRunningMonitor apply(final String destination) {
  5.                 ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
  6.                 runningMonitor.setDestination(destination);
  7.                 runningMonitor.setListener(new ServerRunningListener() {....});//省略ServerRunningListener的具體實現
  8.                 if (zkclientx != null) {
  9.                     runningMonitor.setZkClient(zkclientx);
  10.                 }
  11.                 // 觸發創建一下cid節點
  12.                 runningMonitor.init();
  13.                 return runningMonitor;
  14.             }
  15.         }));

上述代碼中,ServerRunningMonitors是ServerRunningMonitor對象的容器,而ServerRunningMonitor用於監控CanalInstance。

    canal會為每一個destination創建一個CanalInstance,每個CanalInstance都會由一個ServerRunningMonitor來進行監控。而ServerRunningMonitor統一由ServerRunningMonitors進行管理。

    除了CanalInstance需要監控,CanalServer本身也需要監控。因此我們在代碼一開始,就看到往ServerRunningMonitors設置了一個ServerRunningData對象,封裝了canal server監聽的ip和端口等信息。

ServerRunningMonitors源碼如下所示:

  1. public class ServerRunningMonitors {
  2.     private static ServerRunningData serverData;
  3.     private static Map               runningMonitors; // <String,ServerRunningMonitor>
  4.     public static ServerRunningData getServerData() {
  5.         return serverData;
  6.     }
  7.     public static Map<String, ServerRunningMonitor> getRunningMonitors() {
  8.         return runningMonitors;
  9.     }
  10.     public static ServerRunningMonitor getRunningMonitor(String destination) {
  11.         return (ServerRunningMonitor) runningMonitors.get(destination);
  12.     }
  13.     public static void setServerData(ServerRunningData serverData) {
  14.         ServerRunningMonitors.serverData = serverData;
  15.     }
  16.     public static void setRunningMonitors(Map runningMonitors) {
  17.         ServerRunningMonitors.runningMonitors = runningMonitors;
  18.     }
  19. }

ServerRunningMonitors的setRunningMonitors方法接收的參數是一個Map,其中Map的key是destination,value是ServerRunningMonitor,也就是說針對每一個destination都有一個ServerRunningMonitor來監控。

上述代碼中,在往ServerRunningMonitors設置Map時,是通過MigrateMap.makeComputingMap方法來創建的,其接受一個Function類型的參數,這是guava中定義的接口,其聲明了apply抽象方法。其工作原理可以通過下面代碼片段進行介紹:

  1. Map<String, User> map = MigrateMap.makeComputingMap(new Function<String, User>() {
  2.             @Override
  3.             public User apply(String name) {
  4.                 return new User(name);
  5.             }
  6.         });
  7. User user = map.get("tianshouzhi");//第一次獲取時會創建
  8. assert user != null;
  9. assert user == map.get("tianshouzhi");//之后獲取,總是返回之前已經創建的對象

這段代碼中,我們利用MigrateMap.makeComputingMap創建了一個Map,其中key為String類型,value為User類型。當我們調用map.get("tianshouzhi")方法,最開始這個Map中並沒有任何key/value的,於是其就會回調Function的apply方法,利用參數"tianshouzhi"創建一個User對象並返回。之后當我們再以"tianshouzhi"為key從Map中獲取User對象時,會直接將前面創建的對象返回。不會回調apply方法,也就是說,只有在第一次嘗試獲取時,才會回調apply方法。

    而在上述代碼中,實際上就利用了這個特性,只不過是根據destination獲取ServerRunningMonitor對象,如果不存在就創建。

在創建ServerRunningMonitor對象時,首先根據ServerRunningData創建ServerRunningMonitor實例,之后設置了destination和ServerRunningListener對象,接着,判斷如果zkClientx字段如果不為空,也設置到ServerRunningMonitor中,最后調用init方法進行初始化。

  1. ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
  2. runningMonitor.setDestination(destination);
  3. runningMonitor.setListener(new ServerRunningListener(){...})//省略ServerRunningListener具體代碼
  4. if (zkclientx != null) {
  5. runningMonitor.setZkClient(zkclientx);
  6. }
  7. // 觸發創建一下cid節點
  8. runningMonitor.init();
  9. return runningMonitor;

ServerRunningListener的實現如下:

  1. new ServerRunningListener() {
  2.     /*內部調用了embededCanalServer的start(destination)方法。
  3.     此處需要划重點,說明每個destination對應的CanalInstance是通過embededCanalServer的start方法啟動的,
  4.     這與我們之前分析將instanceGenerator設置到embededCanalServer中可以對應上。
  5.     embededCanalServer負責調用instanceGenerator生成CanalInstance實例,並負責其啟動。*/
  6.      public void processActiveEnter() {
  7.          try {
  8.              MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
  9.              embededCanalServer.start(destination);
  10.          } finally {
  11.              MDC.remove(CanalConstants.MDC_DESTINATION);
  12.          }
  13.      }
  14.   //內部調用embededCanalServer的stop(destination)方法。與上start方法類似,只不過是停止CanalInstance。
  15.      public void processActiveExit() {
  16.          try {
  17.              MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
  18.              embededCanalServer.stop(destination);
  19.          } finally {
  20.              MDC.remove(CanalConstants.MDC_DESTINATION);
  21.          }
  22.      }
  23.      /*處理存在zk的情況下,在Canalinstance啟動之前,在zk中創建節點。
  24.      路徑為:/otter/canal/destinations/{0}/cluster/{1},其0會被destination替換,1會被ip:port替換。
  25.      此方法會在processActiveEnter()之前被調用*/
  26.      public void processStart() {
  27.          try {
  28.              if (zkclientx != null) {
  29.                  final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port);
  30.                  initCid(path);
  31.                  zkclientx.subscribeStateChanges(new IZkStateListener() {
  32.                      public void handleStateChanged(KeeperState state) throws Exception {
  33.                      }
  34.                      public void handleNewSession() throws Exception {
  35.                          initCid(path);
  36.                      }
  37.                  });
  38.              }
  39.          } finally {
  40.              MDC.remove(CanalConstants.MDC_DESTINATION);
  41.          }
  42.      }
  43. //處理存在zk的情況下,在Canalinstance停止前,釋放zk節點,路徑為/otter/canal/destinations/{0}/cluster/{1},
  44. //其0會被destination替換,1會被ip:port替換。此方法會在processActiveExit()之前被調用
  45.      public void processStop() {
  46.          try {
  47.              MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
  48.              if (zkclientx != null) {
  49.                  final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port);
  50.                  releaseCid(path);
  51.              }
  52.          } finally {
  53.              MDC.remove(CanalConstants.MDC_DESTINATION);
  54.          }
  55.      }
  56. }

上述代碼中,我們可以看到啟動一個CanalInstance實際上是在ServerRunningListener的processActiveEnter方法中,通過調用embededCanalServer的start(destination)方法進行的,對於停止也是類似。

那么ServerRunningListener中的相關方法到底是在哪里回調的呢?我們可以在ServerRunningMonitor的start和stop方法中找到答案,這里只列出start方法。

  1. public class ServerRunningMonitor extends AbstractCanalLifeCycle {
  2.  
  3. ...
  4. public void start() {
  5.     super.start();
  6.     processStart();//其內部會調用ServerRunningListener的processStart()方法
  7.     if (zkClient != null) {//存在zk,以HA方式啟動
  8.         // 如果需要盡可能釋放instance資源,不需要監聽running節點,不然即使stop了這台機器,另一台機器立馬會start
  9.         String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
  10.         zkClient.subscribeDataChanges(path, dataListener);
  11.  
  12.         initRunning();
  13.     } else {//沒有zk,直接啟動
  14.         processActiveEnter();
  15.     }
  16. }
  17.  
  18. //...stop方法邏輯類似,相關代碼省略
  19. }

當ServerRunningMonitor的start方法被調用時,其首先會直接調用processStart方法,這個方法內部直接調了ServerRunningListener的processStart()方法,源碼如下所示。通過前面的分析,我們已經知道在存在zkClient!=null的情況,會往zk中創建一個節點。

  1. private void processStart() {
  2.     if (listener != null) {
  3.         try {
  4.             listener.processStart();
  5.         } catch (Exception e) {
  6.             logger.error("processStart failed", e);
  7.         }
  8.     }
  9. }

之后會判斷是否存在zkClient,如果不存在,則以本地方式啟動,如果存在,則以HA方式啟動。我們知道,canal server可以部署成兩種方式:集群方式或者獨立部署。其中集群方式是利用zk來做HA,獨立部署則可以直接進行啟動。我們先來看比較簡單的直接啟動。

直接啟動:

不存在zk的情況下,會進入else代碼塊,調用processActiveEnter方法,其內部調用了listener的processActiveEnter,啟動相應destination對應的CanalInstance。

  1. private void processActiveEnter() {
  2.     if (listener != null) {
  3.         try {
  4.             listener.processActiveEnter();
  5.         } catch (Exception e) {
  6.             logger.error("processActiveEnter failed", e);
  7.         }
  8.     }
  9. }

 

HA方式啟動:

存在zk,說明canal server可能做了集群,因為canal就是利用zk來做HA的。首先根據destination構造一個zk的節點路徑,然后進行監聽。

  1. /*構建臨時節點的路徑:/otter/canal/destinations/{0}/running,其中占位符{0}會被destination替換。
  2. 在集群模式下,可能會有多個canal server共同處理同一個destination,
  3. 在某一時刻,只能由一個canal server進行處理,處理這個destination的canal server進入running狀態,其他canal server進入standby狀態。*/
  4. String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
  5.  
  6. /*對destination對應的running節點進行監聽,一旦發生了變化,則說明可能其他處理相同destination的canal server可能出現了異常,
  7. 此時需要嘗試自己進入running狀態。*/
  8. zkClient.subscribeDataChanges(path, dataListener);

上述只是監聽代碼,之后嘗試調用initRunning方法通過HA的方式來啟動CanalInstance。

  1. private void initRunning() {
  2.     if (!isStart()) {
  3.         return;
  4.     }
  5.     //構建臨時節點的路徑:/otter/canal/destinations/{0}/running,其中占位符{0}會被destination替換
  6.     String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
  7.     // 序列化
  8.     //構建臨時節點的數據,標記當前destination由哪一個canal server處理
  9.     byte[] bytes = JsonUtils.marshalToByte(serverData);
  10.     try {
  11.         mutex.set(false);
  12.         //嘗試創建臨時節點。如果節點已經存在,說明是其他的canal server已經啟動了這個canal instance。
  13.         //此時會拋出ZkNodeExistsException,進入catch代碼塊。
  14.         zkClient.create(path, bytes, CreateMode.EPHEMERAL);
  15.         activeData = serverData;
  16.         processActiveEnter();//如果創建成功,觸發一下事件,內部調用ServerRunningListener的processActiveEnter方法
  17.         mutex.set(true);
  18.     } catch (ZkNodeExistsException e) {
  19.       //創建節點失敗,則根據path從zk中獲取當前是哪一個canal server創建了當前canal instance的相關信息。
  20.       //第二個參數true,表示的是,如果這個path不存在,則返回null。
  21.         bytes = zkClient.readData(path, true);
  22.         if (bytes == null) {// 如果不存在節點,立即嘗試一次            
  23.             initRunning();
  24.         } else {
  25.         //如果的確存在,則將創建該canal instance實例信息存入activeData中。
  26.             activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
  27.         }
  28.     } catch (ZkNoNodeException e) {//如果/otter/canal/destinations/{0}/節點不存在,進行創建其中占位符{0}會被destination替換
  29.         zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); 
  30.        // 嘗試創建父節點        
  31.         initRunning();
  32.     }
  33. }

可以看到,initRunning方法內部只有在嘗試在zk中創建節點成功后,才會去調用listener的processActiveEnter方法來真正啟動destination對應的canal instance,這是canal HA方式啟動的核心。canal官方文檔中介紹了CanalServer HA機制啟動的流程,如下:

Image.png

事實上,這個說明的前兩步,都是在initRunning方法中實現的。從上面的代碼中,我們可以看出,在HA機啟動的情況下,initRunning方法不一定能走到processActiveEnter()方法,因為創建臨時節點可能會出錯。

此外,根據官方文檔說明,如果出錯,那么當前canal instance則進入standBy狀態。也就是另外一個canal instance出現異常時,當前canal instance頂上去。那么相關源碼在什么地方呢?在HA方式啟動最開始的2行代碼的監聽邏輯中:

  1. String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
  2. zkClient.subscribeDataChanges(path, dataListener);

其中dataListener類型是IZkDataListener,這是zkclient客戶端提供的接口,定義如下:

  1. public interface IZkDataListener {
  2.     public void handleDataChange(String dataPath, Object data) throws Exception;
  3.     public void handleDataDeleted(String dataPath) throws Exception;
  4. }

當zk節點中的數據發生變更時,會自動回調這兩個方法,很明顯,一個是用於處理節點數據發生變化,一個是用於處理節點數據被刪除。

而dataListener是在ServerRunningMonitor的構造方法中初始化的,如下:

  1. public ServerRunningMonitor(){
  2.     // 創建父節點
  3.     dataListener = new IZkDataListener() {
  4.         //!!!目前看來,好像並沒有存在修改running節點數據的代碼,為什么這個方法不是空實現?
  5.         public void handleDataChange(String dataPath, Object data) throws Exception {
  6.             MDC.put("destination", destination);
  7.             ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
  8.             if (!isMine(runningData.getAddress())) {
  9.                 mutex.set(false);
  10.             }
  11.  
  12.             if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說明出現了主動釋放的操作,並且本機之前是active                                   
  13.               release = true;
  14.                 releaseRunning();// 徹底釋放mainstem            }
  15.  
  16.             activeData = (ServerRunningData) runningData;
  17.         }
  18.         //當其他canal instance出現異常,臨時節點數據被刪除時,會自動回調這個方法,此時當前canal instance要頂上去
  19.         public void handleDataDeleted(String dataPath) throws Exception {
  20.             MDC.put("destination", destination);
  21.             mutex.set(false);
  22.             if (!release && activeData != null && isMine(activeData.getAddress())) {
  23.                 // 如果上一次active的狀態就是本機,則即時觸發一下active搶占                
  24.                 initRunning();
  25.             } else {
  26.                 // 否則就是等待delayTime,避免因網絡瞬端或者zk異常,導致出現頻繁的切換操作                
  27.                 delayExector.schedule(new Runnable() {
  28.  
  29.                     public void run() {
  30.                         initRunning();//嘗試自己進入running狀態
  31.                     }
  32.                 }, delayTime, TimeUnit.SECONDS);
  33.             }
  34.         }
  35.  
  36.     };
  37.  
  38. }

那么現在問題來了?ServerRunningMonitor的start方法又是在哪里被調用的, 這個方法被調用了,才能真正的啟動canal instance。這部分代碼我們放到后面的CanalController中的start方法進行講解。

下面分析最后一部分代碼,autoScan機制相關代碼。

3.5 autoScan機制相關代碼

關於autoscan,官方文檔有以下介紹:

 

 

 

結合autoscan機制的相關源碼:

  1. //   
  2. autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
  3.         if (autoScan) {
  4.             defaultAction = new InstanceAction() {//....};
  5.  
  6.             instanceConfigMonitors = //....
  7.         }

可以看到,autoScan是否需要自動掃描的開關,只有當autoScan為true時,才會初始化defaultAction字段和instanceConfigMonitors字段。其中:

其中:

    defaultAction:其作用是如果配置發生了變更,默認應該采取什么樣的操作。其實現了InstanceAction接口定義的三個抽象方法:start、stop和reload。當新增一個destination配置時,需要調用start方法來啟動;當移除一個destination配置時,需要調用stop方法來停止;當某個destination配置發生變更時,需要調用reload方法來進行重啟。

    instanceConfigMonitors:類型為Map<InstanceMode, InstanceConfigMonitor>。defaultAction字段只是定義了配置發生變化默認應該采取的操作,那么總該有一個類來監聽配置是否發生了變化,這就是InstanceConfigMonitor的作用。官方文檔中,只提到了對canal.conf.dir配置項指定的目錄的監聽,這指的是通過spring方式加載配置。顯然的,通過manager方式加載配置,配置中心的內容也是可能發生變化的,也需要進行監聽。此時可以理解為什么instanceConfigMonitors的類型是一個Map,key為InstanceMode,就是為了對這兩種方式的配置加載方式都進行監聽。

defaultAction字段初始化源碼如下所示:

  1. defaultAction = new InstanceAction() {
  2.  
  3.     public void start(String destination) {
  4.         InstanceConfig config = instanceConfigs.get(destination);
  5.         if (config == null) {
  6.             // 重新讀取一下instance config
  7.             config = parseInstanceConfig(properties, destination);
  8.             instanceConfigs.put(destination, config);
  9.         }
  10.  
  11.         if (!embededCanalServer.isStart(destination)) {
  12.             // HA機制啟動
  13.             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
  14.             if (!config.getLazy() && !runningMonitor.isStart()) {
  15.                 runningMonitor.start();
  16.             }
  17.         }
  18.     }
  19.  
  20.     public void stop(String destination) {
  21.         // 此處的stop,代表強制退出,非HA機制,所以需要退出HA的monitor和配置信息
  22.         InstanceConfig config = instanceConfigs.remove(destination);
  23.         if (config != null) {
  24.             embededCanalServer.stop(destination);
  25.             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
  26.             if (runningMonitor.isStart()) {
  27.                 runningMonitor.stop();
  28.             }
  29.         }
  30.     }
  31.  
  32.     public void reload(String destination) {
  33.         // 目前任何配置變化,直接重啟,簡單處理
  34.         stop(destination);
  35.         start(destination);
  36.     }
  37. };

instanceConfigMonitors字段初始化源碼如下所示:

  1. instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
  2.        public InstanceConfigMonitor apply(InstanceMode mode) {
  3.            int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
  4.            if (mode.isSpring()) {//如果加載方式是spring,返回SpringInstanceConfigMonitor
  5.                SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
  6.                monitor.setScanIntervalInSecond(scanInterval);
  7.                monitor.setDefaultAction(defaultAction);
  8.                // 設置conf目錄,默認是user.dir + conf目錄組成
  9.                String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
  10.                if (StringUtils.isEmpty(rootDir)) {
  11.                    rootDir = "../conf";
  12.                }
  13.                if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
  14.                    monitor.setRootConf(rootDir);
  15.                } else {
  16.                    // eclipse debug模式
  17.                    monitor.setRootConf("src/main/resources/");
  18.                }
  19.                return monitor;
  20.            } else if (mode.isManager()) {//如果加載方式是manager,返回ManagerInstanceConfigMonitor
  21.                return new ManagerInstanceConfigMonitor();
  22.            } else {
  23.                throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
  24.            }
  25.        }
  26.    });

可以看到instanceConfigMonitors也是根據mode屬性,來采取不同的監控實現類SpringInstanceConfigMonitor 或者ManagerInstanceConfigMonitor,二者都實現了InstanceConfigMonitor接口。

  1. public interface InstanceConfigMonitor extends CanalLifeCycle {
  2.     void register(String destination, InstanceAction action);
  3.     void unregister(String destination);
  4. }

當需要對一個destination進行監聽時,調用register方法

當取消對一個destination監聽時,調用unregister方法。

事實上,unregister方法在canal 內部並沒有有任何地方被調用,也就是說,某個destination如果開啟了autoScan=true,那么你是無法在運行時停止對其進行監控的。如果要停止,你可以選擇將對應的目錄刪除。

InstanceConfigMonitor本身並不知道哪些canal instance需要進行監控,因為不同的canal instance,有的可能設置autoScan為true,另外一些可能設置為false。

在CanalConroller的start方法中,對於autoScan為true的destination,會調用InstanceConfigMonitor的register方法進行注冊,此時InstanceConfigMonitor才會真正的對這個destination配置進行掃描監聽。對於那些autoScan為false的destination,則不會進行監聽。

目前SpringInstanceConfigMonitor對這兩個方法都進行了實現,而ManagerInstanceConfigMonitor目前對這兩個方法實現的都是空,需要開發者自己來實現。

在實現ManagerInstanceConfigMonitor時,可以參考SpringInstanceConfigMonitor。

此處不打算再繼續進行分析SpringInstanceConfigMonitor的源碼,因為邏輯很簡單,感興趣的讀者可以自行查看SpringInstanceConfigMonitor 的scan方法,內部在什么情況下會回調defualtAction的start、stop、reload方法 。

4 CanalController的start方法

而ServerRunningMonitor的start方法,是在CanalController中的start方法中被調用的,CanalController中的start方法是在CanalLauncher中被調用的。

com.alibaba.otter.canal.deployer.CanalController#start

  1. public void start() throws Throwable {
  2.         logger.info("## start the canal server[{}:{}]", ip, port);
  3.         // 創建整個canal的工作節點 :/otter/canal/cluster/{0}
  4.         final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port);
  5.         initCid(path);
  6.         if (zkclientx != null) {
  7.             this.zkclientx.subscribeStateChanges(new IZkStateListener() {
  8.                 public void handleStateChanged(KeeperState state) throws Exception {
  9.                 }
  10.                 public void handleNewSession() throws Exception {
  11.                     initCid(path);
  12.                 }
  13.             });
  14.         }
  15.         // 優先啟動embeded服務
  16.         embededCanalServer.start();
  17.         //啟動不是lazy模式的CanalInstance,通過迭代instanceConfigs,根據destination獲取對應的ServerRunningMonitor,然后逐一啟動
  18.         for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
  19.             final String destination = entry.getKey();
  20.             InstanceConfig config = entry.getValue();
  21.             // 如果destination對應的CanalInstance沒有啟動,則進行啟動
  22.             if (!embededCanalServer.isStart(destination)) {
  23.                 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
  24.                 //如果不是lazy,lazy模式需要等到第一次有客戶端請求才會啟動
  25.                 if (!config.getLazy() && !runningMonitor.isStart()) {
  26.                     runningMonitor.start();
  27.                 }
  28.             }
  29.             if (autoScan) {
  30.                 instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
  31.             }
  32.         }
  33.         if (autoScan) {//啟動配置文件自動檢測機制
  34.             instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
  35.             for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
  36.                 if (!monitor.isStart()) {
  37.                     monitor.start();//啟動monitor
  38.                 }
  39.             }
  40.         }
  41.         // 啟動網絡接口,監聽客戶端請求
  42.         canalServer.start();
  43.     }

5 總結

deployer模塊的主要作用:

1、讀取canal.properties,確定canal instance的配置加載方式

2、確定canal instance的啟動方式:獨立啟動或者集群方式啟動

3、監聽canal instance的配置的變化,動態停止、啟動或新增

4、啟動canal server,監聽客戶端請求


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM