數據收集之binlog同步 Maxwell --->Kafka


配置MySQL

MySQL 開啟Binlog

 1 #開啟binlog
 2 #修改my.cnf配置文件 增加如下內容
 3 [root@node2 /root]# vim /etc/my.cnf
 4 
 5 [mysqld]
 6 #binlog文件保存目錄及binlog文件名前綴
 7 #binlog文件保存目錄: /var/lib/mysql/
 8 #binlog文件名前綴: mysql-binlog
 9 #mysql向文件名前綴添加數字后綴來按順序創建二進制日志文件 如mysql-binlog.000006 mysql-binlog.000007
10 log-bin=/var/lib/mysql/mysql-binlog
11 #選擇基於行的日志記錄方式
12 binlog-format=ROW
13 #服務器 id
14 #binlog數據中包含server_id,標識該數據是由那個server同步過來的
15 server_id=1

MySQL 配置權限

 1 --創建Maxwell的用戶
 2 mysql>CREATE USER 'maxwell_sync'@'%' IDENTIFIED BY 'maxwell_sync_1';
 3 -- Maxwell需要在待同步的庫上建立schema_database庫,將狀態存儲在`schema_database`選項指定的數據庫中(默認為`maxwell`)
 4 
 5 --授權
 6 mysql>GRANT ALL on maxwell.* to 'maxwell_sync'@'192.168.197.130';
 7 --解釋:grant all指的是授權所有操作權限(增刪改查),*.*指的是所有數據庫,maxwell指的是用戶名,maxwell_sync是密碼,192.168.197.130指的是所要授權的遠程IP地址
 8 
 9 
10 mysql>GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell_sync'@'%';
11 
12 mysql>FLUSH PRIVILEGES;  --刷新,使修改生效

Maxwell

下載解壓 

[root@node2 /data/software]# wget https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz

[root@node2 /data/software]# tar -zxvf maxwell-1.17.1.tar.gz

maxwell配置並同步至kafka  

    將maxwell工具下載到linux機器上,主要是配置config.properties文件,重要的配置參考如下:
    
log_level=info   
host=
user=
password=
port=
jdbc_options=autoReconnect=true  // mysql 超時重連
schema_database=  // 用於在mysql中新建一個binlog相關的數據庫實例
producer=kafka
kafka.bootstrap.servers=
kafka_topic=
kafka.compression.type=snappy
kafka.retries=1
kafka.acks=1
kinesis_stream=maxwell
include_dbs=  // 需要處理的數據庫實例
include_tables= // 需要處理的表格,用逗號分隔
kafka_version=0.9.0.1
client_id= // 標識符,可以包含英文
replica_server_id= // 只能是數字
expire_logs_days=0 //防止binlog斷,maxwell失敗
啟動maxwell   配置文件
     nohup  bin/maxwell --config config.properties --log_level DEBUG &
 
命令行啟動maxwell
#輸出到kafka
bin/maxwell --user='maxwell_sync' --password='maxwell_sync_1' --host='localhost'  --port=3306  --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_version=0.9 --kafka_topic=rotopic

#抽取多個庫可添加include_dbs,逗號分隔
#指定數據表 include_tables ,逗號分隔
#maxwell模擬mysql slave,所以多個maxwell進程時,每個進程的client.id及replica_server_id保證不同
#binlog如果斷了,可能會maxwell失敗,最好設置mysql的expire_logs_days=0

#輸出到控制台用如下配置
bin/maxwell --user='maxwell_sync' --password='maxwell_sync_1' --host='localhost'  --port=3306  --producer=stdout

 

kafka

啟動kafka

#開啟kafka消費者
kafka-console-consumer.sh --zookeeper localhost:2181 --topic rotopic --from-beginning 

 

改變數據庫內容可看到如下結果:

 

問題描述:

1com.google.code.or.net.TransportException: Access denied; you need the REPLICATION SLAVE privilege for this operation  ##用戶權限問題導致;

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM