1、canal數據投遞至kafka


canal 官方文檔

一、canal安裝(1.1.4)

  1. 下載安裝包,解壓,創建軟連接。

  2. 修改配置文件。
    2.1 canal.properties

    # 可選項: tcp(默認), kafka, RocketMQ
    canal.serverMode = kafka
    # kafka/rocketmq 集群配置
    canal.mq.servers = nn1.hadoop:9092,nn2.hadoop:9092,s1:9092
    # 重試次數
    canal.mq.retries = 0
    # flagMessage模式下可以調大該值, 但不要超過MQ消息體大小上限
    canal.mq.batchSize = 16384
    canal.mq.maxRequestSize = 1048576
    # flatMessage模式下請將該值改大, 建議50-200
    canal.mq.lingerMs = 1
    canal.mq.bufferMemory = 33554432
    # Canal的batch size, 默認50K, 由於kafka最大消息體限制請勿超過1M(900K以下)
    canal.mq.canalBatchSize = 50
    # Canal get數據的超時時間, 單位: 毫秒, 空為不限超時
    canal.mq.canalGetTimeout = 100
    # 是否為flat json格式對象
    canal.mq.flatMessage = false
    canal.mq.compressionType = none
    canal.mq.acks = all
    # kafka消息投遞是否使用事務
    canal.mq.transaction = false
    

    2.2 instance.properties

    # 連接的數據庫地址
    canal.instance.master.address=192.168.1.20:3306
    # 用戶名,密碼
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    # 同步的庫
    canal.instance.defaultDatabaseName =
    # 數據庫的編碼格式
    canal.instance.connectionCharset = UTF-8
    # 表的過濾規則 無論是CanalServer還是Consumer,只要有一方指定了filter都會生效,consumer端如果指定,則會覆蓋CanalServer端。
    # 所有表:.*   or  .*\\..*
    # canal schema下所有表: canal\\..*
    # canal下的以canal打頭的表:canal\\.canal.*
    # canal schema下的一張表:canal.test1
    # 多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
    canal.instance.filter.regex = .\*\\..\*
    ...
    # mq config
    canal.mq.topic=example
    # 針對庫名或者表名發送動態topic,1.1.3版本后開始支持,需要設置kafka自動創建topic
    # canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
    # 單隊列模式的分區下標
    canal.mq.partition=0
    # 散列模式的分區數
    # hash partition config
    #canal.mq.partitionsNum=3
    # 散列規則定義
    # 庫名.表名 : 唯一主鍵,比如mytest.person:id。多個表之間用逗號分隔
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    

    2.2.1 canal.mq.dynamicTopic 表達式說明:
    canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多個配置之間使用逗號或分號分隔:
    . 例子1:test\.test 指定匹配的單表,發送到以test_test為名字的topic上
    . 例子2:.\.. 匹配所有表,則每個表都會發送到各自表名的topic上
    . 例子3:test 指定匹配對應的庫,一個庫的所有表都會發送到庫名的topic上
    . 例子4:test\.* 指定匹配的表達式,針對匹配的表會發送到各自表名的topic上
    . 例子5:test,test1\.test1,指定多個表達式,會將test庫的表都發送到test的topic上,test1\.test1的表發送到對應的test1_test1 topic上,其余的表發送到默認的canal.mq.topic值
    為滿足更大的靈活性,允許對匹配條件的規則指定發送的topic名字,配置格式:topicName:schema 或 topicName:schema.table:
    . 例子1: test:test\.test 指定匹配的單表,發送到以test為名字的topic上
    . 例子2: test:.\.. 匹配所有表,因為有指定topic,則每個表都會發送到test的topic下
    . 例子3: test:test 指定匹配對應的庫,一個庫的所有表都會發送到test的topic下
    . 例子4:testA:test\.* 指定匹配的表達式,針對匹配的表會發送到testA的topic下
    . 例子5:test0:test,test1:test1\.test1,指定多個表達式,會將test庫的表都發送到test0的topic下,test1\.test1的表發送到對應的test1的topic下,其余的表發送到默認的canal.mq.topic值

    2.2.2 canal.mq.partitionHash 表達式說明
    canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多個配置之間使用逗號分隔:
    . 例子1:test\.test:pk1^pk2 指定匹配的單表,對應的hash字段為pk1 + pk2
    . 例子2:.\..:id 正則匹配,指定所有正則匹配的表對應的hash字段為id
    . 例子3:.\..:\(pk\) 正則匹配,指定所有正則匹配的表對應的hash字段為表主鍵(自動查找)
    . 例子4: 匹配規則啥都不寫,則默認發到0這個partition上
    . 例子5:.\.. ,不指定pk信息的正則匹配,將所有正則匹配的表,對應的hash字段為表名。按表hash: 一張表的所有數據可以發到同一個分區,不同表之間會做散列 (會有熱點表分區過大問題)
    . 例子6: test\.test:id,.\..* , 針對test的表按照id散列,其余的表按照table散列
    . 注意:設置匹配規則,多條匹配規則之間是按照順序進行匹配(命中一條規則就返回)

  3. MySQL配置
    3.1 開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下:

    [mysqld]
    log-bin=mysql-bin # 開啟 binlog
    binlog-format=ROW # 選擇 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
    

    3.2 授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

二、kafka_2.11-2.1.1 安裝

三、測試

  1. 創建 MySQL 測試表
CREATE TABLE `order` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `order_id` varchar(64) NOT NULL COMMENT '訂單ID',
  `amount` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '訂單金額',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `id` (`id`),
  UNIQUE KEY `uniq_order_id` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='訂單表';
  1. 執行一條 insert 語句
INSERT INTO `order` (order_id, amount) VALUES ('300', 300.0);

啟動一個 kafka 消費者,觀察結果如下:

{
    "data":[
        {
            "id":"3",
            "order_id":"300",
            "amount":"300.0",
            "create_time":"2020-05-13 16:54:19"
        }
    ],
    "database":"canal_test",
    "es":1589360059000,
    "id":3,
    "isDdl":false,
    "mysqlType":{
        "id":"bigint(20)",
        "order_id":"varchar(64)",
        "amount":"decimal(10,2)",
        "create_time":"datetime"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":-5,
        "order_id":12,
        "amount":3,
        "create_time":93
    },
    "table":"order",
    "ts":1589360060236,
    "type":"INSERT"
}
  1. 執行一條 update 語句
UPDATE `order`
SET amount = 350.0
WHERE order_id = '300';

觀察結果如下:

{
    "data":[
        {
            "id":"3",
            "order_id":"300",
            "amount":"350.0",
            "create_time":"2020-05-13 16:54:19"
        }
    ],
    "database":"canal_test",
    "es":1589361037000,
    "id":4,
    "isDdl":false,
    "mysqlType":{
        "id":"bigint(20)",
        "order_id":"varchar(64)",
        "amount":"decimal(10,2)",
        "create_time":"datetime"
    },
    "old":[
        {
            "amount":"300.0"
        }
    ],
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":-5,
        "order_id":12,
        "amount":3,
        "create_time":93
    },
    "table":"order",
    "ts":1589361037721,
    "type":"UPDATE"
}
  1. 執行一條 delete 語句
DELETE FROM `order`
WHERE order_id = '300';

結果如下:

{
    "data":[
        {
            "id":"3",
            "order_id":"300",
            "amount":"350.0",
            "create_time":"2020-05-13 16:54:19"
        }
    ],
    "database":"canal_test",
    "es":1589361229000,
    "id":5,
    "isDdl":false,
    "mysqlType":{
        "id":"bigint(20)",
        "order_id":"varchar(64)",
        "amount":"decimal(10,2)",
        "create_time":"datetime"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":-5,
        "order_id":12,
        "amount":3,
        "create_time":93
    },
    "table":"order",
    "ts":1589361230115,
    "type":"DELETE"
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM