幾篇關於MySQL數據同步到Elasticsearch的文章---第一篇:Debezium實現Mysql到Elasticsearch高效實時同步


文章轉載自:
https://mp.weixin.qq.com/s?__biz=MzI2NDY1MTA3OQ==&mid=2247484358&idx=1&sn=3a783479bb6a1852589f4c4cf3c5d310&chksm=eaa82beedddfa2f822db1492e5f82f7f43d877f2abed6340adbbbe471a7b824b089179147145&scene=21#wechat_redirect

題記

來自Elasticsearch中文社區的問題——

MySQL中表無唯一遞增字段,也無唯一遞增時間字段,該怎么使用logstash實現MySQL實時增量導數據到es中?

logstash和kafka_connector都僅支持基於自增id或者時間戳更新的方式增量同步數據。

回到問題本身:如果庫表里沒有相關字段,該如何處理呢?

本文給出相關探討和解決方案。
1、 binlog認知
1.1 啥是 binlog?

binlog是Mysql sever層維護的一種二進制日志,與innodb引擎中的redo/undo log是完全不同的日志;其主要是用來記錄對mysql數據更新或潛在發生更新的SQL語句,並以"事務"的形式保存在磁盤中。

作用主要有:

1)復制:達到master-slave數據一致的目的。

2)數據恢復:通過mysqlbinlog工具恢復數據。

3)增量備份。

1.2 阿里的Canal實現了增量Mysql同步
[在這里插入圖片描述]

一圖勝千言,canal是用java開發的基於數據庫增量日志解析、提供增量數據訂閱&消費的中間件。

目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用來處理獲得的相關數據。目的:增量數據訂閱&消費。

綜上,使用binlog可以突破logstash或者kafka-connector沒有自增id或者沒有時間戳字段的限制,實現增量同步。
2、基於binlog的同步方式

1)基於kafka Connect的Debezium 開源工程,地址:. https://debezium.io/

2)不依賴第三方的獨立應用: Maxwell開源項目,地址:http://maxwells-daemon.io/

由於已經部署過conluent(kafka的企業版本,自帶zookeeper、kafka、ksql、kafka-connector等),本文僅針對Debezium展開。
3、Debezium介紹

Debezium是捕獲數據實時動態變化的開源的分布式同步平台。能實時捕獲到數據源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、刪除(deletes)操作,實時同步到Kafka,穩定性強且速度非常快。

特點:

1)簡單。無需修改應用程序。可對外提供服務。

2)穩定。持續跟蹤每一行的每一處變動。

3)快速。構建於kafka之上,可擴展,經官方驗證可處理大容量的數據。

4、同步架構
[在這里插入圖片描述]

如圖,Mysql到ES的同步策略,采取“曲線救國”機制。

步驟1: 基Debezium的binlog機制,將Mysql數據同步到Kafka。

步驟2: 基於Kafka_connector機制,將kafka數據同步到Elasticsearch。
5、Debezium實現Mysql到ES增刪改實時同步

軟件版本:

confluent:5.1.2;
Debezium:0.9.2_Final;
Mysql:5.7.x.
Elasticsearch:6.6.1

5.1 Debezium安裝

confluent的安裝部署參見:http://t.cn/Ef5poZk,不再贅述。

Debezium的安裝只需要把debezium-connector-mysql的壓縮包解壓放到Confluent的解壓后的插件目錄(share/java)中。

MySQL Connector plugin 壓縮包的下載地址:

https://debezium.io/docs/install/

注意重啟一下confluent,以使得Debezium生效。
5.2 Mysql binlog等相關配置。

Debezium使用MySQL的binlog機制實現數據動態變化監測,所以需要Mysql提前配置binlog。

核心配置如下,在Mysql機器的/etc/my.cnf的mysqld下添加如下配置。

1[mysqld]
2
3server-id = 223344
4log_bin = mysql-bin
5binlog_format = row
6binlog_row_image = full
7expire_logs_days = 10

然后,重啟一下Mysql以使得binlog生效。

1systemctl start mysqld.service

5.3 配置connector連接器。

配置confluent路徑目錄 : /etc

創建文件夾命令 :

1mkdir kafka-connect-debezium

在mysql2kafka_debezium.json存放connector的配置信息 :

1[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json
2{
3 "name" : "debezium-mysql-source-0223",
4 "config":
5 {
6 "connector.class" : "io.debezium.connector.mysql.MySqlConnector",
7 "database.hostname" : "192.168.1.22",
8 "database.port" : "3306",
9 "database.user" : "root",
10 "database.password" : "XXXXXX",
11 "database.whitelist" : "kafka_base_db",
12 "table.whitlelist" : "accounts",
13 "database.server.id" : "223344",
14 "database.server.name" : "full",
15 "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",
16 "database.history.kafka.topic" : "account_topic",
17 "include.schema.changes" : "true" ,
18 "incrementing.column.name" : "id",
19 "database.history.skip.unparseable.ddl" : "true",
20 "transforms": "unwrap,changetopic",
21 "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
22 "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
23 "transforms.changetopic.regex":"(.*)",
24 "transforms.changetopic.replacement":"$1-smt"
25 }
26}

注意如下配置:

"database.server.id",對應Mysql中的server-id的配置。

"database.whitelist" : 待同步的Mysql數據庫名。

"table.whitlelist" :待同步的Mysq表名。

重要:“database.history.kafka.topic”:存儲數據庫的Shcema的記錄信息,而非寫入數據的topic、

"database.server.name":邏輯名稱,每個connector確保唯一,作為寫入數據的kafka topic的前綴名稱。

坑一:transforms相關5行配置作用是寫入數據格式轉換。

如果沒有,輸入數據會包含:before、after記錄修改前對比信息以及元數據信息(source,op,ts_ms等)。

這些信息在后續數據寫入Elasticsearch是不需要的。(注意結合自己業務場景)。

格式轉換相關原理:http://t.cn/EftoaIi
5.4 啟動connector

1curl -X POST -H "Content-Type:application/json"
2--data @mysql2kafka_debezium.json.json
3http://192.168.1.22:18083/connectors | jq

5.5 驗證寫入是否成功。
5.5.1 查看kafka-topic

1 kafka-topics --list --zookeeper localhost:2181

此處會看到寫入數據topic的信息。

注意新寫入數據topic的格式:database.schema.table-smt 三部分組成。

本示例topic名稱:

full.kafka_base_db.account-smt
5.5.2 消費數據驗證寫入是否正常

1./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning

至此,Debezium實現mysql同步kafka完成。
6、kafka-connector實現kafka同步Elasticsearch
6.1、Kafka-connector介紹

見官網:https://docs.confluent.io/current/connect.html

Kafka Connect是一個用於連接Kafka與外部系統(如數據庫,鍵值存儲,檢索系統索引和文件系統)的框架。

連接器實現公共數據源數據(如Mysql、Mongo、Pgsql等)寫入Kafka,或者Kafka數據寫入目標數據庫,也可以自己開發連接器。
6.2、kafka到ES connector同步配置

配置路徑:

1/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

配置內容:

1"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
2"tasks.max": "1",
3"topics": "full.kafka_base_db.account-smt",
4"key.ignore": "true",
5"connection.url": "http://192.168.1.22:9200",
6"type.name": "_doc",
7"name": "elasticsearch-sink-test"

6.3 kafka到ES啟動connector

啟動命令

1confluent load elasticsearch-sink-test
2-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

6.4 Kafka-connctor RESTFul API查看

Mysql2kafka,kafka2ES的connector詳情信息可以借助postman或者瀏覽器或者命令行查看。

1curl -X GET http://localhost:8083/connectors

7、坑復盤。

坑2: 同步的過程中可能出現錯誤,比如:kafka topic沒法消費到數據。
排解思路如下:

1)確認消費的topic是否是寫入數據的topic;

2)確認同步的過程中沒有出錯。可以借助connector如下命令查看。

1curl -X GET http://localhost:8083/connectors-xxx/status

坑3: Mysql2ES出現日期格式不能識別。

是Mysql jar包的問題,解決方案:在my.cnf中配置時區信息即可。

坑4: kafka2ES,ES沒有寫入數據。

排解思路:

1)建議:先創建同topic名稱一致的索引,注意:Mapping靜態自定義,不要動態識別生成。

2)通過connetor/status排查出錯原因,一步步分析。

8、小結

binlog的實現突破了字段的限制,實際上業界的go-mysql-elasticsearch已經實現。

對比:logstash、kafka-connector,雖然Debezium“曲線救國”兩步實現了實時同步,但穩定性+實時性能相對不錯。

推薦大家使用。大家有好的同步方式也歡迎留言討論交流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM