ClickHouse 中其它常見的表引擎


楔子

Everything is table(萬物皆為表)是 ClickHouse 的一個非常有意思的設計思路,正因為 ClickHouse 是一款數據庫,所以自然而然數據表就是它的武器,是它與外部進行交互的接口層。在數據表背后無論連接的是本地文件、HDFS、zookeeper,還是其它服務,終端用戶只需要面對數據表,只需要使用 SQL 查詢語言。

下面就來介紹一下其它類型的表引擎,它們以表為接口,極大地豐富了 ClickHouse 的查詢能力。這些表引擎各自特點突出,或是獨立地應用於特定場景,或是能夠與 MergeTree 搭配使用。例如外部存儲系列的表引擎,能夠直接讀取其它系統的數據,ClickHouse 自身只負責元數據的管理,類似使用外部表的形式;內存系列的表引擎,能夠充當數據分發的臨時存儲載體或消息通道;日志文件系列的表引擎,擁有簡單易用的特點;接口系列的表引擎,能夠串聯已有數據表,起到粘合劑的作用。那么下面我們就來分門別類的介紹一下,這些表引擎各自的使用特點。

外部存儲類型

顧名思義,外部存儲表引擎能夠直接從其它的存儲系統讀取數據,例如直接讀取 HDFS 的文件或者 MySQL 數據庫的表,這些表引擎只負責元數據管理和數據查詢,而它們自身通常不負責數據的寫入,數據文件直接由外部系統提供。

HDFS

HDFS 是一款分布式文件存儲系統,可以說是 Hadoop 生態的基石,而 ClickHouse 提供的 HDFS 表引擎則可以與之對接,讀取 HDFS 內的文件。關於 HDFS 的安裝這里不贅述了,這里假設已經安裝完畢。但是注意,我們需要關閉 HDFS 的 Kerberos 認證,因為 HDFS 表引擎還不支持 Kerberos,然后在 HDFS 上創建用於存放文件的目錄。

hdfs dfs -mkdir /clickhouse

最后在 HDFS 上給 clickhouse 用戶授權:

hdfs dfs -chown -R clickhouse:clickhouse /clickhouse

然后我們創建 HDFS 數據表,而 ClickHouse 的一張 HDFS 數據表,對應 HDFS 文件系統上的一個文件:

CREATE TABLE hdfs_table1 (
    id UInt32,
    code String,
    name String
) ENGINE = HDFS('hdfs://localhost:6666/clickhouse/hdfs_table1', 'CSV')
-- HDFS('HDFS 的文件存儲路徑', '文件格式,如 CSV、TSV、JSON 等等')
-- 注:數據表的名字和 HDFS 文件系統上的文件名可以不一致

注意:我們這里雖然創建了一張表 hdfs_table1,但 HDFS 文件系統上還並沒有 hdfs_table1 這個文件,而當我們往表中插入數據時,表對應的文件就會在 HDFS 文件系統上創建、同時將數據寫進去。因此我們寫入數據雖然表面上是通過 HDFS 數據表,但實際上數據是存儲在 HDFS 文件系統上的,而 ClickHouse 在這里只負責元數據的管理。可能有人發現了,這不就是 Hive 嘛,是的,ClickHouse 在這里所干的事情和 Hive 是一樣的。下面寫入一批數據:

INSERT INTO hdfs_table1 
SELECT number, concat('code', toString(number)), concat('n', toString(number))
FROM numbers(5)

此時在 HDFS 文件系統的 /clickhouse 下面會創建一個文件,也叫 hdfs_table1,同時將數據寫進去。然后我們就可以通過數據表查詢,注意:因為數據存在 HDFS 文件系統上,所以查詢實際上就是 ClickHouse 讀取 HDFS 文件系統的一個過程。

然后我們再來看看 HDFS 上文件:

可以發現通過 HDFS 表引擎,ClickHouse 在 HDFS 的指定目錄下創建了一個名為 hdfs_table1 的文件,並且按照 CSV 格式寫入了數據。注意:這里創建的數據表類似於 hive 中的外部表,也就是說將 HDFS 數據表刪除(刪除元數據),並不會影響 HDFS 文件系統上的文件。

以上就是 ClickHouse 和 HDFS 之間的交互,不過我們知道 ClickHouse 具有分片功能(后面說),所以它完全不需要借助於 HDFS 存儲系統來存儲數據,而且使用 HDFS 的話,那么 ClickHouse 的列式存儲、數據壓縮、索引等一系列高級特性就都用不上了,反而會嚴重拖慢 ClickHouse 的效率。但 ClickHouse 之所以還提供和 HDFS 的交互,主要是考慮到 Hadoop 生態圈已經存在多年了,在 HDFS 之上已經存儲了大量的數據,所以提供了和 HDFS 交互的接口。通過 HDFS 數據表將 HDFS 文件系統上的數據讀取出來之后,導入到 MergeTree 數據表中,然后進行數據分析。

所以 HDFS 數據表雖然既負責寫又負責讀,就像我們上面演示的那樣,但很明顯我們基本不會用 HDFS 數據表寫數據。因此當涉及 ClickHouse 和 HDFS 的交互時,都是數據已經存在於 HDFS 文件系統之上,我們只是創建一個 HDFS 數據表將數據從 HDFS 文件系統上讀取出來罷了。所以此時創建 HDFS 數據表就需要根據文件內容來創建了。

比如 HDFS 上存在一個 CSV 文件,這個文件里面有 4 列,那么我們創建的數據表就應該有 4 個字段。舉個栗子:

此時 HDFS 上有一個 TSV 格式的文件(CSV 文件的分隔符為逗號,TSV 文件的分隔符為 \t),這個時候我們需要使用 ClickHouse 將其讀取出來。具體做法顯然是創建一張 HDFS 數據表,然后指定數據文件在 HDFS 上存儲路徑即可,但問題是表字段要如何設計呢?沒錯,顯然要根據文件的存儲內容來進行設計,比如這里有 4 個列,那么 HDFS 數據表就應該要有 4 個字段,然后再根據存儲的內容指定字段的類型,那么這個 HDFS 數據表就可以這么定義:

CREATE TABLE hdfs_table2 (
    a UInt32,
    b String,
    c UInt32,
    d String
) ENGINE = HDFS('hdfs://localhost:6666/clickhouse/hdfs_table2', 'TSV');
-- 文件類型要指定 TSV,因為分隔符是 \t
-- 注:這里字段名叫什么完全由我們自己定義,我們也可以起一個有意義的名字
CREATE TABLE hdfs_table2_new (
    id UInt32,
    name String,
    age UInt32,
    place String
) ENGINE = HDFS('hdfs://localhost:6666/clickhouse/hdfs_table2', 'TSV');

這里我們的兩張 HDFS 數據表都指向 HDFS 文件系統上的同一個文件:

還是比較簡單的,這里的 ClickHouse 完全就充當了 Hive 的角色,甚至比 Hive 還要好用不少。不過 ClickHouse 支持的還不止這些,在指定 HDFS 文件路徑的時候 ClickHouse 支持多種方式:

  • 絕對路徑:會讀取指定路徑的單個文件,比如HDFS('hdfs://localhost:6666/clickhouse/hdfs_table2', 'TSV'),會讀取 clickhouse 目錄下的 hdfs_table2 文件
  • * 通配符:匹配任意數量的任意字符,比如 HDFS('hdfs://localhost:6666/clickhouse/*', 'TSV'),會讀取 clickhouse 目錄下的所有文件
  • ? 通配符:匹配單個任意字符,比如 ENGINE = HDFS('hdfs://localhost:6666/clickhouse/hdfs_table?', 'TSV'),會讀取 clickhouse 目錄下所有匹配 hdfs_table? 的文件
  • {M..N} 數字區間:匹配指定數字的文件,例如 HDFS('hdfs://localhost:6666/clickhouse/hdfs_table{1..3}', 'TSV'),會讀取 clickhouse 目錄下的 hdfs_table1、hdfs_table2、hdfs_table3

我們來測試一下,我們上面的文件都沒有后綴名,但有后綴名也是可以的。

這里我們將之前的 hdfs_table2 拷貝 3 份,並上傳至 HDFS,然后創建數據表:

CREATE TABLE girls (
    id UInt32,
    name String,
    age UInt32,
    place String
) ENGINE = HDFS('hdfs://localhost:6666/clickhouse/girls_{1..3}.tsv', 'TSV');
-- 這里寫成 girls_?.tsv 也是可以的

CREATE TABLE girls_new (
    id UInt32,
    name String,
    age UInt32,
    place String
) ENGINE = HDFS('hdfs://localhost:6666/clickhouse/girls_?.tsv', 'TSV');

然后進行查詢:

顯然使用 girls 和 girls_new 都是可以查詢到數據的,由於是 3 個文件,因此會以 3 個分區的形式合並返回。

以上就是 HDFS 數據表的相關內容,可以看到使用起來還是非常方便的,但還是像我們之前說的那樣,ClickHouse 完全獨立於 Hadoop 生態圈,並不需要借助 HDFS 存儲數據。但之所以還提供 HDFS 數據表,主要是為了讀取 HDFS 文件系統上已存在的數據,不然的話我們需要先手動將數據從 HDFS 上下載下來,然后再導入到 ClickHouse 中,會比較麻煩,因此 ClickHouse 通過表引擎的形式直接支持我們訪問 HDFS 文件系統。

當然不光是 HDFS,ClickHouse 還支持很多其它常見的外部存儲系統,當然支持的目的都是為了讀取這些存儲系統中已存在的數據。

MySQL

MySQL 表引擎可以和 MySQL 數據庫中的數據表建立映射,並通過 SQL 向其發起遠程查詢,包括 SELECT 和 INSERT,聲明方式如下:

ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, on_duplicate_clause])

假設我們要訪問 MySQL 的 default 庫下的 trade_info 表,那么可以這么做:

CREATE TABLE clickhouse_trade_info (
    id UInt32,
    column1 type,
    column2 type,
    ......
) ENGINE = MySQL('localhost:3306', 'default', 'trade_info', 'root', '123456')
-- 顯然這幾個參數的含義不需要多說,但還有兩個可選參數 replace_query 和 on_duplicate_clause
-- replace_query 默認為 0,如果設置為 1,會用 REPLACE INTO 代替 INSERT INTO
-- on_duplicate_clause 默認為 0,對應 MySQL 的 ON DUPLICATE KEY 語法,如果想啟用該設置,那么需要設置為 1

創建成功之后,我們就可以通過 ClickHouse 的數據表來讀取 MySQL 數據表的數據了,當然插入數據也是可以的。由於 MySQL 還是比較簡單的,這里就不實際演示了,可以自己測試一下。

當然重點是,我們可以搭配物化視圖一起使用:

CREATE MATERIALIZED VIEW trade_info_view
ENGINE = MergeTree()
ORDER BY id
AS SELECT * FROM clickhouse_trade_info
-- 這里指定數據表的時候一定要指定 ClickHouse 的數據表,不是 MySQL 的
-- 所以這里我們刻意將數據表其名為 clickhouse_trade_info

不過遺憾的是,目前 MySQL 表引擎不支持 UPDATE 和 DELETE 操作,如果需要數據更新的話,可以考慮使用 CollapsingMergeTree 作為視圖的表引擎。不過還是之前所說,使用外部存儲系統基本上都是為了讀數據,很少會有插入、更新和刪除之類的場景出現。

JDBC

相比 MySQL 表引擎,JDBC 表引擎不僅可以讀取 MySQL 數據庫,還能讀取 PostgreSQL、SQLite 和 H2 數據庫。但是光有 JDBC 表引擎還不夠,它還需要依賴一個基於 Java 語言實現的 SQL 代理服務,名為 clickhouse-jdbc-bridge,它可以為 ClickHouse 代理訪問數據庫。

但 clickhouse-jdbc-bridge 需要使用 Maven 進行構建,而我本人不是 Java 方向的,只知道 Java 如何安裝,甚至不知道如何用 Java 寫一個 Hello World,所以更別提使用 Maven 構建項目了,因此這部分內容有興趣可以自己了解一下。總之創建 JDBC 表引擎和 MySQL 表引擎是類似的:

ENGINE = JDBC('jdbc:url', 'database', 'table')

不同的數據庫使用不同的 url,可以自己測試一下。

Kafka

Kafka 是大數據領域非常流行的一款分布式消息系統,而 ClickHouse 也提供了 Kafka 表引擎與之對接,進而訂閱 Kafka 中的主題並實時接收消息數據。而總所周知,在消息系統中存在三層語義:

  • 最多一次(At Most Once):可能出現消息丟失的情況,因為在這種情形下,一條消息在消費端最多被接收一次
  • 最少一次(At Least Once):可能出現消息重復的情況,因為在這種情形下,一條消息在消費端允許被接收多次
  • 精確一次(Exactly Once):數據不多不少,一條消息在消費端恰好被消費一次,這也是最理想的情況,因為消息不可能百分之百不丟失

雖然 Kafka 本身能夠支持上述三種語義,但是目前 ClickHouse 還不支持精確一次語義,因為這需要應用端和 Kafka 深度配合才可以實現。kafka 使用 Offset 標志位來記錄主題數據被消費的位置信息,當應用端接收到消息之后,通過自動提交或手動提交當前的位移信息,以保障消息的語義,但 ClickHouse 在這方面還有進步的空間。

Kafka 表引擎的聲明方式如下:

ENGINE = Kafka()
SETTINGS kafka_broker_list = 'host:port,...',
         kafka_topic_list = 'topic1,topic2',
         kafka_group_name = 'group_name',
         kafka_format = 'data_format[,]',
         [kafka_row_delimiter = 'delimiter_symbol',]
         [kafka_schema = '',]
         [kafka_num_consumers = N,]
         [kafka_skip_broken_message = N,]
         [kafka_commit_every_batch = N]

其中帶有方括號的表示選填項,下面依次介紹這些參數的作用:

  • kafka_broker_list:表示 Broker 服務的地址列表,多個地址之間使用逗號分割
  • kafka_topic_list:表示訂閱的消息主題的名稱列表,多個主題之間使用逗號分割,多個主題中的數據均被消費
  • kafka_group_name:表示消費者組的名稱,表引擎會依據此名稱創建消費者組
  • kafka_format:表示用於解析消息的數據格式,在消息的發送端,必須按照此格式發送消息。而數據格式也必須是 ClickHouse 提供的格式之一,例如 TSV、JSONEachRow 和 CSV 等
  • kafka_row_delimiter:表示判定一行數據的結束符,默認為 '\0'
  • kafka_schema:對應 Kafka 的 schema 參數
  • kafka_num_consumers:表示消費者的數據量,默認值為 1,表引擎會依據此參數在消費者組中開啟相應數量的消費者線程,當然線程數不要超過分區數,否則沒有意義。因為在 kafka 的主題中,一個分區只能被某個消費者組里面的一個消費者消費(如果想被多個消費者消費,那么這些消費者一定要隸屬於不同的消費者組)
  • kafka_skip_broken_message:當表引擎按照預定格式解析數據出現錯誤時,允許跳過失敗的數據的行數,默認值為 0,即不允許任何格式錯誤的情形發生。在此種情形下,只要 kafka 主題中存在無法解析的數據,數據表都將不會接收任何數據。如果將其設置成非 0 的正整數,例如設置為 10,則表示只要 kafka 主題中存在無法解析的數據的總數小於 10,數據表就能正常接收消息數據,而解析錯誤的數據會被自動跳過
  • kafka_commit_every_batch:表示執行 kafka commit 的頻率,因此這里提交偏移量的方式是手動提交,默認值為 0,即當一整個 Block 塊完全寫入數據表后才執行一次 commit。如果設置為 1,則每寫完一個 Batch 批次的數據就會執行一次 kakfa commit(一次 Block 寫入操作,由多次 Batch 寫入操作而成)

因此 ClickHouse 在對接 Kakfa 的時候是會將消息寫入到數據表中的,所以還有一些配置參數可以調整表引擎的行為,比如 stream_poll_timeout_ms,它表示拉取數據的間隔時間。默認值為 500 毫秒,所以 Kafka 表引擎每隔 500 毫秒拉取一次數據,而拉取的數據會先被放入緩存當中,在時機成熟的時候,會被刷新到數據表。

而觸發 Kakfa 表引擎刷新緩存的條件有兩個,當滿足其中任何一個時,便會觸發刷新動作:

  • 當一個數據塊寫入完成的時候,一個數據塊的大小由 kafka_max_block_size 參數控制,默認情況下大小為 65536
  • 等待間隔超過 7500 毫秒,由 stream_fush_interval_ms 控制

Kafka 表引擎底層負責和 Kafka 通信的部分是基於 librdkafka 實現的,這是一個由 C++ 實現的 Kafka 庫,項目地址為 https://github.com/edenhill/librdkafka 。librdkafka 提供了許多自定義的配置參數,例如在默認情況下,每次只會讀取 kafka 中最新的數據,如果將 auto.offset.reset 改成 earliest(默認是 latest),數據將從會從最近一次提交的偏移位置開始讀取。當然里面還支持很多其它的參數,可以通過項目中的 CONFIGURATION.md 進行查看。

ClickHouse 對 librdkafka 的自定義參數也提供了良好的擴展支持,在 ClickHouse 的全局設置中,提供了一組 Kafka 標簽,專門用於定義 librdkafka 的自定義參數。不過需要注意的是,librdkafka 的原生參數中使用了點連接符,而在 ClickHouse 中需要改成下划線的形式,例如:

<kafka>
    <!-- librdkafka 中,參數名是 auto.offset.reset,在這里需要使用下划線進行分割 -->
    <auto_offset_reset>earliest</auto_offset_reset>
</kafka>

下面我們就來測試一下,首先使用 Go 來連接 kafka,創建一個主題,並寫入幾條數據:

package main

import (
    "encoding/json"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    cluster, _ := sarama.NewClusterAdmin([]string{"47.94.174.89:9092"}, config)
    // 創建主題,該主題有三個分區    
    _ = cluster.CreateTopic("heroes", &sarama.TopicDetail{NumPartitions: 3, ReplicationFactor: 1}, false)
    // 寫入消息,每個分區寫入兩條    
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Producer.Partitioner = sarama.NewManualPartitioner
    producer, _ := sarama.NewAsyncProducer([]string{"47.94.174.89:9092"}, config)
    messages := []map[string]interface{}{
        {"id": 1, "name": "麥克雷", "age": 37, "weapon": "維和者", "ultimate": "午時已到"},
        {"id": 2, "name": "源氏", "age": 35, "weapon": "鏢", "ultimate": "斬"},
        {"id": 3, "name": "半藏", "age": 38, "weapon": "弓", "ultimate": "龍"},
        {"id": 4, "name": "士兵76", "age": 55, "weapon": "脈沖步槍", "ultimate": "戰術目鏡"},
        {"id": 5, "name": "死神(諧星)", "age": 57, "weapon": "*彈槍", "ultimate": "死亡綻放"},
        {"id": 6, "name": "路霸", "age": 48, "weapon": "爆裂槍", "ultimate": "雞飛狗跳"},
    }
    for i, message := range messages {
        value, _ := json.Marshal(message)
        // 將 map 轉成 json        
        if i < 2 {
            producer.Input() <- &sarama.ProducerMessage{Topic: "heroes", Partition: 0,
                Value: sarama.StringEncoder(value)}
        } else if i < 4 {
            producer.Input() <- &sarama.ProducerMessage{Topic: "heroes", Partition: 1,
                Value: sarama.StringEncoder(value)}
        } else {
            producer.Input() <- &sarama.ProducerMessage{Topic: "heroes", Partition: 2,
                Value: sarama.StringEncoder(value)}
        }
        select {
        case <-producer.Successes():
        case <-producer.Errors():
        }
    }
}

以上我們就創建一個主題叫 heroes,該主題有三個分區,每個分區寫入了兩條數據。當然你也可以使用其它語言提供的 API 實現,下面我們通過 kafka 控制台查看一下數據有沒有寫入成功。

顯然寫入成功了,上面的 172.24.60.6 是我的內網 IP,然后我們就來創建 kafka 數據表獲取數據。由於數據已經寫入了,所以在讀取的時候必須指定 auto.offset.reset 為 earliest。

<kafka>    
    <auto_offset_reset>earliest</auto_offset_reset>
</kafka>

我們修改 config.xml,然后 clickhouse restart 重啟服務。下面開始創建 Kakfa 數據表:

CREATE TABLE kafka_test (
    id UInt32,    
    name String,    
    age UInt8,   
    weapon String,   
    ultimate String) 
ENGINE = Kafka() -- 由於當前 ClickHouse 和 Kakfa 在同一個節點上,所以這里用內網 IP 也是可以的
SETTINGS kafka_broker_list = '47.94.174.89:9092',         
         kafka_topic_list = 'heroes',         
         kafka_group_name = 'my_group',         
         kafka_format = 'JSONEachRow',         
         kafka_num_consumers = 3

創建成功之后,我們來查詢數據,看看能不能讀取:

整體都很順利,但問題是第二次查詢的時候發現數據沒了,原因就是 kafka 表引擎在執行完查詢之后就會刪除表內的數據。注意這里刪除的數據是 Kakfa 表引擎從 kafka 中拖下來寫入表中的數據,至於 kafka 上面的數據還在。不過很明顯這不是我們期望的,因為不能每次查詢都臨時從 kafka 上拖吧。

所以真正的使用方式如下:

  • 首先創建 Kafka 數據表 A,它充當的是數據管道,負責從 kafka 上拖數據
  • 然后是另外一張任意引擎的數據表 B,它充當的角色是面向終端用戶的查詢表,在生產環境中通常是 MergeTree 系列
  • 最后是一張物化視圖 C,它負責將表 A 的數據實時同步到表 B

下面具體操作一波:

CREATE TABLE kafka_queue (
    id UInt32,
    name String,
    age UInt8,
    weapon String,
    ultimate String
) ENGINE = Kafka()
SETTINGS kafka_broker_list = '47.94.174.89:9092',
         kafka_topic_list = 'heroes',
         kafka_group_name = 'my_group',
         kafka_format = 'JSONEachRow',
         kafka_num_consumers = 3;
         
-- 然后是面向終端用戶的查詢表,這里使用 MergeTree 引擎
CREATE TABLE kafka_table (
    id UInt32,
    name String,
    age UInt8,
    weapon String,
    ultimate String
) ENGINE = MergeTree()
ORDER BY id;

-- 最后是一張物化視圖,用於將數據從 kafka_queue 同步到 kafka_table
CREATE MATERIALIZED VIEW queue_to_table_view TO kafka_table
AS SELECT id, name, age, weapon, ultimate FROM kafka_queue

至此全部的工作就完成了,當數據進入 kafka_queue 的時候,物化視圖 queue_to_table_view 會將數據從 kafka_queue 同步到 kafka_table,即使 kafka_queue 中的數據被刪掉也不影響,因為數據已經進入了 kafka_table,而 kafka_table 才是負責面向數據查詢的表。

說白了數據刪除的問題並沒有得到本質上的解決,只是換了一種曲線救國的方式,通過物化視圖將數據放在了另一張表中。至於 kafka 數據表在查詢之后數據為什么被刪除,我們就不深究了。

注意:從 kafka 上面拖數據是有一定過程的,如果往 kafka 寫完數據之后就立刻查詢 kafka_table,不一定能查詢得到數據,這之間會有一定的延遲。

如果想停止數據同步,可以刪除視圖:DROP TABEL queue_to_table_view,或者卸載視圖:DETACH TABLE queue_to_table_view。這里我們將視圖卸載掉,然后再將之前的數據重新寫入一次,並進行查詢:

我們發現數據並沒有被同步過來,這是理所當然的,因為視圖被卸載了。如果想繼續同步,那么將卸載之后的視圖重新裝載進來即可:

ATTACH MATERIALIZED VIEW queue_to_table_view TO kafka_table
AS SELECT id, name, age, weapon, ultimate FROM kafka_queue

和創建視圖類似,只需要將 CREATE 換成 ATTACH 即可,然后再進行查詢:

此時數據就又同步過來了(如果沒有同步過來就等一小會兒),但是切記:在重新裝載物化視圖之前一定不要查詢 kakfa_queue,因為一旦查詢數據就沒了,物化視圖就沒法同步了。

File

File 表引擎能夠直接讀取本地文件的數據,通常被作為一種擴展手段來使用,例如它可以讀取由其它系統生成的數據文件,如果外部系統直接修改了文件,則變相達到了數據更新的目的;還可以將 ClickHouse 數據導出為本地文件;以及用於數據格式轉換等場景。除此之外,File 表引擎也被應用於 clickhouse-local 工具,之前介紹過。

File 表引擎的聲明方式如下:

ENGINE = File(format)

其中 format 表示文件的數據格式,同樣必須是 ClickHouse 支持的格式,例如 TSV、CSV、JSONEachRow 等。可以發現在 File 表引擎的定義參數中,並沒有包含文件路徑這一選項,因此 File 表引擎的數據文件只能保存在 config.xml 配置中由 path 指定的路徑下,也就是和其它的數據表在同一個路徑下。

每張 File 數據表均由目錄和文件組成,其中目錄以表的名稱命名,而數據文件則以 data.<format> 命名,比如 data.CSV、data.TSV 等等。而創建 File 表的方式有自動和手動兩種,首先介紹自動創建的方式,即由 File 表引擎全權負責表目錄和數據文件的創建:

CREATE TABLE file_table (  
    name String,    
    value UInt32
) ENGINE = File('CSV')

和其它表引擎一樣,當執行完上面的語句后,會在 /var/lib/clickhouse/data/default 中創建相應的目錄,但是里面還沒有數據文件,我們接着寫入數據:

INSERT INTO file_table VALUES ('one', 1), ('two', 2), ('three', 3)

在數據寫入之后,file_table 下面便會生成一個 data.CSV 數據文件:

[root@satori ~]# cat /var/lib/clickhouse/data/default/file_table/data.CSV "one",1"two",2"three",3
[root@satori ~]# 

可以看到數據被寫入了文件之中,但這種情況比較少見,因為寫入數據我們基本上都不會使用外部存儲系列的表引擎,它們存在的目的更多是為了讀取現有的數據。所以接下來介紹手動創建的方式,也就是目錄和里面的文件都已經存在了,它們是由 ClickHouse 之外的其它系統創建的,而我們需要使用 ClickHouse 讀取它。

以上是一個文本文件,如果我們想要讀取它該怎么做呢?首先要根據內部數據創建和合適表結構,這里我們應該選擇 JSONEachRow:

CREATE TABLE file_table_new (
    id UInt32,    
    name String,   
    place String
) 
ENGINE = File('JSONEachRow')

創建完之后,可以查詢一下試試,不出意外是會報錯的,原因就是 file_table_new 下面沒有 data.JSONEachRow 文件。不同於 MergeTree,MergeTree 數據表創建完之后如果不寫入數據,那么查詢結果是空,並不會報錯。但 File 表引擎,它要求目錄下必須有相應的 data.format 文件,所以我們將剛才的 girls.txt 拷貝過去:

[root@satori ~]# cp girls.txt /var/lib/clickhouse/data/default/file_table_new/data.JSONEachRow

拷貝的時候記得重命名,文件必須叫 data.<format>,拷貝之后再執行一下查詢:

當然我們也可以繼續向表中追加數據,都是沒有問題的。這里可能會有人好奇,如果我們不建表,而是手動創建一個 file_table_new 目錄,然后將文件拷貝過去可不可以呢。答案是不可以,因為一張表除了對應一個物理目錄之外,還有部分的元信息,這些元信息是在創建表的時候產生的。所以一定要先建表,然后自動生成對應的目錄之后,再將文件拷貝過去。

以上就是 File 表引擎的基礎用法,可以看到 ClickHouse 想的還是比較周全的,為了已經存在的數據存儲也提供了相應的接口。

內存類型

之前介紹的表引擎,它們都有一個共同的特點:數據是在磁盤中被訪問的,而接下來我們會介紹幾種內存類型的表引擎,數據會從內存中被直接訪問。當然,雖然它們是內存表引擎,但並不意味着不支持物理存儲(落盤),事實上除了 Memory 表引擎之外,其余的幾款內存表引擎都會將數據寫入磁盤,因為為了防止數據丟失,所以也提供了這種故障恢復手段。而在數據表被加載時,它們會將數據全部加載至內存,以供查詢。而將數據全量放在內存中,顯然是一把雙刃劍,因為在提升查詢性能的同時增大了內存消耗。

Memory

Memory 表引擎直接將數據保存在內存中,數據既不會被壓縮也不會被格式轉換,數據在內存中保存的形態與查詢時看到的如出一轍。正因為如此,當 ClickHouse 服務重啟的時候,Memory 表內的數據會全部丟失。所以在一些場合,會將 Memory 作為測試表使用。由於不需要磁盤讀取、序列化以及反序列化操作,所以 Memory 表引擎支持並行查詢,並且在簡單的查詢場景中能夠達到與 MergeTree 旗鼓相當的查詢性能(一億行數據以內)。Memory 表創建方法如下:

CREATE TABLE sweet_memory_1 (id UInt64) ENGINE = Memory()

當數據被寫入之后,磁盤上不會創建任何數據文件,如果服務重啟,那么這張表就沒了。比較簡單,這里就不測試了,但最后需要說明的是,Memory 數據表不單單被用作測試,它還被廣泛應用在 ClickHouse 的內部,它會作為集群間分發數據的存儲載體來使用。例如在分布式 IN 查詢的場合中,會利用 Memory 臨時表保存 IN 字句的查詢結果,並通過網絡將它傳輸到遠端節點,關於這部分內容后續介紹。

Set

Set 表引擎是擁有物理存儲的,數據首先會被寫入內存,然后被同步到磁盤文件中。所以服務重啟之后它的數據不會丟失,當數據表被重新狀態時,文件數據會再次被全量加載到內存。而 Set 我們知道它內部的數據是唯一的,對於有 Python 經驗的人應該再熟悉不過了,也就是說 Set 表引擎具有數據去重的功能。在數據寫入的過程中,重復的數據會被自動忽略。然而 Set 表引擎的使用場景即特殊又有限,它雖然支持正常的 INSERT 寫入,但並不能直接使用 SELECT 進行查詢,Set 表只能間接作為 IN 查詢的右側條件被查詢使用。

Set 表引擎的存儲結構由兩部分組成,它們分別是:

  • [num].bin 數據文件:保存了所有列字段的數據,其中 num 是一個自增 id,從 1 開始。伴隨着每一批數據的寫入(每一次 INSERT),都會生成一個新的 .bin 文件,num 也會隨之加 1
  • tmp 臨時目錄:數據文件首先會被寫到這個目錄,當一批數據寫入完畢之后,數據文件會被移出此目錄

下面就來創建一個 Set 數據表測試一下:

正確的做法是將 Set 數據表作為 IN 查詢的右側條件,例如:

再來查詢一下 set_table 的物理目錄結構:

[root@satori set_table]# ls
1.bin  tmp
[root@satori set_table]#

結果和我們分析的是一樣的。

Join

Join 表引擎顯然是為 JOIN 查詢而生的,它等同於將 JOIN 查詢進行了一層簡單的封裝。在 Join 表引擎的底層實現中,它與 Set 表引擎共用了大部分的處理邏輯,所以 Join 和 Set 表引擎擁有眾多相似之處。例如 Join 表引擎的物理存儲也由 [num].bin 數據文件和 tmp 臨時目錄兩部分組成;數據首先會被寫入內存,然后被同步到磁盤文件,但相比 Set 表引擎,Join 表引擎有着更加廣泛的使用場景,它既能夠作為 JOIN 查詢的連接表,也能夠被直接查詢使用。

Join 表引擎的聲明方式如下所示:

ENGINE = Join(join_strictness, join_type, key1[, key2, ...])

其中各參數的含義如下:

  • join_strictness:連接精度,它決定了 JOIN 查詢在連接數據時所使用的策略,目前支持 ALL、ANY、SEMI、ANTI 四種類型
  • join_type:連接類型,它決定了 JOIN 查詢在組合左右兩個數據集合的策略,目前支持 INNER、LEFT、RIGHT 和 FULL 四種類型
  • join_key:連接鍵,它決定了使用哪個列字段進行關聯

上面這些參數,每一條都對應了 JOIN 查詢字句的語法規則,關於 JOIN 查詢后續展開,我們首先創建相關數據表測試一下:

-- 首先創建主表,引擎為 Log,關於 Log 數據表一會說
CREATE TABLE join_table (
    id UInt8,
    name String,
    time DateTime
) ENGINE = Log();
-- 寫入數據
INSERT INTO join_table
VALUES (1, '古明地覺', '2020-05-01 12:00:00'),
       (2, '霧雨魔理沙', '2020-05-01 12:30:00'),
       (3, '琪露諾', '2020-05-01 13:00:00');
       
-- 接着創建 Join 表
CREATE TABLE id_join_table (
    id UInt8,
    score UInt8,
    time DateTime
) ENGINE = Join(ANY, LEFT, id);
-- 如果 join_strictness 為 ANY,那么 join_key 重復的數據會自動被忽略
-- 所以下面雖然寫了兩條 id 為 1 數據,但只有第一條會保留,因為寫入第二條的時候發現 id 為 1 的數據已經存在了,所以會停止寫入
INSERT INTO TABLE id_join_table
VALUES (1, 100, '2020-05-01 11:55:00'),
       (1, 105, '2020-05-01 11:10:00'),
       (2, 90, '2020-05-01 12:01:00'),
       (3, 80, '2020-05-01 13:10:00'),
       (5, 70, '2020-05-01 14:00:00'),
       (6, 60, '2020-05-01 13:50:00');

我們查詢一下:

我們看到是可以查詢成功的,Join 數據表支持查詢,但這種查詢方式並不是 Join 數據表的主戰場,它的主戰場應該是 Join 查詢,例如:

當然 Join 數據表除了可以直接使用 SELECT 和 JOIN 之外,還可以通過 join 函數訪問:

目前還沒有涉及到 JOIN 查詢,所以一些細節我們還沒有解釋,目前只需要知道有這么個引擎即可,具體內容在后面介紹查詢的時候再詳細說。

Buffer

Buffer 表引擎完全使用內存裝載數據,不支持文件的持久化機制,所以當服務重啟之后,表內的數據會被清空。Buffer 表引擎不是為了面向查詢場景而設計的,它的作用是充當緩沖區的角色。假設有這樣一種場景,我們需要將數據寫入目標 MergeTree 表 A,由於寫入的並發數很高,這可能導致表 A 的合並速度慢於寫入速度,因為每次 INSERT 都會生成一個新的分區目錄。此時便可引入 Buffer 數據表來緩解這類問題,將 Buffer 表作為數據寫入的緩沖區,數據首先會被寫入 Buffer 表,當滿足預設條件時,Buffer 表會自動將數據刷新到目標表。

Buffer 表引擎的聲明方式如下:

ENGINE = Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)

里面參數的作用如下:

  • database:目標表的數據庫
  • table:目標表,Buffer 表內的數據會自動刷新到目標表
  • num_layers:可以理解為線程數,Buffer 表會按照 num_layers 的數量開啟線程,以並行的方式將數據刷新到目標表,官方建議設置為 16

Buffer 表並不是實時刷新數據的,只有在閾值條件滿足時才會刷新,閾值條件由三個最小和最大值組成,含義如下:

  • min_time 和 max_time:時間條件的最小值和最大值,單位為秒,從第一次向表內寫入數據時開始計算
  • min_rows 和 max_rows:數據行數條件的最小值和最大值
  • min_bytes 和 max_bytes:數據大小的最小值和最大值,單位為字節

針對以上條件,Buffer 表刷新數據的判斷依據有三個,滿足其中任意一個就會刷新數據:

  • 三組條件中所有最小閾值都已滿足,則觸發刷新動作
  • 三組條件中有一個最大閾值滿足(這里是超過最大值),則觸發刷新動作
  • 如果寫入一批數據的行數大運 max_rows 或者數據大小大於 max_bytes,則數據直接寫入目標表

還有一點需要注意,上述三組條件在每一個 layer 中都是單獨的計算的,假設 num_layers 為 16,則 Buffer 表最多開啟 16 個線程來響應數據的寫入,它們以輪訓的方式接收請求,在每個線程內會獨立進行上述判斷的過程。也就是說,假設一張 Buffer 表的 max_bytes 為 100 MB,num_layers 為 16,那么這張 Buffer 表能夠同時處理的最大數據量約為 1600 MB。

下面來測試一下它的用法,首先創建一個 Memory 數據表,再創建一張 Buffer 數數據表:

CREATE TABLE memory_table (id UInt64) ENGINE = Memory();
-- 創建 Buffer 表,用於往 Memory 表里面寫入數據
-- 注意這里 Buffer 表的創建語法,后面必須要有 AS,並且這里的 AS 還不是別名的意思
-- 因為創建 Buffer 數據表的時候我們沒有指定字段,那么這張表的結構長什么樣呢?不用想肯定和 Memory 數據表一樣
-- 因為數據就是要往它里面導,所以 AS memory_table 表示將表 memory_table 的結構作為表 buffer_to_memory_table 的結構
-- 當然除了 memory_table 還可以是其它的數據表,不過一般和目標表保持一致,因為數據就是要刷到目標表里面的
CREATE TABLE buffer_to_memory_table AS memory_table
ENGINE = Buffer(default, memory_table, 16, 10, 100, 10000, 1000000, 10000000, 100000000);
-- 接下來向 Buffer 表里面寫入 100 萬行數據
INSERT INTO buffer_to_memory_table SELECT number FROM numbers(1000000);

此時 buffer_to_memory_table 內部有數據,但 memory_table 里面沒有,因為三個條件,沒有一個達到最大閾值(准確的說是超過)。而在 100 秒之后才會有數據,可以驗證一下:

然后我們再寫入一批數據,此時數據量改為 100 萬零 1 條:

可以看到此時不會等待,Buffer 會立即將數據寫入目標表。

日志類型

如果使用的數據量很小(例如 100 萬以下),面對的數據查詢場景也比較簡單,並且是一次寫入多次查詢的模式,那么使用日志家族系列的表引擎將會是一種不錯的選擇。與合並樹家族表引擎類似,日志家族系列的表引擎也有一些共性特征:比如均不支持索引、分區等高級特性,不支持並發讀寫等等。當針對一張日志表寫入數據時,針對這張表的查詢會被阻塞,直至寫入動作結束。但它們同時也擁有切實的物理存儲,數據會被保存到本地文件中,當然除了這些共同的特征職位啊啊,日志家族系列的表引擎也有這各自的特點。接下來就從性能由低到高的順序,一依次介紹這些表引擎的使用方法。

TinyLog

TinyLog 是日志家族中性能最低的表引擎,它的存儲結構由數據文件和元數據兩部分組成。其中數據文件是按列存儲的,也就是說每個字段都有與之對應的 .bin 文件,這種結構和 MergeTree 有些相似,但 TinyLog 既不支持分區,也沒有 .mrk 標記文件。由於沒有標記文件,它自然無法支持 .bin 文件的並行讀取操作,所以它只適合在非常簡單的場景下使用。下面就來創建一張 TinyLog 數據表:

CREATE TABLE tiny_log_table (
    id UInt64,   
    code UInt64
) ENGINE = TinyLog(); 
-- 接着寫入數據
INSERT INTO tiny_log_table SELECT number, number + 1 FROM numbers(100)

數據寫入之后就能通過 SELECT 語句對它進行查詢了,這里就不展示查詢結果了,都能想到是什么,我們來看一下物理存儲:

可以看到 id 和 code 各自生成了對應的 .bin 數據文件,然后還有一個 sizes.json,里面通過 JSON 格式記錄了每個 .bin 文件內對應的數據大小信息。

StripeLog

StripeLog 表引擎的存儲結構由固定的三個文件組成,分別是:

  • data.bin:數據文件,所有的列字段使用同一個文件保存,所有數據均會寫入 data.bin,類似於數據量沒超過閾值的 MergeTree 表
  • index.mrk:數據標記,保存了數據在 data.bin 文件中的位置信息,利用數據標記能夠使用多個線程以並行的方式讀取 data.bin 內的壓縮數據塊,從而提升數據查詢的性能
  • sizes.json:元數據文件,記錄了 data.bin 和 index.mrk 大小的信息

從上述信息能夠得知,相比 TinyLog 而言,StripeLog 擁有更高的查詢性能(因為具有 .mrk 文件,支持並行查詢),同時其使用了更少的文件描述符(所有列都使用同一個文件保存)。下面來創建 StripeLog 數據表:

CREATE TABLE stripe_log_table ( 
    id UInt64,    
    code UInt64
) ENGINE = StripeLog();
-- 然后寫入數據
INSERT INTO stripe_log_table SELECT number, number + 100 FROM numbers(1000)

數據寫入之后即可進行查詢,這里我們還是直接查看一下物理存儲目錄:

里面只有三個文件,其代表的含義顯然無需解釋了。

Log

Log 表引擎結合了 TinyLog 和 StripeLog 兩個表引擎的長處,是日志家族系列中性能最高的表引擎,Log 表引擎的存儲結構由 3 個部分組成:

  • [column].bin:數據文件,數據文件按列獨立存儲,每一個列字段都擁有一個與之對應的 .bin 文件
  • __marks.mrk:數據標記,統一保存了數據在各個 [column].bin 文件中的位置信息,利用數據標記能夠使用多個線程以並行的方式讀取 [column].bin 內的壓縮數據塊,從而提升數據查詢的性能
  • sizes.json:元數據文件,記錄了 [column].bin 和 __marks.mrk 大小的信息

下面創建 Log 數據表:

CREATE TABLE log_table (  
    id UInt64,    
    code UInt64
) ENGINE = Log();
-- 然后寫入數據
INSERT INTO log_table SELECT number, number + 100 FROM numbers(1000)

數據寫入之后即可進行查詢,相信都能看出 TinyLog、StripeLog、Log 之間是高度相似的,我們還是只看一下目錄結構:

以上就是日志類型的表引擎,個人覺得算是最簡單的了,甚至比內存類型的表引擎還要簡單。

接口類型

有這么一類表引擎,它們自身並不存儲任何數據,而是像粘合劑一樣可以整合其它的數據表。在使用這類表引擎的時候,我們不用擔心底層的復雜性,它們就像接口一樣,為用戶提供了統一的訪問界面,所以將它們歸為接口類表引擎。

Merge

假設有這樣一種場景:在數據倉庫的設計中,數據按年分表存儲,例如 test_table_2018、test_table_2019 和 test_table_2020,但是現在需要跨年度查詢這些數據,應該如何實現呢?在這種情形下,使用 Merge 表引擎就是一種很合適的選擇了。

Merge 表引擎就如同一層使用了門面模式的代理,它本身不存儲任何數據,也不支持數據寫入,它的作用就如同它的名字,即負責合並多個查詢結果集。Merge 表引擎可以代理查詢任意數量的數據表,這些查詢會異步且並行執行,並最終合並成一個結果集返回。被代理查詢的數據表被要求處於同一個數據庫內,且擁有相同的表結構,但它們可以使用不同的表引擎以及不同的分區定義(對於 MergeTree 而言)。

Merge 表引擎的聲明方式如下:

ENGINE = Merge(database, table_name)

其中 database 為數據庫的名稱,table_name 為數據表的名稱,它支持使用正式則表達式,比如 ^ 表示合並所有以 test 為前綴的數據表。下面我們來簡單說明一下 Merge 的使用方法:

-- test_table_2018 保存了 2018 年的數據
CREATE TABLE test_table_2018 (
    id String,
    create_time DateTime,
    code String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id;

-- 然后是 test_table_2019,它的結構和 test_table_2018 相同,但使用了不同的表引擎
CREATE TABLE test_table_2019 (
    id String,
    create_time DateTime,
    code String
) ENGINE = Log();

-- 然后創建 Merge 表將上面兩張表結合,這里的 AS 相當於為 merge_test_table_2018_and_2019 指定表結構
-- 顯然這里會復制 test_table_2018 的表結構
CREATE TABLE merge_test_table_2018_and_2019 AS test_table_2018
ENGINE = Merge(currentDatabase(), '^test_table_201')
-- currentDatabase() 表示獲取當前的數據庫

然后我們來寫入一些數據,然后進行查詢:

通過返回的結果集可以印證,所有以 test_table_201 開頭的數據表都被分別查詢,然后合並返回了。值得一提的是,對於 Merge 數據表而言,會有一個虛擬字段 _table,它表示某行數據的來源表。所以通過 _table 我們可以實現表的過濾,比如我們新創建了 test_table_2017 表示 2017 的數據,但當前我們並且不需要 2017 的數據,那么就可以將其作為查詢條件給過濾掉,比如:

SELECT * FROM merge_test_table_2018_and_2019 WHERE _table != 'test_table_2017'

還是蠻方便的,此時 _table 將等同於索引,Merge 表忽略那些被排除在外的表,不會向他們發起查詢請求。

Dictionary

這里涉及到了數據字典,關於數據字典我們會專門放在后面說,目前可以先了解一下。Dictionary 表引擎是數據字典的一層代理封裝,它可以取代字典函數,讓用戶通過數據表查詢字典。字典內的數據被加載后,會全部保存到內存中,所以使用 Dictionary 對字典性能沒有任何影響。聲明 Dictionary 數據表的方式如下:

ENGINE = Dictionary(dict_name)

其中 dict_name 對應一個已被加載的字典名稱,舉個栗子:

CREATE TABLE tb_test_flat_dict ( 
    id UInt64,    
    code String,  
    name String
) ENGINE = Dictionary(test_flat_dict)  

tb_test_flat_dict 等同於數據字典 test_flat_dict 的代理表,對它進行 SELECT 查詢即可獲取內部的數據。

Distributed

在數據庫領域,當面對海量業務數據的時候,一種主流的做法是實施 Sharding 方案,即將一張數據表橫向擴展到多個數據庫實例。其中每個數據庫實例稱為一個 Shard 分片,數據在寫入時,需要按照預定的業務規則均勻地寫至各個 Shard 分片;而在數據查詢時,則需要在每個 Shard 分片上分別查詢,最后歸並結果集。所以為了實現 Sharding 方案,一款支持分布式數據庫的中間件是必不可少的,例如 Apache ShardingSphere。

ClickHouse 作為一款性能卓越的分布式數據庫,自然也是支持 Sharding 方案的,而 Distributed 表引擎就等同於 Sharding 方案中的數據庫中間件。Distributed 表引擎自身不存儲任何數據,它能夠作為分布式表的一層透明代理,在集群內部自動開展數據的寫入分發以及查詢路由工作。關於 Distributed 表引擎的詳細介紹,將會在后續展開。

其它類型

接下來將要介紹的幾款表引擎,由於各自用途迥異,所以只好把它們歸為其它類型。最然這些表引擎的使用場景並不廣泛,但仍建議了解它們的特性和使用方法,因為這些表引擎擴充了 ClickHouse 的能力邊界。在一些特殊的場合,它們也能夠發揮重要作用。

Live View

雖然 ClickHouse 已經提供了准實時的數據處理手段,例如 Kafka 表引擎和物化視圖,但是在應用層面,一直缺乏開放給用戶的事件監聽機制。所以從 19.14 版本開始,ClickHouse 提供了一種全新的視圖:Live View。

Live View 是一種特殊的視圖,雖然它並不屬於表引擎,但是因為它與數據表息息相關,所以還是把 LiveView 歸類到了這里。Live View 的作用類似事件監聽器,它能夠將一條 SQL 查詢結果作為監控目標,當目標數據增加時,LiveView 可以及時發出響應。若要使用 Live View,首先需要將 allow_experimental_live_view 參數設置為 1,可以執行如下語句確認參數是否設置正確:

現在來舉例說明,首先創建一張數據表,它將作為 Live View 的監聽目標:

CREATE TABLE origin_table (
    id UInt64
) ENGINE = Log();
-- 緊接着創建一個 Live View
CREATE LIVE VIEW lv_origin AS SELECT COUNT(*) FROM origin_table;
-- 然后執行 watch 命令開啟監聽模式
-- 以后每當 origin_table 中的數據發生變化,就會執行 SELECT COUNT(*)
WATCH lv_origin;

如此一來 Live View 就進入監聽模式了,首先 origin_table 里面是沒有數據的,所以顯示結果為 0:

然后再開啟一個客戶端,向 origin_table 里面寫入數據,假設寫入 10 條。數據寫入之后會發現 Live View 做出了實時響應,查詢的值變成了 10,並且虛擬字段 _version 會伴隨着每一次的響應增加 1。

Null

Null 表引擎的功能與作用,與 Unix 系統的空設備 /dev/null 很相似,如果用戶向 Null 表寫入數據,系統會正確返回,但數據會被 Null 表自動忽略,永遠不會將它們保存。如果用戶向 Null 表發起查詢,那么它將返回空。在使用物化視圖的時候,如果不希望保留源表的數據,那么將源表設置成 Null 引擎將會是非常好的選擇。下面就來舉個栗子:

-- 首先創建一張 Null 表
CREATE TABLE null_table (id UInt8) ENGINE = Null();
-- 接着以 null_table 為源表,建立一張物化視圖
CREATE MATERIALIZED VIEW view_table
ENGINE = TinyLog() AS SELECT * FROM null_table

如果往 null_table 里面寫數據,那么數據會被順利同步到 view_table 中,但是 null_table 中是查詢不到數據的。

URL

URL 表引擎的作用等價於 HTTP 客戶端,它可以通過 HTTP/HTTPS 協議,直接訪問遠端的 REST 服務。當執行 SELECT 查詢的時候,底層會將其轉換為 GET 請求的遠程調用;而執行 INSERT 查詢的時候,會將其轉成 POST 請求的遠程調用,並將數據以字節流的形式傳遞。URL 表引擎的聲明方式如下所示:

ENGINE = URL('url', format)

其中 url 表示遠端的服務地址,而 format 則是 ClickHouse 支持的數據格式,如 TSV、CSV 和 JSON 等。

這里我們用 Python 的 FastAPI 編寫一個 web 服務:

import re
from fastapi import FastAPI, Request, Response
import orjson
import uvicorn

app = FastAPI()
table = []

@app.get("/girls")
async def get():
    response = Response(orjson.dumps(table),
                        media_type="application/json")
    return response

@app.post("/girls")
async def post(request: Request):
    # 如果插入多行數據,那么這些數據之間會以 \n 進行分割
    data = re.split(rb"(?<=})\n(?={)", await request.body())
    rows = [orjson.loads(_) for _ in data]
    if isinstance(rows, dict):
        table.append(rows)
    else:
        table.extend(rows)
    return True
    
if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=5555)

啟動之后我們來創建表:

CREATE TABLE url_table (
    id UInt32,
    name String,
    place String
) ENGINE = URL('http://localhost:5555/girls', JSONEachRow)

以后每執行一次 SELECT 都相當於發起了一次 GET 請求,執行一次 INSERT 相當於發起了一次 POST 請求,我們來測試一下:

可以看出 ClickHouse 想的真是無比周到,考慮了大量的數據源,以上就是其它表引擎的全部內容。至於 Dictionary 和 Distributed 兩個表引擎我們后面再說,因為涉及了還沒介紹的內容。

到目前為止我們知道了除了 MergeTree 家族表引擎之外還有另外 5 種表引擎,這些表引擎豐富了 ClickHouse 的使用場景,擴充了 ClickHouse 的使用界限。下面再總結一下:

  • 外部存儲類型的表引擎和 Hive 的外部表很類似,它們只負責元數據的管理和數據查詢,自身並不負責數據的生成,數據文件直接由外部系統維護。它們可以直接讀取 HDFS、本地文件、常見關系型數據庫以及 Kafka 的數據。
  • 內存類型的表引擎中的數據是常駐內存的,所以它們擁有堪比 MergeTree 的查詢性能(1 億數據量以內),其中 Set 和 Join 表引擎擁有物理存儲,數據在寫入內存的同時也會被刷到磁盤;而 Memory 和 Buffer 表引擎在服務重啟之后,數據便會被清空。內存類表引擎是一把雙刃劍,在數據大於 1 億的場景下不建議使用內存類型的表引擎。
  • 接口類型的表引擎自身並不存儲任何數據,而是像粘合劑一樣可以整合其它的數據表,其中 Merge 表引擎能夠合並查詢任意兩張表結構相同的數據表;Dictionary 表引擎能夠代理查詢數據字典;而 Distributed 表引擎的作用類似分布式數據庫的分表中間件,能夠幫助用戶簡化數據的分發和路由工作。
  • 其它類型的表引擎用途各不相同,其中 Live View 是一種特殊的視圖,能夠對 SQL 查詢進行實時監聽;Null 表引擎類似於 Linux 系統的空設備 /dev/null,通常和物化視圖一起搭配使用;而 URL 表引擎類似於 HTTP 客戶端,能夠代理調用遠端的 REST 服務。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM