使用Maxwell 和 RabbitMQ監聽MySQL BinLog


  最近要做一個運營管理新系統,要求把原來的一些舊的子系統的數據(MySQL)抽取出來放到數據中心,並要求把子系統的增量數據實時同步到數據中心;一聽這需求覺得一次性同步倒不是很難,難就是難在增量數據要實時同步;數據庫這塊本來就不是強項,加上是剛開始用MySQL所以一下子想不出什么好的解決方案。

  經過Google一個上午終於找到了一個自己覺得可行的方案,使用Maxwell來監聽MySQL的BinLog文件。Maxwell的具體介紹請參考 https://blog.csdn.net/wwwdc1012/article/details/88388552。這里主要記錄下在本機搭建環境時遇到的一些問題。

  先說下本機環境:

    Docker Desktop:  Linux Containers

    MySQL: 安裝在 Docker Desktop里面;

    RabbitMQ: 安裝在Docker Desktop里面;

  同樣的Maxwell也將安裝在 Docker Desktop里

  

docker pull zendesk/maxwell

  開啟MySQL的 BinLog:

  在Powershell 使用 docker ps 查看 mysql的容器ID或者容器名字

  使用 docker exec -it   容器名/容器ID /bin/bash 進入到MySQL容器

  登錄 MySQL: mysql -u root -h 127.0.0.1 -p

  執行以下命令開啟MySQL的BinLog:

mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;

  將Maxwell連接到MySQL:

docker run -ti --rm zendesk/maxwell bin/maxwell --user='root' --password='123456' --host='172.17.0.2'  --producer=stdout

  --user: mysql 登錄用戶名

  --password: mysql 登錄密碼

  --host: mysql 服務IP

  --producer: maxwell 輸出方式, 默認為stdout:控制台輸出

  --port: mysql的端口, 默認為3306

  這里特別強調下,因為我這里都是用docker 來安裝的所以每個應用都會有一個獨立的容器,剛開始的時候沒有考慮到這點 --host 使用了 127.0.0.1 就一直失敗;所以這里不能使用127.0.0.1或者localhost,只能使用容器IP,如果做了映射也可以用你本機的IP。

以下是連接成功

 

 

當我們對數據庫做增、刪、改時控制台輸出如下

 接下來將把這個輸出,輸出到RabbitMQ:

docker run -ti --rm zendesk/maxwell bin/maxwell --user='root' 
--password='123456' 
--host='172.17.0.2'  
--producer=rabbitmq 
--rabbitmq_user='guest' 
--rabbitmq_pass='guest' 
--rabbitmq_host='172.17.0.3' 
--rabbitmq_port='5672' 
--rabbitmq_exchange='databaselogs'  
--rabbitmq_exchange_type='fanout'

  --producer改為 rabbitmq;

  --rabbitmq_user:  mq登錄名;

  --rabbitmq_pass: mq登錄密碼;

  --rabbitmq_host:  mq服務器地址,我這里的172.17.0.3 為mq的容器IP, 也可以寫本機的IP,  也不能使用 localhost和127.0.0.1

  --rabbitmq_port: mq端口;

  --rabbitmq_exchange: mq交換器名稱,這里可以自定義,只要跟后台的程序一致就行了;

  --rabbitmq_exchange_type: mq交換類型;

后台接收代碼:

var factory = new ConnectionFactory() { HostName = "localhost", UserName= "guest", Password= "guest" , Port= 5672 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
          // 這里的 exchange 要跟前面命令一致 channel.ExchangeDeclare(exchange: "databaselogs", type: "fanout"); channel.QueueBind(queue: "task_queue", exchange: "databaselogs", routingKey: ""); //channel.QueueDeclare(queue: "task_queue", // durable: true, // exclusive: false, // autoDelete: false, // arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();

  控制台輸出如下:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM