maxwell實時同步mysql中binlog


 

概述

Maxwell是一個能實時讀取MySQL二進制日志binlog,並生成 JSON 格式的消息,作為生產者發送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的應用程序。它的常見應用場景有ETL、維護緩存、收集表級別的dml指標、增量到搜索引擎、數據分區遷移、切庫binlog回滾方案等。

特征

  1.  支持 SELECT * FROM table 的方式進行全量數據初始化
  2.  支持在主庫發生failover后,自動恢復binlog位置(GTID)
  3.  可以對數據進行分區,解決數據傾斜問題,發送到kafka的數據支持databasetablecolumn等級別的數據分區
  4.  工作方式是偽裝為Slave,接收binlog events,然后根據schemas信息拼裝,可以接受ddlxidrow等各種event

Canal |Maxwell| mysql_streamer

除了Maxwell外,目前常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三個工具對比如下:

canal 由Java開發,分為服務端和客戶端,擁有眾多的衍生應用,性能穩定,功能強大;canal 需要自己編寫客戶端來消費canal解析到的數據。

maxwell相對於canal的優勢是使用簡單,它直接將數據變更輸出為json字符串,不需要再編寫客戶端。

Maxwell和canal工具對比

  • Maxwell沒有canal那種server+client模式,只有一個server把數據發送到消息隊列或redis。如果需要多個實例,通過指定不同配置文件啟動多個進程
  • Maxwell有一個亮點功能,就是canal只能抓取最新數據,對已存在的歷史數據沒有辦法處理。而Maxwell有一個bootstrap功能,可以直接引導出完整的歷史數據用於初始化,非常好用。
  • Maxwell不能直接支持HA,但是它支持斷點還原,即錯誤解決后重啟繼續上次點兒讀取數據。
  • Maxwell只支持json格式,而Canal如果用Server+client模式的話,可以自定義格式。
  • Maxwell比Canal更加輕量級。

安裝地址

1.Maxwell官網地址   http://maxwells-daemon.io/

2.安裝包下載地址    https://github.com/zendesk/maxwell

安裝MaxWell

(1)上傳maxwell-1.22.4.tar.gz 並進行解壓

   tar -zxvf maxwell-1.22.4.tar.gz -C /opt/module/

修改config.properties.example為config.properties

MySql配置

需要mysql開啟binlog,而binlog默認是關閉的,需要開啟,並且為了保證同步數據的一致性,使用的日志格式為row-based replication(RBR),新建或修改my.conf開啟binlog。

(1)修改my-default.cnf

[kris@hadoop101 maxwell-1.22.5]$ cd /usr/share/mysql/ [kris@hadoop101 mysql]$ sudo cp my-default.cnf /etc/my.cnf
[kris@hadoop101 mysql]$ sudo vim /etc/my.cnf

提示:如果是rpm的方式安裝MySql的那么則沒有my.cnf文件,則需要修改mysql-default.cnf然后復制到/etc/路徑下,如果/etc/路徑下有my.cnf則直接修改my.cnf即可

(2)修改my.cnf,添加如下配置。

[kris@hadoop101 mysql]$ sudo vim /etc/my.cnf
[mysqld]
server_id=1 #隨機指定一個不能和其他集群中機器重名的字符串,如果只有一台機器,那就可以隨便指定了 log-bin=master binlog_format=row #選擇row模式

sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

 啟動MySql設置參數

mysql> set global binlog_format=ROW; mysql> set global binlog_row_image=FULL;

創建maxwell庫(maxwell啟動時候會自動創建,不需手動創建;另外不能在maxwell庫里邊創建表,maxwell不會監聽到)和用戶

添加權限:

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456'; mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%'; mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%'; 重啟MySql服務 [kris@hadoop101 ~]$ sudo service mysql restart

重啟mysql, 查詢是否已開啟

show global variables like '%log_bin%'

 

       

 

 查看binlog:  show binlog events;

查看最新一個binlog日志文件名稱 show master status;

查找binlog日志 find / -name mysql-bin -type f 

查看詳細的日志配置信息SHOW  GLOBAL VARIABLES LIKE '%log%';

mysql數據存儲目錄 show variables like '%dir%'; 

Maxwell 使用

   需要首先啟動 zookeeper和 kafka

max的啟動方式有:

1)基於命令行的啟動方式
[kris@hadoop101 maxwell-1.22.5]$ cd /opt/module/
[kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=stdout
(2)基於kafka的啟動方式
[kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=kafka --kafka.bootstrap.servers=hadoop102:9092 --kafka_topic=maxwell
(3)基於RabbitMQ的啟動方式
[kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=rabbitmq --rabbitmq_host='rabbitmq.hostname'4)基於Redis的啟動方式
[kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=redis --redis_host=redis.hostname 提示:maxwell 支持端點續傳功能,如果maxwell不慎掛掉了,重啟會從上次的消費位置繼續讀取。
另外,maxwell如果和Mysql不在同一台機器上只需要修改
--host=hostname即可。

 

 1. 輸出信息到命令行

(1)啟動MaxWell

kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=stdout
Using kafka version: 1.0.0
17:20:11,351 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
17:20:11,813 INFO  SchemaStoreSchema - Creating maxwell database
17:20:12,367 INFO  Maxwell - Maxwell v1.22.5 is booting (StdoutProducer), starting at Position[BinlogPosition[master.000001:3816], lastHeartbeat=0]
17:20:12,625 INFO  AbstractSchemaStore - Maxwell is capturing initial schema
17:20:13,733 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: master.000001:3816
17:20:13,828 INFO  BinaryLogClient - Connected to hadoop101:3306 at master.000001/3816 (sid:6379, cid:5)
17:20:13,829 INFO  BinlogConnectorLifecycleListener - Binlog connected.

    自動創建的maxwell庫,往里邊創建表它是不會監聽到的;

    該庫記錄了maxwell同步的狀態,最后一次同步的id等等信息,在主庫失敗或同步異常后,只要maxwell庫存在,下次同步會根據最后一次同步的id。如果沒有生成maxwell庫或報錯,可能config.properties中配置的mysql用戶權限不夠。

maxwell會在對應的數據庫上建立一個maxwell數據庫用來記錄binlog的position, 如果maxwell因為某個原因失敗了,下次根據數據庫里保存的position去重新獲取binlog,第一次連接是從最新的log position獲取數據,當然,你也可以手動去更新數據庫position位置。

 

對mysql的test測試庫進行增刪改操作:

INSERT INTO stu1 VALUES(1003,'kris',22); //增、刪、改

查看MaxWell 阻塞窗口

{"database":"test","table":"stu1","type":"delete","ts":1576747370,"xid":454,"commit":true,"data":{"id":1003,"name":"wangwu","age":23}} {"database":"test","table":"stu1","type":"insert","ts":1576747481,"xid":622,"commit":true,"data":{"id":1003,"name":"kris","age":22}} {"database":"test","table":"stu1","type":"update","ts":1576747522,"xid":695,"commit":true,"data":{"id":1002,"name":"lisi","age":30},"old":{"age":18}}

 2. 輸出信息到kafka

 啟動kafka集群:

[kris@hadoop101 kafka]$ bin/kafka-server-start.sh config/server.properties &
[1] 3459
[kris@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[1] 6320
[kris@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &
[1] 5823

創建kafka的maxwell topic主題

[kris@hadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server Hadoop101:9092 --topic maxwell

啟動kafka消費者命令行監聽maxwell:

[kris@hadoop101 bin]$ ./kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic maxwell --from-beginning [2019-12-19 18:59:14,209] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-12-19 19:03:38,595] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: maxwell-1. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)

 啟動maxwell

[kris@hadoop101 maxwell-1.22.4]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=kafka --kafka.bootstrap.servers=hadoop101:9092 --kafka_topic=maxwell

對test庫中的stu1表進行增刪改的操作,查看kafka命令行消費者的監聽數據:

 

 kafka DML、kafka DDL

再寫一個中間件,用來轉義這些JSON,就可以用於其它任何系統了。 實際上我這里有個很大的疑問,kafka partition只能保證單個partition有序,多個partition實際上是無序的,但是在應用binlog的時候,大家知道,這個SQL順序極其重要,我先update再delete和先delete再update是決然不同的結果。

因此如果kafka是多個partition, 如何保證SQL的應用是有順序的呢? 我實際上只能想到,只能用一個partition來操作。

 

過濾指定規則的庫:http://maxwells-daemon.io/filtering/

 

全量同步--使用maxwell-bootstrap命令

./bin/maxwell-bootstrap --database test--table stu1 --host hadoop101 --user maxwell --password 123456  --client_id maxwell_dev

同步test.stu1表的所有數據,並指定client_id示maxwell_dev的maxwell執行同步

上一個命令先開着,然后再啟動client_id=maxwell_dev的maxwell

./bin/maxwell --client_id maxwell_dev

等待執行完成即可

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM