篇五|ClickHouse數據導入(Flink、Spark、Kafka、MySQL、Hive)


本文分享主要是ClickHouse的數據導入方式,本文主要介紹如何使用Flink、Spark、Kafka、MySQL、Hive將數據導入ClickHouse,具體內容包括:

  • 使用Flink導入數據
  • 使用Spark導入數據
  • 從Kafka中導入數據
  • 從MySQL中導入數據
  • 從Hive中導入數據
    在這里插入圖片描述

使用Flink導入數據

本文介紹使用 flink-jdbc將數據導入ClickHouse,Maven依賴為:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
    <version>1.10.1</version>
</dependency>

示例

本示例使用Kafka connector,通過Flink將Kafka數據實時導入到ClickHouse

public class FlinkSinkClickHouse {
    public static void main(String[] args) throws Exception {
        String url = "jdbc:clickhouse://192.168.10.203:8123/default";
        String user = "default";
        String passwd = "hOn0d9HT";
        String driver = "ru.yandex.clickhouse.ClickHouseDriver";
        int batchsize = 500; // 設置batch size,測試的話可以設置小一點,這樣可以立刻看到數據被寫入

        // 創建執行環境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        String kafkaSource11 = "" +
                "CREATE TABLE user_behavior ( " +
                " `user_id` BIGINT, -- 用戶id\n" +
                " `item_id` BIGINT, -- 商品id\n" +
                " `cat_id` BIGINT, -- 品類id\n" +
                " `action` STRING, -- 用戶行為\n" +
                " `province` INT, -- 用戶所在的省份\n" +
                " `ts` BIGINT, -- 用戶行為發生的時間戳\n" +
                " `proctime` AS PROCTIME(), -- 通過計算列產生一個處理時間列\n" +
                " `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間\n" +
                " WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 在eventTime上定義watermark\n" +
                ") WITH ( 'connector' = 'kafka', -- 使用 kafka connector\n" +
                " 'topic' = 'user_behavior', -- kafka主題\n" +
                " 'scan.startup.mode' = 'earliest-offset', -- 偏移量,從起始 offset 開始讀取\n" +
                " 'properties.group.id' = 'group1', -- 消費者組\n" +
                " 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址\n" +
                " 'format' = 'json', -- 數據源格式為 json\n" +
                " 'json.fail-on-missing-field' = 'true',\n" +
                " 'json.ignore-parse-errors' = 'false'" +
                ")";

        // Kafka Source
        tEnv.executeSql(kafkaSource11);
        String query = "SELECT user_id,item_id,cat_id,action,province,ts FROM user_behavior";
        Table table = tEnv.sqlQuery(query);

        String insertIntoCkSql = "INSERT INTO behavior_mergetree(user_id,item_id,cat_id,action,province,ts)\n" +
                "VALUES(?,?,?,?,?,?)";

        //將數據寫入 ClickHouse Sink
        JDBCAppendTableSink sink = JDBCAppendTableSink
                .builder()
                .setDrivername(driver)
                .setDBUrl(url)
                .setUsername(user)
                .setPassword(passwd)
                .setQuery(insertIntoCkSql)
                .setBatchSize(batchsize)
                .setParameterTypes(Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG)
                .build();

        String[] arr = {"user_id","item_id","cat_id","action","province","ts"};
        TypeInformation[] type = {Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG};

        tEnv.registerTableSink(
                "sink",
                arr,
                type,
                sink
        );

        tEnv.insertInto(table, "sink");

        tEnv.execute("Flink Table API to ClickHouse Example");
    }

}

Note:

  • 由於 ClickHouse 單次插入的延遲比較高,我們需要設置 BatchSize 來批量插入數據,提高性能。
  • 在 JDBCAppendTableSink 的實現中,若最后一批數據的數目不足 BatchSize,則不會插入剩余數據。

使用Spark導入數據

本文主要介紹如何通過Spark程序寫入數據到Clickhouse中。

<dependency>
      <groupId>ru.yandex.clickhouse</groupId>
      <artifactId>clickhouse-jdbc</artifactId>
      <version>0.2.4</version>
</dependency>
<!-- 如果報錯:Caused by: java.lang.ClassNotFoundException: com.google.common.escape.Escapers,則添加下面的依賴 -->
<dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>28.0-jre</version>
</dependency>

示例

object Spark2ClickHouseExample {

  val properties = new Properties()
  properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
  properties.put("user", "default")
  properties.put("password", "hOn0d9HT")
  properties.put("batchsize", "1000")
  properties.put("socket_timeout", "300000")
  properties.put("numPartitions", "8")
  properties.put("rewriteBatchedStatements", "true")

  case class Person(name: String, age: Long)

  private def runDatasetCreationExample(spark: SparkSession): Dataset[Person] = {
    import spark.implicits._
    // DataFrames轉成DataSet
    val path = "file:///e:/people.json"
    val peopleDS = spark.read.json(path)
    peopleDS.createOrReplaceTempView("people")
    val ds = spark.sql("SELECT name,age FROM people").as[Person]
    ds.show()
    ds
  }

  def main(args: Array[String]) {


    val url = "jdbc:clickhouse://kms-1:8123/default"
    val table = "people"

    val spark = SparkSession
      .builder()
      .appName("Spark Example")
      .master("local") //設置為本地運行
      .getOrCreate()
    val ds = runDatasetCreationExample(spark)

    ds.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
    spark.stop()
  }
}

從Kafka中導入數據

主要是使用ClickHouse的表引擎。

使用方式

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]
  • kafka_broker_list :逗號分隔的brokers地址 (localhost:9092).
  • kafka_topic_list :Kafka 主題列表,多個主題用逗號分隔.
  • kafka_group_name :消費者組.
  • kafka_format – Message format. 比如JSONEachRow、JSON、CSV等等

使用示例

在kafka中創建user_behavior主題,並向該主題寫入數據,數據示例為:

{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}

在ClickHouse中創建表,選擇表引擎為Kafka(),如下:

 CREATE TABLE kafka_user_behavior (
    user_id UInt64 COMMENT '用戶id',
    item_id UInt64 COMMENT '商品id',
    cat_id UInt16  COMMENT '品類id',
    action String  COMMENT '行為',
    province UInt8 COMMENT '省份id',
    ts UInt64      COMMENT '時間戳'
  ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'cdh04:9092',
    kafka_topic_list = 'user_behavior',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow'
;
-- 查詢
cdh04 :) select * from kafka_user_behavior ;

-- 再次查看數據,發現數據為空
cdh04 :) select count(*) from kafka_user_behavior;

SELECT count(*)
FROM kafka_user_behavior

┌─count()─┐
│       0 │
└─────────┘

通過物化視圖將kafka數據導入ClickHouse

當我們一旦查詢完畢之后,ClickHouse會刪除表內的數據,其實Kafka表引擎只是一個數據管道,我們可以通過物化視圖的方式訪問Kafka中的數據。

  • 首先創建一張Kafka表引擎的表,用於從Kafka中讀取數據
  • 然后再創建一張普通表引擎的表,比如MergeTree,面向終端用戶使用
  • 最后創建物化視圖,用於將Kafka引擎表實時同步到終端用戶所使用的表中
-- 創建Kafka引擎表
 CREATE TABLE kafka_user_behavior_src (
    user_id UInt64 COMMENT '用戶id',
    item_id UInt64 COMMENT '商品id',
    cat_id UInt16  COMMENT '品類id',
    action String  COMMENT '行為',
    province UInt8 COMMENT '省份id',
    ts UInt64      COMMENT '時間戳'
  ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'cdh04:9092',
    kafka_topic_list = 'user_behavior',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow'
;

-- 創建一張終端用戶使用的表
 CREATE TABLE kafka_user_behavior (
    user_id UInt64 COMMENT '用戶id',
    item_id UInt64 COMMENT '商品id',
    cat_id UInt16  COMMENT '品類id',
    action String  COMMENT '行為',
    province UInt8 COMMENT '省份id',
    ts UInt64      COMMENT '時間戳'
  ) ENGINE = MergeTree()
    ORDER BY user_id
;
-- 創建物化視圖,同步數據
CREATE MATERIALIZED VIEW user_behavior_consumer TO kafka_user_behavior
    AS SELECT * FROM kafka_user_behavior_src ;
-- 查詢,多次查詢,已經被查詢的數據依然會被輸出
cdh04 :) select * from kafka_user_behavior;

Note:

Kafka消費表不能直接作為結果表使用。Kafka消費表只是用來消費Kafka數據,沒有真正的存儲所有數據。

從MySQL中導入數據

同kafka中導入數據類似,ClickHouse同樣支持MySQL表引擎,即映射一張MySQL中的表到ClickHouse中。

數據類型對應關系

MySQL中數據類型與ClickHouse類型映射關系如下表。

MySQL ClickHouse
UNSIGNED TINYINT UInt8
TINYINT Int8
UNSIGNED SMALLINT UInt16
SMALLINT Int16
UNSIGNED INT, UNSIGNED MEDIUMINT UInt32
INT, MEDIUMINT Int32
UNSIGNED BIGINT UInt64
BIGINT Int64
FLOAT Float32
DOUBLE Float64
DATE Date
DATETIME, TIMESTAMP DateTime
BINARY FixedString

使用方式

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

使用示例

-- 連接MySQL中clickhouse數據庫的test表
CREATE TABLE mysql_users(
    id Int32,
    name String
) ENGINE = MySQL(
 '192.168.10.203:3306',
 'clickhouse',
 'users', 
 'root', 
 '123qwe');
-- 查詢數據
cdh04 :) SELECT * FROM mysql_users;

SELECT *
FROM mysql_users

┌─id─┬─name──┐
│  1 │ tom   │
│  2 │ jack  │
│  3 │ lihua │
└────┴───────┘
-- 插入數據,會將數據插入MySQL對應的表中
-- 所以當查詢MySQL數據時,會發現新增了一條數據
INSERT INTO users VALUES(4,'robin');
-- 再次查詢
cdh04 :) select * from mysql_users;                

SELECT *
FROM mysql_users

┌─id─┬─name──┐
│  1 │ tom   │
│  2 │ jack  │
│  3 │ lihua │
│  4 │ robin │
└────┴───────┘

注意:對於MySQL表引擎,不支持UPDATE和DELETE操作,比如執行下面命令時,會報錯:

-- 執行更新
ALTER TABLE mysql_users UPDATE name = 'hanmeimei' WHERE id = 1;
-- 執行刪除
ALTER TABLE mysql_users DELETE WHERE id = 1;
-- 報錯
DB::Exception: Mutations are not supported by storage MySQL.

從Hive中導入數據

本文使用Waterdrop進行數據導入,Waterdrop是一個非常易用,高性能,能夠應對海量數據的實時數據處理產品,它構建在Spark之上。Waterdrop擁有着非常豐富的插件,支持從Kafka、HDFS、Kudu中讀取數據,進行各種各樣的數據處理,並將結果寫入ClickHouse、Elasticsearch或者Kafka中。

我們僅需要編寫一個Waterdrop Pipeline的配置文件即可完成數據的導入。配置文件包括四個部分,分別是Spark、Input、filter和Output。

關於Waterdrop的安裝,十分簡單,只需要下載ZIP文件,解壓即可。使用Waterdrop需要安裝Spark。

  • 在Waterdrop安裝目錄的config/文件夾下創建配置文件:hive_table_batch.conf,內容如下。主要包括四部分:Spark、Input、filter和Output。

    • Spark部分是Spark的相關配置,主要配置Spark執行時所需的資源大小。

    • Input部分是定義數據源,其中pre_sql是從Hive中讀取數據SQL,table_name是將讀取后的數據,注冊成為Spark中臨時表的表名,可為任意字段。

    • filter部分配置一系列的轉化,比如過濾字段

    • Output部分是將處理好的結構化數據寫入ClickHouse,ClickHouse的連接配置。

      需要注意的是,必須保證hive的metastore是在服務狀態。

spark {
  spark.app.name = "Waterdrop_Hive2ClickHouse"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  // 這個配置必需填寫
  spark.sql.catalogImplementation = "hive"
}
input {
    hive {
        pre_sql = "select * from default.users"
        table_name = "hive_users"
    }
}
filter {}
output {
    clickhouse {
        host = "kms-1:8123"
        database = "default"
        table = "users"
        fields = ["id", "name"]
        username = "default"
        password = "hOn0d9HT"
    }
}
  • 執行任務
[kms@kms-1 waterdrop-1.5.1]$ bin/start-waterdrop.sh  --config config/hive_table_batch.conf --master yarn --deploy-mode cluster

這樣就會啟動一個Spark作業執行數據的抽取,等執行完成之后,查看ClickHouse的數據。

總結

本文主要介紹了如何通過Flink、Spark、Kafka、MySQL以及Hive,將數據導入到ClickHouse,對每一種方式都出了詳細的示例,希望對你有所幫助。

公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM