最近要做一個運營管理新系統,要求把原來的一些舊的子系統的數據(MySQL)抽取出來放到數據中心,並要求把子系統的增量數據實時同步到數據中心;一聽這需求覺得一次性同步倒不是很難,難就是難在增量數據要實時同步;數據庫這塊本來就不是強項,加上是剛開始用MySQL所以一下子想不出什么好的解決方案。
經過Google一個上午終於找到了一個自己覺得可行的方案,使用Maxwell來監聽MySQL的BinLog文件。Maxwell的具體介紹請參考 https://blog.csdn.net/wwwdc1012/article/details/88388552。這里主要記錄下在本機搭建環境時遇到的一些問題。
先說下本機環境:
Docker Desktop: Linux Containers
MySQL: 安裝在 Docker Desktop里面;
RabbitMQ: 安裝在Docker Desktop里面;
同樣的Maxwell也將安裝在 Docker Desktop里
docker pull zendesk/maxwell
開啟MySQL的 BinLog:
在Powershell 使用 docker ps 查看 mysql的容器ID或者容器名字
使用 docker exec -it 容器名/容器ID /bin/bash 進入到MySQL容器
登錄 MySQL: mysql -u root -h 127.0.0.1 -p
執行以下命令開啟MySQL的BinLog:
mysql> set global binlog_format=ROW; mysql> set global binlog_row_image=FULL;
將Maxwell連接到MySQL:
docker run -ti --rm zendesk/maxwell bin/maxwell --user='root' --password='123456' --host='172.17.0.2' --producer=stdout
--user: mysql 登錄用戶名
--password: mysql 登錄密碼
--host: mysql 服務IP
--producer: maxwell 輸出方式, 默認為stdout:控制台輸出
--port: mysql的端口, 默認為3306
這里特別強調下,因為我這里都是用docker 來安裝的所以每個應用都會有一個獨立的容器,剛開始的時候沒有考慮到這點 --host 使用了 127.0.0.1 就一直失敗;所以這里不能使用127.0.0.1或者localhost,只能使用容器IP,如果做了映射也可以用你本機的IP。
以下是連接成功
當我們對數據庫做增、刪、改時控制台輸出如下
接下來將把這個輸出,輸出到RabbitMQ:
docker run -ti --rm zendesk/maxwell bin/maxwell --user='root' --password='123456' --host='172.17.0.2' --producer=rabbitmq --rabbitmq_user='guest' --rabbitmq_pass='guest' --rabbitmq_host='172.17.0.3' --rabbitmq_port='5672' --rabbitmq_exchange='databaselogs' --rabbitmq_exchange_type='fanout'
--producer改為 rabbitmq;
--rabbitmq_user: mq登錄名;
--rabbitmq_pass: mq登錄密碼;
--rabbitmq_host: mq服務器地址,我這里的172.17.0.3 為mq的容器IP, 也可以寫本機的IP, 也不能使用 localhost和127.0.0.1
--rabbitmq_port: mq端口;
--rabbitmq_exchange: mq交換器名稱,這里可以自定義,只要跟后台的程序一致就行了;
--rabbitmq_exchange_type: mq交換類型;
后台接收代碼:
var factory = new ConnectionFactory() { HostName = "localhost", UserName= "guest", Password= "guest" , Port= 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) {
// 這里的 exchange 要跟前面命令一致 channel.ExchangeDeclare(exchange: "databaselogs", type: "fanout"); channel.QueueBind(queue: "task_queue", exchange: "databaselogs", routingKey: ""); //channel.QueueDeclare(queue: "task_queue", // durable: true, // exclusive: false, // autoDelete: false, // arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
控制台輸出如下: