MySQL Binlog 解析工具 Maxwell 詳解


maxwell 簡介

Maxwell是一個能實時讀取MySQL二進制日志binlog,並生成 JSON 格式的消息,作為生產者發送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的應用程序。它的常見應用場景有ETL、維護緩存、收集表級別的dml指標、增量到搜索引擎、數據分區遷移、切庫binlog回滾方案等。官網(http://maxwells-daemon.io)、GitHub(https://github.com/zendesk/maxwell)

Maxwell主要提供了下列功能:

  • 支持 SELECT * FROM table 的方式進行全量數據初始化
  • 支持在主庫發生failover后,自動恢復binlog位置(GTID)
  • 可以對數據進行分區,解決數據傾斜問題,發送到kafka的數據支持database、table、column等級別的數據分區
  • 工作方式是偽裝為Slave,接收binlog events,然后根據schemas信息拼裝,可以接受ddl、xid、row等各種event

除了Maxwell外,目前常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三個工具對比如下:

canal、maxwell、mysql_streamer對比

canal 由Java開發,分為服務端和客戶端,擁有眾多的衍生應用,性能穩定,功能強大;canal 需要自己編寫客戶端來消費canal解析到的數據。

maxwell相對於canal的優勢是使用簡單,它直接將數據變更輸出為json字符串,不需要再編寫客戶端。

快速開始

首先MySQL需要先啟用binlog,關於什么是MySQL binlog,可以參考文章《MySQL Binlog 介紹

$ vi my.cnf

[mysqld]
server_id=1
log-bin=master
binlog_format=row

創建Maxwell用戶,並賦予 maxwell 庫的一些權限

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

使用 maxwell 之前需要先啟動 kafka

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
# 啟動Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

單機啟動 kafka 之前,需要修改一下配置文件,打開配置文件 vi config/server.properties,在文件最后加入 advertised.host.name 的配置,值為 kafka 所在機器的IP

advertised.host.name=10.100.97.246

不然后面通過 docker 啟動 maxwell 將會報異常(其中的 hadoop2 是我的主機名)

17:45:21,446 DEBUG NetworkClient - [Producer clientId=producer-1] Error connecting to node hadoop2:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: hadoop2:9092
        at org.apache.kafka.common.network.Selector.connect(Selector.java:217) ~[kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:263) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-1.0.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:101) ~[?:1.8.0_181]
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[?:1.8.0_181]
        at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-1.0.0.jar:?]
        ... 6 more

接着可以啟動 kafka

bin/kafka-server-start.sh config/server.properties

測試 kafka

# 創建一個 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# 列出所有 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 啟動一個生產者,然后隨意發送一些消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

# 在另一個終端啟動一下消費者,觀察所消費的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

通過 docker 快速安裝並使用 Maxwell (當然之前需要自行安裝 docker)

# 拉取鏡像 
docker pull zendesk/maxwell

# 啟動maxwell,並將解析出的binlog輸出到控制台
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='10.100.97.246' --producer=stdout

測試Maxwell,首先創建一張簡單的表,然后增改刪數據

CREATE TABLE `test` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `age` int(11) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
insert into test values(1,22,"小旋鋒");
update test set name='whirly' where id=1;
delete from test where id=1;

觀察docker控制台的輸出,從輸出的日志中可以看出Maxwell解析出的binlog的JSON字符串的格式

{"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小旋鋒"}}
{"database":"test","table":"test","type":"update","ts":1552153502,"xid":833,"commit":true,"data":{"id":1,"age":22,"name":"whirly"},"old":{"name":"小旋鋒"}}
{"database":"test","table":"test","type":"delete","ts":1552153502,"xid":834,"commit":true,"data":{"id":1,"age":22,"name":"whirly"}}

輸出到 Kafka,關閉 docker,重新設置啟動參數

docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='123456' --host='10.100.97.246' --producer=kafka \
    --kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug

然后啟動一個消費者來消費 maxwell topic的消息,觀察其輸出;再一次執行增改刪數據的SQL,仍然可以得到相同的輸出

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell

輸出JSON字符串的格式

  • data 最新的數據,修改后的數據
  • old 舊數據,修改前的數據
  • type 操作類型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知類型)
  • xid 事務id
  • commit 同一個xid代表同一個事務,事務的最后一條語句會有commit,可以利用這個重現事務
  • server_id
  • thread_id
  • 運行程序時添加參數--output_ddl,可以捕捉到ddl語句
  • datetime列會輸出為"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"會原樣輸出
  • maxwell支持多種編碼,但僅輸出utf8編碼
  • maxwell的TIMESTAMP總是作為UTC處理,如果要調整為自己的時區,需要在后端邏輯上進行處理

與輸出格式相關的配置如下

選項 參數值 描述 默認值
output_binlog_position BOOLEAN 是否包含 binlog position false
output_gtid_position BOOLEAN 是否包含 gtid position false
output_commit_info BOOLEAN 是否包含 commit and xid true
output_xoffset BOOLEAN 是否包含 virtual tx-row offset false
output_nulls BOOLEAN 是否包含值為NULL的字段 true
output_server_id BOOLEAN 是否包含 server_id false
output_thread_id BOOLEAN 是否包含 thread_id false
output_schema_id BOOLEAN 是否包含 schema_id false
output_row_query BOOLEAN 是否包含 INSERT/UPDATE/DELETE 語句. Mysql需要開啟 binlog_rows_query_log_events false
output_ddl BOOLEAN 是否包含 DDL (table-alter, table-create, etc) events false
output_null_zerodates BOOLEAN 是否將 '0000-00-00' 轉換為 null? false

進階使用

基本的配置

選項 參數值 描述 默認值
config 配置文件 config.properties 的路徑
log_level [debug | info | warn | error] 日志級別 info
daemon 指定Maxwell實例作為守護進程到后台運行
env_config_prefix STRING 匹配該前綴的環境變量將被視為配置值

可以把Maxwell的啟動參數寫到一個配置文件 config.properties 中,然后通過 config 選項指定,bin/maxwell --config config.properties

user=maxwell
password=123456
host=10.100.97.246
producer=kafka
kafka.bootstrap.servers=10.100.97.246:9092
kafka_topic=maxwell

mysql 配置選項

Maxwell 根據用途將 MySQL 划分為3種角色:

  • host:主機,建maxwell庫表,存儲捕獲到的schema等信息

    • 主要有六張表,bootstrap用於數據初始化,schemas記錄所有的binlog文件信息,databases記錄了所有的數據庫信息,tables記錄了所有的表信息,columns記錄了所有的字段信息,positions記錄了讀取binlog的位移信息,heartbeats記錄了心跳信息
  • replication_host:復制主機,Event監聽,讀取該主機binlog

    • hostreplication_host分開,可以避免 replication_user 往生產庫里寫數據
  • schema_host:schema主機,捕獲表結構schema的主機

    • binlog里面沒有字段信息,所以maxwell需要從數據庫查出schema,存起來。
    • schema_host一般用不到,但在binlog-proxy場景下就很實用。比如要將已經離線的binlog通過maxwell生成json流,於是自建一個mysql server里面沒有結構,只用於發送binlog,此時表機構就可以制動從 schema_host 獲取。

通常,這三個主機都是同一個,schema_host 只在有 replication_host 的時候使用。

與MySQL相關的有下列配置

選項 參數值 描述 默認值
host STRING mysql 地址 localhost
user STRING mysql 用戶名
password STRING mysql 密碼 (no password)
port INT mysql 端口 3306
jdbc_options STRING mysql jdbc connection options DEFAULT_JDBC_OPTS
ssl SSL_OPT SSL behavior for mysql cx DISABLED
schema_database STRING Maxwell用於維護的schema和position將使用的數據庫 maxwell
client_id STRING 用於標識Maxwell實例的唯一字符串 maxwell
replica_server_id LONG 用於標識Maxwell實例的唯一數字 6379 (see notes)
master_recovery BOOLEAN enable experimental master recovery code false
gtid_mode BOOLEAN 是否開啟基於GTID的復制 false
recapture_schema BOOLEAN 重新捕獲最新的表結構(schema),不可在 config.properties中配置 false
replication_host STRING server to replicate from. See split server roles schema-store host
replication_password STRING password on replication server (none)
replication_port INT port on replication server 3306
replication_user STRING user on replication server
replication_ssl SSL_OPT SSL behavior for replication cx cx DISABLED
schema_host STRING server to capture schema from. See split server roles schema-store host
schema_password STRING password on schema-capture server (none)
schema_port INT port on schema-capture server 3306
schema_user STRING user on schema-capture server
schema_ssl SSL_OPT SSL behavior for schema-capture server DISABLED

生產者的配置

僅介紹kafka,其他的生產者的配置詳見官方文檔。

kafka是maxwell支持最完善的一個生產者,並且內置了多個版本的kafka客戶端(0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.2.1 or 0.11.0.1, 1.0.0.),默認 kafka_version=1.0.0(當前Maxwell版本1.20.0)

Maxwell 會將消息投遞到Kafka的Topic中,該Topic由 kafka_topic 選項指定,默認值為 maxwell,除了指定為靜態的Topic,還可以指定為動態的,譬如 namespace_%{database}_%{table}%{database}%{table} 將被具體的消息的 database 和 table 替換。

Maxwell 讀取配置時,如果配置項是以 kafka. 開頭,那么該配置將設置到 Kafka Producer 客戶端的連接參數中去,譬如

kafka.acks = 1
kafka.compression.type = snappy
kafka.retries=5

下面是Maxwell通用生產者和Kafka生產者的配置參數

選項 參數值 描述 默認值
producer PRODUCER_TYPE 生產者類型 stdout
custom_producer.factory CLASS_NAME 自定義消費者的工廠類
producer_ack_timeout PRODUCER_ACK_TIMEOUT 異步消費認為消息丟失的超時時間(毫秒ms)
producer_partition_by PARTITION_BY 輸入到kafka/kinesis的分區函數 database
producer_partition_columns STRING 若按列分區,以逗號分隔的列名稱
producer_partition_by_fallback PARTITION_BY_FALLBACK producer_partition_by=column時需要,當列不存在是使用
ignore_producer_error BOOLEAN 為false時,在kafka/kinesis發生錯誤時退出程序;為true時,僅記錄日志 See also dead_letter_topic true
kafka.bootstrap.servers STRING kafka 集群列表,HOST:PORT[,HOST:PORT]
kafka_topic STRING kafka topic maxwell
dead_letter_topic STRING 詳見官方文檔
kafka_version KAFKA_VERSION 指定maxwell的 kafka 生產者客戶端版本,不可在config.properties中配置 0.11.0.1
kafka_partition_hash [default | murmur3] 選擇kafka分區時使用的hash方法 default
kafka_key_format [array | hash] how maxwell outputs kafka keys, either a hash or an array of hashes hash
ddl_kafka_topic STRING output_ddl為true時, 所有DDL的消息都將投遞到該topic kafka_topic

過濾器配置

Maxwell 可以通過 --filter 配置項來指定過濾規則,通過 exclude 排除,通過 include 包含,值可以為具體的數據庫、數據表、數據列,甚至用 Javascript 來定義復雜的過濾規則;可以用正則表達式描述,有幾個來自官網的例子

# 僅匹配foodb數據庫的tbl表和所有table_數字的表
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
# 排除所有庫所有表,僅匹配db1數據庫
--filter = 'exclude: *.*, include: db1.*'
# 排除含db.tbl.col列值為reject的所有更新
--filter = 'exclude: db.tbl.col = reject'
# 排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
# blacklist 黑名單,完全排除bad_db數據庫,若要恢復,必須刪除maxwell庫
--filter = 'blacklist: bad_db.*' 

數據初始化

Maxwell 啟動后將從maxwell庫中獲取上一次停止時position,從該斷點處開始讀取binlog。如果binlog已經清除了,那么怎樣可以通過maxwell把整張表都復制出來呢?也就是數據初始化該怎么做?

對整張表進行操作,人為地產生binlog?譬如找一個不影響業務的字段譬如update_time,然后加一秒,再減一秒?

update test set update_time = DATE_ADD(update_time,intever 1 second);
update test set update_time = DATE_ADD(update_time,intever -1 second);

這樣明顯存在幾個大問題:

  • 不存在一個不重要的字段怎么辦?每個字段都很重要,不能隨便地修改!
  • 如果整張表很大,修改的過程耗時很長,影響了業務!
  • 將產生大量非業務的binlog!

針對數據初始化的問題,Maxwell 提供了一個命令工具 maxwell-bootstrap 幫助我們完成數據初始化,maxwell-bootstrap 是基於 SELECT * FROM table 的方式進行全量數據初始化,不會產生多余的binlog!

這個工具有下面這些參數:

參數 說明
--log_level LOG_LEVEL 日志級別(DEBUG, INFO, WARN or ERROR)
--user USER mysql 用戶名
--password PASSWORD mysql 密碼
--host HOST mysql 地址
--port PORT mysql 端口
--database DATABASE 要bootstrap的表所在的數據庫
--table TABLE 要引導的表
--where WHERE_CLAUSE 設置過濾條件
--client_id CLIENT_ID 指定執行引導操作的Maxwell實例

實驗一番,下面將引導 test 數據庫中 test 表,首先是准備幾條測試用的數據

INSERT INTO `test` VALUES (1, 1, '1');
INSERT INTO `test` VALUES (2, 2, '2');
INSERT INTO `test` VALUES (3, 3, '3');
INSERT INTO `test` VALUES (4, 4, '4');

然后 reset master; 清空binlog,刪除 maxwell 庫中的表。接着使用快速開始中的命令,啟動Kafka、Maxwell和Kafka消費者,然后啟動 maxwell-bootstrap

docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap --user maxwell  \
    --password 123456 --host 10.100.97.246  --database test --table test --client_id maxwell

注意:--bootstrapper=sync 時,在處理bootstrap時,會阻塞正常的binlog解析;--bootstrapper=async 時,不會阻塞。

也可以執行下面的SQL,在 maxwell.bootstrap 表中插入記錄,手動觸發

insert into maxwell.bootstrap (database_name, table_name) values ('test', 'test');

就可以在 kafka 消費者端看見引導過來的數據了

{"database":"maxwell","table":"bootstrap","type":"insert","ts":1552199115,"xid":36738,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell"}}
{"database":"test","table":"test","type":"bootstrap-start","ts":1552199115,"data":{}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":1,"age":1,"name":"1"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":2,"age":2,"name":"2"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":3,"age":3,"name":"3"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":4,"age":4,"name":"4"}}
{"database":"maxwell","table":"bootstrap","type":"update","ts":1552199115,"xid":36756,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":1,"inserted_rows":4,"total_rows":0,"created_at":null,"started_at":"2019-03-10 14:25:15","completed_at":"2019-03-10 14:25:15","binlog_file":"mysql-bin.000001","binlog_position":64446,"client_id":"maxwell"},"old":{"is_complete":0,"inserted_rows":1,"completed_at":null}}
{"database":"test","table":"test","type":"bootstrap-complete","ts":1552199115,"data":{}}

中間的4條便是 test.test 的binlog數據了,注意這里的 type 是 bootstrap-insert,而不是 insert

然后再一次查看binlog,show binlog events;,會發現只有與 maxwell 相關的binlog,並沒有 test.test 相關的binlog,所以 maxwell-bootstrap 命令並不會產生多余的 binlog,當數據表的數量很大時,這個好處會更加明顯

Bootstrap 的過程是 bootstrap-start -> bootstrap-insert -> bootstrap-complete,其中,start和complete的data字段為空,不攜帶數據。

在進行bootstrap過程中,如果maxwell崩潰,重啟時,bootstrap會完全重新開始,不管之前進行到多少,若不希望這樣,可以到數據庫中設置 is_complete 字段值為1(表示完成),或者刪除該行

Maxwell監控

Maxwell 提供了 base logging mechanism, JMX, HTTP or by push to Datadog 這四種監控方式,與監控相關的配置項有下列這些:

選項 參數值 描述 默認值
metrics_prefix STRING 指標的前綴 MaxwellMetrics
metrics_type [slf4j | jmx | http | datadog] 發布指標的方式
metrics_jvm BOOLEAN 是否收集JVM信息 false
metrics_slf4j_interval SECONDS 將指標記錄到日志的頻率,metrics_type須配置為slf4j 60
http_port INT metrics_type為http時,發布指標綁定的端口 8080
http_path_prefix STRING http的路徑前綴 /
http_bind_address STRING http發布指標綁定的地址 all addresses
http_diagnostic BOOLEAN http是否開啟diagnostic后綴 false
http_diagnostic_timeout MILLISECONDS http diagnostic 響應超時時間 10000
metrics_datadog_type [udp | http] metrics_type為datadog時發布指標的方式 udp
metrics_datadog_tags STRING 提供給 datadog 的標簽,如 tag1:value1,tag2:value2
metrics_datadog_interval INT 推指標到datadog的頻率,單位秒 60
metrics_datadog_apikey STRING metrics_datadog_type=http 時datadog用的api key
metrics_datadog_host STRING metrics_datadog_type=udp時推指標的目標地址 localhost
metrics_datadog_port INT metrics_datadog_type=udp 時推指標的端口 8125

具體可以得到哪些監控指標呢?有如下,注意所有指標都預先配置了指標前綴 metrics_prefix

指標 類型 說明
messages.succeeded Counters 成功發送到kafka的消息數量
messages.failed Counters 發送失敗的消息數量
row.count Counters 已處理的binlog行數,注意並非所有binlog都發往kafka
messages.succeeded.meter Meters 消息成功發送到Kafka的速率
messages.failed.meter Meters 消息發送失敗到kafka的速率
row.meter Meters 行(row)從binlog連接器到達maxwell的速率
replication.lag Gauges 從數據庫事務提交到Maxwell處理該事務之間所用的時間(毫秒)
inflightmessages.count Gauges 當前正在處理的消息數(等待來自目的地的確認,或在消息之前)
message.publish.time Timers 向kafka發送record所用的時間(毫秒)
message.publish.age Timers 從數據庫產生事件到發送到Kafka之間的時間(毫秒),精確度為+/-500ms
replication.queue.time Timers 將一個binlog事件送到處理隊列所用的時間(毫秒)

上述有些指標為kafka特有的,並不支持所有的生產者。

實驗一番,通過 http 方式獲取監控指標

docker run -p 8080:8080 -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='123456' --host='10.100.97.246' --producer=kafka \
    --kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug \
    --metrics_type=http  --metrics_jvm=true --http_port=8080

上面的配置大部分與前面的相同,不同的有 -p 8080:8080 docker端口映射,以及 --metrics_type=http --metrics_jvm=true --http_port=8080,配置了通過http方式發布指標,啟用收集JVM信息,端口為8080,之后可以通過 http://10.100.97.246:8080/metrics 便可獲取所有的指標

Maxwell監控

http 方式有四種后綴,分別對應四種不同的格式

endpoint 說明
/metrics 所有指標以JSON格式返回
/prometheus 所有指標以Prometheus格式返回(Prometheus是一套開源的監控&報警&時間序列數據庫的組合)
/healthcheck 返回Maxwell過去15分鍾是否健康
/ping 簡單的測試,返回 pong

如果是通過 JMX 的方式收集Maxwell監控指標,可以 JAVA_OPTS 環境變量配置JMX訪問權限

export JAVA_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9010 \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=10.100.97.246"

多個Maxwell實例

在不同的配置下,Maxwell可以在同一個主服務器上運行多個實例。如果希望讓生產者以不同的配置運行,例如將來自不同組的表(table)的事件投遞到不同的Topic中,這將非常有用。Maxwell的每個實例都必須配置一個唯一的client_id,以便區分的binlog位置。

GTID 支持

Maxwell 從1.8.0版本開始支持基於GTID的復制(GTID-based replication),在GTID模式下,Maxwell將在主機更改后透明地選擇新的復制位置。

什么是GTID Replication?

GTID (Global Transaction ID) 是對於一個已提交事務的編號,並且是一個全局唯一的編號。

從 MySQL 5.6.5 開始新增了一種基於 GTID 的復制方式。通過 GTID 保證了每個在主庫上提交的事務在集群中有一個唯一的ID。這種方式強化了數據庫的主備一致性,故障恢復以及容錯能力。

在原來基於二進制日志的復制中,從庫需要告知主庫要從哪個偏移量進行增量同步,如果指定錯誤會造成數據的遺漏,從而造成數據的不一致。借助GTID,在發生主備切換的情況下,MySQL的其它從庫可以自動在新主庫上找到正確的復制位置,這大大簡化了復雜復制拓撲下集群的維護,也減少了人為設置復制位置發生誤操作的風險。另外,基於GTID的復制可以忽略已經執行過的事務,減少了數據發生不一致的風險。

注意事項

timestamp column

maxwell對時間類型(datetime, timestamp, date)都是當做字符串處理的,這也是為了保證數據一致(比如0000-00-00 00:00:00這樣的時間在timestamp里是非法的,但mysql卻認,解析成java或者python類型就是null/None)。

如果MySQL表上的字段是 timestamp 類型,是有時區的概念,binlog解析出來的是標准UTC時間,但用戶看到的是本地時間。比如 f_create_time timestamp 創建時間是北京時間 2018-01-05 21:01:01,那么mysql實際存儲的是 2018-01-05 13:01:01,binlog里面也是這個時間字符串。如果不做消費者不做時區轉換,會少8個小時。

與其每個客戶端都要考慮這個問題,我覺得更合理的做法是提供時區參數,然后maxwell自動處理時區問題,否則要么客戶端先需要知道哪些列是timestamp類型,或者連接上原庫緩存上這些類型。

binary column

maxwell可以處理binary類型的列,如blob、varbinary,它的做法就是對二進制列使用 base64_encode,當做字符串輸出到json。消費者拿到這個列數據后,不能直接拼裝,需要 base64_decode

表結構不同步

如果是拿比較老的binlog,放到新的mysql server上去用maxwell拉去,有可能表結構已經發生了變化,比如binlog里面字段比 schema_host 里面的字段多一個。目前這種情況沒有發現異常,比如阿里RDS默認會為 無主鍵無唯一索引的表,增加一個__##alibaba_rds_rowid##__,在 show create tableschema 里面都看不到這個隱藏主鍵,但binlog里面會有,同步到從庫。

另外我們有通過git去管理結構版本,如果真有這種場景,也可以應對。

大事務binlog

當一個事物產生的binlog量非常大的時候,比如遷移日表數據,maxwell為了控制內存使用,會自動將處理不過來的binlog放到文件系統

Using kafka version: 0.11.0.1
21:16:07,109 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
21:16:07,380 INFO  SchemaStoreSchema - Creating maxwell database
21:16:07,540 INFO  Maxwell - Maxwell v?? is booting (RabbitmqProducer), starting at Position[BinlogPosition[mysql-bin.006235:24980714],
lastHeartbeat=0]
21:16:07,649 INFO  AbstractSchemaStore - Maxwell is capturing initial schema
21:16:08,267 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.006235:24980714
21:16:08,324 INFO  BinaryLogClient - Connected to rm-xxxxxxxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006235/24980714 (sid:637
9, cid:9182598)
21:16:08,325 INFO  BinlogConnectorLifecycleListener - Binlog connected.
03:15:36,104 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell7935334910787514257events
03:17:14,880 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell3143086481692829045events

但是遇到另外一個問題,overflow隨后就出現異常 EventDataDeserializationException: Failed to deserialize data of EventHeaderV4,當我另起一個maxwell指點之前的binlog postion開始解析,卻有沒有拋異常。事后的數據也表明並沒有數據丟失。

問題產生的原因還不明,Caused by: java.net.SocketException: Connection reset,感覺像讀取 binlog 流的時候還沒讀取到完整的event,異常關閉了連接。這個問題比較頑固,github上面類似問題都沒有達到明確的解決。(這也從側面告訴我們,大表數據遷移,也要批量進行,不要一個insert into .. select 搞定)

03:18:20,586 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell5229190074667071141events
03:19:31,289 WARN  BinlogConnectorLifecycleListener - Communication failure.
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{time
stamp=1514920657000, eventType=WRITE_ROWS, serverId=2115082720, headerLength=19, dataLength=8155, nextPosition=520539918, flags=0}
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:216) ~[mys
ql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:184) ~[mysql-binlog-c
onnector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:890) [mysql-binlog-connector-java-0
.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559) [mysql-binlog-connector-java-0.13.0.jar:0.13
.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793) [mysql-binlog-connector-java-0.13.0.jar:0.13.0
]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[?:1.8.0_121]
        at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_121]
        at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:51) ~[mysql-binlog-connector-
java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:202) ~[mysql-binlo
g-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:184) ~[mysql-binlog-connector-java-0.13
.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46) ~[mysql-binlog-connector-jav
a-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeLong(AbstractRowsEventDataD
eserializer.java:212) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataD
eserializer.java:150) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:132) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDeserializer.java:64) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:56) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:32) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:210) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        ... 5 more
03:19:31,514 INFO  BinlogConnectorLifecycleListener - Binlog disconnected.
03:19:31,590 WARN  BinlogConnectorReplicator - replicator stopped at position: mysql-bin.006236:520531744 -- restarting
03:19:31,595 INFO  BinaryLogClient - Connected to rm-xxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006236/520531744 (sid:6379, cid:9220521)

tableMapCache

前面講過,如果我只想獲取某幾個表的binlog變更,需要用 include_tables 來過濾,但如果mysql server上現在刪了一個表t1,但我的binlog是從昨天開始讀取,被刪的那個表t1在maxwell啟動的時候是拉取不到表結構的。然后昨天的binlog里面有 t1 的變更,因為找不到表結構給來組裝成json,會拋異常。

手動在 maxwell.tables/columns 里面插入記錄是可行的。但這個問題的根本是,maxwell在binlog過濾的時候,只在處理row_event的時候,而對 tableMapCache 要求binlog里面的所有表都要有。

自己(seanlook)提交了一個commit,可以在做 tableMapCache 的時候也僅要求緩存 include_dbs/tables 這些表: https://github.com/seanlook/maxwell/commit/2618b70303078bf910a1981b69943cca75ee04fb

提高消費性能

在用rabbitmq時,routing_key%db%.%table%,但某些表產生的binlog增量非常大,就會導致各隊列消息量很不平均,目前因為還沒做到事務xid或者thread_id級別的並發回放,所以最小隊列粒度也是表,盡量單獨放一個隊列,其它數據量小的合在一起。

binlog

Maxwell 在 maxwell 庫中維護了 binlog 的位移等信息,由於一些原因譬如 reset master;,導致 maxwell 庫中的記錄與實際的binlog對不上,這時將報異常,這是可以手動修正binlog位移或者直接清空/刪除 maxwell 庫重建

com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
        at java.lang.Thread.run(Thread.java:748)

以及

com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin.000001' at 760357, the last event read from './mysql-bin.000001' at 1888540, the last byte read from './mysql-bin.000001' at 1888540.
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
        at java.lang.Thread.run(Thread.java:748)

參考文檔

關注_小旋鋒_微信公眾號


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM