本文分享主要是ClickHouse的數據導入方式,本文主要介紹如何使用Flink、Spark、Kafka、MySQL、Hive將數據導入ClickHouse,具體內容包括:
- 使用Flink導入數據
- 使用Spark導入數據
- 從Kafka中導入數據
- 從MySQL中導入數據
- 從Hive中導入數據
使用Flink導入數據
本文介紹使用 flink-jdbc將數據導入ClickHouse,Maven依賴為:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_${scala.binary.version}</artifactId>
<version>1.10.1</version>
</dependency>
示例
本示例使用Kafka connector,通過Flink將Kafka數據實時導入到ClickHouse
public class FlinkSinkClickHouse {
public static void main(String[] args) throws Exception {
String url = "jdbc:clickhouse://192.168.10.203:8123/default";
String user = "default";
String passwd = "hOn0d9HT";
String driver = "ru.yandex.clickhouse.ClickHouseDriver";
int batchsize = 500; // 設置batch size,測試的話可以設置小一點,這樣可以立刻看到數據被寫入
// 創建執行環境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
String kafkaSource11 = "" +
"CREATE TABLE user_behavior ( " +
" `user_id` BIGINT, -- 用戶id\n" +
" `item_id` BIGINT, -- 商品id\n" +
" `cat_id` BIGINT, -- 品類id\n" +
" `action` STRING, -- 用戶行為\n" +
" `province` INT, -- 用戶所在的省份\n" +
" `ts` BIGINT, -- 用戶行為發生的時間戳\n" +
" `proctime` AS PROCTIME(), -- 通過計算列產生一個處理時間列\n" +
" `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間\n" +
" WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 在eventTime上定義watermark\n" +
") WITH ( 'connector' = 'kafka', -- 使用 kafka connector\n" +
" 'topic' = 'user_behavior', -- kafka主題\n" +
" 'scan.startup.mode' = 'earliest-offset', -- 偏移量,從起始 offset 開始讀取\n" +
" 'properties.group.id' = 'group1', -- 消費者組\n" +
" 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址\n" +
" 'format' = 'json', -- 數據源格式為 json\n" +
" 'json.fail-on-missing-field' = 'true',\n" +
" 'json.ignore-parse-errors' = 'false'" +
")";
// Kafka Source
tEnv.executeSql(kafkaSource11);
String query = "SELECT user_id,item_id,cat_id,action,province,ts FROM user_behavior";
Table table = tEnv.sqlQuery(query);
String insertIntoCkSql = "INSERT INTO behavior_mergetree(user_id,item_id,cat_id,action,province,ts)\n" +
"VALUES(?,?,?,?,?,?)";
//將數據寫入 ClickHouse Sink
JDBCAppendTableSink sink = JDBCAppendTableSink
.builder()
.setDrivername(driver)
.setDBUrl(url)
.setUsername(user)
.setPassword(passwd)
.setQuery(insertIntoCkSql)
.setBatchSize(batchsize)
.setParameterTypes(Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG)
.build();
String[] arr = {"user_id","item_id","cat_id","action","province","ts"};
TypeInformation[] type = {Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG};
tEnv.registerTableSink(
"sink",
arr,
type,
sink
);
tEnv.insertInto(table, "sink");
tEnv.execute("Flink Table API to ClickHouse Example");
}
}
Note:
- 由於 ClickHouse 單次插入的延遲比較高,我們需要設置
BatchSize
來批量插入數據,提高性能。- 在 JDBCAppendTableSink 的實現中,若最后一批數據的數目不足
BatchSize
,則不會插入剩余數據。
使用Spark導入數據
本文主要介紹如何通過Spark程序寫入數據到Clickhouse中。
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<!-- 如果報錯:Caused by: java.lang.ClassNotFoundException: com.google.common.escape.Escapers,則添加下面的依賴 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
示例
object Spark2ClickHouseExample {
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "default")
properties.put("password", "hOn0d9HT")
properties.put("batchsize", "1000")
properties.put("socket_timeout", "300000")
properties.put("numPartitions", "8")
properties.put("rewriteBatchedStatements", "true")
case class Person(name: String, age: Long)
private def runDatasetCreationExample(spark: SparkSession): Dataset[Person] = {
import spark.implicits._
// DataFrames轉成DataSet
val path = "file:///e:/people.json"
val peopleDS = spark.read.json(path)
peopleDS.createOrReplaceTempView("people")
val ds = spark.sql("SELECT name,age FROM people").as[Person]
ds.show()
ds
}
def main(args: Array[String]) {
val url = "jdbc:clickhouse://kms-1:8123/default"
val table = "people"
val spark = SparkSession
.builder()
.appName("Spark Example")
.master("local") //設置為本地運行
.getOrCreate()
val ds = runDatasetCreationExample(spark)
ds.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
spark.stop()
}
}
從Kafka中導入數據
主要是使用ClickHouse的表引擎。
使用方式
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
kafka_broker_list
:逗號分隔的brokers地址 (localhost:9092).kafka_topic_list
:Kafka 主題列表,多個主題用逗號分隔.kafka_group_name
:消費者組.kafka_format
– Message format. 比如JSONEachRow
、JSON、CSV等等
使用示例
在kafka中創建user_behavior主題,並向該主題寫入數據,數據示例為:
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
在ClickHouse中創建表,選擇表引擎為Kafka(),如下:
CREATE TABLE kafka_user_behavior (
user_id UInt64 COMMENT '用戶id',
item_id UInt64 COMMENT '商品id',
cat_id UInt16 COMMENT '品類id',
action String COMMENT '行為',
province UInt8 COMMENT '省份id',
ts UInt64 COMMENT '時間戳'
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'cdh04:9092',
kafka_topic_list = 'user_behavior',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow'
;
-- 查詢
cdh04 :) select * from kafka_user_behavior ;
-- 再次查看數據,發現數據為空
cdh04 :) select count(*) from kafka_user_behavior;
SELECT count(*)
FROM kafka_user_behavior
┌─count()─┐
│ 0 │
└─────────┘
通過物化視圖將kafka數據導入ClickHouse
當我們一旦查詢完畢之后,ClickHouse會刪除表內的數據,其實Kafka表引擎只是一個數據管道,我們可以通過物化視圖的方式訪問Kafka中的數據。
- 首先創建一張Kafka表引擎的表,用於從Kafka中讀取數據
- 然后再創建一張普通表引擎的表,比如MergeTree,面向終端用戶使用
- 最后創建物化視圖,用於將Kafka引擎表實時同步到終端用戶所使用的表中
-- 創建Kafka引擎表
CREATE TABLE kafka_user_behavior_src (
user_id UInt64 COMMENT '用戶id',
item_id UInt64 COMMENT '商品id',
cat_id UInt16 COMMENT '品類id',
action String COMMENT '行為',
province UInt8 COMMENT '省份id',
ts UInt64 COMMENT '時間戳'
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'cdh04:9092',
kafka_topic_list = 'user_behavior',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow'
;
-- 創建一張終端用戶使用的表
CREATE TABLE kafka_user_behavior (
user_id UInt64 COMMENT '用戶id',
item_id UInt64 COMMENT '商品id',
cat_id UInt16 COMMENT '品類id',
action String COMMENT '行為',
province UInt8 COMMENT '省份id',
ts UInt64 COMMENT '時間戳'
) ENGINE = MergeTree()
ORDER BY user_id
;
-- 創建物化視圖,同步數據
CREATE MATERIALIZED VIEW user_behavior_consumer TO kafka_user_behavior
AS SELECT * FROM kafka_user_behavior_src ;
-- 查詢,多次查詢,已經被查詢的數據依然會被輸出
cdh04 :) select * from kafka_user_behavior;
Note:
Kafka消費表不能直接作為結果表使用。Kafka消費表只是用來消費Kafka數據,沒有真正的存儲所有數據。
從MySQL中導入數據
同kafka中導入數據類似,ClickHouse同樣支持MySQL表引擎,即映射一張MySQL中的表到ClickHouse中。
數據類型對應關系
MySQL中數據類型與ClickHouse類型映射關系如下表。
MySQL | ClickHouse |
---|---|
UNSIGNED TINYINT | UInt8 |
TINYINT | Int8 |
UNSIGNED SMALLINT | UInt16 |
SMALLINT | Int16 |
UNSIGNED INT, UNSIGNED MEDIUMINT | UInt32 |
INT, MEDIUMINT | Int32 |
UNSIGNED BIGINT | UInt64 |
BIGINT | Int64 |
FLOAT | Float32 |
DOUBLE | Float64 |
DATE | Date |
DATETIME, TIMESTAMP | DateTime |
BINARY | FixedString |
使用方式
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
使用示例
-- 連接MySQL中clickhouse數據庫的test表
CREATE TABLE mysql_users(
id Int32,
name String
) ENGINE = MySQL(
'192.168.10.203:3306',
'clickhouse',
'users',
'root',
'123qwe');
-- 查詢數據
cdh04 :) SELECT * FROM mysql_users;
SELECT *
FROM mysql_users
┌─id─┬─name──┐
│ 1 │ tom │
│ 2 │ jack │
│ 3 │ lihua │
└────┴───────┘
-- 插入數據,會將數據插入MySQL對應的表中
-- 所以當查詢MySQL數據時,會發現新增了一條數據
INSERT INTO users VALUES(4,'robin');
-- 再次查詢
cdh04 :) select * from mysql_users;
SELECT *
FROM mysql_users
┌─id─┬─name──┐
│ 1 │ tom │
│ 2 │ jack │
│ 3 │ lihua │
│ 4 │ robin │
└────┴───────┘
注意:對於MySQL表引擎,不支持UPDATE和DELETE操作,比如執行下面命令時,會報錯:
-- 執行更新
ALTER TABLE mysql_users UPDATE name = 'hanmeimei' WHERE id = 1;
-- 執行刪除
ALTER TABLE mysql_users DELETE WHERE id = 1;
-- 報錯
DB::Exception: Mutations are not supported by storage MySQL.
從Hive中導入數據
本文使用Waterdrop進行數據導入,Waterdrop是一個非常易用,高性能,能夠應對海量數據的實時數據處理產品,它構建在Spark之上。Waterdrop擁有着非常豐富的插件,支持從Kafka、HDFS、Kudu中讀取數據,進行各種各樣的數據處理,並將結果寫入ClickHouse、Elasticsearch或者Kafka中。
我們僅需要編寫一個Waterdrop Pipeline的配置文件即可完成數據的導入。配置文件包括四個部分,分別是Spark、Input、filter和Output。
關於Waterdrop的安裝,十分簡單,只需要下載ZIP文件,解壓即可。使用Waterdrop需要安裝Spark。
-
在Waterdrop安裝目錄的config/文件夾下創建配置文件:hive_table_batch.conf,內容如下。主要包括四部分:Spark、Input、filter和Output。
-
Spark部分是Spark的相關配置,主要配置Spark執行時所需的資源大小。
-
Input部分是定義數據源,其中
pre_sql
是從Hive中讀取數據SQL,table_name
是將讀取后的數據,注冊成為Spark中臨時表的表名,可為任意字段。 -
filter部分配置一系列的轉化,比如過濾字段
-
Output部分是將處理好的結構化數據寫入ClickHouse,ClickHouse的連接配置。
需要注意的是,必須保證hive的metastore是在服務狀態。
-
spark {
spark.app.name = "Waterdrop_Hive2ClickHouse"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
// 這個配置必需填寫
spark.sql.catalogImplementation = "hive"
}
input {
hive {
pre_sql = "select * from default.users"
table_name = "hive_users"
}
}
filter {}
output {
clickhouse {
host = "kms-1:8123"
database = "default"
table = "users"
fields = ["id", "name"]
username = "default"
password = "hOn0d9HT"
}
}
- 執行任務
[kms@kms-1 waterdrop-1.5.1]$ bin/start-waterdrop.sh --config config/hive_table_batch.conf --master yarn --deploy-mode cluster
這樣就會啟動一個Spark作業執行數據的抽取,等執行完成之后,查看ClickHouse的數據。
總結
本文主要介紹了如何通過Flink、Spark、Kafka、MySQL以及Hive,將數據導入到ClickHouse,對每一種方式都出了詳細的示例,希望對你有所幫助。
公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包