MySQL to Redis同步


方式1:使用MQ在這里插入圖片描述

對於此類業務,增加一個消費訂閱基本沒什么成本,服務本身也不需要做任何更改。唯一需要擔心的一個問題是丟消息的情況?因為現在消息是緩存數據的唯一來源,一旦出現丟消息,緩存里缺失的那條數據永遠不會被補上。 MQ 集群,像 Kafka 或者 RocketMQ,它都有高可用和高可靠的保證機制,可以滿足數據可靠性要求的。

方式2:使用 Binlog 實時更新 Redis 緩存

在這里插入圖片描述
數據更新服務只負責處理業務邏輯,更新 MySQL,完全不用管如何去更新緩存。另外起程序,支撐binlog的解析到更新redis的過程。
常用canal負責更新緩存的服務,把自己偽裝成一個 MySQL 的從節點,從 MySQL 接收 Binlog,解析 Binlog 之后,可以得到實時的數據變更信息,然后根據這個變更信息去更新 Redis 緩存。

canal 官方圖

在這里插入圖片描述
官方:canal 特別設計了 client-server 模式,交互協議使用 protobuf 3.0 , client 端可采用不同語言實現不同的消費邏輯。

我的配置過程:

  1. 開啟binlog
[mysqld]
log-bin = /usr/local/var/mysql/mysql_bin_log/mysql-bin #開啟binlog/路徑
server_id = 1 ## 配置一個ServerID
binlog_format=ROW #格式

給 Canal 開一個專門的 MySQL 用戶並授權,確保這個用戶有復制 Binlog 的權限:

CREATE USER canal IDENTIFIED BY 'canal'; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

查看是否開啟binlog:
show variables like '%log_bin%';
在這里插入圖片描述
查看對應的binlog以及對應記錄行數位置:
show master status
在這里插入圖片描述
安裝canal:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar zvfx canal.deployer-1.1.4.tar.gz

配置:
vim canal/conf/example/instance.properties


canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000007
canal.instance.master.position=1905
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName=test
# table regex
canal.instance.filter.regex=.*\\..

啟動/重啟/停止腳本位於bin目錄下
使用canal/bin/startup.sh啟動,啟動之后看一下日志文件 canal/logs/example/example.log,如果里面沒有報錯,就說明啟動成功並連接到我們的 MySQL 上了。

使用客戶端進行消費(Python):

#encoding:utf-8
#author:donghao

import time
import redis
import json
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='donghao')
cache = redis.Redis(connection_pool=pool)
while True:
    message = client.get(100)
    entries = message['entries']
    for entry in entries:
        entry_type = entry.entryType
        if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
            continue
        row_change = EntryProtocol_pb2.RowChange()
        row_change.MergeFromString(entry.storeValue)
        event_type = row_change.eventType
        header = entry.header
        database = header.schemaName
        table = header.tableName
        event_type = header.eventType
        for row in row_change.rowDatas:
            format_data = dict()
            if event_type == EntryProtocol_pb2.EventType.DELETE:
                for column in row.beforeColumns:
                    format_data = {
                        column.name: column.value
                    }
            elif event_type == EntryProtocol_pb2.EventType.INSERT:
                for column in row.afterColumns:
                    format_data = {
                        column.name: column.value
                    }
            else:
                format_data['before'] = format_data['after'] = dict()
                for column in row.beforeColumns:
                    format_data['before'][column.name] = column.value
                for column in row.afterColumns:
                    format_data['after'][column.name] = column.value
            data = dict(
                db=database,
                table=table,
                event_type=event_type,
                data=format_data,
            )
            if event_type == EntryProtocol_pb2.EventType.UPDATE:
                # 更新前
                cache_key = "article:article_id_{}".format(format_data['before']['id'])
                cached_article = cache.get(cache_key)
                if cached_article:
                    print('更新前 ', json.loads(cached_article))
                # 更新緩存
                cache.set(cache_key, json.dumps(format_data['after']))
                print('更新后 ', json.loads(cache.get(cache_key)))
            print(data)
    time.sleep(1)
client.disconnect()

在這里插入圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM