Canal 使用入門


  1. Canal 基礎認知
    1. canal簡介
      Canal 譯意為水道/管道/溝渠,主要用途是基於 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費,早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,
      實現方式主要是基於業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。 基於日志增量訂閱和消費的業務包括 - 數據庫鏡像 - 數據庫實時備份 - 索引構建和實時維護(拆分異構索引、倒排索引等) - 業務 cache 刷新 - 帶業務邏輯的增量數據處理 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

    2. canal原理
      1.  MySQL主備復制原理
        MySQL master()將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)
        MySQL slave(從)將 master 的 binary log events 拷貝到它的中繼日志(relay log)
        MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據

         

      2.  Canal工作原理
        canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議
        MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal)
        canal 解析 binary log 對象(原始為 byte 流),用戶使用canal客戶端對數據進行進一步處理

    3. Canal官方文檔
    4. Canal源代碼
    5. Canal架構簡單說明
      個人認知: 一個Canal Sever服務器中可以包含一個或者多個Instance(實例|目的地),每一個Instance就是一個任務處理線程,用於從不同的MySQL服務器或者同一個MySQL的
      不同數據庫監聽收集binlog日志(canal監聽mysql的數據配置在example/instance.properties文件中),canal默認提供一個example實例,
      如果需要多個實例在conf中創建一個文件夾instanceA,復制example/instance.properties文件到instanceA中並修改,在canal.properties文件中設置destinations時逗號分割,
      如果沒有設置則實例無效

      1.  Canal安裝包文件目錄
        conf/
        ├── canal_local.properties
        ├── canal.properties
        ├── example
        │   ├── h2.mv.db
        │   ├── instance.properties
        │   └── meta.dat
        ├── logback.xml
        ├── metrics
        │   └── Canal_instances_tmpl.json
        └── spring
            ├── base-instance.xml
            ├── default-instance.xml
            ├── file-instance.xml
            ├── group-instance.xml
            ├── memory-instance.xml
            └── tsdb
                ├── h2-tsdb.xml
                ├── mysql-tsdb.xml
                ├── sql
                │   └── create_table.sql
                └── sql-map
                    ├── sqlmap-config.xml
                    ├── sqlmap_history.xml
                    └── sqlmap_snapshot.xml
      2.  重要配置文件說明
        1.  canal.properties
          canal.properties這個配置文件負責的是canal服務的基礎配置,每個canal可以啟動多個實例instance,一個instance代表一個數據采集處理線程,
          每個instance都有一個獨立的配置文件instance.properties,不同的instance可以采集不同的mysql數據庫,也就是一個canal可以對應多個mysql數據庫。

          在instance里面有一個小隊列,可以理解為是jvm級的隊列,instance抓取來的數據先放入到隊列中,隊列可以有很多出口:
              ①mq模式:canal server自己主動把數據推送到kafka,這個比較簡單,一行代碼不用寫,只需要配個kafka|rocketmq|rabbitmq的地址,
                每個instance對應kafka的一個topic,數據是json串。
               這種方式雖然簡單,但是其缺點主要體現在兩個個方面,一個instance對應一個topic,所有表都在這一個topic,所以實時的時候要進行分流。
                 另一方面,因為數據是json,並且攜帶了很多冗余信息,但是數據量大的時候傳輸效率比較低。
              ②tcp模式:啟動canal客戶端連接到canal服務器端之后,主動去拉取數據,可以定義多長周期消費多少數據。缺點在於抓取出來的是序列化壓縮的數據,
                所以需要反序列化,解壓,比較麻煩。優點在於我們可以進行壓縮,過濾掉沒用的冗余信息,只保留我們需要的信息,提交傳輸效率。

          #################################################
          ######### common argument #############
          #################################################
          # tcp bind ip |canal server綁定的本地IP信息,如果不配置,默認選擇一個本機IP進行啟動服務
          canal.ip = 
          # register ip to zookeeper |canal server注冊到外部zookeeper、admin的ip信息 (針對docker的外部可見ip)
          canal.register.ip =
          # canal server提供socket服務的端口,端口號,是給tcp模式(netty)時候用的,如果用了kafka或者rocketmq,就不需要配置這個端口
          canal.port = 11111 
          canal.metrics.pull.port = 11112
          # canal instance user/passwd canal數據端口訂閱的ACL配置 (v1.1.4新增)如果為空,代表不開啟
          #canal.user = canal 
          # canal數據端口訂閱的ACL配置 (v1.1.4新增)如果為空,代表不開啟
          #canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 
           
          # canal admin config|canal鏈接canal-admin的地址 (v1.1.4新增)
          canal.admin.manager = 127.0.0.1:8089 
          # admin管理指令鏈接端口 (v1.1.4新增)
          canal.admin.port = 11110
          # admin管理指令鏈接的ACL配置 (v1.1.4新增)
          canal.admin.user = admin 
          # admin管理指令鏈接的ACL配置 (v1.1.4新增)
          canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 
           
          # canal server鏈接zookeeper集群的鏈接信息,單機部署不需要配置,集群部署需要配置
          canal.zkServers = 
          # flush data to zk| canal持久化數據到zookeeper上的更新頻率,單位毫秒
          canal.zookeeper.flush.period = 1000 
          canal.withoutNetty = false
          # tcp, kafka, RocketMQ| Canal模式設定,主要有tcp和mq兩種類型
          canal.serverMode = tcp
          # flush meta cursor/parse position to file|主要針對h2-tsdb.xml時對應h2文件的存放目錄,默認為conf/xx/h2.mv.db
          canal.file.data.dir = ${canal.conf.dir}
          canal.file.flush.period = 1000
          ## memory store RingBuffer size, should be Math.pow(2,n) |canal內存store中可緩存buffer記錄數,需要為2的指數
          canal.instance.memory.buffer.size = 16384 
          ## memory store RingBuffer used memory unit size , default 1kb | 內存記錄的單位大小,默認1KB,和buffer.size組合決定最終的內存使用大小
          canal.instance.memory.buffer.memunit = 1024 
          ## meory store gets mode used MEMSIZE or ITEMSIZE | canal內存store中數據緩存模式
          ## 1. ITEMSIZE : 根據buffer.size進行限制,只限制記錄的數量
          ## 2. MEMSIZE : 根據buffer.size * buffer.memunit的大小,限制緩存記錄的大小
          canal.instance.memory.batch.mode = MEMSIZE 
           
          canal.instance.memory.rawEntry = true
           
          ## detecing config |是否開啟心跳檢查
          canal.instance.detecting.enable = false 
          # 心跳檢查sql
          #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() 
          canal.instance.detecting.sql = select 1
          # 心跳檢查頻率,單位秒
          canal.instance.detecting.interval.time = 3 
          # 心跳檢查失敗重試次數
          canal.instance.detecting.retry.threshold = 3 
          # 心跳檢查失敗后,是否開啟自動mysql自動切換
          # 說明:比如心跳檢查失敗超過閥值后,如果該配置為true,canal就會自動鏈到mysql備庫獲取binlog數據
          canal.instance.detecting.heartbeatHaEnable = false 
           
          # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
          canal.instance.transaction.size = 1024
          # mysql fallback connected to new master should fallback times
          # canal發生mysql切換時,在新的mysql庫上查找binlog時需要往前查找的時間,單位秒
          # 說明:mysql主備庫可能存在解析延遲或者時鍾不統一,需要回退一段時間,保證數據不丟
          canal.instance.fallbackIntervalInSeconds = 60 
           
          # network config
          # 網絡鏈接參數,SocketOptions.SO_RCVBUF
          canal.instance.network.receiveBufferSize = 16384 
          # 網絡鏈接參數,SocketOptions.SO_SNDBUF
          canal.instance.network.sendBufferSize = 16384 
          # 網絡鏈接參數,SocketOptions.SO_TIMEOUT
          canal.instance.network.soTimeout = 30 

          # binlog filter config
          # 是否使用druid處理所有的ddl解析來獲取庫和表名
          canal.instance.filter.druid.ddl = true
          # 是否忽略dcl語句
          canal.instance.filter.query.dcl = false
          # 是否忽略dml語句(mysql5.6之后,在row模式下每條DML語句也會記錄SQL到binlog中,可參考MySQL文檔)
          canal.instance.filter.query.dml = false
          # 是否忽略ddl語句
          canal.instance.filter.query.ddl = false
          # 是否忽略binlog表結構獲取失敗的異常(主要解決回溯binlog時,對應表已被刪除或者表結構和binlog不一致的情況)
          canal.instance.filter.table.error = false
          # 是否dml的數據變更事件(主要針對用戶只訂閱ddl/dcl的操作)
          canal.instance.filter.rows = false
          # 是否忽略事務頭和尾,比如針對寫入kakfa的消息時,不需要寫入TransactionBegin/Transactionend事件
          canal.instance.filter.transaction.entry = false

          # binlog format/image check
          # 支持的binlog format格式列表(otter會有支持format格式限制)
          canal.instance.binlog.format = ROW,STATEMENT,MIXED
          # 支持的binlog image格式列表(otter會有支持format格式限制)
          canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

          # binlog ddl isolation
          # ddl語句是否單獨一個batch返回(比如下游dml/ddl如果做batch內無序並發處理,會導致結構不一致)
          canal.instance.get.ddl.isolation = false

          # parallel parser config
          # 是否開啟binlog並行解析模式
          canal.instance.parser.parallel = true
          ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
          #canal.instance.parser.parallelThreadSize = 16
          ## disruptor ringbuffer size, must be power of 2| binlog並行解析的異步ringbuffer隊列(必須為2的指數)
          canal.instance.parser.parallelBufferSize = 256

          # table meta tsdb info
          # 是否開啟tablemeta的tsdb能力
          canal.instance.tsdb.enable = true
          canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
          canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
          canal.instance.tsdb.dbUsername = canal
          canal.instance.tsdb.dbPassword = canal
          # dump snapshot interval, default 24 hour
          canal.instance.tsdb.snapshot.interval = 24
          # purge snapshot expire , default 360 hour(15 days)
          canal.instance.tsdb.snapshot.expire = 360

          #################################################
          ######### destinations #############
          #################################################
          # 當前server上部署的instance列表,多個使用逗號分隔
          canal.destinations =
          # conf root dir | conf/目錄所在的路徑
          canal.conf.dir = ../conf
          # auto scan instance dir add/remove and start/stop instance | 開啟instance自動掃描
          # 如果配置為true,canal.conf.dir目錄下的instance配置變化會自動觸發
          canal.auto.scan = true
          # instance自動掃描的間隔時間,單位秒
          canal.auto.scan.interval = 5
          # v1.0.25版本新增,全局的tsdb配置方式的組件文件
          canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
          #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
          # 全局配置加載方式
          canal.instance.global.mode = manager
          # 全局lazy模式
          canal.instance.global.lazy = false
          # 全局的manager配置方式的鏈接信息
          canal.instance.global.manager.address = ${canal.admin.manager}
          #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
          # 全局的spring配置方式的組件文件
          canal.instance.global.spring.xml = classpath:spring/file-instance.xml
          #canal.instance.global.spring.xml = classpath:spring/default-instance.xml

          ##################################################
          ######### MQ #############
          ##################################################
          # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092
          canal.mq.servers = 127.0.0.1:6667
          # 發送失敗重試次數
          canal.mq.retries = 0
          # kafka為ProducerConfig.BATCH_SIZE_CONFIG,每次發送批量消息的個數
          canal.mq.batchSize = 16384
          # kafka為ProducerConfig.MAX_REQUEST_SIZE_CONFIG
          canal.mq.maxRequestSize = 1048576
          # kafka為ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建議將該值調大, 如: 200
          canal.mq.lingerMs = 100
          # kafka為ProducerConfig.BUFFER_MEMORY_CONFIG
          canal.mq.bufferMemory = 33554432
          # 獲取canal數據的批次大小
          canal.mq.canalBatchSize = 50
          # 獲取canal數據的超時時間
          canal.mq.canalGetTimeout = 100
          # 是否為json格式
          canal.mq.flatMessage = true
          canal.mq.compressionType = none
          # kafka為ProducerConfig.ACKS_CONFIG
          canal.mq.acks = all
          #canal.mq.properties. =
          canal.mq.producerGroup = test

        2. canl_local.properties
          # 主要定義了本地webUI # register ip
          canal.register.ip = # canal admin config canal.admin.manager = 127.0.0.1:8089
          canal.admin.port = 11110
          canal.admin.user = admin
          canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
          # admin auto register canal.admin.register.auto = true
          canal.admin.register.cluster =
          canal.admin.register.name = 
        3. example/instance.properties
          一個example的目錄就是一個instance,canal要配置多個實例采集多個數據源mysql的話如下配置,然后把conf目錄下example復制多份,分別重命名,如下
          #################################################
          ######### destinations #############
          #################################################
          canal.destinations = example1,example2,example3
          ————————————————
           
          #################################################
          ## mysql serverId , v1.0.26+ will autoGen|作為Mysql從節點的id,1.0.26版本之后會自動生成
          # canal.instance.mysql.slaveId=0
           
          # enable gtid use true/false | 是否啟用mysql gtid的訂閱模式
          canal.instance.gtidon=false 
           
          # position info
          # mysql主庫鏈接地址
          canal.instance.master.address=rm-qj0lin9gpm5hn841b.mysql.rds.cnipaig2.cloud:3306 
          # mysql主庫鏈接時起始的binlog文件
          canal.instance.master.journal.name= 
          # mysql主庫鏈接時起始的binlog偏移量
          canal.instance.master.position=
          # mysql主庫鏈接時起始的binlog的時間戳
          canal.instance.master.timestamp= 
          # mysql主庫鏈接時對應的gtid位點
          canal.instance.master.gtid= 
           
          # rds oss binlog
          canal.instance.rds.accesskey=
          canal.instance.rds.secretkey=
          canal.instance.rds.instanceId=
           
          # table meta tsdb info
          canal.instance.tsdb.enable=fasle
          #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
          #canal.instance.tsdb.dbUsername=canal
          #canal.instance.tsdb.dbPassword=canal
           
          #canal.instance.standby.address =
          #canal.instance.standby.journal.name =
          #canal.instance.standby.position =
          #canal.instance.standby.timestamp =
          #canal.instance.standby.gtid=
           
          # username/password
          # mysql數據庫帳號
          canal.instance.dbUsername=rds_test 
          # mysql數據庫密碼
          canal.instance.dbPassword=rds_test123 
          canal.instance.connectionCharset = UTF-8
          # enable druid Decrypt database password
          canal.instance.enableDruid=false
          #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
           
          # table regex| 需要采集的MySQL表過濾匹配表達式
          canal.instance.filter.regex=.*\\..*
          # table black regex 數據表黑名單過濾匹配表達式
          canal.instance.filter.black.regex=
          # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
          #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
          # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
          #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
           
          # mq config | mq的topic信息
          canal.mq.topic=drds
          # dynamic topic route by schema or table regex|是否每張表動態匹配一個topic
          #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
          # mq的分區數量
          canal.mq.partition=0
          # hash partition config | 散列模式的分區數
          #canal.mq.partitionsNum=3 
          #canal.mq.partitionHash=test.table:id^name,.*\\..*
          #################################################
  2. Canal使用前提條件--安裝MySQL+開啟binlog支持(Mac)
    1.  下載Mac版本MySQL安裝包
       mysql dmg安裝包下載地址,選擇版本和系統版本下載
      https://downloads.mysql.com/archives/community/
    2. 安裝dmg安裝包
      需要注意在最后一步安裝會生成一個root用戶秘密,需要記錄下來 

    3. 設置環境變量
      vim /etc/profile
        export PATH=$PATH:/usr/local/mysql/bin
      vim ~/.bash_profile
         export PATH=$PATH:/usr/local/mysql/bin
      source /etc/profile
      source ~/.bash_profile
    4. 啟動MySQL
      apple---系統便好設置---mysql----啟動mysql 
    5. 修改root用戶密碼+創建canal用戶
      mysql -u root -p’你剛才的密碼’
      alter user root@localhost identified by 'root';
      GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' ;
      -- canal的原理是模擬自己為mysql slave,所以這里一定需要做為mysql slave的相關權限 
      CREATE USER canal IDENTIFIED BY 'canal';  
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
      -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;  
      flush privileges;
      -- 針對已有的賬戶可通過grants查詢權限:
      show grants for 'canal' ;
    6. 開啟MySQL的binlog日志
      -- canal的原理是基於mysql binlog技術,所以這里一定需要開啟mysql的binlog寫入功能,並且配置binlog模式為row. vim /etc/my.cnf
      [mysqld]
      default-storage-engine=INNODB
      character-set-server=utf8
      log-bin = mysql-bin
      binlog-format = ROW
      server_id = 1
      port = 3306
      [client]
      default-character-set=utf8
      
      -- 重啟mysql--同啟動步驟
      -- 驗證binlog是否開啟
      mysql> show variables like 'binlog_format';
      +---------------+-------+
      | Variable_name | Value |
      +---------------+-------+
      | binlog_format | ROW   |
      +---------------+-------+
      mysql> show variables like 'log_bin';
      +---------------+-------+
      | Variable_name | Value |
      +---------------+-------+
      | log_bin       | ON    |
      +---------------+-------+
  3. 單機部署Canal(Mac)

    1. 下載安裝包
      cd /data/app
      wget https://github.com/alibaba/canal/releases/download/canal-1.1.15/canal.deployer-1.1.15.tar.gz
    2. 解壓安裝包
      cd /data/app
      mkdir canal-server-1.1.15
      tar zxvf canal.deployer-$version.tar.gz  -C canal-server-1.1.15           
    3. 使用tcp模式-客戶端主動拉取模式
      1. 配置文件
        1. conf.canal.properties
          #################################################
          #########        common argument     #############
          #################################################
          # tcp bind ip
          canal.ip =
          # register ip to zookeeper
          canal.register.ip =
          canal.port = 11111
          canal.metrics.pull.port = 11112
          # canal instance user/passwd
          # canal.user = canal
          # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
          
          #################################################
          #########    destinations           #############
          #################################################
          canal.destinations = example # conf root dir
          canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance
          canal.auto.scan = true
          canal.auto.scan.interval = 5
          # set this value to 'true' means that when binlog pos not found, skip to latest.
          # WARN: pls keep 'false' in production env, or if you know what you want.
          canal.auto.reset.latest.pos.mode = false
          
          # tcp, kafka, rocketMQ, rabbitMQ
          canal.serverMode = tcp
        2. conf/example/instance.properties
          #################################################
          ## mysql serverId , v1.0.26+ will autoGen
          canal.instance.mysql.slaveId=1234
          
          # position info
          canal.instance.master.address=127.0.0.1:3306
          canal.instance.master.journal.name=
          canal.instance.master.position=
          canal.instance.master.timestamp=
          canal.instance.master.gtid=
          
          # username/password
          canal.instance.dbUsername=canal
          canal.instance.dbPassword=canal
          canal.instance.connectionCharset = UTF-8
          # enable druid Decrypt database password
          canal.instance.enableDruid=false
          #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
          
          # table regex
          #canal.instance.filter.regex=.*\\..*
          canal.instance.filter.regex=douc_0827\\..*,dosm_activiti_0827\\..*
      2. 啟動canal
        cd /data/app/canal-server-1.1.15
        ./bin/startup.sh
      3. 查看日志
        canal服務器日志文件目錄
        cd /data/app/canal-server-1.1.15/logs/canal
        
        example實例日志文件目錄
        cd /data/app/canal-server-1.1.15/logs/example
      4. 項目搭建與測試
        1. pom.xml
          <?xml version="1.0" encoding="UTF-8"?>
          <project xmlns="http://maven.apache.org/POM/4.0.0"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
              <parent>
                  <artifactId>spring-boot-cancal</artifactId>
                  <groupId>com.example</groupId>
                  <version>0.0.1-SNAPSHOT</version>
              </parent>
              <modelVersion>4.0.0</modelVersion>
              <artifactId>canal-use-demo</artifactId>
              <dependencies>
                  <dependency>
                      <groupId>org.apache.logging.log4j</groupId>
                      <artifactId>log4j-to-slf4j</artifactId>
                      <version>2.7</version>
                  </dependency>
                  <dependency>
                      <groupId>mysql</groupId>
                      <artifactId>mysql-connector-java</artifactId>
                      <version>5.1.48</version>
                      <scope>runtime</scope>
                  </dependency>
                  <!--引入canal依賴-->
                  <dependency>
                      <groupId>com.alibaba.otter</groupId>
                      <artifactId>canal.client</artifactId>
                      <version>1.1.5</version>
                  </dependency>
                  <dependency>
                      <groupId>com.alibaba.otter</groupId>
                      <artifactId>canal.common</artifactId>
                      <version>1.1.5</version>
                  </dependency>
                  <dependency>
                      <groupId>com.alibaba.otter</groupId>
                      <artifactId>canal.protocol</artifactId>
                      <version>1.1.5</version>
                  </dependency>
                  <dependency>
                      <groupId>com.alibaba</groupId>
                      <artifactId>druid</artifactId>
                      <version>1.2.6</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.kafka</groupId>
                      <artifactId>kafka_2.11</artifactId>
                      <version>1.1.1</version>
                  </dependency>
              </dependencies>
          </project>
        2. 工具類
          public class CanalUtils {
              /**
               * 獲取canal連接器
               */
              public static CanalConnector getSingleCanalConnector(String host,int port,String desc,String user,String password){
                  CanalConnector connector = CanalConnectors.newSingleConnector(
                          new InetSocketAddress(host,port), desc, user, password);
                  return connector;
              }
              /**
               * 獲取集群canal連接器
               */
              public static CanalConnector getClusterCanalConnector(List<? extends SocketAddress> addresses, String desc, String user, String password){
                  CanalConnector connector = CanalConnectors.newClusterConnector(addresses, desc, user, password);
                  return connector;
              }
              public static CanalConnector getClusterCanalConnectorV2(List<Map<String,String>> adrs, String desc, String user, String password){
                  List<InetSocketAddress> addresses = new ArrayList<>();
                  adrs.forEach(en ->{
                      String host = en.get("host");
                      String port = en.get("port");
                      addresses.add(new InetSocketAddress(host,Integer.parseInt(port)));
                  });
                  CanalConnector connector = CanalConnectors.newClusterConnector(addresses, desc, user, password);
                  return connector;
              }
              /**
               * 獲取由zookeeper組成的canal集群鏈接
               */
              public static CanalConnector getClusterCanalConnector(String zkHost, String desc, String user, String password){
                  CanalConnector connector = CanalConnectors.newClusterConnector(zkHost, desc, user, password);
                  return connector;
              }
              public static CanalConnector getLocalCanalConnector(){
          //        return getCanalConnector(AddressUtils.getHostIp(), 11111, "example", "", "");
                  return getSingleCanalConnector("127.0.0.1", 11111, "example", "", "");
              }
          
              /**
               * 獲取MySQL連接
               */
              public static Connection getMySqlConnnection(String url,String userName,String password){
                  Connection connection = null;
                  try{
                      Class.forName("com.mysql.jdbc.Driver");
                      connection = DriverManager.getConnection(url,userName,password);
                  }catch (Exception e){
                      System.out.println(e.getLocalizedMessage());
                  }
                  return connection;
              }
              public static boolean execute(Connection conn,String sql){
                  try(PreparedStatement statement = conn.prepareStatement(sql)){
                      return statement.execute();
                  }catch (Exception e){
                      System.out.println(e.getLocalizedMessage());
                  }
                  return false;
              }
              /**
               * 獲取數據連接池
               */
              public static DataSource getDataSource(String url,String user,String psd){
                  DruidDataSource dataSource = new DruidDataSource();
                  // 2、為數據源實例指定必須的屬性
                  dataSource.setUsername(user);
                  dataSource.setPassword(psd);
                  dataSource.setUrl(url);
                  dataSource.setDriverClassName("com.mysql.jdbc.Driver");
                  // 3、指定數據源的一些可選屬性
                  // 3.1、指定數據庫連接池中出使化連接數的個數
                  dataSource.setInitialSize(5);
                  // 3.2、指定最大連接數:同意時刻可以同時向數據庫申請的連接數
                  dataSource.setMaxActive(20);
                  // 3.3、指定最小連接數:在數據庫連接池中保存的最少的空閑連接的數量
                  dataSource.setMinIdle(2);
                  // 3.4、等待數據庫連接池分配連接的最長時間。單位為毫秒。超出時間將拋出異常
                  dataSource.setMaxWait(1000 * 5);
                  // 4、從數據源中獲取數據庫連接
                  return dataSource;
              }
          
              /**
               * 數據連接池獲取鏈接
               */
              public static Connection getConnFromPool(DataSource dataSource) throws Exception{
                  return dataSource.getConnection();
              }
          }
        3. 測試類
          public class SimpleCanalClientExample {
              public static void main(String[] args) {
                  // 創建鏈接
                  CanalConnector connector = CanalUtils.getLocalCanalConnector();
                  int batchSize = 1000;
                  int emptyCount = 0;
                  int dataCount = 0;
                  try {
                      // 打開鏈接
                      connector.connect();
                      // todo 訂閱數據庫表,全部表
          //            connector.subscribe(".*\\..*");
                      /**
                       * todo 針對cw_dodb,abc數據庫表數據操作都會追逐,其他數據庫級別的操作(創建|修改)也會追蹤,針對其他庫中表操作不會記錄
                       *      -- 方式一: 在服務器/canal-server-1.1.5/conf/example/instance.properties 進行如下配置也可以
                       *               canal.instance.filter.regex=cw_dodb\\..*,abc\\..*
                       *               在代碼中使用服務器中默認配置的,也就是不重新調用subscribe方法
                       *      -- 方式二: 代碼中覆蓋 connector.subscribe("cw_dodb\\..*,abc\\..*");
                       *
                       */
          //            connector.subscribe("cw_dodb\\..*,abc\\..*");
                      connector.subscribe("a1\\..*");
                      // 回滾到為進行ack的地方,下次fetch的時候,可以從最后一個沒有ack的地方開始獲取
                      connector.rollback();
                      while (true) {
                          // 獲取指定數量的數據
                          Message message = connector.getWithoutAck(batchSize);
                          // 獲取批量ID
                          long batchId = message.getId();
                          // 獲取批量的數量
                          int size = message.getEntries().size();
                          // 如果沒有數據的處理邏輯
                          if (batchId == -1 || size == 0) {
                              emptyCount++;
                              // System.out.println("empty count : " + emptyCount);
                              try {
                                  Thread.sleep(1000);
                              } catch (InterruptedException e) {
                              }
                          } else {
                              // 有數據的處理emptyCount = 0;
                              dataCount++;
                              // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                              printEntry(message.getEntries());
                          }
                          // 提交確認,標記ack
                          connector.ack(batchId);
                          System.out.println("---handled data counts: "+dataCount);
                          // 處理失敗, 回滾數據
                          // connector.rollback(batchId);
                      }
                  } finally {
                      connector.disconnect();
                  }
              }
              /**
               * 打印canal server解析binlog獲取的實體類信息
               * @param entrys
               */
              private static void printEntry(List<Entry> entrys) {
                  for (Entry entry : entrys) {
                      if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                          // 開啟/關閉事務的實體類型,跳過
                          continue;
                      }
                      /**
                       * RowChange對象,包含了一行數據變化的所有特征
                       * 比如isDdl 是否是ddl變更操作 sql 具體的ddl sql beforeColumns afterColumns 變更前后的數據字段等等
                       */
                      RowChange rowChage = null;
                      try {
                          rowChage = RowChange.parseFrom(entry.getStoreValue());
                      } catch (Exception e) {
                          throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
                      }
                      // 獲取針對Mysql數據操作類型:insert/update/delete
                      EventType eventType = rowChage.getEventType();
          
                      // 打印Header信息
                      System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                              entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                              entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                              eventType));
          
                      // 判斷是否是DDL語句
                      if(rowChage.getIsDdl()){
                          System.out.println("================》;isDdl: true,sql: " + rowChage.getSql());
                      }
          
                      List<RowData> rowDatasList = rowChage.getRowDatasList();
                      if(!CollectionUtils.isEmpty(rowDatasList)) {
                          // todo 如果不是DDL的話獲取不到原始sql語句,只能通過數據組裝成SQL
                          String sql = rowChage.getSql();
                          System.out.println("------sql--------: "+sql);
          
                          // 獲取RowChange對象里的每一行數據,打印出來
                          for (RowData rowData : rowChage.getRowDatasList()) {
          
                              if (eventType == EventType.DELETE) {
                                  System.out.println("----------------delete:----------------");
                                  printColumn(rowData.getBeforeColumnsList());
                              } else if (eventType == EventType.INSERT) {
                                  System.out.println("-----------------insert:-----------------");
          
                                  printColumn(rowData.getAfterColumnsList());
                              } else {
                                  System.out.println("-------&gt; before");
                                  printColumn(rowData.getBeforeColumnsList());
                                  System.out.println("-------&gt; after");
                                  printColumn(rowData.getAfterColumnsList());
                              }
                          }
                      }
                  }
              }
              private static void printColumn(List<Column> columns) {
                  for (Column column : columns) {
                      System.out.println("\t"+column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                  }
              }
          }
      5. 項目運行與MySQL操作測試

    4. 使用mq模式-服務端主動推送模式

      1. 配置文件
        1. conf/canal.propertie
          # tcp, kafka, rocketMQ, rabbitMQ
          canal.serverMode = kafka 
          ##################################################
          #########      Kafka      #############
          ##################################################
          kafka.bootstrap.servers = 127.0.0.1:9092
          kafka.acks = all
          kafka.compression.type = none
          kafka.batch.size = 16384
          kafka.linger.ms = 1
          kafka.max.request.size = 1048576
          kafka.buffer.memory = 33554432
          kafka.max.in.flight.requests.per.connection = 1
          kafka.retries = 0
        2. conf/canal.properties

          # mq config
          canal.mq.topic=canal-binlog # dynamic topic route by schema or table regex
          #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
          canal.mq.dynamicTopic=
          canal.mq.partition=0
          # hash partition config
          #canal.mq.partitionsNum=3
          #canal.mq.partitionHash=test.table:id^name,.*\\..*
          #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
      2. 啟動canal
        cd /data/app/canal-server-1.1.15
        ./bin/startup.sh
      3. MySQL操作+件套Kafka消息  
  4. 集群部署Canal(Linux-host01,host02)
    1. zookeeper安裝
    2. canal集群部署與測試  
      1. canal集群工作原理
            一個canal服務器進程,每一個instance就是一個線程,單獨對應一個mysql服務器的binlog。再起一個canal服務的話,對於同一個mysql服務器不能做負載均衡,數據分片等。有兩個canal服務器
        都監控一個或多個mysql服務器的binlog。
            這兩個canal服務同時只能有一個提供服務,當提供服務的這個宕機時,zookeeper能知道,zookeeper就通知另一個canal服務器讓他提供服務。當原來宕機的那個再啟動起來時,是搶占模式的, 誰搶到就誰上,沒搶到就standy模式。
            canal本身就是一個工具不存數據,宕機了就宕機,只有還有另外一個能提供服務就行,所以沒有什么同步問題(不像數據庫有同步問題)。因為啟動canal服務是需要消耗資源的,不想redis高可用
        占資源太少了。canal的standy資源也不能給少了,要雙份資源,就看企業在意不在意,服務核心不核心。
          maxwell和canal非常像,maxwell連高可用機制都沒提供,倒了就再起。其實很多軟件都是不提供高可用方案的,如果怕他倒的話,可以用Keepalived,這個機制很簡單就是做心跳監測,可以給任何
        進程做一個心跳檢測,可以一直檢測他在不在進程列表里,如果宕了進程沒了他會有一系列觸發操作,可以在他里面寫一個shell,如還有一個備機,要是這個掛了就在備機啟動。或者自己手工在restart,
        這是一種通用型方案。Keepalived和maxwell是完全沒有耦合關系的,maxwell完全不知道Keepalived的存在,Keepalived是從外圍的觀察者觀察這個進程,不像zookeeper,是需要向它注冊的。
        
        注意:這里zookeeper為觀察者監控的模式,只能實現高可用,而不是負載均衡, 即同一時點只有一個canal-server節點能夠監控某個數據源,只要這個節點能夠正常工作, 那么其他監控這個數據源的canal-server只能做stand-by,直到工作節點停掉,其他canal-server節點才能搶占。
      2. canal服務器配置文件修改
        ======= host01 # 修改配置信息
        cd  /data/app/canal-server-1.1.5
        rm -rf conf/example/meta.dat
        rm -rf conf/example/h2*
        cd conf
        vim canal.properties
            #################################################
            #########        common argument     #############
            #################################################
            # tcp bind ip
            canal.ip =
            # register ip to zookeeper 每個實例推薦使用不同的ip
            canal.register.ip = 10.1.1.2
            canal.port = 11111
            canal.metrics.pull.port = 11112
            canal.zkServers =127.0.0.1:2181
        
        ====== host02 # 進行復制和修改
        cd  /data/app/canal-server-1.1.5
        rm -rf conf/example/meta.dat
        rm -rf conf/example/h2*
        cd conf
        vim canal.properties
            #################################################
            #########        common argument     #############
            #################################################
            # tcp bind ip
            canal.ip =
            # register ip to zookeeper 每個實例推薦使用不同的ip
            canal.register.ip = 10.1.1.3
            canal.port = 11111
            canal.metrics.pull.port = 11112
            canal.zkServers =127.0.0.1:2181
            
        vim example/instance.properties
            canal.instance.mysql.slaveId=1235
      3. zookeeper啟動+服務狀態檢查
         1. 啟動zookeeper
                [root@localhost data]# zkServer.sh start
                    ZooKeeper JMX enabled by default
                    Using config: /usr/local/zookeeper-3.4.13/bin/../conf/zoo.cfg
                    Starting zookeeper ... STARTED
        2. 啟動狀態檢查
                [root@VM-16-13-centos zookeeper-3.4.13]# zkServer.sh status
                ZooKeeper JMX enabled by default
                Using config: /usr/local/zookeeper-3.4.13/bin/../conf/zoo.cfg
                Mode: standalone
      4. canal集群啟動
        -- host01,host02上分別執行
        cd /data/app/canal-server-1.1.5 ./bin/startup.sh
         

         

        結合如上截圖中,兩個canal服務器節點通過zookeeper組成集群,兩個節點中的instance相同監聽同一個MySQL的數據信息,當集群啟動時發現最初啟動節點的logs目錄下
        存在實例日志文件夾 example,另外一個節點logs目錄下卻沒有,符合canal集群高可用原則,一個處於active一個處於standy狀態
      5. canal集群高可用驗證
        1. 在節點10.177.146.33(active)上關閉canal
        2. 執行MySQL數據操作,發現canal的kafka依舊有數據寫入
        3. 查看另外一個節點10.177.146.32(standy),發現logs下出現example實例日志
        4. 在節點10.177.146.33上刪除logs下所有文件,重啟canal之后發現logs目錄下未生成example實例日志
        5. 總結: 該過程中,active狀態節點流轉為33-->32    
  5. Canal擴展(業務需要)
    1. 需求說明
       -- 使用canal轉發收集到的binlog日志到kafka,使用的是canal自定義的JSON數據格式,如下所示   {
          "data":[
              {
                  "a":"ee"
              }
          ],
          "database":"abc",
          "es":1637578296000,
          "id":19,
          "isDdl":false,
          "mysqlType":{
              "a":"varchar(100)"
          },
          "old":null,
          "pkNames":null,
          "sql":"",
          "sqlType":{
              "a":12
          },
          "table":"abc",
          "ts":1637578297135,
          "type":"INSERT"
      }
         -- 為了快速接入Dodb,考慮到Dodb有現有功能--Kafka數據源(帶存儲型),可以將kafka中數據映射到Dodb的數據表ClickHouse表中
         -- 考慮到Clickhouse的update,delete性能比較低,將所有的update,delete操作轉換為insert操作
              新增倆個字段version(DateTime),sign(Int),version使用插入時間毫秒數,sign更新|插入時使用1,刪除時使用-1,不過后期需要考慮clickhouse數據去重  
              {
                  "number":"1",
                  "commodity_price":"1.0",
                  "sign":1,
                  "description":"e8023b930e384692978af8a7af4ea6f6",
                  "id":"d26eea16687a40e2be925cae162c9bd3",
                  "version":1637560526296,
                  "commodity_name":"2"
              }
              {
                  "number":"1",
                  "commodity_price":"1.0",
                  "sign":-1,
                  "description":"e8023b930e384692978af8a7af4ea6f6",
                  "id":"d26eea16687a40e2be925cae162c9bd3",
                  "version":1637560726488,
                  "commodity_name":"2"
              }
         -- 將收集的不同表中binlog變更數據,分發到對應的各自topic中,之后在dodb中創建對應的Kafka存儲型
             數據源中,將數據寫入Clickhouse數據表
    2. 擴展思路一
      binlog(MySQL)->canal server->kafka producer(Canal內嵌)->kafka consumer(擴展)->kafka producer(擴展)->dodb kafka存儲型數據源->ClickHouse -- kafka consumer(擴展項目程序中)
          1. 過濾符合條件的數據,根據配置的數據庫名稱+數據表名稱
          2. 數據轉換,將新增,刪除,更新操作統一轉換為新增操作,使用字段version-Long,sign—Int作為標識
          3. 數據分發,針對采集的表數據,經過轉換后每張表數據轉發至對應的單個topic中
      -- kafka producer(擴展項目程序中)
          4. 完成數據轉發操作
      1. canal原生內嵌kafka發送數據樣例
        kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal-binlog --property print.key=true
        
        null        {"data":[{"id":"aba1b95e24dd4434909177c2c14f3020","commodity_name":"1","commodity_price":"3.0","number":"3","description":"79d9782c3d174c918fe64ef66d1bc039"}],"database":"abc","es":1637560518000,"id":5,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"double","number":"int(10)","description":"varchar(2048)"},"old":[{"commodity_name":"desc1"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":8,"number":4,"description":12},"table":"abc","ts":1637560518949,"type":"UPDATE"}
        null        {"data":[{"id":"d26eea16687a40e2be925cae162c9bd3","commodity_name":"2","commodity_price":"1.0","number":"1","description":"e8023b930e384692978af8a7af4ea6f6"}],"database":"abc","es":1637560526000,"id":6,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"double","number":"int(10)","description":"varchar(2048)"},"old":[{"commodity_name":"desc1"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":8,"number":4,"description":12},"table":"abc","ts":1637560526294,"type":"UPDATE"}  
        null        {"data":[{"id":"d26eea16687a40e2be925cae162c9bd3","commodity_name":"2","commodity_price":"1.0","number":"1","description":"e8023b930e384692978af8a7af4ea6f6"}],"database":"abc","es":1637560726000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"double","number":"int(10)","description":"varchar(2048)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":8,"number":4,"description":12},"table":"abc","ts":1637560726484,"type":"DELETE"}
        null        {"data":[{"id":"wwwwww","commodity_name":"1","commodity_price":"0.0","number":"0","description":"wwwww"}],"database":"abc","es":1637560787000,"id":8,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"double","number":"int(10)","description":"varchar(2048)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":8,"number":4,"description":12},"table":"abc","ts":1637560787488,"type":"INSERT"}
      2. 添加擴展程序后kafka發送數據樣例
        -- topic 為指定表名稱+特定配置的前后綴組成 kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mysql_abc_topic --property print.key=true
        
        null        {"number":"3","commodity_price":"3.0","sign":1,"description":"79d9782c3d174c918fe64ef66d1bc039","id":"aba1b95e24dd4434909177c2c14f3020","version":1637560518952,"commodity_name":"1"}
        null        {"number":"1","commodity_price":"1.0","sign":1,"description":"e8023b930e384692978af8a7af4ea6f6","id":"d26eea16687a40e2be925cae162c9bd3","version":1637560526296,"commodity_name":"2"}
        null        {"number":"1","commodity_price":"1.0","sign":-1,"description":"e8023b930e384692978af8a7af4ea6f6","id":"d26eea16687a40e2be925cae162c9bd3","version":1637560726488,"commodity_name":"2"}
        null        {"number":"0","commodity_price":"0.0","sign":1,"description":"wwwww","id":"wwwwww","version":1637560787492,"commodity_name":"1"}
      3. 擴展代碼開發
        1. pom.xml
           <dependencies>
                  <dependency>
                      <groupId>com.alibaba</groupId>
                      <artifactId>fastjson</artifactId>
                      <version>1.2.15</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.kafka</groupId>
                      <artifactId>kafka_2.11</artifactId>
                      <version>1.1.1</version>
                  </dependency>
                  <dependency>
                      <groupId>org.projectlombok</groupId>
                      <artifactId>lombok</artifactId>
                      <version>1.18.20</version>
                  </dependency>
              </dependencies>
              <build>
                  <plugins>
                      <plugin>
                          <artifactId>maven-compiler-plugin</artifactId>
                          <version>2.0.2</version>
                          <configuration>
                              <source>1.8</source>
                              <target>1.8</target>
                              <encoding>utf-8</encoding>
                          </configuration>
                      </plugin>
                          <plugin>
                              <artifactId>maven-assembly-plugin</artifactId>
                              <configuration>
                                  <descriptorRefs>
                                      <descriptorRef>jar-with-dependencies</descriptorRef>
                                  </descriptorRefs>
                              </configuration>
                          </plugin>
                  </plugins>
              </build>
        2. 靜態變量類
          public class Constants {
              /**標記字段  1-更新|新增   -1-刪除*/
              public static final String SIGN = "sign";
              /** 聚合處理字段*/
              public static final String VERSION = "version";
              public static final Integer ADD_UPDATE = 1;
              public static final Integer DELETE = -1;
              /**
               * 目的數據topic
               * -- 數據處理完畢之后發送數據的topic
               * -- 采用每張表的一個topic
               */
              public static final String TOPIC_PREFIX = "mysql_";
              public static final String TOPIC_SUFFIX = "_topic";
              /**canal轉發的kafka消息消費組i*/
              public static final String CONSUMER_GROUP_NAME = "from_canal_kafka";
          }
        3. 配置文件  
          /**
           * @version 1.0.0
           * @ClassName ReWrapperConfig.java
           * @Description TODO
           * canal.properties配置文件中添加如下參數
            #此參數包含需要采集的數據庫.數據表名
            custom.async.mysql.db_table.list=douc.dosm_data_dictionary,douc.tab,abc.abc,abc.abcd
            # 消費canal轉發至kafka的binlog消息,進行數據組裝+拆分之后發送最終kafka的topic的>前后綴
            # 注意: 處理后的每個張表的數據發送往 prefix_table_suffix topic且增刪改操作都處>理為增,使用sign,version區分
            custom.async.kafka.topic.prefix = mysql_
            custom.async.kafka.topic.suffix = _topic
           */
          @Data
          @Slf4j
          public class ReWrapperConfig {
              /**源數據topic -- 對標  canal.mq.topic*/
              private String sourceTopic;
              /**kafka地址 -- 對標  kafka.bootstrap.servers*/
              private String kafkaServer;
              private Thread thread;
              private KafkaConsumer<String,String> consumer;
              private KafkaProducer<String, String> producer;
              /**
               * 上面將數據庫和數據庫和數據表拆分為兩個參數是有問題的
               * -- 此參數包含數據庫.數據表名
               * -- canal.properties中 新增字段custom.async.mysql.db_table.list
               */
              private String mysqlDbTables;
              private List<String> dbTables;
              /**
               * 解析之后每張表數據采集發送到一個topic中
               * -- dodb創建kafka存儲性的數據源,映射到對應的表中
               * -- canal.properties中 新增字段custom.async.kafka.topic.prefix
               * -- canal.properties中 新增字段custom.async.kafka.topic.suffix
               */
              private String topic_prefix;
              private String topic_suffix;
          public void init(){ consumer = consumerFromCanalOrinal(); producer = kafkaProducer(); KafkaReWrapperHandler kafkaReWrapperHandler = new KafkaReWrapperHandler(consumer,producer,this); thread = new Thread(kafkaReWrapperHandler); thread.start(); } public void destory(){ if(null != consumer){ consumer.close();} if(null != producer){producer.close(); } if(null != null && !thread.isInterrupted()){ thread.interrupt();} } /** * canal轉發的kafka消息消費者 * @return */ public KafkaConsumer<String,String> consumerFromCanalOrinal(){ Properties props = new Properties(); // 定義kakfa 服務的地址,不需要將所有broker指定上 props.put("bootstrap.servers", kafkaServer); // 制定consumer group props.put("group.id",CONSUMER_GROUP_NAME); // 是否自動確認offset props.put("enable.auto.commit", "false"); // 自動確認offset的時間間隔 props.put("auto.commit.interval.ms", "1000"); // key的反序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的反序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("session.timeout.ms", "60000"); props.put("max.poll.interval.ms", "50000"); props.put("max.poll.records", "5000"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer.encoding", "UTF8"); props.put("value.deserializer.encoding", "UTF8"); props.put("max.partition.fetch.bytes", 209715200); props.put("receive.buffer.bytes", 1024 * 1024 * 10); props.put("fetch.min.bytes", 300); KafkaConsumer<String, String> stringStringKafkaConsumer = new KafkaConsumer<>(props); stringStringKafkaConsumer.subscribe(Arrays.asList(sourceTopic)); return stringStringKafkaConsumer; } /** * 重新處理之后發送Kafka的生產者 * -- 數據來源於初始canal轉kafka的binlog數據 * -- 數據進行處理 * -- 發送給各個表對應的topic * @return */ public KafkaProducer<String, String> kafkaProducer() { Properties props = new Properties(); props.put("bootstrap.servers", kafkaServer); /*ack方式,all,會等所有的commit最慢的方式; 1:由Leader確認 如果不是集群部署模式需要將該行代碼注釋掉,否則報錯找不到當前topic可用的Leader*/ //props.put("acks", "1"); // 失敗是否重試,設置會有可能產生重復數據 props.put("retries", 0); // 對於每個partition的batch buffer大小 props.put("batch.size", 16384); // 等多久,如果buffer沒滿,比如設為1,即消息發送會多1ms的延遲,如果buffer沒滿 props.put("linger.ms", 1); // 整個producer可以用於buffer的內存大小 props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*cananl內部存在一個kafka的生產者,需要額外配置該參數,否則將報錯 WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=DemoProducer*/ props.put("client.id","9999"); return new KafkaProducer<>(props); } /** * 最終數據發送的topic * @param table * @return */ public String getKafkaTopic(String table){ if(Objects.nonNull(topic_prefix) && Objects.nonNull(topic_suffix)){ return topic_prefix+table+topic_suffix; } return TOPIC_PREFIX+table+TOPIC_SUFFIX; } /** * 最新的校驗是否是采集的數據 * -- 所有的配置數據從配置文件canal.properties文件中獲取 * -- 禁止從常量類Constants中獲取 * @param db 數據庫名 * @param table 數據表名 * @return */ public boolean isCollectedTable(String db,String table){ if(Objects.isNull(mysqlDbTables)){ return false; } String dt = db+"."+table; return getDbTables().contains(dt); } private List<String> getDbTables(){ if(Objects.isNull(dbTables)){ if(Objects.isNull(mysqlDbTables)){ return dbTables = new ArrayList<>(); } dbTables = Arrays.asList(mysqlDbTables.split(",")) .stream() .distinct() .collect(Collectors.toList()); } return dbTables; } }
        4. 處理類
          @Data
          @Slf4j
          public class KafkaReWrapperHandler implements Runnable{
              private KafkaConsumer<String,String> consumer;
              private KafkaProducer<String,String> producer;
              private ReWrapperConfig config;
              public KafkaReWrapperHandler(KafkaConsumer<String,String> consumer,KafkaProducer<String,String> producer,ReWrapperConfig config){
                  this.consumer = consumer;
                  this.producer = producer;
                  this.config = config;
              }
              @Override
              public void run() {
                  log.error("------啟動KafkaConsumer2---------");
                  while (!Thread.currentThread().isInterrupted()) {
                      ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
                      for (ConsumerRecord record : consumerRecords) {
                              try{
                                  log.debug("---kafkaConsumer2接收到 :" + record.topic() + "------" + record.key() + "---------" + record.value());
                                  JSONObject jsonObject = JSONObject.parseObject(record.value().toString());
                                  // 變更的數據
                                  JSONArray data = jsonObject.getJSONArray("data");
                                  int size = null == data ? 0 : data.size();
                                  // 數據庫信息
                                  String database = jsonObject.getString("database");
                                  Boolean isDdl = jsonObject.getBoolean("isDdl");
                                  JSONArray old = jsonObject.getJSONArray("old");
                                  // 主鍵信息
                                  JSONArray pkNames = jsonObject.getJSONArray("pkNames");
                                  // sql語句
                                  String sql = jsonObject.getString("sql");
                                  JSONObject sqlType = jsonObject.getJSONObject("sqlType");
                                  // 表名
                                  String table = jsonObject.getString("table");
                                  // 操作類型
                                  String type = jsonObject.getString("type");
                                  log.debug("---當前操作類型:{}", type);
                                  log.debug("---當前表名稱:{}", table);
                                  if(Objects.nonNull(database) && Objects.nonNull(table)){
                                      if(!config.isCollectedTable(database,table)){
                                          log.debug("-- 當前數據操作發生在數據表{}, 該表不屬於采集范疇:[{}]",database+"."+table,config.getMysqlDbTables());
                                          continue;
                                      }
                                  }else{
                                      log.warn("未獲取到表名信息,應該不是針對表數據的操作");
                                      if (isDdl) {
                                          log.error("----DDL SQL : {}", sql);
                                      }
                                      continue;
                                  }
                                  log.warn("---kafkaConsumer2接收需要采集的數據 :" + record.topic() + "------" + record.key() + "---------" + record.value());
          
                                  /**
                                   * -- Database 數據庫級別
                                   *  drop |create database  都是QUERY類型,都屬於ddl
                                   *
                                   * -- Table 表級別操作
                                   *  create table 屬於CREATE類型,屬於ddl,table為表名
                                   *  rename table 屬於RENAME類型,屬於ddl,table為表名
                                   *  drop table   屬於ERASE操作類型
                                   *
                                   * -- Field 表字段級別操作
                                   *  add column   屬於ALTER類型,屬於ddl
                                   *  modify column name|type 屬於ALTER類型,屬於ddl
                                   *  drop column  屬於ALTER類型,屬於ddl
                                   *
                                   * -- 表數據操作
                                   *  insert data  屬於INSERT類型,(批量)單個insert操作在kafka中都是單獨消息
                                   *  update data  屬於UPDATE類型,單個更新每個update操作都在Kafka中都是單獨消息
                                   *               批量更新- 單個消息data和old都是多個數據-更新數數組數據
                                   *  delete data  屬於DELETE類型,單個刪除在kafka中都是一個單獨消息
                                   *               批量刪除- 單個消息包含多個操作
                                   */
                                  String topic = config.getKafkaTopic(table);
                                  Long ts =  jsonObject.getLong("ts");
                                  if(null == ts){
                                      ts = System.currentTimeMillis();
                                  }
                                  if(null != data){
                                      addFieldsAndSend(topic, data, type,ts);
                                  }
                              }catch (Exception e){
                                  log.error("---消費從canal轉發kafka的消息失敗{}/{}",e.getLocalizedMessage(),e);
                              }finally {
                                  // 手動提交
                                  consumer.commitAsync();
                              }
                          }
                      }
                  }
                  private void addFieldsAndSend(String topic, JSONArray data, String type,Long ts) {
                      for (int i = 0; i < data.size(); i++) {
                          JSONObject jsonObject = data.getJSONObject(i);
                          // Long類型的數據比DateTime類型在后續Clickhouse中數據去重時辨識度更高
                          ts = ts +(i+1);
                          jsonObject.put(VERSION,ts);
                          switch (type){
                              case "INSERT":
                                  jsonObject.put(SIGN,ADD_UPDATE);
                                  break;
                              case "UPDATE":
                                  jsonObject.put(SIGN,ADD_UPDATE);
                                  break;
                              case "DELETE":
                                  jsonObject.put(SIGN,DELETE);
                                  break;
                          }
                          String message = jsonObject.toJSONString();
                          producer.send(new ProducerRecord<>(topic,message));
                          log.error("----發送Kafka消息 ,topic-{},message-{}",topic,message);
                      }
                  }
          }
        5.  擴展canal操作
          1. 將jar包放入canal的lib目錄下
          2. 配置文件修改
            1. conf/canal.properties新增配置屬性
              ################################################## ####### Custom ################# ##################################################
              # 需要采集的數據表--帶數據庫名 ,使用英文逗號分隔
              custom.async.mysql.db_table.list=douc.sys_group,douc.sys_group_user,douc.sys_user,douc.sys_department
              # 注意: 處理后的每個張表的數據發送往 prefix_table_suffix topic且增刪改操作都處理為增,使用sign,version區分
              custom.async.kafka.topic.prefix = mysql_ 
              custom.async.kafka.topic.suffix = _topic
            2. conf/spring/file-instance.xml中注入擴展程序中的組件
               <bean id="ReWrapperConfig" class="com.canal.config.ReWrapperConfig" init-method="init" destroy-method="destory">
                      <property name="sourceTopic" value="${canal.mq.topic}"/>
                      <property name="kafkaServer" value="${kafka.bootstrap.servers}"/>
                  <property name="mysqlDbTables" value="${custom.async.mysql.db_table.list}" />
                  <property name="topic_prefix" value="${custom.async.kafka.topic.prefix}" />
                  <property name="topic_suffix" value="${custom.async.kafka.topic.suffix}" />
               </bean>
          3.  不足|缺點
            未處理好與canal內嵌kafka依賴的關系,導致打出來的jar包比較大
    3. 擴展思路二 
      --直接針對canal內嵌的KafkaProducer進行處理,將數據處理和數據分發功能在源碼中處理
      1.  待續。。。  
  6. Canal源碼編譯安裝(待續。。。)  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM