一簡介
python-mysql-replication 是由python實現的 MySQL復制協議工具,我們可以用它來解析binlog 獲取日志的insert,update,delete等事件 ,並基於此做其他業務需求。比如數據更改時失效緩存,監聽dml事件通知下游業務方做對應處理。
其項目信息
網址 http://www.github.com/noplay/python-mysql-replication
官方文檔 https://python-mysql-replication.readthedocs.io
二 實踐
2.1 安裝配置
獲取源代碼
git clone http://www.github.com/noplay/python-mysql-replication
使用pip 安裝
pip install mysql-replication
權限:
可以直接使用復制賬號也可以使用其他賬號,但是該賬號必須SELECT, REPLICATION SLAVE, REPLICATION CLIENT權限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'replicator'@'%' IDENTIFIED BY 'xxxxx';
數據庫日志相關的參數設置如下:
log_bin=on ,binlog_format=row,binlog_row_image=FULL
2.2 核心類介紹
python-mysql-replication 的入口是類BinLogStreamReader(),我們在使用該工具時需要實例化一個BinLogStreamReader()對象 stream,BinLogStreamReader 通過 ReportSlave 向主庫注冊作為一個slave角色,用於接受MySQL的binlog廣播。有興趣的可以研究其代碼具體實現。
該實例提供解析 binlog 各種事件的集合,每個事件也是一個對象。
初始化BinLogStreamReader()實例需要使用的參數如下:
connection_settings: 數據庫的連接配置信息
resume_stream:從位置或binlog的最新事件或舊的可用事件開始
log_file:設置復制開始日志文件
log_pos:設置復制開始日志pos(resume_stream應該為true)
auto_position:使用master_auto_position gtid設置位置
blocking:如果設置為True,會持續監聽binlog事件,如果設置為False 則會一次性解析所有可獲取的binlog。
only_events:只解析指定的事件 比如only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],參數類型是一個數組。
#### 以上是比較常用的參數
ignored_events:設置哪些事件可以被忽略。也是一個數組。
only_tables,ignored_tables,only_schemas,ignored_schemas ##根據字面意思理解
freeze_schema:如果為true,則不支持ALTER TABLE速度更快。
skip_to_timestamp:在達到指定的時間戳之前忽略所有事件,否則會解析所有可訪問的binlog
report_slave:用於向主庫注冊SHOW SLAVE HOSTS中slave,該值可以是字典比如{'hostname':'127.0.0.1', 'username':'root', 'password':'rep', 'port':3306}
slave_uuid:在SHOW SLAVE HOSTS中slave_uuid。
fail_on_table_metadata_unavailable:如果我們無法獲取有關row_events的表信息,應該引發異常。
2.3 如何使用呢?
最簡單的用法 腳本名 pyreplica.py
from pymysqlreplication import BinLogStreamReader
MYSQL_SETTINGS = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": ""
}
def main():
# server_id is your slave identifier, it should be unique.
# set blocking to True if you want to block and wait for the next event at
# the end of the stream
stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
server_id=3,
blocking=True)
for binlogevent in stream:
binlogevent.dump()
stream.close() ###如果blocking=True ,改行記錄可以不用。
if __name__ == "__main__":
main()
開啟兩個窗口,一個窗口執行,另外一個窗口操作mysql 寫入或者修改數據
python pyreplica.py
輸出如下:
=== GtidEvent ===
Date: 2019-06-25T17:41:34
Log position: 339
Event size: 42
Read bytes: 25
Commit: False
GTID_NEXT: cc726403-93d1-11e9-90b7-ecf4bbde7778:13
()
=== QueryEvent ===
Date: 2019-06-25T17:41:34
Log position: 411
Event size: 49
Read bytes: 49
Schema: test
Execution time: 0
Query: BEGIN
()
=== TableMapEvent ===
Date: 2019-06-25T17:41:34
Log position: 456
Event size: 22
Read bytes: 21
Table id: 126
Schema: test
Table: x
Columns: 2
()
=== WriteRowsEvent ===
Date: 2019-06-25T17:41:34
Log position: 500
Event size: 21
Read bytes: 12
Table: test.x
Affected columns: 2
Changed rows: 1
Values:
--
('*', u'a', ':', 1)
('*', u'id', ':', 18)
()
=== XidEvent ===
Date: 2019-06-25T17:41:34
Log position: 531
Event size: 8
Read bytes: 8
Transaction ID: 1293393
()
2.3 拓展
基於該工具提供的日志事件解析我們可以做很多事情,比較有名的工具 binlog2sql 利用該工具解析binlog 做數據回滾 。
mysql-replication.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
import sys
import json
mysql_settings = {'host': '127.0.0.1','port': 3306,
'user': 'replica', 'passwd': 'xxxx'}
def main():
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=1,
blocking=True,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
for binlogevent in stream:
for row in binlogevent.rows:
event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["values"] = dict(row["values"].items())
event = dict(event.items())
elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["before_values"] = dict(row["before_values"].items())
event["after_values"] = dict(row["after_values"].items())
event = dict(event.items())
elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["values"] = dict(row["values"].items())
event = dict(event.items())
print json.dumps(event)
sys.stdout.flush()
if __name__ == "__main__":
main()
執行腳本結果 如下圖
除了解析binlog,我們還可以用python-mysql-replication 做數據全量加增量遷移。比如僅僅遷移某些大表而不是整個庫的時候,可以用到。有興趣的朋友可以想想大概的算法。