基於python的mysql復制工具


一簡介

python-mysql-replication 是由python實現的 MySQL復制協議工具,我們可以用它來解析binlog 獲取日志的insert,update,delete等事件 ,並基於此做其他業務需求。比如數據更改時失效緩存,監聽dml事件通知下游業務方做對應處理。

其項目信息

網址     http://www.github.com/noplay/python-mysql-replication
官方文檔 https://python-mysql-replication.readthedocs.io

二 實踐

2.1 安裝配置

獲取源代碼

git clone http://www.github.com/noplay/python-mysql-replication

使用pip 安裝

pip install mysql-replication

權限:
可以直接使用復制賬號也可以使用其他賬號,但是該賬號必須SELECT, REPLICATION SLAVE, REPLICATION CLIENT權限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'replicator'@'%' IDENTIFIED BY 'xxxxx';

數據庫日志相關的參數設置如下:

log_bin=on ,binlog_format=row,binlog_row_image=FULL

2.2 核心類介紹

python-mysql-replication 的入口是類BinLogStreamReader(),我們在使用該工具時需要實例化一個BinLogStreamReader()對象 stream,BinLogStreamReader 通過 ReportSlave 向主庫注冊作為一個slave角色,用於接受MySQL的binlog廣播。有興趣的可以研究其代碼具體實現。

該實例提供解析 binlog 各種事件的集合,每個事件也是一個對象。

初始化BinLogStreamReader()實例需要使用的參數如下:

connection_settings: 數據庫的連接配置信息
resume_stream:從位置或binlog的最新事件或舊的可用事件開始
log_file:設置復制開始日志文件
log_pos:設置復制開始日志pos(resume_stream應該為true)
auto_position:使用master_auto_position gtid設置位置
blocking:如果設置為True,會持續監聽binlog事件,如果設置為False 則會一次性解析所有可獲取的binlog。
only_events:只解析指定的事件 比如only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],參數類型是一個數組。

#### 以上是比較常用的參數

ignored_events:設置哪些事件可以被忽略。也是一個數組。

only_tables,ignored_tables,only_schemas,ignored_schemas ##根據字面意思理解

freeze_schema:如果為true,則不支持ALTER TABLE速度更快。
skip_to_timestamp:在達到指定的時間戳之前忽略所有事件,否則會解析所有可訪問的binlog
report_slave:用於向主庫注冊SHOW SLAVE HOSTS中slave,該值可以是字典比如{'hostname':'127.0.0.1', 'username':'root', 'password':'rep', 'port':3306}

slave_uuid:在SHOW SLAVE HOSTS中slave_uuid。
fail_on_table_metadata_unavailable:如果我們無法獲取有關row_events的表信息,應該引發異常。

2.3 如何使用呢?

最簡單的用法 腳本名 pyreplica.py

from pymysqlreplication import BinLogStreamReader
MYSQL_SETTINGS = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": ""
}

def main():
    # server_id is your slave identifier, it should be unique.
    # set blocking to True if you want to block and wait for the next event at
    # the end of the stream
    stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
                                server_id=3,
                                blocking=True)

    for binlogevent in stream:
        binlogevent.dump()
    stream.close() ###如果blocking=True ,改行記錄可以不用。
if __name__ == "__main__":
    main()


開啟兩個窗口,一個窗口執行,另外一個窗口操作mysql 寫入或者修改數據

python pyreplica.py

輸出如下:

=== GtidEvent ===
Date: 2019-06-25T17:41:34
Log position: 339
Event size: 42
Read bytes: 25
Commit: False
GTID_NEXT: cc726403-93d1-11e9-90b7-ecf4bbde7778:13
()
=== QueryEvent ===
Date: 2019-06-25T17:41:34
Log position: 411
Event size: 49
Read bytes: 49
Schema: test
Execution time: 0
Query: BEGIN
()
=== TableMapEvent ===
Date: 2019-06-25T17:41:34
Log position: 456
Event size: 22
Read bytes: 21
Table id: 126
Schema: test
Table: x
Columns: 2
()
=== WriteRowsEvent ===
Date: 2019-06-25T17:41:34
Log position: 500
Event size: 21
Read bytes: 12
Table: test.x
Affected columns: 2
Changed rows: 1
Values:
--
('*', u'a', ':', 1)
('*', u'id', ':', 18)
()
=== XidEvent ===
Date: 2019-06-25T17:41:34
Log position: 531
Event size: 8
Read bytes: 8
Transaction ID: 1293393
()

2.3 拓展

基於該工具提供的日志事件解析我們可以做很多事情,比較有名的工具 binlog2sql 利用該工具解析binlog 做數據回滾 。

mysql-replication.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent,
)
import sys
import json

mysql_settings = {'host': '127.0.0.1','port': 3306, 
                  'user': 'replica', 'passwd': 'xxxx'}
def main():

    stream = BinLogStreamReader(
        connection_settings=mysql_settings,
        server_id=1,
        blocking=True,
        only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])

    for binlogevent in stream:
        for row in binlogevent.rows:
            event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}
            if isinstance(binlogevent, DeleteRowsEvent):
                event["action"] = "delete"
                event["values"] = dict(row["values"].items())
                event = dict(event.items())
            elif isinstance(binlogevent, UpdateRowsEvent):
                event["action"] = "update"
                event["before_values"] = dict(row["before_values"].items())
                event["after_values"] = dict(row["after_values"].items())
                event = dict(event.items())
            elif isinstance(binlogevent, WriteRowsEvent):
                event["action"] = "insert"
                event["values"] = dict(row["values"].items())
                event = dict(event.items())
            print json.dumps(event)
            sys.stdout.flush()

if __name__ == "__main__":
    main()

執行腳本結果 如下圖

除了解析binlog,我們還可以用python-mysql-replication 做數據全量加增量遷移。比如僅僅遷移某些大表而不是整個庫的時候,可以用到。有興趣的朋友可以想想大概的算法。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM