python使用canal做mysql數據庫實時同步


python使用canal做mysql數據庫實時同步

1 簡介

​ 做兩個服務器的數據庫實時同步

​ cannal,是阿里的一個mysql增量訂閱&消費工具:https://github.com/alibaba/canal

cannal分為服務端和客戶端:

服務端可以理解為一個mysql服務端(即高可用架構中的從節點),為了讓canal服務端生效,我們需要進行一些簡單的配置, 讓canal服務端向真正的mysql服務端發送獲取binlog請求,並且將binlog解析以后存在本地的數據結構中

客戶端可以理解為某種意義上的數據庫客戶端,通過一些簡單的編碼,我們可以獲取存在canal服務端的已被解析的binlog數據 (增量數據),獲取數據以后,即可進行定制化的處理

2 服務端安裝

官方文檔:https://github.com/alibaba/canal/wiki/QuickStart

01 mysql准備

  • 修改配置

對於自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下

vi /etc/my.cnf

直接在這個位置加上下邊三行

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
  • 授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限

1 重啟mysql

sudo etc/init.d/mysql restart

2 用mysql的root用戶登陸mysql,查看 log_bin 變量

mysql -u root -p
show variables like 'log_bin';

如果是on,表示該功能已開啟

3 在mysql添加以下用戶和權限

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

02 cannal配置

  • 下載cannal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz

或者下載安裝包:

地址:https://github.com/alibaba/canal/releases

![image-20200904171734024](/Users/tianzhh/Library/Application Support/typora-user-images/image-20200904171734024.png)

  • 解壓
mkdir /tmp/canal
tar zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz  -C /tmp/canal
  • 修改配置
vi conf/example/instance.properties
#position info,需要改成自己的數據庫信息
canal.instance.master.address = 127.0.0.1:3306 

#username/password,需要改成自己的數據庫信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
# 你要同步的數據庫
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
  • 啟動 停止
sh bin/startup.sh
sh bin/stop.sh

3 客戶端

github主頁:https://github.com/haozi3156666/canal-python

自己python開發客戶端詳解

安裝

pip3 install canal-python
pip3 install protobuf

完整代碼

import time

from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2

client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')

while True:
    message = client.get(100)
    entries = message['entries']
    for entry in entries:
        entry_type = entry.entryType
        if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
            continue
        row_change = EntryProtocol_pb2.RowChange()
        row_change.MergeFromString(entry.storeValue)
        event_type = row_change.eventType
        header = entry.header
        database = header.schemaName
        table = header.tableName
        event_type = header.eventType
        for row in row_change.rowDatas:
            format_data = dict()
            if event_type == EntryProtocol_pb2.EventType.DELETE:
                for column in row.beforeColumns:
                    format_data = {
                        column.name: column.value
                    }
            elif event_type == EntryProtocol_pb2.EventType.INSERT:
                for column in row.afterColumns:
                    format_data = {
                        column.name: column.value
                    }
            else:
                format_data['before'] = format_data['after'] = dict()
                for column in row.beforeColumns:
                    format_data['before'][column.name] = column.value
                for column in row.afterColumns:
                    format_data['after'][column.name] = column.value
            data = dict(
                db=database,
                table=table,
                event_type=event_type,
                data=format_data,
            )
            print(data)
    time.sleep(1)

client.disconnect()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM