1.項目背景
1.1 MaxWell 介紹
Maxwell是一個守護進程,它能監聽並讀取MySQL的binlog,然后解析輸出為json,支持將數據輸出到Kafka、Kinesis或其他流媒體平台,支持庫和表的過濾。
地址:https://github.com/zendesk/maxwell
1.2 版本選型
maxwell-1.25.0
2.配置MySql
需要打開MySql的 binlog(默認是關閉),采用 row-based replication(RBR) 日志格式
binlog有三種格式:Statement、Row以及Mixed
–基於SQL語句的復制(statement-based replication,SBR)
–基於行的復制(row-based replication,RBR)
–混合模式復制(mixed-based replication,MBR)
STATMENT模式:每一條會修改數據的sql語句會記錄到binlog中
ROW模式:不記錄sql語句上下文相關信息,僅保存哪條記錄被修改
Mixed模式:從5.1.8版本開始,MySQL提供了Mixed格式,實際上就是Statement與Row的結合
2.1 創建my.cnf
cp /usr/share/mysql/my-default.cnf /etc/my.cnf
#my.cnf
[mysqld]
server_id=1 # 隨機指定
log-bin=master
binlog_format=row #選擇 Row 模式
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
2.2 啟動Mysql
mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;
2.3 配置 Maxwell庫 及 相關權限
Maxwell需要連接MySQL,並創建一個名稱為maxwell的數據庫存儲元數據,且需要能訪問需要同步的數據庫,新創建一個MySQL用戶專門用來給Maxwell使用
注意:MaxWell 在啟啟動時會自動創建 maxwell 庫
添加權限:
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> flush privileges;
2.4 重啟Mysql
service mysql restart
#查看 binlog 相關配置
show global variables like '%log_bin%'
#查看詳細的日志配置信息SHOW GLOBAL VARIABLES LIKE '%log%';
#mysql數據存儲目錄 show variables like '%dir%';
3.安裝MaxWell
tar /opt/software/maxwell-1.25.0.tar.gz -C /opt/module
3.1 MaxWell 配置參數
3.2 啟動MaxWell(kafka 方式)
#創建 kafka topic maxwell
kafka-topics.sh --zookeeper hadoop101:2181/kafka --create --replication-factor 1 --partitions 1 --topic maxwell
#啟動MaxWell
/opt/module/maxwell-1.25.0/bin/maxwell --user='maxwell' --password='123456' --host='hadoop101' --producer=kafka --kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092 --kafka_topic=maxwell
4.測試
4.1 創建測試數據
#mysql中創建測試表
create table test.maxwell_test(id int,name varchar(100));
#向maxwell_test插入數據
INSERT into maxwell_test values(1,'aa');
INSERT into maxwell_test values(2,'bb');
INSERT into maxwell_test values(3,'cc');
INSERT into maxwell_test values(4,'dd');
INSERT into maxwell_test values(5,'ee');
4.2 消費kafka中數據
package com.monk.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/**
* @ClassName KafkaConsumerMaxWell
* @Author wuning
* @Date: 2020/4/02 21:46
* @Description:
*/
public class KafkaConsumerMaxWell implements Runnable {
private String topic;
private KafkaConsumer<Integer,String> kafkaConsumer;
public KafkaConsumerMaxWell(String topic, String bootstrap, String groupId) {
this.topic = topic;
Properties props = new Properties();
props.put("bootstrap.servers", bootstrap);
//消費者組ID
props.put("group.id", groupId);
//設置自動提交offset
props.put("enable.auto.commit", "true");
//設置自動提交offset的延時(可能會造成重復消費的情況)
props.put("auto.commit.interval.ms", "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//key-value的反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<Integer, String>(props);
}
@Override
public void run() {
kafkaConsumer.subscribe((Collections.singletonList(topic)));
while(true){
ConsumerRecords<Integer,String> records = kafkaConsumer.poll(100);
for (ConsumerRecord record:records) {
System.out.println(record.value());
}
}
}
public static void main(String[] args) {
KafkaConsumerMaxWell kafkaConsumerMaxWell =
new KafkaConsumerMaxWell("maxwell", "hadoop101:9092,hadoop102:9092,hadoop103:9092", "g1");
new Thread(kafkaConsumerMaxWell).start();
}
}
Maxwell生成的數據格式為JSON,常見字段含義如下:
type:操作類型,包含database-create,database-drop,table-create,table-drop,table-alter,insert,update,delete
database:操作的數據庫名稱
ts:操作時間,13位時間戳
table:操作的表名
data:數據增加/刪除/修改之后的內容
old:數據修改前的內容或者表修改前的結構定義
sql:DDL操作的SQL語句
def:表創建與表修改的結構定義
xid:事物唯一ID
commit:數據增加/刪除/修改操作是否已提交