【Centos】使用confluent將Mysql數據同步到clickhouse


一直在尋找如何存儲大數據的解決辦法,碰巧在技術網站上了解到了Clickhouse,能支撐幾十億甚至百億以上的數據量,頓時我覺得有必要去部署一套用用。
 
clickhouse是存入數據的,但是還缺少監聽mysql的工具,讓binlog變化存入到clickhouse中。試了下clickhouse自帶的MaterializeMySQL,不支持json,剛好我用到了,尷尬!不然這個就是最簡單的方式,直接clickhouse內置的cdc就解決了。
 
后來了解到了多款cdc框架,最后選擇了用Debezium。
 
接下來是使用kafka和zookeeper作為中間件,接收和轉發數據。在了解的過程中又發現了confluent,包含了kafka和zookeeper,於是愉快的選擇了confluent。
 
環境信息:
Centos:6.10
Mysql:5.7
confluent: 6.0.0
 
實現功能:
目前是只支持數據的插入,不會修改和刪除
 
一、mysql配置
1.配置mysql的配置,在[mysqld] 下面添加或啟用如下配置
[mysqld]
server-id = 223344
log_bin = mysql-bin
binlog_format = row
binlog_row_image = full
expire_logs_days = 10

然后重啟Mysql

 
創建數據庫和表
CREATE DATABASE `test`
 
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(10) unsigned zerofill NOT NULL
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=latin1;

 

二、安裝配置confluent(注:以下是單機模式部署)(Centos)
 
 
2.配置confluent的環境變量 (/etc/profile 或者 ~/.bashrc)
export CONFLUENT_HOME=/datadisk/conflunt/confluent-6.0.0
export PATH=$PATH:$CONFLUENT_HOME/bin

 

3.啟動confluent

confluent local services start

顯示如下信息(注:不知為啥,我每次都要多次執行啟動,才能順序啟動完下列服務)

ZooKeeper is [UP]
Kafka is [UP]
Schema Registry is [UP]
Kafka REST is [UP]
Connect is [UP]
ksqlDB Server is [UP]
Control Center is [UP]

 

4.在confluent目錄下的etc文件夾下,創建kafka-connect-debezium目錄,並在新建的目錄下創建文件register-mysql.json
{
    "name":"test-connector",
    "config":{
        "connector.class":"io.debezium.connector.mysql.MySqlConnector",
        "tasks.max":"1",
        "database.hostname":"127.0.0.1",
        "database.port":"3306",
        "database.user":"root",
        "database.password":"123456",
        "database.server.id":"1",
        "database.server.name":"testserver2",
        "database.whitelist":"test",
        "database.history.kafka.bootstrap.servers":"localhost:9092",
        "database.history.kafka.topic":"schema-changes.test",
        "transforms":"unwrap,changetopic,dropFieldBefore",
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.changetopic.regex":"(.*)",
        "transforms.changetopic.replacement":"$1-smt",
        "transforms.dropFieldBefore.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.dropFieldBefore.blacklist":"before"
    }
}

以上配置是用於連接數據庫,並監聽數據庫test,然后寫入kafka。其中transforms是用於轉換數據格式。

 
confluent-hub install debezium/debezium-connector-mysql:latest

 

6.啟動連接,在register-mysql.json所在的目錄,直接執行:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

就創建了與Mysql的連接

 
7.啟動kafka消費者監聽
kafka-avro-console-consumer --topic testserver2.test.user-smt --bootstrap-server 127.0.0.1:9092 --from-beginning

 

其中testserver2.test.user-smt可以通過如下命令查看:
kafka-topics --list --zookeeper localhost:2181

 

8.測試
根據第8步中配置的,在已存在的user表中插入數據,就可以看到有數據打印
{"id":1,"name":{"string":"awen"},"age":10}

到這里confluent就配置好了

 
三、安裝、配置clickhouse
 
1.安裝clickhouse (Centos)參考 https://www.cnblogs.com/gomysql/p/11199856.html
curl -s https://packagecloud.io/install/repositories/altinity/clickhouse/script.rpm.sh | sudo bash
yum install -y clickhouse-server clickhouse-client
clickhouse-server

 

2.創建監聽kafka的表
啟動clickhouse-client:
CREATE TABLE queue (id UInt64, name Nullable(String), age UInt64) ENGINE = Kafka SETTINGS kafka_broker_list='localhost:9092', kafka_topic_list='testserver2.test.user-smt', kafka_group_name ='group1', kafka_format='AvroConfluent', format_avro_schema_registry_url = 'http://localhost:8081/subjects/testserver2.test.user-smt-value/versions/latest';

(注queue表就是消費kafka的消息,不會存儲)

 
format_avro_schema_registry_url 內容可以根據下面命令查找(jq是json格式化工具):
curl --silent -X GET http://localhost:8081/subjects/ | jq .

 

創建接收的表
CREATE TABLE user (id UInt64, name Nullable(String), age UInt64) ENGINE = MergeTree() order by id ;

 

創建 MATERIALIZED VIEW,將queue中的數據消費,並存儲到user表中
CREATE MATERIALIZED VIEW queue_consumer TO user as select * from queue;

 

當mysql表中數據變化,使用clickhouse-client查詢user表:
ecs-15d4.novalocal :) select * from user;
 
SELECT * FROM user
 
┌─id─┬─name─┬─age─┐
│ 7      │ ggg       │ 21      │
└───┴──────┴────┘
 
1 rows in set. Elapsed: 0.005 sec.
 
至此就配置完成了整個流程。
 
 
參考:
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM