Clickhouse 與 Kafka 的數據同步


作者: LemonNan

原文地址: https://mp.weixin.qq.com/s/SUUHF9R_FKg-3vq7Q3cwBQ

注: 轉載需注明作者及原文地址

介紹

Clickhouse 本身為一個分析型數據庫,提供很多跟其他組件的同步方案,本文將以 Kafka 作為數據來源介紹如何將 Kafka 的數據同步到 Clickhouse 中。

流程圖

話不多說,先上一張數據同步的流程圖

建表

在數據同步之前,我們需要建對應的 clickhouse 表,根據上面的流程圖,我們需要建立三個表:

1.數據表

2.kafka 引擎表

3.物化視圖

數據表

# 創建數據表
CREATE DATABASE IF NOT EXISTS data_sync;
CREATE TABLE IF NOT EXISTS data_sync.test
(
    name String DEFAULT 'lemonNan' COMMENT '姓名',
    age int DEFAULT 18 COMMENT '年齡',
    gongzhonghao String DEFAULT 'lemonCode' COMMENT '公眾號',
    my_time DateTime64(3, 'UTC') COMMENT '時間'
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(my_time)
ORDER BY my_time

引擎表

# 創建 kafka 引擎表, 地址: 172.16.16.4, topic: lemonCode
CREATE TABLE IF NOT EXISTS data_sync.test_queue(
    name String,
    age int,
    gongzhonghao String, 
    my_time DateTime64(3, 'UTC')
) ENGINE = Kafka
SETTINGS
  kafka_broker_list = '172.16.16.4:9092',
  kafka_topic_list = 'lemonCode',
  kafka_group_name = 'lemonNan',
  kafka_format = 'JSONEachRow',
  kafka_row_delimiter = '\n',
  kafka_schema = '',
  kafka_num_consumers = 1

物化視圖

# 創建物化視圖
CREATE MATERIALIZED VIEW IF NOT EXISTS test_mv TO test AS SELECT name, age, gongzhonghao, my_time FROM test_queue;

數據模擬

下面是開始模擬流程圖的數據走向,已安裝 Kafka 的可以跳過安裝步驟。

安裝 kafka

kafka 這里為了演示安裝的是單機

# 啟動 zookeeper
docker run -d --name zookeeper -p 2181:2181  wurstmeister/zookeeper
# 啟動 kafka, KAFKA_ADVERTISED_LISTENERS 后的 ip地址為機器ip
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.16.4:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

使用kafka命令發送數據

# 啟動生產者,向 topic lemonCode 發送消息
kafka-console-producer.sh --bootstrap-server 172.16.16.4:9092 --topic lemonCode
# 發送以下消息
{"name":"lemonNan","age":20,"gongzhonghao":"lemonCode","my_time":"2022-03-06 18:00:00.001"}
{"name":"lemonNan","age":20,"gongzhonghao":"lemonCode","my_time":"2022-03-06 18:00:00.001"}
{"name":"lemonNan","age":20,"gongzhonghao":"lemonCode","my_time":"2022-03-06 18:00:00.002"}
{"name":"lemonNan","age":20,"gongzhonghao":"lemonCode","my_time":"2022-03-06 23;59:59.002"}

查看 Clickhouse 的數據表

select * from test;

到這一步,數據已經從 Kafka 同步到了 Clickhouse 中了,怎么說呢,還是比較方便的。

關於數據副本

這里使用的表引擎是 ReplicateMergeTree , 用 ReplicateMergeTree 的一個原因是生成多個數據副本,減少數據丟失風險,使用 ReplicateMergeTree 引擎的話,數據會自動同步到相同分片的其他節點上。

在實際情況里,還有一種方式也可以進行數據的同步,通過使用不同的 kafka consumer group 進行數據消費。

具體見下圖:

副本方案1

通過 ReplicateMergeTree 的同步機制將數據同步到同分片下其他節點,同步時占用消費節點資源。

副本方案2

通過 Kafka 本身的消費機制,將消息廣播至多個 Clickhouse 節點,數據同步不占用 Clickhouse 額外資源。

注意的地方

搭建過程可能需要注意的地方

  • 本文出現的 172.16.16.4 為機器內網ip
  • 一般引擎表以 queue 為結尾,物化視圖以 mv 為結尾,辨識度會高一點

總結

本文介紹了數據從 Kafka 同步至 Clickhouse以及多副本的方案,Clickhouse 還提供了很多其它的集成方案,包括 Hive、MongoDB、S3、SQLite、Kafka 等等等等,具體可以看下方鏈接。

集成的表引擎:https://clickhouse.com/docs/zh/engines/table-engines/integrations/

最后

歡迎掃描下方二維碼或搜索公眾號 LemonCode , 一起交流學習!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM