clickhouse從Kafka中導入數據


操作步驟

  1. 在ClickHouse集群中新建Kafka消費表。
     
    CREATE TABLE default.kafka_src_table ON CLUSTER default ( //定義表結構的字段 id Int32, age Int32, msg String ) ENGINE = Kafka() SETTINGS kafka_broker_list = '*', kafka_topic_list = 'test', kafka_group_name = 'test', kafka_format = 'JSONEachRow';
    參數說明
    • kafka_broker_list:對應的Kafka集群地址
    • kafka_topic_list:對應消費的topic。
    • kafka_group_name:消費topic的group,需要先在Kafka中創建
    • kafka_format:ClickHouse可以處理的數據類型。
      • JSONEachRow表示每行一條數據的json格式。一般如果是json格式的話,設置JSONEachRow即可。
      • 如果需要輸入嵌套的json,請設置input_format_import_nested_json=1。
      關於ClickHouse支持的各種格式,可以參考官網:Formats for Input and Output Data

    更多屬性設置,可以參考:ClickHouse集成kafka

  2. 創建ClickHouse目的表。
    1. 創建本地表。
       
      create table default.kafka_table_local ON CLUSTER default ( id Int32, age UInt32, msg String ) ENGINE = ReplicatedMergeTree( '/clickhouse/tables/kafka_sink_table/{shard}', '{replica}') ORDER BY (id);
    2. 創建分布式表。
       
      CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
  3. 創建view把Kafka消費表消費到的數據導入ClickHouse目的表。
     
    CREATE MATERIALIZED VIEW consumer TO kafka_table_distributed AS SELECT * FROM kafka_src_table;
     
    說明 Kafka消費表不能直接作為結果表使用。Kafka消費表只是用來消費Kafka數據,沒有真正的存儲所有數據。
     
     
     
     


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM