2.Canal連接MQ


1. 配置文件介紹

Canal的啟動,是以創建實例(instance)的方式,每個實例都有自己單獨的工作環境,

而配置也分成兩個部分

  • canal.properties (系統根配置文件)
  • instance.properties (instance級別的配置文件,每個instance一份)

1.1 canal.properties常用配置介紹:

1.instance列表定義

參數名字 參數說明 默認值
canal.destinations 當前server上部署的instance列表
canal.conf.dir conf/目錄所在的路徑 ../conf
canal.auto.scan 開啟instance自動掃描 如果配置為true,canal.conf.dir目錄下的instance配置變化會自動觸發: a. instance目錄新增: 觸發instance配置載入,lazy為true時則自動啟動 b. instance目錄刪除:卸載對應instance配置,如已啟動則進行關閉 c. instance.properties文件變化:reload instance配置,如已啟動自動進行重啟操作 true
canal.auto.scan.interval instance自動掃描的間隔時間,單位秒 5
canal.instance.global.mode 全局配置加載方式 spring
canal.instance.global.lazy 全局lazy模式 false
canal.instance.global.manager.address 全局的manager配置方式的鏈接信息
canal.instance.global.spring.xml 全局的spring配置方式的組件文件 classpath:spring/memory-instance.xml (spring目錄相對於canal.conf.dir)
canal.instance.example.mode canal.instance.example.lazy canal.instance.example.spring.xml ..... instance級別的配置定義,如有配置,會自動覆蓋全局配置定義模式 命名規則:canal.instance.{name}.xxx
canal.instance.tsdb.spring.xml v1.0.25版本新增,全局的tsdb配置方式的組件文件 classpath:spring/tsdb/h2-tsdb.xml (spring目錄相對於canal.conf.dir)

2.common參數定義,比如可以將instance.properties的公用參數,抽取放置到這里,這樣每個instance啟動的時候就可以共享.(instance.properties配置定義優先級高於canal.properties)

參數名字 參數說明 默認值
canal.id 每個canal server實例的唯一標識,暫無實際意義 1
canal.ip canal server綁定的本地IP信息,如果不配置,默認選擇一個本機IP進行啟動服務
canal.register.ip canal server注冊到外部zookeeper、admin的ip信息 (針對docker的外部可見ip)
canal.port canal server提供socket服務的端口 11111
canal.zkServers canal server鏈接zookeeper集群的鏈接信息 例子:10.20.144.22:2181,10.20.144.51:2181
canal.zookeeper.flush.period canal持久化數據到zookeeper上的更新頻率,單位毫秒 1000
canal.instance.memory.batch.mode canal內存store中數據緩存模式 1. ITEMSIZE : 根據buffer.size進行限制,只限制記錄的數量 2. MEMSIZE : 根據buffer.size * buffer.memunit的大小,限制緩存記錄的大小 MEMSIZE
canal.instance.memory.buffer.size canal內存store中可緩存buffer記錄數,需要為2的指數 16384
canal.instance.memory.buffer.memunit 內存記錄的單位大小,默認1KB,和buffer.size組合決定最終的內存使用大小 1024
canal.instance.transactionn.size 最大事務完整解析的長度支持 超過該長度后,一個事務可能會被拆分成多次提交到canal store中,無法保證事務的完整可見性 1024
canal.instance.fallbackIntervalInSeconds canal發生mysql切換時,在新的mysql庫上查找binlog時需要往前查找的時間,單位秒 說明:mysql主備庫可能存在解析延遲或者時鍾不統一,需要回退一段時間,保證數據不丟 60
canal.instance.detecting.enable 是否開啟心跳檢查 false
canal.instance.detecting.sql 心跳檢查sql insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.interval.time 心跳檢查頻率,單位秒 3
canal.instance.detecting.retry.threshold 心跳檢查失敗重試次數 3
canal.instance.detecting.heartbeatHaEnable 心跳檢查失敗后,是否開啟自動mysql自動切換 說明:比如心跳檢查失敗超過閥值后,如果該配置為true,canal就會自動鏈到mysql備庫獲取binlog數據 false
canal.instance.network.receiveBufferSize 網絡鏈接參數,SocketOptions.SO_RCVBUF 16384
canal.instance.network.sendBufferSize 網絡鏈接參數,SocketOptions.SO_SNDBUF 16384
canal.instance.network.soTimeout 網絡鏈接參數,SocketOptions.SO_TIMEOUT 30
canal.instance.filter.druid.ddl 是否使用druid處理所有的ddl解析來獲取庫和表名 true
canal.instance.filter.query.dcl 是否忽略dcl語句 false
canal.instance.filter.query.dml 是否忽略dml語句 (mysql5.6之后,在row模式下每條DML語句也會記錄SQL到binlog中,可參考MySQL文檔) false
canal.instance.filter.query.ddl 是否忽略ddl語句 false
canal.instance.filter.table.error 是否忽略binlog表結構獲取失敗的異常(主要解決回溯binlog時,對應表已被刪除或者表結構和binlog不一致的情況) false
canal.instance.filter.rows 是否dml的數據變更事件(主要針對用戶只訂閱ddl/dcl的操作) false
canal.instance.filter.transaction.entry 是否忽略事務頭和尾,比如針對寫入kakfa的消息時,不需要寫入TransactionBegin/Transactionend事件 false
canal.instance.binlog.format 支持的binlog format格式列表 (otter會有支持format格式限制) ROW,STATEMENT,MIXED
canal.instance.binlog.image 支持的binlog image格式列表 (otter會有支持format格式限制) FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation ddl語句是否單獨一個batch返回(比如下游dml/ddl如果做batch內無序並發處理,會導致結構不一致) false
canal.instance.parser.parallel 是否開啟binlog並行解析模式(串行解析資源占用少,但性能有瓶頸, 並行解析可以提升近2.5倍+) true
canal.instance.parser.parallelBufferSize binlog並行解析的異步ringbuffer隊列 (必須為2的指數) 256
canal.instance.tsdb.enable 是否開啟tablemeta的tsdb能力 true
canal.instance.tsdb.dir 主要針對h2-tsdb.xml時對應h2文件的存放目錄,默認為conf/xx/h2.mv.db ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url jdbc url的配置(h2的地址為默認值,如果是mysql需要自行定義) jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername jdbc url的配置(h2的地址為默認值,如果是mysql需要自行定義) canal
canal.instance.tsdb.dbPassword jdbc url的配置(h2的地址為默認值,如果是mysql需要自行定義) canal
canal.instance.rds.accesskey aliyun賬號的ak信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值
canal.instance.rds.secretkey aliyun賬號的sk信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值)
canal.admin.manager canal鏈接canal-admin的地址 (v1.1.4新增)
canal.admin.port admin管理指令鏈接端口 (v1.1.4新增) 11110
canal.admin.user admin管理指令鏈接的ACL配置 (v1.1.4新增) admin
canal.admin.passwd admin管理指令鏈接的ACL配置 (v1.1.4新增) 密碼默認值為admin的密文
canal.user canal數據端口訂閱的ACL配置 (v1.1.4新增)如果為空,代表不開啟
canal.passwd canal數據端口訂閱的ACL配置 (v1.1.4新增)如果為空,代表不開啟

1.2 instance.properties常用配置介紹

在canal.properties定義了canal.destinations后,需要在canal.conf.dir對應的目錄下建立同名的文件

比如:

canal.destinations = example1,example2

這時需要創建example1和example2兩個目錄,每個目錄里各自有一份instance.properties.

instance.properties參數列表:

參數名字 參數說明 默認值
canal.instance.mysql.slaveId mysql集群配置中的serverId概念,需要保證和當前mysql集群中id唯一 (v1.1.x版本之后canal會自動生成,不需要手工指定)
canal.instance.master.address mysql主庫鏈接地址 127.0.0.1:3306
canal.instance.master.journal.name mysql主庫鏈接時起始的binlog文件
canal.instance.master.position mysql主庫鏈接時起始的binlog偏移量
canal.instance.master.timestamp mysql主庫鏈接時起始的binlog的時間戳
canal.instance.gtidon 是否啟用mysql gtid的訂閱模式 false
canal.instance.master.gtid mysql主庫鏈接時對應的gtid位點
canal.instance.dbUsername mysql數據庫帳號 canal
canal.instance.dbPassword mysql數據庫密碼 canal
canal.instance.defaultDatabaseName mysql鏈接時默認schema
canal.instance.connectionCharset mysql 數據解析編碼 UTF-8
canal.instance.filter.regex mysql 數據解析關注的表,Perl正則表達式.多個正則之間以逗號(,)分隔,轉義符需要雙斜杠(\) 常見例子:1. 所有表:.* or .\.. 2. canal schema下所有表: canal\..* 3. canal下的以canal打頭的表:canal\.canal.* 4. canal schema下的一張表:canal\.test15. 多個規則組合使用:canal\..*,mysql.test1,mysql.test2 (逗號分隔) .\..
canal.instance.filter.black.regex mysql 數據解析表的黑名單,表達式規則見白名單的規則
canal.instance.rds.instanceId aliyun rds對應的實例id信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值)

2. 連接MQ

2.1 配置信息

canal 1.1.1版本之后, 默認支持將canal server接收到的binlog數據直接投遞到MQ, 目前默認支持的MQ系統有:

  • kafka
  • RocketMQ

本文將使用RocketMQ,具體使用可以參考這篇博文: https://www.cnblogs.com/xjwhaha/p/15055452.html

修改相關配置:

canal.properties文件修改:

#指定消息投遞方式為 RocketMQ, 默認為TCP,即客戶端連接的方式
canal.serverMode = RocketMQ
#指定mq地址
canal.mq.servers = 192.168.3.88:9876

instance.properties文件修改:

# 消費組名
canal.mq.producerGroup = test_group
# 投遞的topic
canal.mq.topic=test_topic
#也可指定動態生成的topic,例如根據 庫名_表名 投遞
#canal.mq.dynamicTopic=.*\\..*

更加詳細的配置信息參考如下:

參數名 參數說明 默認值
canal.mq.servers kafka為bootstrap.servers rocketMQ中為nameserver列表 127.0.0.1:6667
canal.mq.retries 發送失敗重試次數 0
canal.mq.batchSize kafka為ProducerConfig.BATCH_SIZE_CONFIG rocketMQ無意義 16384
canal.mq.maxRequestSize kafka為ProducerConfig.MAX_REQUEST_SIZE_CONFIG rocketMQ無意義 1048576
canal.mq.lingerMs kafka為ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建議將該值調大, 如: 200 rocketMQ無意義 1
canal.mq.bufferMemory kafka為ProducerConfig.BUFFER_MEMORY_CONFIG rocketMQ無意義 33554432
canal.mq.acks kafka為ProducerConfig.ACKS_CONFIG rocketMQ無意義 all
canal.mq.kafka.kerberos.enable kafka為ProducerConfig.ACKS_CONFIG rocketMQ無意義 false
canal.mq.kafka.kerberos.krb5FilePath kafka kerberos認證 rocketMQ無意義 ../conf/kerberos/krb5.conf
canal.mq.kafka.kerberos.jaasFilePath kafka kerberos認證 rocketMQ無意義 ../conf/kerberos/jaas.conf
canal.mq.producerGroup kafka無意義 rocketMQ為ProducerGroup名 Canal-Producer
canal.mq.accessChannel kafka無意義 rocketMQ為channel模式,如果為aliyun則配置為cloud local
--- --- ---
canal.mq.vhost= rabbitMQ配置
canal.mq.exchange= rabbitMQ配置
canal.mq.username= rabbitMQ配置
canal.mq.password= rabbitMQ配置
canal.mq.aliyunuid= rabbitMQ配置
--- --- ---
canal.mq.canalBatchSize 獲取canal數據的批次大小 50
canal.mq.canalGetTimeout 獲取canal數據的超時時間 100
canal.mq.parallelThreadSize mq數據轉換並行處理的並發度 8
canal.mq.flatMessage 是否為json格式 如果設置為false,對應MQ收到的消息為protobuf格式 需要通過CanalMessageDeserializer進行解碼 false
--- --- ---
canal.mq.topic mq里的topic名
canal.mq.dynamicTopic mq里的動態topic規則, 1.1.3版本支持
canal.mq.partition 單隊列模式的分區下標, 1
canal.mq.partitionsNum 散列模式的分區數
canal.mq.partitionHash 散列規則定義 庫名.表名 : 唯一主鍵,比如mytest.person: id 1.1.3版本支持新語法,見下文

canal.mq.dynamicTopic 表達式說明

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多個配置之間使用逗號或分號分隔

  • 例子1:test\.test 指定匹配的單表,發送到以test_test為名字的topic上
  • 例子2:.\.. 匹配所有表,則每個表都會發送到各自表名的topic上
  • 例子3:test 指定匹配對應的庫,一個庫的所有表都會發送到庫名的topic上
  • 例子4:test\..* 指定匹配的表達式,針對匹配的表會發送到各自表名的topic上
  • 例子5:test,test1\.test1,指定多個表達式,會將test庫的表都發送到test的topic上,test1\.test1的表發送到對應的test1_test1 topic上,其余的表發送到默認的canal.mq.topic值

為滿足更大的靈活性,允許對匹配條件的規則指定發送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

  • 例子1: test:test\.test 指定匹配的單表,發送到以test為名字的topic上
  • 例子2: test:.\.. 匹配所有表,因為有指定topic,則每個表都會發送到test的topic下
  • 例子3: test:test 指定匹配對應的庫,一個庫的所有表都會發送到test的topic下
  • 例子4:testA:test\..* 指定匹配的表達式,針對匹配的表會發送到testA的topic下
  • 例子5:test0:test,test1:test1\.test1,指定多個表達式,會將test庫的表都發送到test0的topic下,test1\.test1的表發送到對應的test1的topic下,其余的表發送到默認的canal.mq.topic值

大家可以結合自己的業務需求,設置匹配規則,建議MQ開啟自動創建topic的能力

canal.mq.partitionHash 表達式說明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多個配置之間使用逗號分隔

  • 例子1:test\.test:pk1^pk2 指定匹配的單表,對應的hash字段為pk1 + pk2
  • 例子2:.\..:id 正則匹配,指定所有正則匹配的表對應的hash字段為id
  • 例子3:.\..:$pk$ 正則匹配,指定所有正則匹配的表對應的hash字段為表主鍵(自動查找)
  • 例子4: 匹配規則啥都不寫,則默認發到0這個partition上
  • 例子5:.\.. ,不指定pk信息的正則匹配,將所有正則匹配的表,對應的hash字段為表名
    • 按表hash: 一張表的所有數據可以發到同一個分區,不同表之間會做散列 (會有熱點表分區過大問題)
  • 例子6: test\.test:id,.\..* , 針對test的表按照id散列,其余的表按照table散列

注意:大家可以結合自己的業務需求,設置匹配規則,多條匹配規則之間是按照順序進行匹配(命中一條規則就返回)

2.2 啟動canal

啟動canal成功后,可以查看RocketMQ 管理平台,已經創建了對應的topic,

image-20210803143058522

並且在修改數據庫后,也發送了相應的消息:

image-20210803143202512

查看消息體,根據配置信息canal.mq.flatMessage 的不同, 消息體表現為不同的形式, 分別為二進制,和json的形式,如果沒有必要可以 設置為false,提高性能

二進制:

image-20210803143419898

json:

image-20210803143435919

2.3 消費端

官方例子:https://github.com/alibaba/canal/blob/master/example/src/main/java/com/alibaba/otter/canal/example/rocketmq/CanalRocketMQClientExample.java

當開啟MQ形式后,就不能使用原來的方式去操作數據的變化,而是使用對應MQ的消費方式,例如:

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 實例化消息生產者,指定組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("192.168.3.244:9876");
        // 訂閱Topic
        consumer.subscribe("test_topic", "*");
        //負載均衡模式消費
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注冊回調函數,處理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n",
                        Thread.currentThread().getName(), msgs);
                for (MessageExt messageExt : msgs) {
                    byte[] data = messageExt.getBody();
                    if (data != null) {
                        Message message = CanalMessageDeserializer.deserializer(data);
                        // 如果canal配置 canal.mq.flatMessage = true,則以json方式解析
                        // FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
                        printEntry(message.getEntries());
                    }
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //啟動消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            //如果是事務開啟關閉時間則跳過
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}


結合RocketMQ 的消費方式和 alibaba 的ottr包進行解析信息,可以對將消費封裝為對象:

依賴:

<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.5</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.5</version>
        </dependency>
    </dependencies>

修改數據庫信息,打印信息如下:

================&gt; binlog[mysql-bin.000012:3587] , name[test,aa_test] , eventType : UPDATE
-------&gt; before
id : 1111110946    update=false
status : 1    update=false
orderId : 2200    update=false
orderProductId : 0    update=false
stanId : 1    update=false
quantity : 1    update=false
paymentDate : 2021-07-07 14:07:23    update=false
warehouse : 1    update=false
pid : 1    update=false
customerId : 1    update=false
type : 1    update=false
-------&gt; after
id : 1111110946    update=false
status : 1    update=false
orderId : 2200    update=false
orderProductId : 1    update=true
stanId : 1    update=false
quantity : 1    update=false
paymentDate : 2021-07-07 14:07:23    update=false
warehouse : 1    update=false
pid : 1    update=false
customerId : 1    update=false
type : 1    update=false

3. 啟動源碼探究

在指定canal.serverMode = rocketMQ 后,數據的消費方式為 MQ方式,此時 使用客戶端鏈接的方式將無法使用
會報如下錯誤: Connection refused: connect
因為服務端沒有開啟Netty相關服務,下面查看源碼,探究是如何啟動的

Canal的啟動入口為CanalLauncher 類的main方法,

public class CanalLauncher {

  // ....

    public static void main(String[] args) {
        try {
            logger.info("## set default uncaught exception handler");
            setGlobalUncaughtExceptionHandler();

            logger.info("## load canal configurations");
            //解析配置文件
            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
            Properties properties = new Properties();
            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
            } else {
                properties.load(new FileInputStream(conf));
            }
			//將配置信息賦予 啟動類
            final CanalStarter canalStater = new CanalStarter(properties);
            String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
            if (StringUtils.isNotEmpty(managerAddress)) {
                String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
                String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
                String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
                boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
                    CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
                String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
                String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
                if (StringUtils.isEmpty(registerIp)) {
                    registerIp = AddressUtils.getHostIp();
                }
                final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
                    user,
                    passwd,
                    registerIp,
                    Integer.parseInt(adminPort),
                    autoRegister,
                    autoCluster);
                PlainCanal canalConfig = configClient.findServer(null);
                if (canalConfig == null) {
                    throw new IllegalArgumentException("managerAddress:" + managerAddress
                                                       + " can't not found config for [" + registerIp + ":" + adminPort
                                                       + "]");
                }
                Properties managerProperties = canalConfig.getProperties();
                // merge local
                managerProperties.putAll(properties);
                int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
                    CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
                    "5"));
                executor.scheduleWithFixedDelay(new Runnable() {

                    private PlainCanal lastCanalConfig;

                    public void run() {
                        try {
                            if (lastCanalConfig == null) {
                                lastCanalConfig = configClient.findServer(null);
                            } else {
                                PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
                                if (newCanalConfig != null) {
                                    // 遠程配置canal.properties修改重新加載整個應用
                                    canalStater.stop();
                                    Properties managerProperties = newCanalConfig.getProperties();
                                    // merge local
                                    managerProperties.putAll(properties);
                                    canalStater.setProperties(managerProperties);
                                    canalStater.start();

                                    lastCanalConfig = newCanalConfig;
                                }
                            }

                        } catch (Throwable e) {
                            logger.error("scan failed", e);
                        }
                    }

                }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
                canalStater.setProperties(managerProperties);
            } else {
                canalStater.setProperties(properties);
            }

          	//已上的代碼都是在解析配置文件,為接下來的啟動做准備
            //啟動
            canalStater.start();
            runningLatch.await();
            executor.shutdownNow();
        } catch (Throwable e) {
            logger.error("## Something goes wrong when starting up the canal Server:", e);
        }
    }

 // ....

}

接下來看 canalStart的start方法:

    public synchronized void start() throws Throwable {
        //這里獲取的就是 canal.serverMode 的配置信息
        //可以看出如果配置 為 kafaka 或者 rocketmq 則會對對應的生產對象初始化
        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
        if (serverMode.equalsIgnoreCase("kafka")) {
            canalMQProducer = new CanalKafkaProducer();
        } else if (serverMode.equalsIgnoreCase("rocketmq")) {
            canalMQProducer = new CanalRocketMQProducer();
        }

        //如果 MQ 生產對象不為空
        if (canalMQProducer != null) {
            // 設置禁用Netty 標識
            System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
            // 設置為raw避免ByteString->Entry的二次解析
            System.setProperty("canal.instance.memory.rawEntry", "false");
        }

        logger.info("## start the canal server.");
        // 初始化 canal server 主控制類
        controller = new CanalController(properties);
        controller.start();
        logger.info("## the canal server is running now ......");
        shutdownThread = new Thread() {

            public void run() {
                try {
                    logger.info("## stop the canal server");
                    controller.stop();
                    CanalLauncher.runningLatch.countDown();
                } catch (Throwable e) {
                    logger.warn("##something goes wrong when stopping canal Server:", e);
                } finally {
                    logger.info("## canal server is down.");
                }
            }

        };
        Runtime.getRuntime().addShutdownHook(shutdownThread);

        // 初始化 canalMQStarter , 並賦予給 總控制類CanalController
        if (canalMQProducer != null) {
            canalMQStarter = new CanalMQStarter(canalMQProducer);
            MQProperties mqProperties = buildMQProperties(properties);
            String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
            canalMQStarter.start(mqProperties, destinations);
            controller.setCanalMQStarter(canalMQStarter);
        }

        // start canalAdmin
        String port = properties.getProperty(CanalConstants.CANAL_ADMIN_PORT);
        if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
            String user = properties.getProperty(CanalConstants.CANAL_ADMIN_USER);
            String passwd = properties.getProperty(CanalConstants.CANAL_ADMIN_PASSWD);
            CanalAdminController canalAdmin = new CanalAdminController(this);
            canalAdmin.setUser(user);
            canalAdmin.setPasswd(passwd);

            String ip = properties.getProperty(CanalConstants.CANAL_IP);
            CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
            canalAdminWithNetty.setCanalAdmin(canalAdmin);
            canalAdminWithNetty.setPort(Integer.valueOf(port));
            canalAdminWithNetty.setIp(ip);
            canalAdminWithNetty.start();
            this.canalAdmin = canalAdminWithNetty;
        }

        running = true;
    }

而在 controller = new CanalController(properties); 這步初始化 Canal控制類時,則讀取標識信息,判斷是否啟動Netty服務:

// ...
String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
        if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
            canalServer = CanalServerWithNetty.instance();
            canalServer.setIp(ip);
            canalServer.setPort(port);
        }
// ...

最后在最終的啟動方法中:controller.start(),因為 canalServer 為空,則沒有啟動Netty Server服務

// ...
        // 啟動網絡接口
        if (canalServer != null) {
            canalServer.start();
        }
// ..


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM