使用阿里canal實現mysql與Elasticsearch增量同步


一、背景介紹

    最近在做一個地理信息相關的項目,需要維護大量的地址描述數據,同時需要提供對數據檢索的功能,准備采用Elasticsearch(6.7)實現。那么問題就來了,地址數據需要同時在MySQL和ES中維護,如果通過代碼層面實現會增加代碼量也不易維護,權衡之下決定使用阿里的Canal中間件來實現,留念備查。

    Canal主要用途是基於 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費,工作原理是偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議,MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal ),canal 解析 binary log 對象(原始為 byte 流)。同時支持客戶端數據落地的適配功能,目前支持關系型數據庫的數據同步、HBase的數據同步和ElasticSearch多表數據同步。

 

二、環境准備

1、MySQL數據庫安裝;

2、Elasticsearch安裝;

3、Canal Server安裝及配置,參考https://github.com/alibaba/canal/wiki/QuickStart;

4、Canal Client Adapter安裝,參考https://github.com/alibaba/canal/wiki/ClientAdapter;

 

三、Canal Server配置instance

1、在canal server安裝目錄下找到/conf/canal.properties,在canal.destinations配置項中增加一個instance,我這里配置的是es-address-original

1 #################################################
2 #########         destinations        #############
3 #################################################
4 canal.destinations = es-address-original

 

2、在/conf目錄下創建es-address-original文件夾,並創建instance.properties文件,大家可以直接復制conf目錄下的example目錄進行修改,主要配置參數如下,其它參考自行參考官方文檔

# MySQL數據庫連接信息
canal.instance.master.address=192.168.x.x:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

# mysql 數據解析關注的表,Perl正則表達式
# 多個正則之間以逗號(,)分隔,轉義符需要雙斜杠(\\)
# 常見例子:
# 1.  所有表:.*   or  .*\\..*
# 2.  canal schema下所有表: canal\\..*
# 3.  canal下的以canal打頭的表:canal\\.canal.*
# 4.  canal schema下的一張表:canal\\.test1
# 5.  多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
canal.instance.filter.regex=address-platform\\.address_original

 

四、Canal Adapter配置

client-adapter分為適配器和啟動器兩部分, 適配器為多個fat jar, 每個適配器會將自己所需的依賴打成一個包, 以SPI的方式讓啟動器動態加載, 目前所有支持的適配器都放置在plugin目錄下

1、在canal adapter的conf目錄下找到application.yml配置文件(根據官方介紹啟動器為SpringBoot項目)

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp # 客戶端模式 tcp or kafka or rocketMQ
  canalServerHost: 127.0.0.1:11111 # canal server address
#  zookeeperHosts: slave1:2181
#  mqServers: 127.0.0.1:9092 #or rocketmq
#  flatMessage: true
  batchSize: 500 # 每次獲取數據的批大小,單位為K
  syncBatchSize: 1000 # 每次同步的批數量
  retries: 0 # 重試次數,-1為無限重試
  timeout: # 同步超時時間,單位為毫秒
  accessKey:
  secretKey:
  srcDataSources: # 源數據庫
    defaultDS:
      url: jdbc:mysql://192.168.0.201:3306/address-platform?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: es-address-original # canal instance Name or mq topic name對應canal server中配置的instance名稱
    groups:
    - groupId: g1
      outerAdapters:
      - 
        key: addressOriginalKey
        name: es
        hosts: 192.168.x.x:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # transport or rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch # ES集群名稱

 

2、/conf/es下新增配置文件,文件名隨意,配置內容如下

dataSourceKey: defaultDS # 源數據源的key, 對應上面配置的srcDataSources中的值
outerAdapterKey: addressOriginalKey     # 對應application.yml中es配置的key
destination: es-address-original # cannal的instance或者MQ的topic
groupId: g1 # 對應MQ模式下的groupId, 只會同步對應groupId的數據
esMapping:
  _index: address_original # es 的索引名稱
  _type: _doc # es 的type名稱, es7下無需配置此項
  _id: id # es 的_id, 如果不配置該項必須配置下面的pk項_id則會由es自動分配
  upsert: true
#  pk: id
  sql: "select a.ID as id, a.ADDRESS as address, a.SERIAL_NO as serial_no from address_original a" # sql映射,注意區分表字段和索引字段大小寫
#  objFields:
#    _labels: array:;
#  etlCondition: "where a.c_time>={}" # etl 的條件參數
  commitBatch: 3000 # 提交批大小

 

3、創建ES索引信息,通過postman請求ES服務器http://192.168.x.x:9200/address_original,address_original是索引的名稱,請求方式為PUT,參數類型為raw(json)

 

4、這里有幾個坑注意一下:

1)canal適配器會通過GET http://192.168.x.x:9200/address_original/_mapping的方式讀取es mapping,如果創建索引的時候沒有配置mappings信息,會報Not found the mapping info of index異常;

2)測試的時候表字段名是大寫,es索引字段名稱小寫,拋了空指針異常沒有具體的異常描述,后來將/canal adapter/conf/es目錄中的配置文件sql配置項采用別名統一小寫后解決,這里推測數據庫表與索引映射名稱區分大小寫的,后面再看看源碼求證一下;

 

五、運行測試

1、在MySQL數據庫address_original表中維護數據(增刪改);

2、觀察canal adapter日志;

 

六、運行結果

索引文檔結果會根據數據庫操作同步更新


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM