maxwell 簡介
Maxwell是一個能實時讀取MySQL二進制日志binlog,並生成 JSON 格式的消息,作為生產者發送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的應用程序。它的常見應用場景有ETL、維護緩存、收集表級別的dml指標、增量到搜索引擎、數據分區遷移、切庫binlog回滾方案等。官網(http://maxwells-daemon.io)、GitHub(https://github.com/zendesk/maxwell)
Maxwell主要提供了下列功能:
- 支持
SELECT * FROM table
的方式進行全量數據初始化 - 支持在主庫發生failover后,自動恢復binlog位置(GTID)
- 可以對數據進行分區,解決數據傾斜問題,發送到kafka的數據支持database、table、column等級別的數據分區
- 工作方式是偽裝為Slave,接收binlog events,然后根據schemas信息拼裝,可以接受ddl、xid、row等各種event
除了Maxwell外,目前常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三個工具對比如下:
canal 由Java開發,分為服務端和客戶端,擁有眾多的衍生應用,性能穩定,功能強大;canal 需要自己編寫客戶端來消費canal解析到的數據。
maxwell相對於canal的優勢是使用簡單,它直接將數據變更輸出為json字符串,不需要再編寫客戶端。
快速開始
首先MySQL需要先啟用binlog,關於什么是MySQL binlog,可以參考文章《MySQL Binlog 介紹》
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
創建Maxwell用戶,並賦予 maxwell 庫的一些權限
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
使用 maxwell 之前需要先啟動 kafka
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
# 啟動Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
單機啟動 kafka 之前,需要修改一下配置文件,打開配置文件 vi config/server.properties
,在文件最后加入 advertised.host.name
的配置,值為 kafka 所在機器的IP
advertised.host.name=10.100.97.246
不然后面通過 docker 啟動 maxwell 將會報異常(其中的 hadoop2 是我的主機名)
17:45:21,446 DEBUG NetworkClient - [Producer clientId=producer-1] Error connecting to node hadoop2:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: hadoop2:9092
at org.apache.kafka.common.network.Selector.connect(Selector.java:217) ~[kafka-clients-1.0.0.jar:?]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793) [kafka-clients-1.0.0.jar:?]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230) [kafka-clients-1.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:263) [kafka-clients-1.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-1.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-1.0.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101) ~[?:1.8.0_181]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[?:1.8.0_181]
at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-1.0.0.jar:?]
... 6 more
接着可以啟動 kafka
bin/kafka-server-start.sh config/server.properties
測試 kafka
# 創建一個 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 列出所有 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 啟動一個生產者,然后隨意發送一些消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
# 在另一個終端啟動一下消費者,觀察所消費的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
通過 docker 快速安裝並使用 Maxwell (當然之前需要自行安裝 docker)
# 拉取鏡像
docker pull zendesk/maxwell
# 啟動maxwell,並將解析出的binlog輸出到控制台
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='10.100.97.246' --producer=stdout
測試Maxwell,首先創建一張簡單的表,然后增改刪數據
CREATE TABLE `test` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`age` int(11) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into test values(1,22,"小旋鋒");
update test set name='whirly' where id=1;
delete from test where id=1;
觀察docker控制台的輸出,從輸出的日志中可以看出Maxwell解析出的binlog的JSON字符串的格式
{"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小旋鋒"}}
{"database":"test","table":"test","type":"update","ts":1552153502,"xid":833,"commit":true,"data":{"id":1,"age":22,"name":"whirly"},"old":{"name":"小旋鋒"}}
{"database":"test","table":"test","type":"delete","ts":1552153502,"xid":834,"commit":true,"data":{"id":1,"age":22,"name":"whirly"}}
輸出到 Kafka,關閉 docker,重新設置啟動參數
docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
--password='123456' --host='10.100.97.246' --producer=kafka \
--kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug
然后啟動一個消費者來消費 maxwell topic的消息,觀察其輸出;再一次執行增改刪數據的SQL,仍然可以得到相同的輸出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell
輸出JSON字符串的格式
- data 最新的數據,修改后的數據
- old 舊數據,修改前的數據
- type 操作類型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知類型)
- xid 事務id
- commit 同一個xid代表同一個事務,事務的最后一條語句會有commit,可以利用這個重現事務
- server_id
- thread_id
- 運行程序時添加參數--output_ddl,可以捕捉到ddl語句
- datetime列會輸出為"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"會原樣輸出
- maxwell支持多種編碼,但僅輸出utf8編碼
- maxwell的TIMESTAMP總是作為UTC處理,如果要調整為自己的時區,需要在后端邏輯上進行處理
與輸出格式相關的配置如下
選項 | 參數值 | 描述 | 默認值 |
---|---|---|---|
output_binlog_position |
BOOLEAN | 是否包含 binlog position | false |
output_gtid_position |
BOOLEAN | 是否包含 gtid position | false |
output_commit_info |
BOOLEAN | 是否包含 commit and xid | true |
output_xoffset |
BOOLEAN | 是否包含 virtual tx-row offset | false |
output_nulls |
BOOLEAN | 是否包含值為NULL的字段 | true |
output_server_id |
BOOLEAN | 是否包含 server_id | false |
output_thread_id |
BOOLEAN | 是否包含 thread_id | false |
output_schema_id |
BOOLEAN | 是否包含 schema_id | false |
output_row_query |
BOOLEAN | 是否包含 INSERT/UPDATE/DELETE 語句. Mysql需要開啟 binlog_rows_query_log_events |
false |
output_ddl |
BOOLEAN | 是否包含 DDL (table-alter, table-create, etc) events | false |
output_null_zerodates |
BOOLEAN | 是否將 '0000-00-00' 轉換為 null? | false |
進階使用
基本的配置
選項 | 參數值 | 描述 | 默認值 |
---|---|---|---|
config |
配置文件 config.properties 的路徑 |
||
log_level |
[debug | info | warn | error] |
日志級別 | info |
daemon |
指定Maxwell實例作為守護進程到后台運行 | ||
env_config_prefix |
STRING | 匹配該前綴的環境變量將被視為配置值 |
可以把Maxwell的啟動參數寫到一個配置文件 config.properties
中,然后通過 config 選項指定,bin/maxwell --config config.properties
user=maxwell
password=123456
host=10.100.97.246
producer=kafka
kafka.bootstrap.servers=10.100.97.246:9092
kafka_topic=maxwell
mysql 配置選項
Maxwell 根據用途將 MySQL 划分為3種角色:
-
host
:主機,建maxwell庫表,存儲捕獲到的schema等信息- 主要有六張表,bootstrap用於數據初始化,schemas記錄所有的binlog文件信息,databases記錄了所有的數據庫信息,tables記錄了所有的表信息,columns記錄了所有的字段信息,positions記錄了讀取binlog的位移信息,heartbeats記錄了心跳信息
-
replication_host
:復制主機,Event監聽,讀取該主機binlog- 將
host
和replication_host
分開,可以避免replication_user
往生產庫里寫數據
- 將
-
schema_host
:schema主機,捕獲表結構schema的主機- binlog里面沒有字段信息,所以maxwell需要從數據庫查出schema,存起來。
schema_host
一般用不到,但在binlog-proxy
場景下就很實用。比如要將已經離線的binlog通過maxwell生成json流,於是自建一個mysql server里面沒有結構,只用於發送binlog,此時表機構就可以制動從 schema_host 獲取。
通常,這三個主機都是同一個,schema_host
只在有 replication_host
的時候使用。
與MySQL相關的有下列配置
選項 | 參數值 | 描述 | 默認值 |
---|---|---|---|
host |
STRING | mysql 地址 | localhost |
user |
STRING | mysql 用戶名 | |
password |
STRING | mysql 密碼 | (no password) |
port |
INT | mysql 端口 3306 | |
jdbc_options |
STRING | mysql jdbc connection options | DEFAULT_JDBC_OPTS |
ssl |
SSL_OPT | SSL behavior for mysql cx | DISABLED |
schema_database |
STRING | Maxwell用於維護的schema和position將使用的數據庫 | maxwell |
client_id |
STRING | 用於標識Maxwell實例的唯一字符串 | maxwell |
replica_server_id |
LONG | 用於標識Maxwell實例的唯一數字 | 6379 (see notes) |
master_recovery |
BOOLEAN | enable experimental master recovery code | false |
gtid_mode |
BOOLEAN | 是否開啟基於GTID的復制 | false |
recapture_schema |
BOOLEAN | 重新捕獲最新的表結構(schema),不可在 config.properties中配置 | false |
replication_host |
STRING | server to replicate from. See split server roles | schema-store host |
replication_password |
STRING | password on replication server | (none) |
replication_port |
INT | port on replication server | 3306 |
replication_user |
STRING | user on replication server | |
replication_ssl |
SSL_OPT | SSL behavior for replication cx cx | DISABLED |
schema_host |
STRING | server to capture schema from. See split server roles | schema-store host |
schema_password |
STRING | password on schema-capture server | (none) |
schema_port |
INT | port on schema-capture server | 3306 |
schema_user |
STRING | user on schema-capture server | |
schema_ssl |
SSL_OPT | SSL behavior for schema-capture server | DISABLED |
生產者的配置
僅介紹kafka,其他的生產者的配置詳見官方文檔。
kafka是maxwell支持最完善的一個生產者,並且內置了多個版本的kafka客戶端(0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.2.1 or 0.11.0.1, 1.0.0.),默認 kafka_version=1.0.0(當前Maxwell版本1.20.0)
Maxwell 會將消息投遞到Kafka的Topic中,該Topic由 kafka_topic
選項指定,默認值為 maxwell
,除了指定為靜態的Topic,還可以指定為動態的,譬如 namespace_%{database}_%{table}
,%{database}
和 %{table}
將被具體的消息的 database 和 table 替換。
Maxwell 讀取配置時,如果配置項是以 kafka.
開頭,那么該配置將設置到 Kafka Producer 客戶端的連接參數中去,譬如
kafka.acks = 1
kafka.compression.type = snappy
kafka.retries=5
下面是Maxwell通用生產者和Kafka生產者的配置參數
選項 | 參數值 | 描述 | 默認值 |
---|---|---|---|
producer |
PRODUCER_TYPE | 生產者類型 | stdout |
custom_producer.factory |
CLASS_NAME | 自定義消費者的工廠類 | |
producer_ack_timeout |
PRODUCER_ACK_TIMEOUT | 異步消費認為消息丟失的超時時間(毫秒ms) | |
producer_partition_by |
PARTITION_BY | 輸入到kafka/kinesis的分區函數 | database |
producer_partition_columns |
STRING | 若按列分區,以逗號分隔的列名稱 | |
producer_partition_by_fallback |
PARTITION_BY_FALLBACK | producer_partition_by=column 時需要,當列不存在是使用 |
|
ignore_producer_error |
BOOLEAN | 為false時,在kafka/kinesis發生錯誤時退出程序;為true時,僅記錄日志 See also dead_letter_topic |
true |
kafka.bootstrap.servers |
STRING | kafka 集群列表,HOST:PORT[,HOST:PORT] |
|
kafka_topic |
STRING | kafka topic | maxwell |
dead_letter_topic |
STRING | 詳見官方文檔 | |
kafka_version |
KAFKA_VERSION | 指定maxwell的 kafka 生產者客戶端版本,不可在config.properties中配置 | 0.11.0.1 |
kafka_partition_hash |
[default | murmur3] |
選擇kafka分區時使用的hash方法 | default |
kafka_key_format |
[array | hash] |
how maxwell outputs kafka keys, either a hash or an array of hashes | hash |
ddl_kafka_topic |
STRING | 當output_ddl 為true時, 所有DDL的消息都將投遞到該topic |
kafka_topic |
過濾器配置
Maxwell 可以通過 --filter
配置項來指定過濾規則,通過 exclude
排除,通過 include
包含,值可以為具體的數據庫、數據表、數據列,甚至用 Javascript 來定義復雜的過濾規則;可以用正則表達式描述,有幾個來自官網的例子
# 僅匹配foodb數據庫的tbl表和所有table_數字的表
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
# 排除所有庫所有表,僅匹配db1數據庫
--filter = 'exclude: *.*, include: db1.*'
# 排除含db.tbl.col列值為reject的所有更新
--filter = 'exclude: db.tbl.col = reject'
# 排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
# blacklist 黑名單,完全排除bad_db數據庫,若要恢復,必須刪除maxwell庫
--filter = 'blacklist: bad_db.*'
數據初始化
Maxwell 啟動后將從maxwell庫中獲取上一次停止時position,從該斷點處開始讀取binlog。如果binlog已經清除了,那么怎樣可以通過maxwell把整張表都復制出來呢?也就是數據初始化該怎么做?
對整張表進行操作,人為地產生binlog?譬如找一個不影響業務的字段譬如update_time,然后加一秒,再減一秒?
update test set update_time = DATE_ADD(update_time,intever 1 second);
update test set update_time = DATE_ADD(update_time,intever -1 second);
這樣明顯存在幾個大問題:
- 不存在一個不重要的字段怎么辦?每個字段都很重要,不能隨便地修改!
- 如果整張表很大,修改的過程耗時很長,影響了業務!
- 將產生大量非業務的binlog!
針對數據初始化的問題,Maxwell 提供了一個命令工具 maxwell-bootstrap
幫助我們完成數據初始化,maxwell-bootstrap
是基於 SELECT * FROM table
的方式進行全量數據初始化,不會產生多余的binlog!
這個工具有下面這些參數:
參數 | 說明 |
---|---|
--log_level LOG_LEVEL |
日志級別(DEBUG, INFO, WARN or ERROR) |
--user USER |
mysql 用戶名 |
--password PASSWORD |
mysql 密碼 |
--host HOST |
mysql 地址 |
--port PORT |
mysql 端口 |
--database DATABASE |
要bootstrap的表所在的數據庫 |
--table TABLE |
要引導的表 |
--where WHERE_CLAUSE |
設置過濾條件 |
--client_id CLIENT_ID |
指定執行引導操作的Maxwell實例 |
實驗一番,下面將引導 test
數據庫中 test
表,首先是准備幾條測試用的數據
INSERT INTO `test` VALUES (1, 1, '1');
INSERT INTO `test` VALUES (2, 2, '2');
INSERT INTO `test` VALUES (3, 3, '3');
INSERT INTO `test` VALUES (4, 4, '4');
然后 reset master;
清空binlog,刪除 maxwell 庫中的表。接着使用快速開始中的命令,啟動Kafka、Maxwell和Kafka消費者,然后啟動 maxwell-bootstrap
docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap --user maxwell \
--password 123456 --host 10.100.97.246 --database test --table test --client_id maxwell
注意:--bootstrapper=sync
時,在處理bootstrap時,會阻塞正常的binlog解析;--bootstrapper=async
時,不會阻塞。
也可以執行下面的SQL,在 maxwell.bootstrap
表中插入記錄,手動觸發
insert into maxwell.bootstrap (database_name, table_name) values ('test', 'test');
就可以在 kafka 消費者端看見引導過來的數據了
{"database":"maxwell","table":"bootstrap","type":"insert","ts":1552199115,"xid":36738,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell"}}
{"database":"test","table":"test","type":"bootstrap-start","ts":1552199115,"data":{}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":1,"age":1,"name":"1"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":2,"age":2,"name":"2"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":3,"age":3,"name":"3"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":4,"age":4,"name":"4"}}
{"database":"maxwell","table":"bootstrap","type":"update","ts":1552199115,"xid":36756,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":1,"inserted_rows":4,"total_rows":0,"created_at":null,"started_at":"2019-03-10 14:25:15","completed_at":"2019-03-10 14:25:15","binlog_file":"mysql-bin.000001","binlog_position":64446,"client_id":"maxwell"},"old":{"is_complete":0,"inserted_rows":1,"completed_at":null}}
{"database":"test","table":"test","type":"bootstrap-complete","ts":1552199115,"data":{}}
中間的4條便是 test.test
的binlog數據了,注意這里的 type 是 bootstrap-insert
,而不是 insert
。
然后再一次查看binlog,show binlog events;
,會發現只有與 maxwell
相關的binlog,並沒有 test.test
相關的binlog,所以 maxwell-bootstrap
命令並不會產生多余的 binlog,當數據表的數量很大時,這個好處會更加明顯
Bootstrap 的過程是 bootstrap-start -> bootstrap-insert -> bootstrap-complete
,其中,start和complete的data字段為空,不攜帶數據。
在進行bootstrap過程中,如果maxwell崩潰,重啟時,bootstrap會完全重新開始,不管之前進行到多少,若不希望這樣,可以到數據庫中設置 is_complete
字段值為1(表示完成),或者刪除該行
Maxwell監控
Maxwell 提供了 base logging mechanism, JMX, HTTP or by push to Datadog
這四種監控方式,與監控相關的配置項有下列這些:
選項 | 參數值 | 描述 | 默認值 |
---|---|---|---|
metrics_prefix |
STRING | 指標的前綴 | MaxwellMetrics |
metrics_type |
[slf4j | jmx | http | datadog] |
發布指標的方式 | |
metrics_jvm |
BOOLEAN | 是否收集JVM信息 | false |
metrics_slf4j_interval |
SECONDS | 將指標記錄到日志的頻率,metrics_type 須配置為slf4j |
60 |
http_port |
INT | metrics_type 為http時,發布指標綁定的端口 |
8080 |
http_path_prefix |
STRING | http的路徑前綴 | / |
http_bind_address |
STRING | http發布指標綁定的地址 | all addresses |
http_diagnostic |
BOOLEAN | http是否開啟diagnostic后綴 | false |
http_diagnostic_timeout |
MILLISECONDS | http diagnostic 響應超時時間 | 10000 |
metrics_datadog_type |
[udp | http] |
metrics_type 為datadog時發布指標的方式 |
udp |
metrics_datadog_tags |
STRING | 提供給 datadog 的標簽,如 tag1:value1,tag2:value2 | |
metrics_datadog_interval |
INT | 推指標到datadog的頻率,單位秒 | 60 |
metrics_datadog_apikey |
STRING | 當 metrics_datadog_type=http 時datadog用的api key |
|
metrics_datadog_host |
STRING | 當metrics_datadog_type=udp 時推指標的目標地址 |
localhost |
metrics_datadog_port |
INT | 當metrics_datadog_type=udp 時推指標的端口 |
8125 |
具體可以得到哪些監控指標呢?有如下,注意所有指標都預先配置了指標前綴 metrics_prefix
指標 | 類型 | 說明 |
---|---|---|
messages.succeeded |
Counters | 成功發送到kafka的消息數量 |
messages.failed |
Counters | 發送失敗的消息數量 |
row.count |
Counters | 已處理的binlog行數,注意並非所有binlog都發往kafka |
messages.succeeded.meter |
Meters | 消息成功發送到Kafka的速率 |
messages.failed.meter |
Meters | 消息發送失敗到kafka的速率 |
row.meter |
Meters | 行(row)從binlog連接器到達maxwell的速率 |
replication.lag |
Gauges | 從數據庫事務提交到Maxwell處理該事務之間所用的時間(毫秒) |
inflightmessages.count |
Gauges | 當前正在處理的消息數(等待來自目的地的確認,或在消息之前) |
message.publish.time |
Timers | 向kafka發送record所用的時間(毫秒) |
message.publish.age |
Timers | 從數據庫產生事件到發送到Kafka之間的時間(毫秒),精確度為+/-500ms |
replication.queue.time |
Timers | 將一個binlog事件送到處理隊列所用的時間(毫秒) |
上述有些指標為kafka特有的,並不支持所有的生產者。
實驗一番,通過 http 方式獲取監控指標
docker run -p 8080:8080 -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
--password='123456' --host='10.100.97.246' --producer=kafka \
--kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug \
--metrics_type=http --metrics_jvm=true --http_port=8080
上面的配置大部分與前面的相同,不同的有 -p 8080:8080
docker端口映射,以及 --metrics_type=http --metrics_jvm=true --http_port=8080
,配置了通過http方式發布指標,啟用收集JVM信息,端口為8080,之后可以通過 http://10.100.97.246:8080/metrics
便可獲取所有的指標
http 方式有四種后綴,分別對應四種不同的格式
endpoint | 說明 |
---|---|
/metrics |
所有指標以JSON格式返回 |
/prometheus |
所有指標以Prometheus格式返回(Prometheus是一套開源的監控&報警&時間序列數據庫的組合) |
/healthcheck |
返回Maxwell過去15分鍾是否健康 |
/ping |
簡單的測試,返回 pong |
如果是通過 JMX 的方式收集Maxwell監控指標,可以 JAVA_OPTS
環境變量配置JMX訪問權限
export JAVA_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9010 \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=10.100.97.246"
多個Maxwell實例
在不同的配置下,Maxwell可以在同一個主服務器上運行多個實例。如果希望讓生產者以不同的配置運行,例如將來自不同組的表(table)的事件投遞到不同的Topic中,這將非常有用。Maxwell的每個實例都必須配置一個唯一的client_id,以便區分的binlog位置。
GTID 支持
Maxwell 從1.8.0版本開始支持基於GTID的復制(GTID-based replication),在GTID模式下,Maxwell將在主機更改后透明地選擇新的復制位置。
什么是GTID Replication?
GTID (Global Transaction ID) 是對於一個已提交事務的編號,並且是一個全局唯一的編號。
從 MySQL 5.6.5 開始新增了一種基於 GTID 的復制方式。通過 GTID 保證了每個在主庫上提交的事務在集群中有一個唯一的ID。這種方式強化了數據庫的主備一致性,故障恢復以及容錯能力。
在原來基於二進制日志的復制中,從庫需要告知主庫要從哪個偏移量進行增量同步,如果指定錯誤會造成數據的遺漏,從而造成數據的不一致。借助GTID,在發生主備切換的情況下,MySQL的其它從庫可以自動在新主庫上找到正確的復制位置,這大大簡化了復雜復制拓撲下集群的維護,也減少了人為設置復制位置發生誤操作的風險。另外,基於GTID的復制可以忽略已經執行過的事務,減少了數據發生不一致的風險。
注意事項
timestamp column
maxwell對時間類型(datetime, timestamp, date)都是當做字符串處理的,這也是為了保證數據一致(比如0000-00-00 00:00:00
這樣的時間在timestamp里是非法的,但mysql卻認,解析成java或者python類型就是null/None)。
如果MySQL表上的字段是 timestamp 類型,是有時區的概念,binlog解析出來的是標准UTC時間,但用戶看到的是本地時間。比如 f_create_time timestamp
創建時間是北京時間 2018-01-05 21:01:01
,那么mysql實際存儲的是 2018-01-05 13:01:01
,binlog里面也是這個時間字符串。如果不做消費者不做時區轉換,會少8個小時。
與其每個客戶端都要考慮這個問題,我覺得更合理的做法是提供時區參數,然后maxwell自動處理時區問題,否則要么客戶端先需要知道哪些列是timestamp類型,或者連接上原庫緩存上這些類型。
binary column
maxwell可以處理binary類型的列,如blob、varbinary,它的做法就是對二進制列使用 base64_encode
,當做字符串輸出到json。消費者拿到這個列數據后,不能直接拼裝,需要 base64_decode
。
表結構不同步
如果是拿比較老的binlog,放到新的mysql server上去用maxwell拉去,有可能表結構已經發生了變化,比如binlog里面字段比 schema_host
里面的字段多一個。目前這種情況沒有發現異常,比如阿里RDS默認會為 無主鍵無唯一索引的表,增加一個__##alibaba_rds_rowid##__
,在 show create table
和 schema
里面都看不到這個隱藏主鍵,但binlog里面會有,同步到從庫。
另外我們有通過git去管理結構版本,如果真有這種場景,也可以應對。
大事務binlog
當一個事物產生的binlog量非常大的時候,比如遷移日表數據,maxwell為了控制內存使用,會自動將處理不過來的binlog放到文件系統
Using kafka version: 0.11.0.1
21:16:07,109 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
21:16:07,380 INFO SchemaStoreSchema - Creating maxwell database
21:16:07,540 INFO Maxwell - Maxwell v?? is booting (RabbitmqProducer), starting at Position[BinlogPosition[mysql-bin.006235:24980714],
lastHeartbeat=0]
21:16:07,649 INFO AbstractSchemaStore - Maxwell is capturing initial schema
21:16:08,267 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.006235:24980714
21:16:08,324 INFO BinaryLogClient - Connected to rm-xxxxxxxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006235/24980714 (sid:637
9, cid:9182598)
21:16:08,325 INFO BinlogConnectorLifecycleListener - Binlog connected.
03:15:36,104 INFO ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell7935334910787514257events
03:17:14,880 INFO ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell3143086481692829045events
但是遇到另外一個問題,overflow隨后就出現異常 EventDataDeserializationException: Failed to deserialize data of EventHeaderV4
,當我另起一個maxwell指點之前的binlog postion開始解析,卻有沒有拋異常。事后的數據也表明並沒有數據丟失。
問題產生的原因還不明,Caused by: java.net.SocketException: Connection reset
,感覺像讀取 binlog 流的時候還沒讀取到完整的event,異常關閉了連接。這個問題比較頑固,github上面類似問題都沒有達到明確的解決。(這也從側面告訴我們,大表數據遷移,也要批量進行,不要一個insert into .. select
搞定)
03:18:20,586 INFO ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell5229190074667071141events
03:19:31,289 WARN BinlogConnectorLifecycleListener - Communication failure.
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{time
stamp=1514920657000, eventType=WRITE_ROWS, serverId=2115082720, headerLength=19, dataLength=8155, nextPosition=520539918, flags=0}
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:216) ~[mys
ql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:184) ~[mysql-binlog-c
onnector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:890) [mysql-binlog-connector-java-0
.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559) [mysql-binlog-connector-java-0.13.0.jar:0.13
.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793) [mysql-binlog-connector-java-0.13.0.jar:0.13.0
]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[?:1.8.0_121]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_121]
at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:51) ~[mysql-binlog-connector-
java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:202) ~[mysql-binlo
g-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:184) ~[mysql-binlog-connector-java-0.13
.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46) ~[mysql-binlog-connector-jav
a-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeLong(AbstractRowsEventDataD
eserializer.java:212) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataD
eserializer.java:150) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:132) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDeserializer.java:64) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:56) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:32) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:210) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
... 5 more
03:19:31,514 INFO BinlogConnectorLifecycleListener - Binlog disconnected.
03:19:31,590 WARN BinlogConnectorReplicator - replicator stopped at position: mysql-bin.006236:520531744 -- restarting
03:19:31,595 INFO BinaryLogClient - Connected to rm-xxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006236/520531744 (sid:6379, cid:9220521)
tableMapCache
前面講過,如果我只想獲取某幾個表的binlog變更,需要用 include_tables 來過濾,但如果mysql server上現在刪了一個表t1,但我的binlog是從昨天開始讀取,被刪的那個表t1在maxwell啟動的時候是拉取不到表結構的。然后昨天的binlog里面有 t1 的變更,因為找不到表結構給來組裝成json,會拋異常。
手動在 maxwell.tables/columns
里面插入記錄是可行的。但這個問題的根本是,maxwell在binlog過濾的時候,只在處理row_event的時候,而對 tableMapCache 要求binlog里面的所有表都要有。
自己(seanlook)提交了一個commit,可以在做 tableMapCache 的時候也僅要求緩存 include_dbs/tables 這些表: https://github.com/seanlook/maxwell/commit/2618b70303078bf910a1981b69943cca75ee04fb
提高消費性能
在用rabbitmq時,routing_key
是 %db%.%table%
,但某些表產生的binlog增量非常大,就會導致各隊列消息量很不平均,目前因為還沒做到事務xid或者thread_id級別的並發回放,所以最小隊列粒度也是表,盡量單獨放一個隊列,其它數據量小的合在一起。
binlog
Maxwell 在 maxwell 庫中維護了 binlog 的位移等信息,由於一些原因譬如 reset master;
,導致 maxwell 庫中的記錄與實際的binlog對不上,這時將報異常,這是可以手動修正binlog位移或者直接清空/刪除 maxwell 庫重建
com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
at java.lang.Thread.run(Thread.java:748)
以及
com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin.000001' at 760357, the last event read from './mysql-bin.000001' at 1888540, the last byte read from './mysql-bin.000001' at 1888540.
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
at java.lang.Thread.run(Thread.java:748)
參考文檔
- Maxwell's Daemon Doc
- 輕風博客.MySQL Binlog解析工具Maxwell 1.17.1 的安裝和使用
- Sean.自建Binlog訂閱服務 —— Maxwell
- MySQL 5.7 基於 GTID 的主從復制實踐