canal-kakfa-flink實現mysql數據的實時同步(一)


一、canal介紹

官網的介紹:

名稱:canal [kə'næl]
譯意: 水道/管道/溝渠
語言: 純java開發
定位: 基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了mysql
關鍵詞: mysql binlog parser / real-time / queue&topic

canal是阿里巴巴使用純java語言開發的一款基於數據庫日志增量解析,以提供增量數據訂閱和消費的軟件.
其實說白了,canal就是一款實現增量數據同步的工具,當前只支持監控並解析binlog。

那canal可以做哪些事情呢:

  • 數據庫鏡像
  • 數據庫實時備份
  • 數據庫多級索引的維護
  • 業務緩存刷新
  • 帶有業務邏輯數據的實時處理

canal的工作原理

Mysql主從同步原理

image

從圖中可以看到:

  1. master實例將數據庫的變更(delete,update,insert,...)日志順序寫入到binary log當中。
  2. 當slave連接到master的時候,master會為slave開啟一個binlog dump線程,當master的binlog發生變化的時候,binlog dump線程會通知slave,並將變化的binlog數據發送給slave。
  3. 當主從同步開啟的時候,在slave上會創建2個線程
    • IO Thread
      該線程連接到master機器,master上的binlog dump線程會將binlog內容發送給該線程,該IO線程接收到binlog內容之后,再將內容寫到本地的relay log中。
    • SQL Thread
      該線程讀取IO線程寫入的relay log。並根據relay log的內容對slave數據庫做相應的操作

Canal 原理

image

canal的工作原理其實是模擬了數據庫的主從同步機制,將自己偽裝成mysql slave:

  1. 模擬mysql master與slave的通信協議,它向master發送dump請求
  2. master收到canal發送過來的請求之后,開始推送binlog給canal
  3. canal接受binlog進行解析binary log對象(原始為protobuf byte流)並sink到下游(如:mysql,kafka,es,...)

Canal 架構

image

說明:

  • server代表的是一個canal運行實例,代表的是一個jvm
  • instace對應的是一個數據隊列(一個server可以部署多個insance)

instance模塊:

  • eventParser:數據源接入,模擬slave和master進行交互,協議解析
  • eventSink:Parser和Store的連接器,進行數據過濾,加工,分發工作
  • eventStore:數據存儲
  • metaManager:增量訂閱&消費信息管理器

二、Canal 的搭建

Mysql的配置

當前canal支持的版本有 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

  • 對於自建的mysql,需要進行如下配置:

    1. 開啟MySQL的binlog全日制功能
    2. 配置binlog-format為ROW模式

    對應的my.cnf中的配置如下:

    [mysqld]
    log-bin=mysql-bin # 開啟 binlog
    binlog-format=ROW # 選擇 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
    
    1. 重啟mysql服務
    2. 創建canal用戶,並進行授權,使其具有mysql slave的權限:
    CREATE USER canal IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    
  • 如果使用的是RDS數據庫,則直接進行創建canal用戶並授權操作即可。

Canal的安裝

  • 下載

    點擊這里,下載所需要的版本的安裝包,我這里以1.1.4為例:

    image

    在下方找到deployer包:

    image

  • 解壓

    執行tar zxvf ./canal.deployer-1.1.4.tar.gz 之后可以看到解壓后的目錄結構如下:

    image

  • 配置

    進入到conf目錄下cd conf,可以看到有一個example的文件夾,這個是canal自帶的一個instance文件夾,我們需要拷貝一個並重名為我們自己的cp -r ./example ./route,最終目錄結構像這樣:

    image

    執行vi route/instance.properties編輯配置文件

    #################################################
    ## mysql serverId , v1.0.26+ will autoGen
    canal.instance.mysql.slaveId=1234
    
    # enable gtid use true/false
    canal.instance.gtidon=false
    
    # position info 這里需要改成自己的數據信息
    canal.instance.master.address=192.168.2.226:3306
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    
    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=
    
    # table meta tsdb info
    canal.instance.tsdb.enable=true
    #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
    #canal.instance.tsdb.dbUsername=canal
    #canal.instance.tsdb.dbPassword=canal
    
    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #canal.instance.standby.gtid=
    
    # username/password 這里需要改成前面創建並授權了的數據庫信息
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal@winner
    # The encoding that represents the databases corresponds to the encoding type in Java,such as UTF-8,GBK , ISO-8859-1
    canal.instance.connectionCharset = UTF-8
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
    
    # table regex 這里配置需要過濾的表名,正則語法,多個表(庫名.表名),使用逗號分隔開來,(.*\\..* 表示讀取所有的庫中的表),這里列舉過濾兩張表為例
    canal.instance.filter.regex=db1\\.user,db2\\.device
    # table black regex
    canal.instance.filter.black.regex=
    # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
    # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
    
    # mq config 如果是准備將解析后的日志發送到kafka,這里用來配置每個表的數據發送到那個topic
    # 如果是打算所有的監控的標的日志數據都打到一個topic中的話,可以這樣設置一個topic名即可
    canal.mq.topic=example
    # dynamic topic route by schema or table regex 這里是動態topic的配置
    # 如果你打算將不同的表的日志打到不同的topic中里面去的話,可以打開下面的配置,格式為[topic:table],如果是多個,可使用逗號分隔, 當然上面的靜態topic和此動態topic是可以同時打開的
    canal.mq.dynamicTopic=bi_binlog_config_topic:db1\\.user,bi_binlog_config_topic:db2\\.device
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    
    

    執行 vi canal.properties 修改該配置文件

    #################################################
    #########               common argument         #############
    #################################################
    # tcp bind ip 配置canal所在機器的ip
    canal.ip = 192.168.2.223
    # register ip to zookeeper
    canal.register.ip =
    canal.port = 11111
    canal.metrics.pull.port = 11112
    # canal instance user/passwd
    # canal.user = canal
    # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
    
    # canal admin config
    #canal.admin.manager = 127.0.0.1:8089
    canal.admin.port = 11110
    canal.admin.user = admin
    canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
    
    # 配置canal的zk地址
    canal.zkServers = 192.168.1.227:2181,192.168.1.226:2181,192.168.1.225:2181
    # flush data to zk
    canal.zookeeper.flush.period = 1000
    canal.withoutNetty = false
    # tcp, kafka, RocketMQ
    canal.serverMode = kafka
    # flush meta cursor/parse position to file
    canal.file.data.dir = ${canal.conf.dir}
    canal.file.flush.period = 1000
    ## memory store RingBuffer size, should be Math.pow(2,n)
    canal.instance.memory.buffer.size = 16384
    ## memory store RingBuffer used memory unit size , default 1kb
    canal.instance.memory.buffer.memunit = 1024 
    ## meory store gets mode used MEMSIZE or ITEMSIZE
    canal.instance.memory.batch.mode = MEMSIZE
    canal.instance.memory.rawEntry = true
    
    ## detecing config
    canal.instance.detecting.enable = false
    #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
    canal.instance.detecting.sql = select 1
    canal.instance.detecting.interval.time = 3
    canal.instance.detecting.retry.threshold = 3
    canal.instance.detecting.heartbeatHaEnable = false
    
    # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
    canal.instance.transaction.size =  1024
    # mysql fallback connected to new master should fallback times
    canal.instance.fallbackIntervalInSeconds = 60
    
    # network config
    canal.instance.network.receiveBufferSize = 16384
    canal.instance.network.sendBufferSize = 16384
    canal.instance.network.soTimeout = 30
    
    # binlog filter config
    canal.instance.filter.druid.ddl = true
    canal.instance.filter.query.dcl = false
    canal.instance.filter.query.dml = false
    canal.instance.filter.query.ddl = false
    canal.instance.filter.table.error = false
    canal.instance.filter.rows = false
    canal.instance.filter.transaction.entry = false
    
    # binlog format/image check
    canal.instance.binlog.format = ROW,STATEMENT,MIXED 
    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
    
    # binlog ddl isolation
    canal.instance.get.ddl.isolation = false
    
    # parallel parser config,# if your server has only one cpu,you need open this confi and set value to false
    canal.instance.parser.parallel = true
    ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
    #canal.instance.parser.parallelThreadSize = 16
    ## disruptor ringbuffer size, must be power of 2
    canal.instance.parser.parallelBufferSize = 256
    
    # table meta tsdb info
    canal.instance.tsdb.enable = true
    canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
    canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
    canal.instance.tsdb.dbUsername = canal
    canal.instance.tsdb.dbPassword = canal
    # dump snapshot interval, default 24 hour
    canal.instance.tsdb.snapshot.interval = 24
    # purge snapshot expire , default 360 hour(15 days)
    canal.instance.tsdb.snapshot.expire = 360
    
    # aliyun ak/sk , support rds/mq
    canal.aliyun.accessKey =
    canal.aliyun.secretKey =
    
    #################################################
    #########               destinations            #############
    #################################################
    canal.destinations = route
    # conf root dir
    canal.conf.dir = ../conf
    # auto scan instance dir add/remove and start/stop instance
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    
    canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
    
    canal.instance.global.mode = spring
    canal.instance.global.lazy = false
    canal.instance.global.manager.address = ${canal.admin.manager}
    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
    #canal.instance.global.spring.xml = classpath:spring/file-instance.xml
    canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    
    ##################################################
    #########                    MQ                      #############
    ##################################################
    canal.mq.servers = 192.168.1.227:9092,192.168.1.226:9092,192.168.1.225:9092
    canal.mq.retries = 0
    canal.mq.batchSize = 16384
    canal.mq.maxRequestSize = 1048576
    canal.mq.lingerMs = 100
    canal.mq.bufferMemory = 33554432
    canal.mq.canalBatchSize = 50
    canal.mq.canalGetTimeout = 100
    # 該配置如果設置為false,則canal不進行日志解析,只發送原生的protpbuf二進制日志,消息體相對較小,如果為true,canal會將其解析為json格式,消息體相對較大,占用存儲空間較大
    canal.mq.flatMessage = true
    canal.mq.compressionType = none
    canal.mq.acks = all
    #canal.mq.properties. =
    # canal.mq.producerGroup = test
    # Set this value to "cloud", if you want open message trace feature in aliyun.
    # canal.mq.accessChannel = local
    # aliyun mq namespace
    #canal.mq.namespace =
    
    ##################################################
    #########     Kafka Kerberos Info    #############
    ##################################################
    canal.mq.kafka.kerberos.enable = false
    canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
    canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
    

    配置文件中的配置項很多,總結一下,需要配置的項:

    • instance.properties
      ## mysql serverId , v1.0.26+ will autoGen
      canal.instance.mysql.slaveId=1234
      # position info
      canal.instance.master.address=192.168.1.218:3306
      # username/password
      canal.instance.dbUsername=canal
      canal.instance.dbPassword=canal@winner
      canal.instance.connectionCharset = UTF-8
      # table regex
      canal.instance.filter.regex=db1\\.user,db2\\.device
      canal.mq.topic=example
      canal.mq.dynamicTopic=bi_binlog_config_topic:db1\\.user,bi_binlog_config_topic:db2\\.device
      
    • canal.properties
      # tcp bind ip
      canal.ip = 192.168.1.173
      # tcp, kafka, RocketMQ
      canal.serverMode = kafka
      canal.mq.flatMessage = true
      #################################################
      #########               destinations            #############
      #################################################
      canal.destinations =route
      #canal.instance.global.spring.xml = classpath:spring/file-instance.xml
      canal.instance.global.spring.xml = classpath:spring/default-instance.xml
      ##################################################
      #########                    MQ                      #############
      ##################################################
      canal.mq.servers = 192.168.1.227:9092,192.168.1.226:9092,192.168.1.225:19092
      #canal.mq.producerGroup = test
      # Set this value to "cloud", if you want open message trace feature in aliyun.
      #canal.mq.accessChannel = local
      

三、啟動驗證

啟動

進入到文件解壓目錄下,執行命令sh bin/startup.sh啟動服務

查看日志

執行命令tail -f logs/canal/canal.log,查看,到如下日志,說明服務啟動成功:

image

topic數據驗證

  1. 在kafka所部屬的機器上通過客戶端,進入kafka的安裝目錄,打開消費者:

    bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.225:9092 --topic bi_binlog_config_topic --from-beginning

  2. 對所監控的數據表進行執行insert/update/delete操作,驗證topic中是否有數據過來

    image

  3. 可以看到kafka對應的topic中已經可以正確收到操作解析后的消息了

四、可能遇到的問題

1. canal啟動后出現生產者發送消息失敗的錯誤

  • 報錯信息

    Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for bi_binlog_device_topic-0: 30064 ms has passed since batch creation plus linger time
    2021-04-01 10:10:50.481 [pool-4-thread-1] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for bi_binlog_device_topic-0: 30028 ms has passed since batch creation plus linger time
    java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for bi_binlog_device_topic-0: 30028 ms has passed since batch creation plus linger time
    at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na]
    at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na]
    at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:117) ~[canal.server-1.1.4.jar:na]
    at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na]
    at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na]
    at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
    Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for bi_binlog_device_topic-0: 30028 ms has passed since batch creation plus linger time
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94) ~[kafka-clients-1.1.1.jar:na]
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64) ~[kafka-clients-1.1.1.jar:na]
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) ~[kafka-clients-1.1.1.jar:na]
    at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:213) ~[canal.server-1.1.4.jar:na]
    ... 8 common frames omitted
    Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for bi_binlog_device_topic-0: 30028 ms has passed since batch creation plus linger time
    
  • 原因排查

    錯誤提示的是由於連接kafka集群超時引起的,但是按照網上說的

    1. 擴大超時限制
    2. 修改kafka的配置advertised.listeners=PLAINTEXT://192.168.14.140:9092

    在進行了上面的幾步操作核查之后,發現並沒有解決問題,開始找其他的出路。
    報錯信息那種看到bi_binlog_device_topic-0,想應該是canal在往這個topic中寫數據的時候出了問題,會不很可能是由於topic的問題導致了,所以由於是測試環境,打算把topic的數據清空,然后在重啟canal之后,果然問題解決

    image

    可以看到已經正常讀取日志了

  • 問題解決

    清空topic數據解決,這里有兩種方式:

    • 刪除topic,然后重新創建
      # 刪除topic
      ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic bi_binlog_config_topic
      # 創建topic
      ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bi_binlog_config_topic
      
    • 清空topic中的數據
      # 清空數據
      ./kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name bi_binlog_store_topic --alter --add-config retention.ms=10000
      # 查看狀態
      ./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name bi_binlog_config_topic
      

2. 連接kafka集群超時

  • 報錯信息

    2021-04-01 11:38:44.322 [pool-4-thread-2] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na]
    at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na]
    at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:117) ~[canal.server-1.1.4.jar:na]
    at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na]
    at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na]
    at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
    Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1150) ~[kafka-clients-1.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:846) ~[kafka-clients-1.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) ~[kafka-clients-1.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671) ~[kafka-clients-1.1.1.jar:na]
    at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:199) ~[canal.server-1.1.4.jar:na]
    ... 8 common frames omitted
    
  • 原因排查

    出現該問題的原因是在instance.properties配置中配置了動態topic之后,把canal.mq.topic=example 給注釋掉了導致的,所以需要把這個放開

    # mq config
    canal.mq.topic=example
    canal.mq.dynamicTopic=bi_binlog_store_topic:db0\\.patrol_report,bi_binlog_topic:db1\\.store1
    
  • 問題解決

    參照上一問題的解決方案

五、結束語

本文主要針對阿里開源同步工具canal做了簡單的介紹,並對具體的搭建步驟,並將數據寫入到kafka的過程做了簡要的總結,數據寫入到kafka之后,接下來的就是消費后續程序消費kafka的消息了,可以是flink、spark,...,這里做個筆記,希望能幫助到需要的人。
但是這樣是有一個問題的,就是關於canal的單點故障的問題,所以一般生產環境中,我們都需要對canal進行高可用搭建。

六、參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM