MySQL 數據實時同步到 ES


當需要把 MySQL 的數據實時同步到 ES 時,為了實現低延遲的檢索到 ES 中的數據或者進行其它數據分析處理。本文給出以同步 mysql binlog 的方式實時同步數據到 ES 的思路,實踐並驗證該方式的可行性,以供參考。

mysql binlog 日志

MySQL 的 binlog 日志主要用於數據庫的主從復制和數據恢復。binlog 中記錄了數據的增刪改查操作,主從復制過程中,主庫向從庫同步 binlog 日志,從庫對 binlog 日志中的事件進行重放,從而實現主從同步。

mysql binlog 日志有三種模式,分別為:

  • ROW:記錄每一行數據被修改的情況,但是日志量太大。
  • STATEMENT:記錄每一條修改數據的 SQL 語句,減少了日志量,但是 SQL 語句使用函數或觸發器時容易出現主從不一致。
  • MIXED:結合了 ROW 和 STATEMENT 的優點,根據具體執行數據操作的 SQL 語句選擇使用 ROW 或者 STATEMENT 記錄日志。

要通過 mysql binlog 將數據同步到 ES 集群,只能使用 ROW 模式,因為只有 ROW 模式才能知道 mysql 中的數據的修改內容。下文為以 UPDATE 操作為例,ROW 模式和 STATEMENT 模式的 binlog 日志內容。

  • ROW 模式的 binlog 日志內容示例如下:
  • SET TIMESTAMP=1527917394/*!*/;
    BEGIN
    /*!*/;
    # at 3751
    #180602 13:29:54 server id 1  end_log_pos 3819 CRC32 0x8dabdf01     Table_map: `webservice`.`building` mapped to number 74
    # at 3819
    #180602 13:29:54 server id 1  end_log_pos 3949 CRC32 0x59a8ed85     Update_rows: table id 74 flags: STMT_END_F
    BINLOG '
    UisSWxMBAAAARAAAAOsOAAAAAEoAAAAAAAEACndlYnNlcnZpY2UACGJ1aWxkaW5nAAYIDwEPEREG
    wACAAQAAAAHfq40=
    UisSWx8BAAAAggAAAG0PAAAAAEoAAAAAAAEAAgAG///A1gcAAAAAAAALYnVpbGRpbmctMTAADwB3
    UkRNbjNLYlV5d1k3ajVbD64WWw+uFsDWBwAAAAAAAAtidWlsZGluZy0xMAEPAHdSRE1uM0tiVXl3
    WTdqNVsPrhZbD64Whe2oWQ==
    '/*!*/;
    ### UPDATE `webservice`.`building`
    ### WHERE
    ###   @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */
    ###   @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */
    ###   @3=0 /* TINYINT meta=0 nullable=0 is_null=0 */
    ###   @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */
    ###   @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ###   @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ### SET
    ###   @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */
    ###   @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */
    ###   @3=1 /* TINYINT meta=0 nullable=0 is_null=0 */
    ###   @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */
    ###   @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ###   @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    # at 3949
    #180602 13:29:54 server id 1  end_log_pos 3980 CRC32 0x58226b8f     Xid = 182
    COMMIT/*!*/;
  • STATEMENT 模式下 binlog 日志內容示例如下:
    SET TIMESTAMP=1527919329/*!*/;
    update building set Status=1 where Id=2000
    /*!*/;
    # at 688
    #180602 14:02:09 server id 1  end_log_pos 719 CRC32 0x4c550a7d     Xid = 200
    COMMIT/*!*/;

從 ROW 模式和 STATEMENT 模式下 UPDATE 操作的日志內容可以看出,ROW 模式完整地記錄了要修改的某行數據更新前以及更改后所有字段的值,而 STATEMENT 模式只記錄了 UPDATE 操作的 SQL 語句。我們要將 MySQL 的數據實時同步到 ES,只能選擇 ROW 模式的 binlog,獲取並解析 binlog 日志的數據內容,執行 ES document api,將數據同步到 ES 集群中。

mysqldump 工具

mysqldump是一個對 MySQL 數據庫中的數據進行全量導出的一個工具。mysqldump的使用方式如下:

mysqldump -uelastic -p'Elastic_123' --host=172.16.32.5 -F webservice > dump.sql

上述命令表示從遠程數據庫172.16.32.5:3306中導出database:webservice的所有數據,寫入到dump.sql文件中,指定-F參數表示在導出數據后重新生成一個新的 binlog 日志文件以記錄后續的所有數據操作。dump.sql中的文件內容如下:

-- MySQL dump 10.13  Distrib 5.6.40, for Linux (x86_64)
--
-- Host: 172.16.32.5    Database: webservice
-- ------------------------------------------------------
-- Server version    5.5.5-10.1.9-MariaDBV1.0R012D002-20171127-1822

/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;

--
-- Table structure for table `building`
--

DROP TABLE IF EXISTS `building`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `building` (
  `Id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `BuildingId` varchar(64) NOT NULL COMMENT '虛擬建築Id',
  `Status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '虛擬建築狀態:0、處理中;1、正常;-1,停止;-2,銷毀中;-3,已銷毀',
  `BuildingName` varchar(128) NOT NULL DEFAULT '' COMMENT '虛擬建築名稱',
  `CreateTime` timestamp NOT NULL DEFAULT '2017-12-03 16:00:00' COMMENT '創建時間',
  `UpdateTime` timestamp NOT NULL DEFAULT '2017-12-03 16:00:00' COMMENT '更新時間',
  PRIMARY KEY (`Id`),
  UNIQUE KEY `BuildingId` (`BuildingId`)
) ENGINE=InnoDB AUTO_INCREMENT=2010 DEFAULT CHARSET=utf8 COMMENT='虛擬建築表';
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Dumping data for table `building`
--

LOCK TABLES `building` WRITE;
/*!40000 ALTER TABLE `building` DISABLE KEYS */;
INSERT INTO `building` VALUES (2000,'building-2',0,'6YFcmntKrNBIeTA','2018-05-30 13:28:31','2018-05-30 13:28:31'),(2001,'building-4',0,'4rY8PcVUZB1vtrL','2018-05-30 13:28:34','2018-05-30 13:28:34'),(2002,'building-5',0,'uyjHVUYrg9KeGqi','2018-05-30 13:28:37','2018-05-30 13:28:37'),(2003,'building-7',0,'DNhyEBO4XEkXpgW','2018-05-30 13:28:40','2018-05-30 13:28:40'),(2004,'building-1',0,'TmtYX6ZC0RNB4Re','2018-05-30 13:28:43','2018-05-30 13:28:43'),(2005,'building-6',0,'t8YQcjeXefWpcyU','2018-05-30 13:28:49','2018-05-30 13:28:49'),(2006,'building-10',0,'WozgBc2IchNyKyE','2018-05-30 13:28:55','2018-05-30 13:28:55'),(2007,'building-3',0,'yJk27cmLOVQLHf1','2018-05-30 13:28:58','2018-05-30 13:28:58'),(2008,'building-9',0,'RSbjotAh8tymfxs','2018-05-30 13:29:04','2018-05-30 13:29:04'),(2009,'building-8',0,'IBOMlhaXV6k226m','2018-05-30 13:29:31','2018-05-30 13:29:31');
/*!40000 ALTER TABLE `building` ENABLE KEYS */;
UNLOCK TABLES;

/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

-- Dump completed on 2018-06-02 14:23:51

從以上內容可以看出,mysqldump 導出的 sql 文件包含 create table、drop table 以及插入數據的 sql 語句,但是不包含 create database 建庫語句。

使用 go-mysql-elasticsearch 開源工具同步數據到 ES

go-mysql-elasticsearch是用於同步 MySQL 數據到 ES 集群的一個開源工具,項目 github 地址

go-mysql-elasticsearch的基本原理:如果是第一次啟動該程序,首先使用 mysqldump 工具對源 MySQL 數據庫進行一次全量同步,通過 elasticsearch client 執行操作寫入數據到 ES;然后實現了一個 mysql client,作為 slave 連接到源 MySQL,源 MySQL 作為 master 會將所有數據的更新操作通過 binlog event 同步給 slave,通過解析 binlog event 就可以獲取到數據的更新內容,寫入到 ES。

另外,該工具還提供了操作統計的功能,每當有數據增刪改操作時,會將對應操作的計數加1,程序啟動時會開啟一個 HTTP 服務,通過調用 HTTP 接口可以查看增刪改操作的次數。

使用限制

  1. mysql binlog 必須是 ROW 模式(騰訊雲 TencentDB for MySQL 產品默認開啟)。
  2. 要同步的 MySQL 數據表必須包含主鍵,否則直接忽略。如果數據表沒有主鍵,UPDATE 和 DELETE 操作就會因為在 ES 中找不到對應的 document 而無法進行同步。
  3. 不支持程序運行過程中修改表結構。
  4. 要賦予用於連接 MySQL 的賬戶 RELOAD 權限、REPLICATION 權限。
    GRANT REPLICATION SLAVE ON *.* TO 'elastic'@'172.16.32.44';
    GRANT RELOAD ON *.* TO 'elastic'@'172.16.32.44';

使用方式

  1. 安裝 Go1.10+ 版本,可以直接安裝最新版的 Go,然后設置 GOPATH 環境變量。

  2. 執行命令go get github.com/siddontang/go-mysql-elasticsearch

  3. 執行命令cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch

  4. 執行make進行編譯,編譯成功后go-mysql-elasticsearch/bin目錄下會生成名為go-mysql-elasticsearch的可執行文件。

  5. 執行命令vi etc/river.toml 修改配置文件,同步172.16.0.101:3306數據庫中的webservice.building表到 ES 集群172.16.32.64:9200的 building index(更詳細的配置文件說明請參考 項目文檔)。

     # MySQL address, user and password
     # user must have replication privilege in MySQL.
     my_addr = "172.16.0.101:3306"
     my_user = "bellen"
     my_pass = "Elastic_123"
     my_charset = "utf8"
    
     # Set true when elasticsearch use https
     #es_https = false
     # Elasticsearch address
     es_addr = "172.16.32.64:9200"
     # Elasticsearch user and password, maybe set by shield, nginx, or x-pack
     es_user = ""
     es_pass = ""
    
     # Path to store data, like master.info, if not set or empty,
     # we must use this to support breakpoint resume syncing.
     # TODO: support other storage, like etcd.
     data_dir = "./var"
    
     # Inner Http status address
     stat_addr = "127.0.0.1:12800"
    
     # pseudo server id like a slave
     server_id = 1001
    
     # mysql or mariadb
     flavor = "mariadb"
    
     # mysqldump execution path
     # if not set or empty, ignore mysqldump.
     mysqldump = "mysqldump"
    
     # if we have no privilege to use mysqldump with --master-data,
     # we must skip it.
     #skip_master_data = false
    
     # minimal items to be inserted in one bulk
     bulk_size = 128
    
     # force flush the pending requests if we don't have enough items >= bulk_size
     flush_bulk_time = "200ms"
    
     # Ignore table without primary key
     skip_no_pk_table = false
    
     # MySQL data source
     [[source]]
     schema = "webservice"
     tables = ["building"]
     [[rule]]
     schema = "webservice"
     table = "building"
     index = "building"
     type = "buildingtype"
  6. 在 ES 集群中創建 building index,因為該工具並沒有使用 ES 的 auto create index 功能,如果 index 不存在會報錯 。

  7. 執行命令./bin/go-mysql-elasticsearch -config=./etc/river.toml

  8. 在控制台輸出結果。

     2018/06/02 16:13:21 INFO  create BinlogSyncer with config {1001 mariadb 172.16.0.101 3306 bellen   utf8 false false <nil> false false 0 0s 0s 0}
     2018/06/02 16:13:21 INFO  run status http server 127.0.0.1:12800
     2018/06/02 16:13:21 INFO  skip dump, use last binlog replication pos (mysql-bin.000001, 120) or GTID %!s(<nil>)
     2018/06/02 16:13:21 INFO  begin to sync binlog from position (mysql-bin.000001, 120)
     2018/06/02 16:13:21 INFO  register slave for master server 172.16.0.101:3306
     2018/06/02 16:13:21 INFO  start sync binlog at binlog file (mysql-bin.000001, 120)
     2018/06/02 16:13:21 INFO  rotate to (mysql-bin.000001, 120)
     2018/06/02 16:13:21 INFO  rotate binlog to (mysql-bin.000001, 120)
     2018/06/02 16:13:21 INFO  save position (mysql-bin.000001, 120)
  9. 測試:向 MySQL 中插入、修改、刪除數據,都可以反映到 ES 中。

使用體驗

  • go-mysql-elasticsearch完成了最基本的 MySQL 實時同步數據到 ES 的功能,業務如果需要更深層次的功能如允許運行中修改 MySQL 表結構,可以進行自行定制化開發。
  • 異常處理不足,解析 binlog event 失敗直接拋出異常。
  • 據作者描述,該項目並沒有被其應用於生產環境中,所以使用過程中建議通讀源碼,知其利弊。

使用 mypipe 同步數據到 ES 集群

mypipe 是一個 mysql binlog 同步工具,在設計之初是為了能夠將 binlog event 發送到 kafka,根據業務的需要也可以將數據同步到任意的存儲介質中。mypipe github 地址

使用限制

  1. mysql binlog 必須是 ROW 模式。
  2. 要賦予用於連接 MySQL 的賬戶 REPLICATION 權限。
    GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'elastic'@'%' IDENTIFIED BY 'Elastic_123'
  3. mypipe 只是將 binlog 日志內容解析后編碼成 Avro 格式推送到 kafka broker,並不是將數據推送到 kafka。如果需要同步到 ES 集群,可以從 kafka 消費數據后,再寫入 ES。
  4. 消費 kafka 中的消息(MySQL 的操作日志),需要對消息內容進行 Avro 解析,獲取到對應的數據操作內容,進行下一步處理。mypipe 封裝了一個KafkaGenericMutationAvroConsumer類,可以直接繼承該類使用,或者自行解析。
  5. mypipe 只支持 binlog 同步,不支持存量數據同步,即 mypipe 程序啟動后無法對 MySQL 中已經存在的數據進行同步。

使用方式

  1. 執行命令git clone https://github.com/mardambey/mypipe.git

  2. 執行命令./sbt package

  3. 配置 mypipe-runner/src/main/resources/application.conf

     mypipe {
    
       # Avro schema repository client class name
       schema-repo-client = "mypipe.avro.schema.SchemaRepo"
    
       # consumers represent sources for mysql binary logs
       consumers {
    
         localhost {
           # database "host:port:user:pass" array
           source = "172.16.0.101:3306:elastic:Elastic_123"
         }
       }
    
       # data producers export data out (stdout, other stores, external services, etc.)
       producers {
    
         kafka-generic {
           class = "mypipe.kafka.producer.KafkaMutationGenericAvroProducer"
         }
       }
    
       # pipes join consumers and producers
       pipes {
    
         kafka-generic {
           enabled = true
           consumers = ["localhost"]
           producer {
             kafka-generic {
               metadata-brokers = "172.16.16.22:9092"
             }
           }
           binlog-position-repo {
             # saved to a file, this is the default if unspecified
             class = "mypipe.api.repo.ConfigurableFileBasedBinaryLogPositionRepository"
             config {
               file-prefix = "stdout-00"     # required if binlog-position-repo is specifiec
               data-dir = "/tmp/mypipe/data" # defaults to mypipe.data-dir if not present
             }
           }
         }
       }
     }
  4. 配置mypipe-api/src/main/resources/reference.conf,修改include-event-condition選項,指定需要同步的 database 和 table。

     include-event-condition = """ db == "webservice" && table =="building" """
  5. 在 kafka broker 端創建topic: webservice_building_generic,默認情況下 mypipe 以${db}_${table}_generic為 topic 名,向該 topic 發送數據。

  6. 執行命令./sbt "project runner" "runMain mypipe.runner.PipeRunner"

  7. 測試:向 mysql building 表中插入數據,寫一個簡單的 consumer 消費 mypipe 推送到 kafka 中的消息。

  8. 消費到沒有經過解析的數據如下:

     ConsumerRecord(topic=u'webservice_building_generic', partition=0, offset=2, timestamp=None, timestamp_type=None, key=None, value='\x00\x01\x00\x00\x14webservice\x10building\xcc\x01\x02\x91,\xae\xa3fc\x11\xe8\xa1\xaaRT\x00Z\xf9\xab\x00\x00\x04\x18BuildingName\x06xxx\x14BuildingId\nId-10\x00\x02\x04Id\xd4%\x00', checksum=128384379, serialized_key_size=-1, serialized_value_size=88)

使用體驗

  • mypipe 相比 go-mysql-elasticsearch 更成熟,支持運行時 ALTER TABLE,同時解析 binlog 異常發生時,可通過配置不同的策略處理異常。
  • mypipe 不能同步存量數據,如果需要同步存量數據可通過其它方式先全量同步后,再使用 mypipe 進行增量同步。
  • mypipe 只同步 binlog,需要同步數據到 ES 需要另行開發。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM