1.步驟
kafka作為消息隊列通常用來收集各個服務產生的數據,而下游各種數據服務訂閱消費數據,本文通過使用clickhouse 自帶的kafka 引擎,來同步消費數據。
同步步驟:
kafka中創建topic,創建消費者並消費該topic(查看消費情況)
建立目標表(通常是MergeTree引擎系列),用來存儲kafka中的數據;
建立kafka引擎表,用於接入kafka數據源;
創建Materialized View(物化視圖), 監聽kafka中的數據並將數據同步到clickhouse的目標表中;
2.創建測試數據源
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-reader # 創建消費者指定topic ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic kafka-reader --group kafka-reader-group
3.創建數據儲存目標表
CREATE TABLE target( day Date, level String, message String ) ENGINE = SummingMergeTree(day, (day, level), 8192);
4.創建kafka消費表
1 )使用kafka引擎創建queue表來連接kafka並讀取topic中的數據。該數據表訂閱了名為kafka-reader的消息主題,且消費組的名稱為kafka-reader-group,⽽消息的格式采⽤了JSONEachRow。
2 )在此之后,查詢這張數據表就能夠看到Kafka的數據了。但是再次查詢這張便就會沒有數據了,這是因為Kafka表引擎在執⾏查詢之后就會刪除表內的數據。
CREATE TABLE queue ( timestamp DateTime, level String, message String ) ENGINE = Kafka SETTINGS kafka_broker_list = '192.168.9.226:9092', kafka_topic_list = 'kafka-reader', kafka_row_delimiter = '\n', kafka_group_name = 'kafka-reader-group', kafka_format = 'JSONEachRow'
參數解析–必要參數:
kafka_broker_list – 以逗號分隔的kafka的brokers 列表 (192.168.9.226:9092)。
kafka_topic_list – topic 列表 (kafka-reader)。
kafka_group_name – Kafka 消費組名稱 (kafka-reader-group)。如果不希望消息在集群中重復,請在每個分片中使用相同的組名。
kafka_format – 消息體格式。JSONEachRow也就是普通的json格式,使用與 SQL 部分的 FORMAT 函數相同表示方法。
參數解析–可選參數:
kafka_row_delimiter - 每個消息體之間的分隔符。
kafka_schema – 如果解析格式需要一個 schema 時,此參數必填。例如,普羅托船長 需要 schema - 文件路徑以及根對象 schema.capnp:Message 的名字。
kafka_num_consumers – 單個表的消費者數量。默認值是:1,如果一個消費者的吞吐量不足,則指定更多的消費者。消費者的總數不應該超過 topic 中分區的數量,因為每個分區只能分配一個消費者。
5.創建Materialized View(物化視圖)傳輸數據
創建好的物化視圖,它將會在后台收集數據。可以持續不斷地從 Kafka 收集數據並通過 SELECT 將數據轉換為所需要的格式。
CREATE MATERIALIZED VIEW consumer TO target AS SELECT toDate(toDateTime(timestamp)) AS day, level,message FROM queue;
6.測試
生產者添加數據:
查詢目標表,查看消費數據
SELECT * FROM target ┌────────day─┬─level─┬─message─┐ │ 2020-12-01 │ 11 │ 不開心 │ │ 2020-12-30 │ 13 │ 寫博客 │ │ 2020-12-31 │ 15 │ 買可樂 │ │ 2020-12-31 │ 17 │ 真好喝 │ └────────────┴───────┴─────────┘ --查詢consumer物化視圖表,一般得到的數據和目標表差不多,除非實時數據很多,停止接收topic數據或更改轉換邏輯需要停用物化視圖,更改完之后再啟用物化視圖 -- 停用 DETACH TABLE consumer; -- 啟用 ATTACH TABLE consumer;
7.重載數據以及增添數據列
1)重讀Kafka數默認從Kafka Topic的開始位置開始,並在到達消息時對其進行讀取。這是正常的方式,但是有時重新讀取消息很有用。
例如,您可能想在修復架構中的錯誤或重新加載備份后重新讀取消息。幸運的是,這很容易做到。我們只是在消費者組中重置偏移量。
- 假設我們丟失了讀數表中的所有消息,並希望從Kafka重新加載它們。首先,讓我們使用TRUNCATE命令重載數據。
- 在重置分區上的偏移之前,我們需要關閉消息使用。通過在ClickHouse中分離queue表來執行此操作,如下所示。
- 接下來,使用以下Kafka命令在用於queue表的使用者組中重置分區偏移量。
注意:改命令需要在Kafka中進行操作。
- 登錄到
ClickHouse
,重新連接queue表
等待幾秒鍾,丟失的記錄將被恢復。此時可以使用SELECT
進行查詢。
2)添加數據列
顯示原始Kafka信息作為行通常很有用,Kafka表引擎也定義了虛擬列,以下更改數據表以顯示Topic分區和偏移量的方法。
- 分離Kafka表來禁用消息使用。不影響數據的生產
- 依次執行以下SQL命令來更改目標表和實例化視圖
注意:我們只是重新創建實例化視圖,而我們更改了目標表,該表保留了現有數據。
- 刪除並重新構建視圖表
- 重新連接
queue
表來再次啟用消息使用
- 查詢數據表信息
8.總結
Clickhouse消費kafka數據的過程中,通過kafka引擎表作為一個管道接收流入的數據,而物化視圖負責將kafka引擎表的數據實時同步到目標表中,我們通過不同sql語句封裝將kafka數據導入到不同目標表中。
另需注意:
在生產者發送數據后,當所有字段都非null時會寫入ch,當某個字段為null時,該條數據不能寫入ch,即使在創建物理表時設定了Nullable,但不會導致程序異常,只是不能寫入這條記錄。但是當創建物理表是設定了Nullable並且kafka引擎表在創建時也給定這個字段Nullable()時,此時這個字段為null值時,該條記錄才會被成功寫入,但是ch中該字段為null值。當缺失某個字段,這條記錄一樣會被同步到ch,缺失的字段value為null,當json消息多出一些字段,這條記錄一樣會被同步到ch,多余的字段會被忽略。ch物理表中的字段如果有Nullable修飾,則kafka引擎表中對應的字段也需要有Nullable修飾。如果不一致會停止接受數據。