一直在尋找如何存儲大數據的解決辦法,碰巧在技術網站上了解到了Clickhouse,能支撐幾十億甚至百億以上的數據量,頓時我覺得有必要去部署一套用用。
clickhouse是存入數據的,但是還缺少監聽mysql的工具,讓binlog變化存入到clickhouse中。試了下clickhouse自帶的MaterializeMySQL,不支持json,剛好我用到了,尷尬!不然這個就是最簡單的方式,直接clickhouse內置的cdc就解決了。
后來了解到了多款cdc框架,最后選擇了用Debezium。
接下來是使用kafka和zookeeper作為中間件,接收和轉發數據。在了解的過程中又發現了confluent,包含了kafka和zookeeper,於是愉快的選擇了confluent。
環境信息:
Centos:6.10
Mysql:5.7
confluent: 6.0.0
實現功能:
目前是只支持數據的插入,不會修改和刪除
一、mysql配置
1.配置mysql的配置,在[mysqld] 下面添加或啟用如下配置
[mysqld] server-id = 223344 log_bin = mysql-bin binlog_format = row binlog_row_image = full expire_logs_days = 10
然后重啟Mysql
創建數據庫和表
CREATE DATABASE `test` CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(10) unsigned zerofill NOT NULL PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=latin1;
二、安裝配置confluent(注:以下是單機模式部署)(Centos)
1.下載confluent
https://www.confluent.io/download/
2.配置confluent的環境變量 (/etc/profile 或者 ~/.bashrc)
export CONFLUENT_HOME=/datadisk/conflunt/confluent-6.0.0 export PATH=$PATH:$CONFLUENT_HOME/bin
3.啟動confluent
confluent local services start
顯示如下信息(注:不知為啥,我每次都要多次執行啟動,才能順序啟動完下列服務)
ZooKeeper is [UP]
Kafka is [UP]
Schema Registry is [UP]
Kafka REST is [UP]
Connect is [UP]
ksqlDB Server is [UP]
Control Center is [UP]
4.在confluent目錄下的etc文件夾下,創建kafka-connect-debezium目錄,並在新建的目錄下創建文件register-mysql.json
{ "name":"test-connector", "config":{ "connector.class":"io.debezium.connector.mysql.MySqlConnector", "tasks.max":"1", "database.hostname":"127.0.0.1", "database.port":"3306", "database.user":"root", "database.password":"123456", "database.server.id":"1", "database.server.name":"testserver2", "database.whitelist":"test", "database.history.kafka.bootstrap.servers":"localhost:9092", "database.history.kafka.topic":"schema-changes.test", "transforms":"unwrap,changetopic,dropFieldBefore", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.changetopic.regex":"(.*)", "transforms.changetopic.replacement":"$1-smt", "transforms.dropFieldBefore.type":"org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.dropFieldBefore.blacklist":"before" } }
以上配置是用於連接數據庫,並監聽數據庫test,然后寫入kafka。其中transforms是用於轉換數據格式。
5.安裝mysql connector,具體可查看
https://docs.confluent.io/current/connect/debezium-connect-mysql/index.html
confluent-hub install debezium/debezium-connector-mysql:latest
6.啟動連接,在register-mysql.json所在的目錄,直接執行:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
就創建了與Mysql的連接
7.啟動kafka消費者監聽
kafka-avro-console-consumer --topic testserver2.test.user-smt --bootstrap-server 127.0.0.1:9092 --from-beginning
其中testserver2.test.user-smt可以通過如下命令查看:
kafka-topics --list --zookeeper localhost:2181
8.測試
根據第8步中配置的,在已存在的user表中插入數據,就可以看到有數據打印
{"id":1,"name":{"string":"awen"},"age":10}
到這里confluent就配置好了
三、安裝、配置clickhouse
1.安裝clickhouse (Centos)參考
https://www.cnblogs.com/gomysql/p/11199856.html
curl -s https://packagecloud.io/install/repositories/altinity/clickhouse/script.rpm.sh | sudo bash yum install -y clickhouse-server clickhouse-client clickhouse-server
2.創建監聽kafka的表
啟動clickhouse-client:
CREATE TABLE queue (id UInt64, name Nullable(String), age UInt64) ENGINE = Kafka SETTINGS kafka_broker_list='localhost:9092', kafka_topic_list='testserver2.test.user-smt', kafka_group_name ='group1', kafka_format='AvroConfluent', format_avro_schema_registry_url = 'http://localhost:8081/subjects/testserver2.test.user-smt-value/versions/latest';
(注queue表就是消費kafka的消息,不會存儲)
format_avro_schema_registry_url 內容可以根據下面命令查找(jq是json格式化工具):
curl --silent -X GET http://localhost:8081/subjects/ | jq .
創建接收的表
CREATE TABLE user (id UInt64, name Nullable(String), age UInt64) ENGINE = MergeTree() order by id ;
創建
MATERIALIZED VIEW,將queue中的數據消費,並存儲到user表中
CREATE MATERIALIZED VIEW queue_consumer TO user as select * from queue;
當mysql表中數據變化,使用clickhouse-client查詢user表:
ecs-15d4.novalocal :) select * from user;
SELECT * FROM user
┌─id─┬─name─┬─age─┐
│ 7 │ ggg │ 21 │
└───┴──────┴────┘
1 rows in set. Elapsed: 0.005 sec.
至此就配置完成了整個流程。


參考: