概述
Maxwell是一個能實時讀取MySQL二進制日志binlog,並生成 JSON 格式的消息,作為生產者發送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的應用程序。它的常見應用場景有ETL、維護緩存、收集表級別的dml指標、增量到搜索引擎、數據分區遷移、切庫binlog回滾方案等。
特征
- 支持 SELECT * FROM table 的方式進行全量數據初始化
- 支持在主庫發生failover后,自動恢復binlog位置(GTID)
- 可以對數據進行分區,解決數據傾斜問題,發送到kafka的數據支持database、table、column等級別的數據分區
- 工作方式是偽裝為Slave,接收binlog events,然后根據schemas信息拼裝,可以接受ddl、xid、row等各種event
Canal |Maxwell| mysql_streamer
除了Maxwell外,目前常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三個工具對比如下:
canal 由Java開發,分為服務端和客戶端,擁有眾多的衍生應用,性能穩定,功能強大;canal 需要自己編寫客戶端來消費canal解析到的數據。
maxwell相對於canal的優勢是使用簡單,它直接將數據變更輸出為json字符串,不需要再編寫客戶端。
Maxwell和canal工具對比
- Maxwell沒有canal那種server+client模式,只有一個server把數據發送到消息隊列或redis。如果需要多個實例,通過指定不同配置文件啟動多個進程。
- Maxwell有一個亮點功能,就是canal只能抓取最新數據,對已存在的歷史數據沒有辦法處理。而Maxwell有一個bootstrap功能,可以直接引導出完整的歷史數據用於初始化,非常好用。
- Maxwell不能直接支持HA,但是它支持斷點還原,即錯誤解決后重啟繼續上次點兒讀取數據。
- Maxwell只支持json格式,而Canal如果用Server+client模式的話,可以自定義格式。
- Maxwell比Canal更加輕量級。
安裝地址
1.Maxwell官網地址 http://maxwells-daemon.io/
2.安裝包下載地址 https://github.com/zendesk/maxwell
安裝MaxWell
(1)上傳maxwell-1.22.4.tar.gz 並進行解壓
tar -zxvf maxwell-1.22.4.tar.gz -C /opt/module/
修改config.properties.example為config.properties
MySql配置
需要mysql開啟binlog,而binlog默認是關閉的,需要開啟,並且為了保證同步數據的一致性,使用的日志格式為row-based replication(RBR),新建或修改my.conf
開啟binlog。
(1)修改my-default.cnf
[kris@hadoop101 maxwell-1.22.5]$ cd /usr/share/mysql/ [kris@hadoop101 mysql]$ sudo cp my-default.cnf /etc/my.cnf [kris@hadoop101 mysql]$ sudo vim /etc/my.cnf
提示:如果是rpm的方式安裝MySql的那么則沒有my.cnf文件,則需要修改mysql-default.cnf然后復制到/etc/路徑下,如果/etc/路徑下有my.cnf則直接修改my.cnf即可。
(2)修改my.cnf,添加如下配置。
[kris@hadoop101 mysql]$ sudo vim /etc/my.cnf
[mysqld]
server_id=1 #隨機指定一個不能和其他集群中機器重名的字符串,如果只有一台機器,那就可以隨便指定了 log-bin=master binlog_format=row #選擇row模式
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
啟動MySql設置參數
mysql> set global binlog_format=ROW; mysql> set global binlog_row_image=FULL;
創建maxwell庫(maxwell啟動時候會自動創建,不需手動創建;另外不能在maxwell庫里邊創建表,maxwell不會監聽到)和用戶
添加權限:
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456'; mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%'; mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%'; 重啟MySql服務 [kris@hadoop101 ~]$ sudo service mysql restart
重啟mysql, 查詢是否已開啟
show global variables like '%log_bin%'
查看binlog: show binlog events;
查看最新一個binlog日志文件名稱 show master status;
查找binlog日志 find / -name mysql-bin -type f
查看詳細的日志配置信息SHOW GLOBAL VARIABLES LIKE '%log%';
mysql數據存儲目錄 show variables like '%dir%';
Maxwell 使用
需要首先啟動 zookeeper和 kafka
max的啟動方式有:
(1)基於命令行的啟動方式 [kris@hadoop101 maxwell-1.22.5]$ cd /opt/module/ [kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=stdout (2)基於kafka的啟動方式 [kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=kafka --kafka.bootstrap.servers=hadoop102:9092 --kafka_topic=maxwell (3)基於RabbitMQ的啟動方式 [kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=rabbitmq --rabbitmq_host='rabbitmq.hostname' (4)基於Redis的啟動方式 [kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=redis --redis_host=redis.hostname 提示:maxwell 支持端點續傳功能,如果maxwell不慎掛掉了,重啟會從上次的消費位置繼續讀取。
另外,maxwell如果和Mysql不在同一台機器上只需要修改 --host=hostname即可。
1. 輸出信息到命令行
(1)啟動MaxWell
kris@hadoop101 maxwell-1.22.5]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=stdout Using kafka version: 1.0.0 17:20:11,351 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured. 17:20:11,813 INFO SchemaStoreSchema - Creating maxwell database 17:20:12,367 INFO Maxwell - Maxwell v1.22.5 is booting (StdoutProducer), starting at Position[BinlogPosition[master.000001:3816], lastHeartbeat=0] 17:20:12,625 INFO AbstractSchemaStore - Maxwell is capturing initial schema 17:20:13,733 INFO BinlogConnectorReplicator - Setting initial binlog pos to: master.000001:3816 17:20:13,828 INFO BinaryLogClient - Connected to hadoop101:3306 at master.000001/3816 (sid:6379, cid:5) 17:20:13,829 INFO BinlogConnectorLifecycleListener - Binlog connected.
自動創建的maxwell庫,往里邊創建表它是不會監聽到的;
該庫記錄了maxwell同步的狀態,最后一次同步的id等等信息,在主庫失敗或同步異常后,只要maxwell庫存在,下次同步會根據最后一次同步的id。如果沒有生成maxwell庫或報錯,可能config.properties中配置的mysql用戶權限不夠。
maxwell會在對應的數據庫上建立一個maxwell數據庫用來記錄binlog的position, 如果maxwell因為某個原因失敗了,下次根據數據庫里保存的position去重新獲取binlog,第一次連接是從最新的log position獲取數據,當然,你也可以手動去更新數據庫position位置。
對mysql的test測試庫進行增刪改操作:
INSERT INTO stu1 VALUES(1003,'kris',22); //增、刪、改
查看MaxWell 阻塞窗口
{"database":"test","table":"stu1","type":"delete","ts":1576747370,"xid":454,"commit":true,"data":{"id":1003,"name":"wangwu","age":23}} {"database":"test","table":"stu1","type":"insert","ts":1576747481,"xid":622,"commit":true,"data":{"id":1003,"name":"kris","age":22}} {"database":"test","table":"stu1","type":"update","ts":1576747522,"xid":695,"commit":true,"data":{"id":1002,"name":"lisi","age":30},"old":{"age":18}}
2. 輸出信息到kafka
啟動kafka集群:
[kris@hadoop101 kafka]$ bin/kafka-server-start.sh config/server.properties & [1] 3459 [kris@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties & [1] 6320 [kris@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties & [1] 5823
創建kafka的maxwell topic主題
[kris@hadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server Hadoop101:9092 --topic maxwell
啟動kafka消費者命令行監聽maxwell:
[kris@hadoop101 bin]$ ./kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic maxwell --from-beginning [2019-12-19 18:59:14,209] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2019-12-19 19:03:38,595] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: maxwell-1. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
啟動maxwell
[kris@hadoop101 maxwell-1.22.4]$ bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=kafka --kafka.bootstrap.servers=hadoop101:9092 --kafka_topic=maxwell
對test庫中的stu1表進行增刪改的操作,查看kafka命令行消費者的監聽數據:
kafka DML、kafka DDL
再寫一個中間件,用來轉義這些JSON,就可以用於其它任何系統了。 實際上我這里有個很大的疑問,kafka partition只能保證單個partition有序,多個partition實際上是無序的,但是在應用binlog的時候,大家知道,這個SQL順序極其重要,我先update再delete和先delete再update是決然不同的結果。
因此如果kafka是多個partition, 如何保證SQL的應用是有順序的呢? 我實際上只能想到,只能用一個partition來操作。
過濾指定規則的庫:http://maxwells-daemon.io/filtering/
全量同步--使用maxwell-bootstrap命令
./bin/maxwell-bootstrap --database test--table stu1 --host hadoop101 --user maxwell --password 123456 --client_id maxwell_dev
同步test.stu1表的所有數據,並指定client_id示maxwell_dev的maxwell執行同步
上一個命令先開着,然后再啟動client_id=maxwell_dev的maxwell
./bin/maxwell --client_id maxwell_dev
等待執行完成即可