clickhouse同步kafka數據方案


1.步驟

  kafka作為消息隊列通常用來收集各個服務產生的數據,而下游各種數據服務訂閱消費數據,本文通過使用clickhouse 自帶的kafka 引擎,來同步消費數據。

  同步步驟:

  kafka中創建topic,創建消費者並消費該topic(查看消費情況)
  建立目標表(通常是MergeTree引擎系列),用來存儲kafka中的數據;
  建立kafka引擎表,用於接入kafka數據源;
  創建Materialized View(物化視圖), 監聽kafka中的數據並將數據同步到clickhouse的目標表中;

 

2.創建測試數據源

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-reader

# 創建消費者指定topic
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic kafka-reader --group kafka-reader-group

 

3.創建數據儲存目標表

CREATE TABLE target(
    day Date,
    level String,
    message String
) ENGINE = SummingMergeTree(day, (day, level), 8192);

 

4.創建kafka消費表

  1 )使用kafka引擎創建queue表來連接kafka並讀取topic中的數據。該數據表訂閱了名為kafka-reader的消息主題,且消費組的名稱為kafka-reader-group,⽽消息的格式采⽤了JSONEachRow。
  2 )在此之后,查詢這張數據表就能夠看到Kafka的數據了。但是再次查詢這張便就會沒有數據了,這是因為Kafka表引擎在執⾏查詢之后就會刪除表內的數據。

CREATE TABLE queue (
timestamp DateTime,
level String,
message String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '192.168.9.226:9092',
kafka_topic_list = 'kafka-reader',
kafka_row_delimiter = '\n',
kafka_group_name = 'kafka-reader-group',
kafka_format = 'JSONEachRow'


  參數解析–必要參數:

  kafka_broker_list – 以逗號分隔的kafka的brokers 列表 (192.168.9.226:9092)。
  kafka_topic_list – topic 列表 (kafka-reader)。
  kafka_group_name – Kafka 消費組名稱 (kafka-reader-group)。如果不希望消息在集群中重復,請在每個分片中使用相同的組名。
  kafka_format – 消息體格式。JSONEachRow也就是普通的json格式,使用與 SQL 部分的 FORMAT 函數相同表示方法。
  參數解析–可選參數:

  kafka_row_delimiter - 每個消息體之間的分隔符。
  kafka_schema – 如果解析格式需要一個 schema 時,此參數必填。例如,普羅托船長 需要 schema - 文件路徑以及根對象 schema.capnp:Message 的名字。
  kafka_num_consumers – 單個表的消費者數量。默認值是:1,如果一個消費者的吞吐量不足,則指定更多的消費者。消費者的總數不應該超過 topic 中分區的數量,因為每個分區只能分配一個消費者。

5.創建Materialized View(物化視圖)傳輸數據

  創建好的物化視圖,它將會在后台收集數據。可以持續不斷地從 Kafka 收集數據並通過 SELECT 將數據轉換為所需要的格式。

CREATE MATERIALIZED VIEW consumer TO target
AS SELECT toDate(toDateTime(timestamp)) AS day, level,message
FROM queue;

 

6.測試

  生產者添加數據:

  查詢目標表,查看消費數據

SELECT *
FROM target

┌────────day─┬─level─┬─message─┐
│ 2020-12-0111 │ 不開心 │
│ 2020-12-3013 │ 寫博客 │
│ 2020-12-3115 │ 買可樂 │
│ 2020-12-3117 │ 真好喝 │
└────────────┴───────┴─────────┘


--查詢consumer物化視圖表,一般得到的數據和目標表差不多,除非實時數據很多,停止接收topic數據或更改轉換邏輯需要停用物化視圖,更改完之后再啟用物化視圖

-- 停用
DETACH TABLE consumer;
-- 啟用
ATTACH TABLE consumer;

7.重載數據以及增添數據列

  1)重讀Kafka數默認從Kafka Topic的開始位置開始,並在到達消息時對其進行讀取。這是正常的方式,但是有時重新讀取消息很有用。

  例如,您可能想在修復架構中的錯誤或重新加載備份后重新讀取消息。幸運的是,這很容易做到。我們只是在消費者組中重置偏移量。

  • 假設我們丟失了讀數表中的所有消息,並希望從Kafka重新加載它們。首先,讓我們使用TRUNCATE命令重載數據。
TRUNCATE TABLE queue;
  • 在重置分區上的偏移之前,我們需要關閉消息使用。通過在ClickHouse中分離queue表來執行此操作,如下所示。

DETACH TABLE queue;
  • 接下來,使用以下Kafka命令在用於queue表的使用者組中重置分區偏移量。

注意:改命令需要在Kafka中進行操作。

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092  --topic kafka-reader --group kafka-reader-group  --reset-offsets --to-earliest --execute
  • 登錄到ClickHouse,重新連接queue表
ATTACH TABLE queue;

  等待幾秒鍾,丟失的記錄將被恢復。此時可以使用SELECT進行查詢。

  2)添加數據列

  顯示原始Kafka信息作為行通常很有用,Kafka表引擎也定義了虛擬列,以下更改數據表以顯示Topic分區和偏移量的方法。

  • 分離Kafka表來禁用消息使用。不影響數據的生產
DETACH TABLE queue;
  • 依次執行以下SQL命令來更改目標表和實例化視圖

注意:我們只是重新創建實例化視圖,而我們更改了目標表,該表保留了現有數據。

ALTER TABLE target
ADD COLUMN name String;
  • 刪除並重新構建視圖表
DROP TABLE consumer;

CREATE MATERIALIZED VIEW consumer TO target
AS SELECT toDate(toDateTime(timestamp)) AS day, level,message,name
FROM queue;
  • 重新連接queue表來再次啟用消息使用
ATTACH TABLE consumer;
  • 查詢數據表信息
select * from consumer;

8.總結

  Clickhouse消費kafka數據的過程中,通過kafka引擎表作為一個管道接收流入的數據,而物化視圖負責將kafka引擎表的數據實時同步到目標表中,我們通過不同sql語句封裝將kafka數據導入到不同目標表中。
  另需注意:

  在生產者發送數據后,當所有字段都非null時會寫入ch,當某個字段為null時,該條數據不能寫入ch,即使在創建物理表時設定了Nullable,但不會導致程序異常,只是不能寫入這條記錄。但是當創建物理表是設定了Nullable並且kafka引擎表在創建時也給定這個字段Nullable()時,此時這個字段為null值時,該條記錄才會被成功寫入,但是ch中該字段為null值。當缺失某個字段,這條記錄一樣會被同步到ch,缺失的字段value為null,當json消息多出一些字段,這條記錄一樣會被同步到ch,多余的字段會被忽略。ch物理表中的字段如果有Nullable修飾,則kafka引擎表中對應的字段也需要有Nullable修飾。如果不一致會停止接受數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM