1. 配置文件介紹
Canal的啟動,是以創建實例(instance)的方式,每個實例都有自己單獨的工作環境,
而配置也分成兩個部分
- canal.properties (系統根配置文件)
- instance.properties (instance級別的配置文件,每個instance一份)
1.1 canal.properties常用配置介紹:
1.instance列表定義
參數名字 | 參數說明 | 默認值 |
---|---|---|
canal.destinations | 當前server上部署的instance列表 | 無 |
canal.conf.dir | conf/目錄所在的路徑 | ../conf |
canal.auto.scan | 開啟instance自動掃描 如果配置為true,canal.conf.dir目錄下的instance配置變化會自動觸發: a. instance目錄新增: 觸發instance配置載入,lazy為true時則自動啟動 b. instance目錄刪除:卸載對應instance配置,如已啟動則進行關閉 c. instance.properties文件變化:reload instance配置,如已啟動自動進行重啟操作 | true |
canal.auto.scan.interval | instance自動掃描的間隔時間,單位秒 | 5 |
canal.instance.global.mode | 全局配置加載方式 | spring |
canal.instance.global.lazy | 全局lazy模式 | false |
canal.instance.global.manager.address | 全局的manager配置方式的鏈接信息 | 無 |
canal.instance.global.spring.xml | 全局的spring配置方式的組件文件 | classpath:spring/memory-instance.xml (spring目錄相對於canal.conf.dir) |
canal.instance.example.mode canal.instance.example.lazy canal.instance.example.spring.xml ..... | instance級別的配置定義,如有配置,會自動覆蓋全局配置定義模式 命名規則:canal.instance.{name}.xxx | 無 |
canal.instance.tsdb.spring.xml | v1.0.25版本新增,全局的tsdb配置方式的組件文件 | classpath:spring/tsdb/h2-tsdb.xml (spring目錄相對於canal.conf.dir) |
2.common參數定義,比如可以將instance.properties的公用參數,抽取放置到這里,這樣每個instance啟動的時候就可以共享.(instance.properties配置定義優先級高於canal.properties)
參數名字 | 參數說明 | 默認值 |
---|---|---|
canal.id | 每個canal server實例的唯一標識,暫無實際意義 | 1 |
canal.ip | canal server綁定的本地IP信息,如果不配置,默認選擇一個本機IP進行啟動服務 | 無 |
canal.register.ip | canal server注冊到外部zookeeper、admin的ip信息 (針對docker的外部可見ip) | 無 |
canal.port | canal server提供socket服務的端口 | 11111 |
canal.zkServers | canal server鏈接zookeeper集群的鏈接信息 例子:10.20.144.22:2181,10.20.144.51:2181 | 無 |
canal.zookeeper.flush.period | canal持久化數據到zookeeper上的更新頻率,單位毫秒 | 1000 |
canal.instance.memory.batch.mode | canal內存store中數據緩存模式 1. ITEMSIZE : 根據buffer.size進行限制,只限制記錄的數量 2. MEMSIZE : 根據buffer.size * buffer.memunit的大小,限制緩存記錄的大小 | MEMSIZE |
canal.instance.memory.buffer.size | canal內存store中可緩存buffer記錄數,需要為2的指數 | 16384 |
canal.instance.memory.buffer.memunit | 內存記錄的單位大小,默認1KB,和buffer.size組合決定最終的內存使用大小 | 1024 |
canal.instance.transactionn.size | 最大事務完整解析的長度支持 超過該長度后,一個事務可能會被拆分成多次提交到canal store中,無法保證事務的完整可見性 | 1024 |
canal.instance.fallbackIntervalInSeconds | canal發生mysql切換時,在新的mysql庫上查找binlog時需要往前查找的時間,單位秒 說明:mysql主備庫可能存在解析延遲或者時鍾不統一,需要回退一段時間,保證數據不丟 | 60 |
canal.instance.detecting.enable | 是否開啟心跳檢查 | false |
canal.instance.detecting.sql | 心跳檢查sql | insert into retl.xdual values(1,now()) on duplicate key update x=now() |
canal.instance.detecting.interval.time | 心跳檢查頻率,單位秒 | 3 |
canal.instance.detecting.retry.threshold | 心跳檢查失敗重試次數 | 3 |
canal.instance.detecting.heartbeatHaEnable | 心跳檢查失敗后,是否開啟自動mysql自動切換 說明:比如心跳檢查失敗超過閥值后,如果該配置為true,canal就會自動鏈到mysql備庫獲取binlog數據 | false |
canal.instance.network.receiveBufferSize | 網絡鏈接參數,SocketOptions.SO_RCVBUF | 16384 |
canal.instance.network.sendBufferSize | 網絡鏈接參數,SocketOptions.SO_SNDBUF | 16384 |
canal.instance.network.soTimeout | 網絡鏈接參數,SocketOptions.SO_TIMEOUT | 30 |
canal.instance.filter.druid.ddl | 是否使用druid處理所有的ddl解析來獲取庫和表名 | true |
canal.instance.filter.query.dcl | 是否忽略dcl語句 | false |
canal.instance.filter.query.dml | 是否忽略dml語句 (mysql5.6之后,在row模式下每條DML語句也會記錄SQL到binlog中,可參考MySQL文檔) | false |
canal.instance.filter.query.ddl | 是否忽略ddl語句 | false |
canal.instance.filter.table.error | 是否忽略binlog表結構獲取失敗的異常(主要解決回溯binlog時,對應表已被刪除或者表結構和binlog不一致的情況) | false |
canal.instance.filter.rows | 是否dml的數據變更事件(主要針對用戶只訂閱ddl/dcl的操作) | false |
canal.instance.filter.transaction.entry | 是否忽略事務頭和尾,比如針對寫入kakfa的消息時,不需要寫入TransactionBegin/Transactionend事件 | false |
canal.instance.binlog.format | 支持的binlog format格式列表 (otter會有支持format格式限制) | ROW,STATEMENT,MIXED |
canal.instance.binlog.image | 支持的binlog image格式列表 (otter會有支持format格式限制) | FULL,MINIMAL,NOBLOB |
canal.instance.get.ddl.isolation | ddl語句是否單獨一個batch返回(比如下游dml/ddl如果做batch內無序並發處理,會導致結構不一致) | false |
canal.instance.parser.parallel | 是否開啟binlog並行解析模式(串行解析資源占用少,但性能有瓶頸, 並行解析可以提升近2.5倍+) | true |
canal.instance.parser.parallelBufferSize | binlog並行解析的異步ringbuffer隊列 (必須為2的指數) | 256 |
canal.instance.tsdb.enable | 是否開啟tablemeta的tsdb能力 | true |
canal.instance.tsdb.dir | 主要針對h2-tsdb.xml時對應h2文件的存放目錄,默認為conf/xx/h2.mv.db | ${canal.file.data.dir:../conf}/${canal.instance.destination:} |
canal.instance.tsdb.url | jdbc url的配置(h2的地址為默認值,如果是mysql需要自行定義) | jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; |
canal.instance.tsdb.dbUsername | jdbc url的配置(h2的地址為默認值,如果是mysql需要自行定義) | canal |
canal.instance.tsdb.dbPassword | jdbc url的配置(h2的地址為默認值,如果是mysql需要自行定義) | canal |
canal.instance.rds.accesskey | aliyun賬號的ak信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值 | 無 |
canal.instance.rds.secretkey | aliyun賬號的sk信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值) | 無 |
canal.admin.manager | canal鏈接canal-admin的地址 (v1.1.4新增) | 無 |
canal.admin.port | admin管理指令鏈接端口 (v1.1.4新增) | 11110 |
canal.admin.user | admin管理指令鏈接的ACL配置 (v1.1.4新增) | admin |
canal.admin.passwd | admin管理指令鏈接的ACL配置 (v1.1.4新增) | 密碼默認值為admin的密文 |
canal.user | canal數據端口訂閱的ACL配置 (v1.1.4新增)如果為空,代表不開啟 | 無 |
canal.passwd | canal數據端口訂閱的ACL配置 (v1.1.4新增)如果為空,代表不開啟 | 無 |
1.2 instance.properties常用配置介紹
在canal.properties定義了canal.destinations后,需要在canal.conf.dir對應的目錄下建立同名的文件
比如:
canal.destinations = example1,example2
這時需要創建example1和example2兩個目錄,每個目錄里各自有一份instance.properties.
instance.properties參數列表:
參數名字 | 參數說明 | 默認值 |
---|---|---|
canal.instance.mysql.slaveId | mysql集群配置中的serverId概念,需要保證和當前mysql集群中id唯一 (v1.1.x版本之后canal會自動生成,不需要手工指定) | 無 |
canal.instance.master.address | mysql主庫鏈接地址 | 127.0.0.1:3306 |
canal.instance.master.journal.name | mysql主庫鏈接時起始的binlog文件 | 無 |
canal.instance.master.position | mysql主庫鏈接時起始的binlog偏移量 | 無 |
canal.instance.master.timestamp | mysql主庫鏈接時起始的binlog的時間戳 | 無 |
canal.instance.gtidon | 是否啟用mysql gtid的訂閱模式 | false |
canal.instance.master.gtid | mysql主庫鏈接時對應的gtid位點 | 無 |
canal.instance.dbUsername | mysql數據庫帳號 | canal |
canal.instance.dbPassword | mysql數據庫密碼 | canal |
canal.instance.defaultDatabaseName | mysql鏈接時默認schema | |
canal.instance.connectionCharset | mysql 數據解析編碼 | UTF-8 |
canal.instance.filter.regex | mysql 數據解析關注的表,Perl正則表達式.多個正則之間以逗號(,)分隔,轉義符需要雙斜杠(\) 常見例子:1. 所有表:.* or .\.. 2. canal schema下所有表: canal\..* 3. canal下的以canal打頭的表:canal\.canal.* 4. canal schema下的一張表:canal\.test15. 多個規則組合使用:canal\..*,mysql.test1,mysql.test2 (逗號分隔) | .\.. |
canal.instance.filter.black.regex | mysql 數據解析表的黑名單,表達式規則見白名單的規則 | 無 |
canal.instance.rds.instanceId | aliyun rds對應的實例id信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值) | 無 |
2. 連接MQ
2.1 配置信息
canal 1.1.1版本之后, 默認支持將canal server接收到的binlog數據直接投遞到MQ, 目前默認支持的MQ系統有:
- kafka
- RocketMQ
本文將使用RocketMQ,具體使用可以參考這篇博文: https://www.cnblogs.com/xjwhaha/p/15055452.html
修改相關配置:
canal.properties文件修改:
#指定消息投遞方式為 RocketMQ, 默認為TCP,即客戶端連接的方式
canal.serverMode = RocketMQ
#指定mq地址
canal.mq.servers = 192.168.3.88:9876
instance.properties文件修改:
# 消費組名
canal.mq.producerGroup = test_group
# 投遞的topic
canal.mq.topic=test_topic
#也可指定動態生成的topic,例如根據 庫名_表名 投遞
#canal.mq.dynamicTopic=.*\\..*
更加詳細的配置信息參考如下:
參數名 | 參數說明 | 默認值 |
---|---|---|
canal.mq.servers | kafka為bootstrap.servers rocketMQ中為nameserver列表 | 127.0.0.1:6667 |
canal.mq.retries | 發送失敗重試次數 | 0 |
canal.mq.batchSize | kafka為ProducerConfig.BATCH_SIZE_CONFIG rocketMQ無意義 |
16384 |
canal.mq.maxRequestSize | kafka為ProducerConfig.MAX_REQUEST_SIZE_CONFIG rocketMQ無意義 |
1048576 |
canal.mq.lingerMs | kafka為ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建議將該值調大, 如: 200 rocketMQ無意義 |
1 |
canal.mq.bufferMemory | kafka為ProducerConfig.BUFFER_MEMORY_CONFIG rocketMQ無意義 |
33554432 |
canal.mq.acks | kafka為ProducerConfig.ACKS_CONFIG rocketMQ無意義 |
all |
canal.mq.kafka.kerberos.enable | kafka為ProducerConfig.ACKS_CONFIG rocketMQ無意義 |
false |
canal.mq.kafka.kerberos.krb5FilePath | kafka kerberos認證 rocketMQ無意義 | ../conf/kerberos/krb5.conf |
canal.mq.kafka.kerberos.jaasFilePath | kafka kerberos認證 rocketMQ無意義 | ../conf/kerberos/jaas.conf |
canal.mq.producerGroup | kafka無意義 rocketMQ為ProducerGroup名 | Canal-Producer |
canal.mq.accessChannel | kafka無意義 rocketMQ為channel模式,如果為aliyun則配置為cloud | local |
--- | --- | --- |
canal.mq.vhost= | rabbitMQ配置 | 無 |
canal.mq.exchange= | rabbitMQ配置 | 無 |
canal.mq.username= | rabbitMQ配置 | 無 |
canal.mq.password= | rabbitMQ配置 | 無 |
canal.mq.aliyunuid= | rabbitMQ配置 | 無 |
--- | --- | --- |
canal.mq.canalBatchSize | 獲取canal數據的批次大小 | 50 |
canal.mq.canalGetTimeout | 獲取canal數據的超時時間 | 100 |
canal.mq.parallelThreadSize | mq數據轉換並行處理的並發度 | 8 |
canal.mq.flatMessage | 是否為json格式 如果設置為false,對應MQ收到的消息為protobuf格式 需要通過CanalMessageDeserializer進行解碼 | false |
--- | --- | --- |
canal.mq.topic | mq里的topic名 | 無 |
canal.mq.dynamicTopic | mq里的動態topic規則, 1.1.3版本支持 | 無 |
canal.mq.partition | 單隊列模式的分區下標, | 1 |
canal.mq.partitionsNum | 散列模式的分區數 | 無 |
canal.mq.partitionHash | 散列規則定義 庫名.表名 : 唯一主鍵,比如mytest.person: id 1.1.3版本支持新語法,見下文 | 無 |
canal.mq.dynamicTopic 表達式說明
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多個配置之間使用逗號或分號分隔
- 例子1:test\.test 指定匹配的單表,發送到以test_test為名字的topic上
- 例子2:.\.. 匹配所有表,則每個表都會發送到各自表名的topic上
- 例子3:test 指定匹配對應的庫,一個庫的所有表都會發送到庫名的topic上
- 例子4:test\..* 指定匹配的表達式,針對匹配的表會發送到各自表名的topic上
- 例子5:test,test1\.test1,指定多個表達式,會將test庫的表都發送到test的topic上,test1\.test1的表發送到對應的test1_test1 topic上,其余的表發送到默認的canal.mq.topic值
為滿足更大的靈活性,允許對匹配條件的規則指定發送的topic名字,配置格式:topicName:schema 或 topicName:schema.table
- 例子1: test:test\.test 指定匹配的單表,發送到以test為名字的topic上
- 例子2: test:.\.. 匹配所有表,因為有指定topic,則每個表都會發送到test的topic下
- 例子3: test:test 指定匹配對應的庫,一個庫的所有表都會發送到test的topic下
- 例子4:testA:test\..* 指定匹配的表達式,針對匹配的表會發送到testA的topic下
- 例子5:test0:test,test1:test1\.test1,指定多個表達式,會將test庫的表都發送到test0的topic下,test1\.test1的表發送到對應的test1的topic下,其余的表發送到默認的canal.mq.topic值
大家可以結合自己的業務需求,設置匹配規則,建議MQ開啟自動創建topic的能力
canal.mq.partitionHash 表達式說明
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多個配置之間使用逗號分隔
- 例子1:test\.test:pk1^pk2 指定匹配的單表,對應的hash字段為pk1 + pk2
- 例子2:.\..:id 正則匹配,指定所有正則匹配的表對應的hash字段為id
- 例子3:.\..:$pk$ 正則匹配,指定所有正則匹配的表對應的hash字段為表主鍵(自動查找)
- 例子4: 匹配規則啥都不寫,則默認發到0這個partition上
- 例子5:.\.. ,不指定pk信息的正則匹配,將所有正則匹配的表,對應的hash字段為表名
- 按表hash: 一張表的所有數據可以發到同一個分區,不同表之間會做散列 (會有熱點表分區過大問題)
- 例子6: test\.test:id,.\..* , 針對test的表按照id散列,其余的表按照table散列
注意:大家可以結合自己的業務需求,設置匹配規則,多條匹配規則之間是按照順序進行匹配(命中一條規則就返回)
2.2 啟動canal
啟動canal成功后,可以查看RocketMQ 管理平台,已經創建了對應的topic,
並且在修改數據庫后,也發送了相應的消息:
查看消息體,根據配置信息canal.mq.flatMessage
的不同, 消息體表現為不同的形式, 分別為二進制,和json的形式,如果沒有必要可以 設置為false,提高性能
二進制:
json:
2.3 消費端
當開啟MQ形式后,就不能使用原來的方式去操作數據的變化,而是使用對應MQ的消費方式,例如:
public class Consumer {
public static void main(String[] args) throws Exception {
// 實例化消息生產者,指定組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("192.168.3.244:9876");
// 訂閱Topic
consumer.subscribe("test_topic", "*");
//負載均衡模式消費
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注冊回調函數,處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
for (MessageExt messageExt : msgs) {
byte[] data = messageExt.getBody();
if (data != null) {
Message message = CanalMessageDeserializer.deserializer(data);
// 如果canal配置 canal.mq.flatMessage = true,則以json方式解析
// FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
printEntry(message.getEntries());
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
//如果是事務開啟關閉時間則跳過
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
結合RocketMQ 的消費方式和 alibaba 的ottr包進行解析信息,可以對將消費封裝為對象:
依賴:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
</dependencies>
修改數據庫信息,打印信息如下:
================> binlog[mysql-bin.000012:3587] , name[test,aa_test] , eventType : UPDATE
-------> before
id : 1111110946 update=false
status : 1 update=false
orderId : 2200 update=false
orderProductId : 0 update=false
stanId : 1 update=false
quantity : 1 update=false
paymentDate : 2021-07-07 14:07:23 update=false
warehouse : 1 update=false
pid : 1 update=false
customerId : 1 update=false
type : 1 update=false
-------> after
id : 1111110946 update=false
status : 1 update=false
orderId : 2200 update=false
orderProductId : 1 update=true
stanId : 1 update=false
quantity : 1 update=false
paymentDate : 2021-07-07 14:07:23 update=false
warehouse : 1 update=false
pid : 1 update=false
customerId : 1 update=false
type : 1 update=false
3. 啟動源碼探究
在指定canal.serverMode = rocketMQ 后,數據的消費方式為 MQ方式,此時 使用客戶端鏈接的方式將無法使用
會報如下錯誤: Connection refused: connect
因為服務端沒有開啟Netty相關服務,下面查看源碼,探究是如何啟動的
Canal的啟動入口為CanalLauncher 類的main方法,
public class CanalLauncher {
// ....
public static void main(String[] args) {
try {
logger.info("## set default uncaught exception handler");
setGlobalUncaughtExceptionHandler();
logger.info("## load canal configurations");
//解析配置文件
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
properties.load(new FileInputStream(conf));
}
//將配置信息賦予 啟動類
final CanalStarter canalStater = new CanalStarter(properties);
String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
if (StringUtils.isNotEmpty(managerAddress)) {
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
if (StringUtils.isEmpty(registerIp)) {
registerIp = AddressUtils.getHostIp();
}
final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
user,
passwd,
registerIp,
Integer.parseInt(adminPort),
autoRegister,
autoCluster);
PlainCanal canalConfig = configClient.findServer(null);
if (canalConfig == null) {
throw new IllegalArgumentException("managerAddress:" + managerAddress
+ " can't not found config for [" + registerIp + ":" + adminPort
+ "]");
}
Properties managerProperties = canalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
"5"));
executor.scheduleWithFixedDelay(new Runnable() {
private PlainCanal lastCanalConfig;
public void run() {
try {
if (lastCanalConfig == null) {
lastCanalConfig = configClient.findServer(null);
} else {
PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
if (newCanalConfig != null) {
// 遠程配置canal.properties修改重新加載整個應用
canalStater.stop();
Properties managerProperties = newCanalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
canalStater.setProperties(managerProperties);
canalStater.start();
lastCanalConfig = newCanalConfig;
}
}
} catch (Throwable e) {
logger.error("scan failed", e);
}
}
}, 0, scanIntervalInSecond, TimeUnit.SECONDS);
canalStater.setProperties(managerProperties);
} else {
canalStater.setProperties(properties);
}
//已上的代碼都是在解析配置文件,為接下來的啟動做准備
//啟動
canalStater.start();
runningLatch.await();
executor.shutdownNow();
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the canal Server:", e);
}
}
// ....
}
接下來看 canalStart的start方法:
public synchronized void start() throws Throwable {
//這里獲取的就是 canal.serverMode 的配置信息
//可以看出如果配置 為 kafaka 或者 rocketmq 則會對對應的生產對象初始化
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
if (serverMode.equalsIgnoreCase("kafka")) {
canalMQProducer = new CanalKafkaProducer();
} else if (serverMode.equalsIgnoreCase("rocketmq")) {
canalMQProducer = new CanalRocketMQProducer();
}
//如果 MQ 生產對象不為空
if (canalMQProducer != null) {
// 設置禁用Netty 標識
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
// 設置為raw避免ByteString->Entry的二次解析
System.setProperty("canal.instance.memory.rawEntry", "false");
}
logger.info("## start the canal server.");
// 初始化 canal server 主控制類
controller = new CanalController(properties);
controller.start();
logger.info("## the canal server is running now ......");
shutdownThread = new Thread() {
public void run() {
try {
logger.info("## stop the canal server");
controller.stop();
CanalLauncher.runningLatch.countDown();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal Server:", e);
} finally {
logger.info("## canal server is down.");
}
}
};
Runtime.getRuntime().addShutdownHook(shutdownThread);
// 初始化 canalMQStarter , 並賦予給 總控制類CanalController
if (canalMQProducer != null) {
canalMQStarter = new CanalMQStarter(canalMQProducer);
MQProperties mqProperties = buildMQProperties(properties);
String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
canalMQStarter.start(mqProperties, destinations);
controller.setCanalMQStarter(canalMQStarter);
}
// start canalAdmin
String port = properties.getProperty(CanalConstants.CANAL_ADMIN_PORT);
if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
String user = properties.getProperty(CanalConstants.CANAL_ADMIN_USER);
String passwd = properties.getProperty(CanalConstants.CANAL_ADMIN_PASSWD);
CanalAdminController canalAdmin = new CanalAdminController(this);
canalAdmin.setUser(user);
canalAdmin.setPasswd(passwd);
String ip = properties.getProperty(CanalConstants.CANAL_IP);
CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
canalAdminWithNetty.setCanalAdmin(canalAdmin);
canalAdminWithNetty.setPort(Integer.valueOf(port));
canalAdminWithNetty.setIp(ip);
canalAdminWithNetty.start();
this.canalAdmin = canalAdminWithNetty;
}
running = true;
}
而在 controller = new CanalController(properties);
這步初始化 Canal控制類時,則讀取標識信息,判斷是否啟動Netty服務:
// ...
String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
canalServer = CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);
}
// ...
最后在最終的啟動方法中:controller.start()
,因為 canalServer 為空,則沒有啟動Netty Server服務
// ...
// 啟動網絡接口
if (canalServer != null) {
canalServer.start();
}
// ..