ClickHouse 的副本與分片


楔子

縱使單節點性能再強,也會有遇到瓶頸的那一天,業務量的持續增長、服務器的意外故障,都是 ClickHouse 需要面對的洪水猛獸。但常言道:一個好漢三個幫,一個籬笆三個樁,放在計算機領域就是,一個節點不夠,就多來幾個節點,下面就來介紹一下 ClickHouse 的集群、副本與分片。

概述

集群是副本和分片的基礎,它將 ClickHouse 的服務拓撲由單節點延伸到多個節點,但它並不像 Hadoop 生態的某些系統那樣,要求所有節點組成一個單一的大集群。ClickHouse 的集群配置非常靈活,用戶既可以將所有節點組成一個單一集群,也可以按照業務的訴求把節點划分為多個小的集群。在每個小的集群區域之間,它們的節點、分區和副本數量可以各不相同,如圖所示。

從作用來看,ClickHouse 集群的工作更多是針對邏輯層面的,集群定義了多個節點的拓撲關系,這些節點在后續服務過程中可能會協同工作,而執行層面的具體工作則交給了副本和分片來執行。

集群很好理解,那副本和分片又是什么呢?這對雙胞胎兄弟有時看起來涇渭分明,有時又讓人分辨不清。這里有兩種區分辦法,第一種是從數據層面區分,假設 ClickHouse 的 N 個節點組成了一個集群,在集群的各個節點上都有一張結構相同的數據表 Y,如果 Node1 和 Node2 上的 Y 的數據完全不同,則 Node1 和 Node2 互為分片;如果它們的數據完全相同,則它們互為副本。換言之,分片之間的數據是不同的,而副本之間的數據是完全相同的。所以拋開表引擎不同,但從數據層面來看,副本和分片有時候只有一線之隔。

另一種是從功能作用層面區分,使用副本的主要目的是防止數據丟失,增加數據存儲的冗余;而使用分片的主要目的是實現數據的水平切分,如圖所示:

下面我們就來逐步介紹副本、分片和集群的使用方法,我們當前是只有一個節點,也就是只有 1 個分片,並且數據只有一份,相當於沒有備份,也就是 0 副本。注意:關於副本,不同框架有着不同的定義,比如 HDFS 中的三副本存儲,那么數據總共有三份,如果是 1 副本,那么數據就只有 1 份。但是 ClickHouse 中 1 副本表示數據額外有一份備份,那么言下之意就是有兩份,但是為了表述方便,后續我們仍采用副本 1、副本 2 來稱呼。再比如 ClickHouse 中的 2 副本,那么說明數據有 3 份,但我們會用副本 1、副本 2、副本 3 來稱呼。當然這些東西理解就好,只是為了避免出現歧義,需要提前先說清楚。

而我們接下來會從數據表的初始形態 1 分片、0 副本開始介紹;然后再說如何為它添加副本,從而形成 1 分片、1 副本的狀態;然后接着再說如何引入分片,將其轉化為多分片、1 副本的形態(多副本的形態以此類推)。

這種形態的變化過程像極了企業內的業務發展過程,在業務初期我們會從單張數據表開始;在業務上線一段時間之后,可能會為它增加副本,以保證數據的安全,或者希望進行讀寫分離從而增加並發量;但隨着業務的發展,數據量越來越大,因此會進一步為其增加分片,從而實現數據的水平切分。

數據副本

我們之前介紹 MergeTree 的時候,介紹過它的命名規則,如果在 *MergeTree 的前面加上 Replicated 前綴,則能夠組合成一個新的變種引擎,如圖所示:

換言之,只有使用了 ReplicatedMergeTree 復制表系列引擎,才能應用副本的能力(后面會說另一種副本的實現方式);或者用一種更為直接的方式理解,即使用了 ReplicatedMergeTree 的數據表就是副本。

所以 ReplicatedMergeTree 是 MergeTree 的派生引擎,它在 MergeTree 的基礎之上加入了分布式協同的能力,如圖所示:

在 MergeTree 中,一個數據分區又開始創建到全部完成,會經歷兩個存儲區域。

  • 1. 內存:數據首先會被寫入內存緩沖區
  • 2. 本地磁盤:數據接着會被寫入 tmp 臨時目錄分區,待全部完成之后再將臨時目錄重命名為正式分區

而 ReplicatedMergeTree 在上述基礎之上增加了 ZooKeeper 的部分,它會進一步在 ZooKeeper 內創建一系列的監聽節點,並以此實現多個實例之間的通信。在整個通信過程中,ZooKeeper 並不會涉及表數據的傳輸。

副本的特點

作為數據副本的主要實現載體,ReplicatedMergeTree 在設計上有一些顯著特點。

  • 依賴 ZooKeeper:在執行 INSERT 和 ALTER 查詢的時候,ReplicatedMergeTree 需要借助 ZooKeeper 的分布式協同能力,來實現多個副本之間的同步。但是在查詢副本的時候,並不需要使用 ZooKeeper,關於這方面的信息,后續會詳細介紹。
  • 表級別的副本:副本是在表級別定義的,所以每張表的副本配置都可以按照它的實際需求進行個性化定義,包括副本的數量,以及副本在集群內的分布位置。
  • 多主架構(Multi Master):可以在任意一個副本上執行 INSERT 和 ALTER 查詢,它們的效果是相同的,這些操作會借助 ZooKeeper 的協同能力被分發至每個副本以本地形式執行。
  • Block 數據塊:在執行 INSERT 命令寫入數據時,會依據 max_insert_block_size 的大小(默認 1048576 行)將數據切分成若干個 Block 數據塊。因此 Block 數據塊是數據寫入的基本單元,並且具有寫入的原子性和唯一性。
  • 原子性:在數據寫入時,一個 Block 數據塊內的數據要么全部寫入成功,要么全部寫入失敗。
  • 唯一性:在寫入一個 Block 數據塊的時候,會按照當前 Block 數據塊的數據順序、數據行和數據大小等指標,計算 Hash 信息摘要並記錄。在此之后,如果某個待寫入的 Block 數據塊與先前已被寫入的 Block 數據塊擁有相同的 Hash 摘要(Block 數據塊內數據順序、數據大小和數據行均相同),則該 Block 數據塊會被忽略。這項設置可以預防由異常原因引起的 Block 數據塊重復寫入的問題。

如果光看上面這些文字介紹的話, 可能不夠直觀,那么下面就用示例逐步展開。

ZooKeeper 的配置方式

在正式開始之前,還需要做一些准備工作,那就是安裝並配置 ZooKeeper,因為 ReplicatedMergeTree 必須對接到它才能工作。關於 zk 的安裝,此處不再贅述,使用 3.4.5 以上的版本均可,我這里安裝完成,下面重點介紹如何在 ClickHouse 中增加 ZooKeeper 的配置。

ClickHouse 使用一組 zookeeper 標簽定義相關配置,默認情況下在 config.xml 中配置即可,將配置寫在 config.xml 的 <yandex> 標簽里面。

<zookeeper>   
    <node index="1">  <!-- ZooKeeper 所在節點配置,可以配置多個地址 -->
        <!-- 節點 IP,也是當前 ClickHouse Server 所在節點 -->
        <host>47.94.174.89</host>  
        <!-- ZooKeeper 監聽端口,默認 2181 -->
        <!-- 由於我們后面會搭建副本,並且只用這一個 ZooKeeper,所以要保證 2181 端口對外開放 -->
        <port>2181</port> 
    </node>
</zookeeper>

當然 config.xml 里面也提供了 zookeeper 這個標簽,不過是被注釋掉的,這里我們不管它,直接拷貝進去即可。

配置完之后,我們使用 clickhouse restart 重啟服務。然后 ClickHouse 在它的系統表中貼心地准備了一張名為 zookeeper 的代理表,可以使用 SQL 查詢的方式讀取遠端 ZooKeeper 的數據。

SELECT czxid, mzxid, name, value FROM system.zookeeper WHERE path = '/';

注意:查詢的時候必須指定 path 進行條件篩選。

這里需要先來簡單說一下 ZooKeeper 的數據模型,ZooKeeper 的數據模型和 UNIX 文件系統很類似,整體上可以看做是一棵樹。樹上的每一個節點都稱之為一個 ZNode,可以通過路徑進行唯一標識,因為每個節點都有一個名稱,我們隨便挑兩個節點畫張圖就清晰了。

根節點就是 /,每一個節點下面可以創建多個子節點,並層層遞歸下去,所以就像文件系統一樣。而我們在查找的時候也是如此,從根節點出發,層層組合,因為每一個節點都有自己的名稱,按照順序組合起來即可得到 path。因此我們指定 path 等於 / 即可查到根節點下面的所有節點,而圖中的 name 字段顯示的就是 ZNode 的名稱,注意:只顯示一層,不會遞歸顯示。

並且每個 ZNode 默認能夠存儲 1MB 的數據,而圖中 system.zookeeper 的 value 字段顯示的就是 ZNode 存儲的值,至於 ZNode 的其它屬性可以自己查閱一下,這里就不多說了。另外,由於我這 ZooKeeper 很早就存在了,所以里面包含了很多與 ClickHouse 無關的數據,如果你是新安裝的,那么信息不會像上面這么多。

-- 這里我們查詢 clickhouse 下面所有的 ZNode,那么將條件改成 path = '/clickhouse' 即可
-- 當前只有一個 task_queue,原因是我們還沒有建表,而建表之后,這里就會多出一個名為 tables 的 ZNode
-- 然后 /clickhouse/tables 下面又會存在名為 01、02... 的 ZNode,對應副本 1、副本 2....
SELECT czxid, mzxid, name, value FROM system.zookeeper
WHERE path = '/clickhouse';
/*
┌─czxid─┬─mzxid─┬─name───────┬─value─┐
│  1087 │  1087 │ task_queue │       │
└───────┴───────┴────────────┴───────┘
*/

副本的定義形式

正如前文所言,使用副本的好處甚多。首先,由於增加了數據的存儲冗余,所以降低了數據丟失的風險;其次,由於副本采用了多主架構,所以每個副本實例都可以作為數據讀、寫的入口,這無疑分攤了節點的負載。

在使用副本時,不需要依賴任何集群的配置(關於集群后面說),ReplicatedMergeTree 結合 ZooKeeper 就能完成全部工作。

ReplicatedMergeTree 的定義方式如下:

ENGINE = ReplicatedMergeTree('zk_path', 'replica_name')

在上述配置項中,有 zk_path 和 replica_name 兩項,首先介紹 zk_path 的作用。

zk_path 用於指定在 ZooKeeper 中創建的數據表的路徑,路徑名稱是自定義的,並沒有固定規則,用戶可以設置成自己希望的任何路徑。即便如此,ClickHouse 還是提供了一些約定俗成的配置模板以供參考,例如:

/clickhouse/tables/{shard}/table_name

其中:

  • /clickhouse/tables 是約定俗成的路徑固定前綴,表示存放數據表的根路徑。
  • {shard} 表示分片編號,通常用數值替代,例如 01、02、03,一張數據表可以有多個分片,而每個分片都擁有自己的副本。
  • table_name 表示數據表的名稱,為了方便維護,通常與物理表的名字相同(雖然 ClickHouse 並不要求路徑中的表名稱和物理表名必須一致);而 replica_name 的作用是定義在 ZooKeeper 中創建的副本名稱,該名稱是區分不同副本實例的唯一標識。一種約定俗成的方式是使用所在服務器的域名稱。

對於 zk_path 而言,同一張數據表的同一個分片的不同副本,應該定義相同的路徑;而對於 replica_name 而言,同一張數據表的同一個分片的不同副本,應該定義不同的名稱。讀起來很拗口,我們舉個栗子說明一下。

1 個分片、1 個副本的情形:

-- zk_path 相同,replica_name 不同
ReplicatedMergeTree('/clickhouse/tables/01/test_1', '192.168.0.1')
ReplicatedMergeTree('/clickhouse/tables/01/test_1', '192.168.0.2')

多個分片、1 個副本的情形:

-- 分片1(2 分片、1 副本),zk_path 相同,其中 shard = 01,replica_name 不同
ReplicatedMergeTree('/clickhouse/tables/01/test_1', '192.168.0.1')
ReplicatedMergeTree('/clickhouse/tables/01/test_1', '192.168.0.2')

-- 分片2(2 分片、1 副本),zk_path 相同,其中 shard = 02,replica_name 不同
ReplicatedMergeTree('/clickhouse/tables/02/test_1', '192.168.0.3')
ReplicatedMergeTree('/clickhouse/tables/02/test_1', '192.168.0.4')

首先是 zk_path,無論一張表有多少個分片、多少個副本,它們終歸屬於同一張表,所以 zk_path 的最后一部分、也就是表名稱是不變的,這里始終是 test_1。但問題是多個分片之間要如何區分呢?所以此時就依賴於 {shard},/clickhouse/tables/01/test_1 表示 test_1 的第 1 個分片,/clickhouse/tables/02/test_1 表示 test_1 的第 2 個分片,第 3、4、5..... 個分片依次類推,至於其它的表也是同理。

而一個分片不管有多少個副本,這些副本終歸都屬於同一個分片、同一張表。所以同一張數據表的同一個分片的不同副本,應該定義相同的路徑;例如表 test_2 的每個分片都有 3 個副本,那么以第 8 個分片為例,它的所有副本的 zk_path 就都應該配成:

/clickhouse/tables/08/test_2

然后是 replica_name,這個就比較簡單了,因為要區分同一個分區內多個副本,顯然它們要有不同的名稱。所以上面讀起來很拗口的第二個句話就解釋完了,對於 replica_name 而言,同一張數據表的同一個分片的不同副本,應該定義不同的名稱,這里我直接使用 IP 地址替代了。

ReplicatedMergeTree 原理解析

正如我們之前分析 MergeTree 一樣,ReplicatedMergeTree 作為復制表系列的基礎表引擎,涵蓋了數據副本最為核心的邏輯,我們很明顯要拿它入手。只要明白了 ReplicatedMergeTree 的核心原理,就能掌握整個 ReplicatedMergeTree 系列表引擎的使用方法。

數據結構

在 ReplicatedMergeTree 的核心邏輯中,大量運用了 ZooKeeper 的能力,以實現多個 ReplicatedMergeTree 副本實例之間的協同,包括主副本選舉、副本狀態感知、操作日志分發、任務隊列和 BlockID 去重判斷等等。在執行 INSERT 數據寫入、MERGE 分區和 MUTATION 操作的時候,都會涉及與 ZooKeeper 的通信。但是在通信的過程中,並不會涉及任何表數據的傳輸,在查詢數據的時候也不會訪問 ZooKeeper,所以無需擔心 ZooKeeper 的承載壓力。

因為 ZooKeeper 對 ReplicatedMergeTree 非常重要,所以下面首先從它的數據結構開始介紹。

ZooKeeper 內的節點結構

ReplicatedMergeTree 需要依靠 ZooKeeper 的時間監聽機制以實現各個副本之間的協調,所以在每張 ReplicatedMergeTree 表的創建過程中,它會以 zk_path 為根路徑,在 ZooKeeper 中為這張表創建一組監聽節點。而按照作用的不同,這些監聽節點可以分成如下幾類(先有個印象,后面會通過實際案例來體現):

1)元數據:

  • {zk_path}/metadata:保存元數據信息,包括主鍵、分區鍵、采樣表達式等
  • {zk_path}/columns:保存列字段信息,包括列名稱和數據類型
  • {zk_path}/replicas:保存副本名稱,對應設置參數中的 replica_name

2)判斷標識:

  • {zk_path}/leader_election:用於主副本的選舉工作,主副本會主導 MERGE 和 MUTATION 操作(ALTER DELETE 和 ALTER UPDATE,類似關系型數據庫中的 DELETE 和 UPDATE),這些任務在主副本完成之后再借助 ZooKeeper 將消息事件分發至其它副本
  • {zk_path}/blocks:記錄 Block 數據塊的 Hash 信息摘要,以及對應的 partition_id,通過 Hash 信息摘要能夠判斷 Block 塊是否重復;通過 partition_id,則能夠找到需要同步的數據分區
  • {zk_path}/block_numbers:按照分區的寫入順序,以相同的順序記錄 partition_id,各個副本在本地進行 MERGE 時,都會依照相同的 block_numbers 順序進行
  • {zk_path}/quorum:記錄 quorum 的數量,當至少有 quorum 數量的副本寫入成功后,整個寫入才算成功。quorum 的數量由 insert_quorum 參數控制,默認值為 0

3)操作日志:

  • {zk_path}/log:常規操作日志節點(INSERT、MERGE 和 DROP PARTITON),它是整個工作機制中最為重要的一環,保存了副本需要執行的任務指令。log 使用了 ZooKeeper 的持久順序型節點,每條指令的名稱以 log- 為前綴遞增,例如 log-0000000000、log-0000000001 等。每一個副本實例都會監聽 /log 節點,當有新的指令加入時,它們會把指令加入副本各自的任務隊列,並執行任務。關於這方面的邏輯,后續詳細介紹
  • {zk_path}/mutations:MUTATION 操作日志節點,作用與 log 日志類似,當執行 ALTER DELETE 和 ALTER UPDATE 查詢時,操作指令會被添加到這個點。mutations 同樣使用了 ZooKeeper 的持久順序節點,但是它的命名沒有前綴,每條指令直接以遞增數字的形式保存,例如 0000000000、0000000001 等。關於這方面的邏輯,同樣后續展開
  • {zk_path}/replicas/{replica_name}/*:每個副本各自的節點下的一組監聽節點,用於指導副本在本地執行具體的任務指令,其中較為重要的節點有如下幾個:
    • {zk_path}/replicas/{replica_name}/queue:任務隊列節點,用於執行具體的操作任務。當副本從 /log 或 /mutations 節點監聽到操作指令時,會將執行任務添加至該節點下,並基於隊列執行
    • {zk_path}/replicas/{replica_name}/log_pointer:log 日志指針節點,記錄了最后一次執行的 log 日志下標信息,例如 log_poniter:4 對應了 /log/log-0000000003(從 0 開始計數)
    • {zk_path}/replicas/{replica_name}/mutation_pointer:mutations 日志指針節點,記錄了最后一次執行的 mutations 日志名稱,例如 mutation_pointer:0000000000 對應了 {zk_path}/mutations/0000000000

Entry 日志對象的數據結構

從上面的介紹可知,ReplicatedMergeTree 在 ZooKeeper 中有兩組非常重要的父節點,那就是 /log 和 /mutations,為了簡便,接下來介紹路徑時候就不寫 {zk_path} 了。它們的作用猶如一座通信塔,是分發操作指令的信息通道,而分發操作之靈的方式,則是為這些父節點添加子節點。所有的副本實例,都會監聽父節點的變化,當有子節點被添加時,它們能夠實時感知。

當然這些被添加的子節點在 ZooKeeper 中就是相應的 ZNode,而 ZNode 的存儲的值在 ClickHouse 中統一被抽象為 Entry 對象,而具體實現則由 LogEntry 和 MutationEntry 對象承載,分別對應 /log 和 /mutations 的子節點的 value。

1)LogEntry:

LogEntry 用於封裝 /log 的子節點信息,大白話解釋的話,其實 /log 下面的 ZNode 存儲的 value 就是 LogEntry,它擁有如下幾個核心屬性。

  • source replica:發送這條 Log 指令的副本來源,對應 replica_name
  • block_id:當前分區的 BlockID,對應 /blocks 路徑下子節點的名稱
  • 操作指令類型,主要有 get、merge 和 mutate 三種,分別對應從遠程副本下載分區、合並分區以及 MUTATION 操作
  • 當前分區目錄的名稱

2)MutationEntry:

MutationEntry 用於封裝 /mutations 的子節點信息,它同樣擁有如下幾個核心屬性。

  • source replica:發送這條 MUTATION 指令的副本來源,對應replica_name
  • commands:操作指令,主要有 ALTER DELETE 和 ALTER UPDATE
  • mutation_id:MUTATION 操作的版本號
  • partition_id:當前分區目錄的 ID

以上就是 Entry 日志對象的數據結構信息,在接下來將要介紹的核心流程中,會看到它們的身影。

分區副本協同的核心流程

副本協同的核心流程主要有 INSERT、MERGE、MUTATION 和 ALTER 四種,分別對應了數據寫入、分區合並、數據修改和元數據修改。INSERT 和 ALTER 查詢是分布式執行的,借助 ZooKeeper 的事件通知機制,多個副本之間會自動進行有效協同,但是它們不會使用 ZooKeeper 存儲任何分區數據。至於其他查詢並不支持分布式執行,包括 SELECT、CREATE、DROP、RENAME 和 ATTACH。例如,為了創建多個副本,我們需要分別登錄每個 ClickHouse 節點,在它們本地執行各自的 CREATE 語句(后面將會介紹如何利用集器配置簡化這一操作)。接下來,會依次介紹上述流程的工作機理。為了便於理解,我們先來整體認識一下各個流程的介紹方法。

首先,擬定一個演示場景,即使用 ReplicatedMergeTree 實現一張擁有 1 分片、1 副本的數據表,並以此來貫穿整個講解過程(對於大於 1 個副本的場景,流程以此類推)。

接着,通過對 ReplicatedMergeTree 分別執行 INSERT、MERGE、MUTATION 和 ALTER 操作,以此來講解相應的工作原理。與此同時,通過實際案例,論證工作原理。

首先到這里我們只有一個節點就有些捉襟見肘了,因為我們要實現 1 分片、1 副本,那么至少要有兩個節點。而在我當前阿里雲上有三台服務器,相關信息如下:

  • 47.94.174.89,主機名為 satori,2 核心 8GB 內存
  • 47.93.39.238,主機名為 matsuri,2 核心 4GB 內存
  • 47.93.235.147,主機名為 aqua,2 核心 4GB 內存

主機 satori 就是我們當前一直使用的節點,然后再加上 matsuri 節點來實現我們的 1 分片、1 副本。

INSERT 的核心執行流程

當需要在 ReplicatedMergeTree 中執行 INSERT 查詢以寫入數據時,即會進入 INSERT 核心流程。而整個流程按照時間從上往下順序進行,我們大致將其分了 8 個步驟,那么下面就依次講解每一個過程。

 

1)創建第一個副本實例

使用副本之前,我們需要修改一下配置文件,我們的目的是為了讓 satori 節點和 matsuri 節點組合形成 1 分片 1 副本策略,所以需要修改配置文件 config.xml。首先對於 1 分片多副本而言,我們不需要配置集群,只需要配置好 ZooKeeper 即可。當然 1 分片多副本也是可以配置集群的,只不過該集群只有一個分片罷了,但我們說不配置也能實現 1 分片,所以這里我們就采用不配置集群的方式實現;而關於集群的配置,我們在介紹多分片的時候再說,因為如果是多分片,則必須要配置集群,具體內容后面說。下面來看看需要修改哪些配置:

<!-- 配置 ZooKeeper,我們之前就配置過了,可以設置多個 ZooKeeper,這里我們設置單個就行 -->
<zookeeper>   
    <node index="1">  
        <host>47.94.174.89</host>  
        <port>2181</port>
    </node>
</zookeeper>

<!-- 開放給外界訪問,我們在最開始的時候也改過了 -->
<listen_host>0.0.0.0</listen_host>

<!-- 節點之間進行通信的 IP,需要進行設置,讓節點之間可以互相訪問,否則副本無法同步 -->
<!-- 注意:我當前用的阿里雲服務器不在同一個內網中,所以指定的是公網 IP
     但如果你的多個阿里服務器雲在同一個內網,那么建議指定內網 IP,節點之間通信會比使用公網 IP 快很多 -->
<interserver_http_host>47.94.174.89</interserver_http_host>
<!-- 節點之間進行通信的端口,默認 9009,當然端口無所謂,只要保證彼此之間是開放的就行 -->
<interserver_http_port>9009</interserver_http_port>

修改完配置之后重啟 ClickHouse,然后從 satori 節點(47.94.174.89)開始,執行下面的語句,從而創建第一個副本實例:

CREATE TABLE replicated_sales_1 (
    id String,
    price Float64,
    create_time DateTime
) 
-- 這里 replica_name 我們就以 "主機名_replica" 的形式命名
ENGINE=ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1', 'satori_replica') 
PARTITION BY toYYYYMM(create_time)
ORDER BY id;

在創建的過程中,ReplicatedMergeTree 會進行一些初始化動作,例如:

  • 根據 zk_path 初始化所有的 ZooKeeper 節點
  • 在 /replicas/ 節點下注冊自己的副本實例 satori_replica
  • 啟動監聽任務,監聽 /log 日志節點
  • 參與副本選舉,選舉出主副本,選舉的方式是向 /leader_election 插入子節點,第一個插入成功的副本就是主副本

此時在 satori 節點上,該表(副本)就創建完畢了。

 

2)創建第二個副本實例

接着在 matsuri 節點上創建第二個副本,當然要先安裝 ClickHouse,這里我已經安裝完畢了。然后我們也要修改配置文件。

<!-- 配置 ZooKeeper,ZooKeeper 在 satori 節點上,指定相關 IP 和端口 -->
<zookeeper>   
    <node index="1">  
        <host>47.94.174.89</host>  
        <port>2181</port>
    </node>
</zookeeper>

<!-- 同理,開放給外界 -->
<listen_host>0.0.0.0</listen_host>

<!-- 節點之間進行通信的 IP,這里改成 matsuri 節點的 IP -->
<interserver_http_host>47.93.39.238</interserver_http_host>
<interserver_http_port>9009</interserver_http_port>

修改完配置文件之后,啟動 ClickHouse,如果已經啟動,那么就重啟 ClickHouse 。然后創建第二個副本,表結構和 zk_path 和第一個副本相同,而 replica_name 則設置成 matsuri 節點的 IP。

CREATE TABLE replicated_sales_1 (
    id String,
    price Float64,
    create_time DateTime
) 
-- 除了 replica_name,其它不變
ENGINE=ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1', 'matsuri_replica') 
PARTITION BY toYYYYMM(create_time)
ORDER BY id;

在創建的過程中,第二個 ReplicatedMergeTree 同樣會進行一些初始化動作,例如:

  • 在 /replicas/ 節點下注冊自己的副本實例 matsuri_replica
  • 啟動監聽任務,監聽 /log 日志節點
  • 參與副本選舉,選舉出主副本,顯然當前的主副本是 satori_replica 副本

兩個副本全部創建完了,接下來該干啥了,沒錯,寫入數據。

 

3)向第一個副本實例寫入數據

現在嘗試向第一個副本 satori_replica 寫入數據,執行如下命令:

INSERT INTO replicated_sales_1
VALUES ('A001', 100, '2019-05-10 00:00:00');

在執行上述命令后,首先會在本地完成分區目錄的寫入:

Renaming temporary part tmp_insert_201905_1_1_0 to 201905_0_0_0

接着向 /blocks 節點寫入該數據分區的 block_id:

Wrote block with ID '201905_11789774603085290207_1081647372304402844'

可能有人覺得這個 ID 是從哪里來的,答案是從 ZooKeeper 上面查的:

因為目前只有一個分區,所以 /blocks 節點下面只有一個子節點(ZNode),ZNode 的名字就是 block_id,存儲的值就是分區目錄名。當然這個 block_id 的計算過程我們就不演示了,它不是重點,只要知道它是作為后續去重操作的判斷依據即可。比如我們此時再執行一次剛才的 SQL 語句,試圖寫入重復數據,然后查詢時會發現沒有任何效果。

所以副本會自動忽略 block_id 重復的待寫入數據。

此外,如果設置了 insert_quorum 參數(默認值為 0)並且值大於等於 2,那么 satori_replica 副本會進一步監控已完成寫入操作的副本個數,只有當寫入副本個數大於等於 insert_quorum 時,整個寫入操作才算成功。

 

4)由第一個副本實例推送 Log 日志

在第 3 個步驟完成之后,會繼續由執行了 INSERT 的副本向 /log 節點推送操作日志,在這個栗子中,會由第一個副本 satori_replica 擔此重任。

顯然里面只有一個 ZNode,其 name 就是日志的編號:/log/log-0000000000,而 value 就是對應的 LogEntry。但這里我們沒有打印,原因是里面有特殊的換行,導致打印的時候看起來非常的丑,至於內容如下所示:

format version: 4
create_time: 2021-09-17 15:07:47
source replica: satori_replica
block_id: 201905_11789774603085290207_1081647372304402844
get
201905_0_0_0
part_type: Compact

信息不難理解,里面的 get 表示操作類型,這里是下載。而需要下載的分區是 201905_0_0_0,其余所有副本都會基於 Log 日志以相同的順序執行命令。

 

5)第二個副本實例拉取 Log 日志

matsuri_replica 副本會一直監聽 /log 節點變化,當 satori_replica 副本推送了 /log/log-0000000000 之后,matsuri_replica 副本便會觸發日志的拉取任務並更新 log_pointer,將其指向最新日志下標:

/replicas/matsuri_replica/log_pointer: 1

-- /replicas/matsuri_replica 下面的 ZNode 有很多,log_pointer 只是其中一個,所以這里再通過 name 過濾一下
-- 注意:我們查看的是 log_pointer,所以不要將 path 指定為 /replicas/matsuri_replica/log_pointer
-- 上面的做法不對的,因為這等於查看 log_pointer 下面所有的 ZNode,而不是 log_pointer 這個 ZNode
SELECT name, value FROM system.zookeeper
WHERE path = '/clickhouse/tables/01/replicated_sales_1/replicas/matsuri_replica'
AND name = 'log_pointer'

每拉取一次日志,log_pointer 對應的 value 就會自增 1,初始值是 0。

當 matsuri_replica 副本拉取了 satori_replica 副本推送的日志(這里是 /log/log-0000000000),便會根據其內容(LogEntry)進行執行。只不過這個動作並不是馬上就發生的,而是會將其轉成任務對象放在隊列中:

/replicas/matsuri_replica/queue

這里放入隊列是讓其成為 /replicas/matsuri_replica/queue 的子節點(ZNode),這么做的原因是在復雜的情況下,考慮到在同一個時間段內可能會連續收到許多個 LogEntry,所以通過隊列的方式消化任務是一種更為合理的設計。注意:拉取的 LogEntry 是一個區間,這同樣是因為可能會連續收到多個 LogEntry。不過當前只有一個 LogEntry,所以區間的開頭和結尾是同一個 LogEntry。

Pulling 1 entries to queue: log-0000000000 - log-0000000000

但如果我們此時查看 /replicas/matsuri_replica/queue 的話,會發現拿不到任何的信息,這是因為我們往 satori_replica 副本寫入數據的之后,很快就同步到 matsuri_replica 副本當中了,所以此時隊列中的任務已經被消費掉了,因此就什么也看不到了。我們可以查詢 matsuri_replica 副本,看看數據是否真的同步過來了。

可以看到數據已經在 matsuri_replica 節點上面了,所以創建數據表需要每個副本都要創建,但是插入數據只需要往一個副本中插入即可,剩余的副本會自動同步。不過數據雖然已經同步過來了,但我們的整個流程還沒有說完。當把 LogEntry 轉成任務對象放到 queue 之后,該做什么了呢?不用想,肯定是從 queue 取出依次執行。

 

6)第二個副本實例向其它副本發起下載請求

matsuri_replica 基於 queue 隊列發起下載任務,依次取出任務對象(封裝 LogEntry)進行執行,當看到類型為 get 的時候,ReplicatedMergeTree 就會明白此時在遠端的其它副本中已經成功寫入了數據分區,而自己需要同步這些數據。

format version: 4
create_time: 2021-09-17 15:07:47
source replica: satori_replica
block_id: 201905_11789774603085290207_1081647372304402844
get
201905_0_0_0
part_type: Compact

因此 matsuri_replica 副本會開始選擇遠端的某一個副本作為下載來源,那么要選擇哪一個呢?算法如下:

  • 1. 從 /replicas 中拿到所有的副本節點
  • 2. 遍歷這些副本,選取其中一個,選取的副本需要擁有最大的 log_pointer,因為 log_pointer 越大,執行的日志越多,數據也就越完整。以此同時還要 queue 下的 ZNode 少的,因為 ZNode 越少,說明任務執行負擔越小

當前遠端只有 satori_replica 一個副本實例,所以會從它這里下載,於是 matsuri_replica 向 satori_replica 發起了 HTTP 請求,希望下載分區 201905_0_0_0。

Fetching part 201905_0_0_0 from replicas/satori_replica
Sending request to http://47.94.174.89:9009/?endpoint=DataPartsExchange

如果第一次下載失敗,在默認情況下 matsuri_replica 會再嘗試 4 次,一共會嘗試 5 次。具體嘗試次數由 max_fetch_partition_retries_count 參數控制,默認為 5。

 

7)第一個副本實例向響應數據下載

satori_replica 的 DataPartsExchange 端口服務接收到調用請求,在得知對方來意之后,根據參數做出響應,再基於 DataPartsExchange 將本地分區 201905_0_0_0 響應給 matsuri_replica。

Sending part 201905_0_0_0

 

8)第二個副本實例下載數據並完成本地寫入

matsuri_replica 在接收到 satori_replica 的分區數據后,首先將其寫至臨時目錄:

tmp_fetch_201905_0_0_0

待全部數據接收完成之后,重命名該目錄:

Renaming temporary part tmp_fetch_201905_0_0_0 to 201905_0_0_0

至此整個寫入流程結束。

可以看到從頭到尾,在整個 INSERT 寫入的過程中,ZooKeeper 沒有進行任何表數據的傳輸,它只是起着一個分布式協調的作用。客戶端往一個副本里面寫入分區數據,然后該副本會往 ZooKeeper 里面推送相關日志,其它副本再拉取日志,根據日志內容(LogEntry)從該副本這里下載數據並寫入各自的本地文件系統中。

另外我們說如果設置了 insert_quorum 並且 insert_quorum >= 2,則還會由該副本監控完成寫入的副本數量。總之核心就是數據寫在哪一個副本,哪一個副本就向 ZooKeeper 里面推日志,然后其它副本拉日志,接着選擇一個最合適的遠端副本,進行分區數據的點對點下載。

 

流程圖總結:

下面再用一張圖總結一下上面的 8 個步驟:

MERGE 的核心執行流程

當前我們只寫入了一條數據,如果往 201905 這個分區中再寫入一條數據呢,顯然會創建一個新的分區目錄,然后后台線程會在一個合適的時機進行分區目錄的合並。這里我們就還往 satori_replica 中寫,然后在 matsuri_replica 中讀,順便再次驗證副本之間是否會正常同步。

之前我們說,每拉取一次日志,log_pointer 就會自增 1,既然這里發生了數據同步,那么肯定涉及日志的拉取。那么按照分析,matsuri_replica 的 log_pointer 應該會變成 2,因為之前是 1,這里又拉取了一次。

插入數據我們算是已經明白了,那接下來就是合並數據了,也就是 ReplicatedMergeTree 的分區合並動作:MERGE。

其實無論 MERGE 操作從哪個副本發起,最終合並計划都會由主副本決定。在之前的例子中,satori_replica 已經競選為主副本,所以為了論證,我們就從 matsuri_replica 開始。整個流程還是按照時間從上往下順序進行,我們大致將其分了 5 個步驟,那么下面就依次講解每一個過程。

 

1)創建遠程連接,嘗試與主副本通信

首先在 matsuri_replica 副本所在節點執行 OPTIMIZE,強制觸發分區合並,這個時候 matsuri_replica 會通過 /replicas 找到 satori_replica,並嘗試建立和它的遠程連接。

optimize table replicated_sales_1
Connection (47.94.174.89:9000) : Connetion. Database: default. User: default

 

2)主副本接收通信

主副本接收並建立來自遠端副本的連接。

 

3)由主副本制定 MERGE 計划並推送 Log 日志

由主副本 satori_replica 制定 MERGE 計划,並判斷哪些分區需要合並,在選定之后 satori_replica 將合並計划轉化為 Log 日志對象並推送,以通知所有副本開始合並。那么信息都有哪些呢,直接通過 ZooKeeper 查看即可。

這里我們使用 DataGrip 查詢,命令行查詢的話,輸出的內容不好看。我們注意到 /log 下面有三個 ZNode,log-0000000000 是第一次往 satori_replica 寫入數據所產生的日志,log-0000000001 是第二次往 satori_replica 寫入數據所產生的日志,而 log-0000000002 顯然就是 MEREG 操作產生的日志。

根據內容中的 merge 我們可以得知這是一個合並操作,將 201905_0_0_0 和 201905_1_1_1 合並成 201905_0_1_1。並且通過 source replica 我們也能得知,合並操作是從 matsuri_replica 副本發出的。與此同時,主副本還會鎖住執行線程,對日志的接收情況進行監聽。其監聽行為由 replication_alter_partitions_sync 參數控制,默認值為 1。

  • 如果 replication_alter_partitions_sync 設置為 0,那么不做任何等待
  • 如果 replication_alter_partitions_sync 設置為 1,只等待主副本自身完成
  • 如果 replication_alter_partitions_sync 設置為 2,會等待所有副本拉取完成

 

4)各個副本分別拉取 Log 日志

主副本將 MERGE 計划制定好之后會推到 ZooKeeper 中,然后所有副本會進行拉取並推送到任務隊列 /queue 中,也就是成為它的一個 ZNode。

Pulling 1 entries to queue: log-0000000002 - log-0000000002

 

5)各個副本分別在本地執行 MERGE

satori_replica 和 matsuri_replica 基於各自的 /queue 隊列執行任務:

Executing log entry to merge parts 201905_0_0_0, 201905_1_1_0 to 201905_0_1_1

各個副本開始在本地執行 MERGE:

Merged 2 parts: from 201905_0_0_0 to 201905_1_1_0

到此,整個合並流程結束。可以看到和插入數據一樣,在 MERGE 的過程中,ZooKeeper 也不會涉及任何表數據的傳輸,所有的合並操作都是由各個副本在本地完成的。並且無論合並動作在哪個副本被觸發,首先都會被轉交給主副本,再由主副本負責合並計划的制定、消息日志的推送以及日志接收情況的監控。

 

流程圖總結:

同樣可以用一張流程圖總結一下上面 5 個步驟:

MUTATION 的核心流程

對 ReplicatedMergeTree 數據表執行 ALTER DELETE 或 ALTER UPDATE 操作的時候,會進入 MUTATION 部分的邏輯。和 MERGE 類似,無論 MUTATION 操作從哪個副本發起,都會由主副本進行響應,所以為了方便論證,我們還是從不是主副本的 matsuri_replica 副本開始。整個流程按照時間從上往下順序進行,大致分為 5 個步驟,下面依次介紹。

 

1)推送 Mutation 日志

在 matsuri_replica 通過 ALTER DELETE 來刪除數據(ALTER UPDATE 與之同理),執行如下命令:

ALTER TABLE replicated_sales_1 DELETE WHERE id = '1';

執行之后,該副本會進行兩個重要動作。

  • 創建 MUTATION ID

created mutation with ID 0000000000

  • 將 MUTATION 操作轉換為 Mutation 日志,並推送到 /mutations/0000000000,也就是在 /mutations 下面新建一個名為 0000000000 的 ZNode。其中 0000000000 就是 Mutation 日志的 name,而對應的值、也就是 value 被稱為 MutationEntry,這里和 Log 日志類似。MutationEntry 內容如下:

由此也能知曉,MUTATION 的操作日志是經由 /mutations 節點分發至各個副本中的。

 

2)所有副本實例各自監聽 Mutation 日志

所有副本都會監聽 /mutations 節點,一旦有新的日志子節點加入,它們都能實時感知。

Loading 1 mutation entries: 0000000000 - 0000000000

當監聽到有新的 Mutation 日志加入時,並不是所有的副本都會立即響應,它們會先判斷自己是不是主副本。

 

3)由主副本實例響應 Mutation 日志並推送 Log 日志

只有主副本才會響應 Mutation 日志,在這個栗子中 satori_replica 為主副本,所以 satori_replica 將 Mutation 日志轉換為 Log 日志並推送至 /log 節點,已通知各個副本執行的具體操作。之前 /log 下面最后一個 ZNode 是 log-0000000002,所以接下來會寫入一個 log-0000000003。

從日志內容可以看出上述的操作類型為 mutate,而這次需要將 201905_0_1_1 分區修改成 201905_0_1_1_2,修改規則:"201905_0_1_1" + "_" + mutation_id。

 

4)各個副本實例分別拉取 Log 日志

satori_replica 副本和 matsuri_replica 副本分別監聽 /log/log-0000000003 日志的推送,它們也會分別拉取日志到本地,通推送到各自的 /queue 任務隊列。

Pulling 1 entries to queue: /log/log-0000000003 - /log/log-0000000003

 

5)各個副本分別在本地執行 MUTATION

satori_replica 副本和 matsuri_replica 副本基於各自的 /queue 隊列開始執行任務:

Executing log entry to mutate part 201905_0_1_1 to 201905_0_1_1_2

各個副本開始在本地執行 MUTATION 操作:

Cloning part 201905_0_1_1 to tmp_clone_201905_0_1_1_2
Renaming temporary part tmp_clone_201905_0_1_1_2 to 201905_0_1_1_2

至此,整個 MUTATION 流程結束。

可以看到在 MUTATION 的整個過程中,ZooKeeper 同樣不會進行任何實質性的數據傳輸,所有的 MUTATION 操作最終都是由各個副本在本地完成的。而 MUTATION 操作是經過 /mutations 節點實現分發的,本着誰執行誰負責的原則,當前是由 matsuri_replica 負責了消息的推送,在 /mutations 下面新建了 /mutations/0000000000。但無論 MUTATION 操作由哪個副本觸發,最終都會轉交給主副本,再由主副本負責推送到 Log 日志中,以通知各個副本執行最終的 MUTATION 邏輯,同時也由主副本對日志接收的情況進行監控。

 

流程圖總結:

同樣可以用一張流程圖總結一下上面 5 個步驟:

ALTER 的核心執行流程

對 ReplicatedMergeTree 執行 ALTER 操作進行元數據修改的時候,會進入 ALTER 部分的邏輯,例如增加、刪除表字段等等。但與之前的幾個流程相比,ALTER 的邏輯要顯得簡單很多,因為整個過程不涉及 /log 日志的分發。整個流程按照時間從上往下順序進行,大致可以分為 3 個步驟,下面依次介紹。

 

1)修改共享元數據

在 matsuri_replica 副本上修改,增加一個字段。

-- 增加一個字段 product,表示商品的名字,並且將該字段排在 id 后面
ALTER TABLE replicated_sales_1 ADD COLUMN product String AFTER id;

執行之后,matsuri_replica 副本會修改 ZooKeeper 內的共享元數據節點 /metadata、/columns:

Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.

數據修改后,節點的版本號也會同時提升:

Version of metadata nodes in ZooKeeper changed. Waiting for structure write lock.

與此同時,matsuri_replica 還會負責監聽所有副本的修改完成情況:

Waiting for satori_replica to apply changes
Waiting for matsuri_replica to apply changes

 

2)監聽共享元數據變更並各自執行本地修改

satori_replica 和 matsuri_replica 各自監聽共享元數據的變更,之后它們會分別對本地的元數據版本號和共享版本號進行對比。在當前這個案例中,它們會發現本地版本號低於共享版本號,於是開始在各自本地執行更新操作:

Metadata changed in ZooKeeper. Applying changes locally.
Applied changes to the metadata of the table.

 

3)確認所有副本完成修改

matsuri_replica 會確認所有副本均已完成修改:

ALTER finished
Done processing query

至此整個 ALTER 流程結束,可以看出該過程 ZooKeeper 同樣不會涉及實質性的數據傳輸,所有的 ALTER 均使用各個副本在本地所完成的。本着誰執行誰負責的原則,在這個案例中由 matsuri_replica 負責對共享元數據進行修改以及對各個副本的修改進度進行監控。

具體執行案例如下,我們在 matsuri_replica 副本上增加一個 product 字段,然后在 satori_replica 副本上查詢到了新增加的字段,證明確實進行了同步。

至於 matsuri_replica 副本我們就不用看了,它里面肯定也是會多出一個 product 字段的。

 

流程圖總結:

同樣可以用一張流程圖總結一下上面 3 個步驟:

數據分片

通過引入數據副本,雖然能夠有效降低數據丟失的風險(多份存儲),並提升查詢的性能(分攤查詢、讀寫分離),但是仍然有一個問題沒有解決,那就是數據表的容量問題。因為到目前為止,每個副本自身仍然保存了數據表的全量數據,所以在業務量十分龐大的場景中,依靠副本並不能解決單表的性能瓶頸。想要從根本上解決這類問題,需要借助另外一種手段,即進一步將數據水平切分,也就是我們將要介紹的數據分片。

ClickHouse 中的每個服務節點都可以稱為一個 shard(分片),從理論上來講,假設有 N 張數據表 T,分布在 N 個 ClickHouse 服務節點,而這些數據表之前沒有重復數據,那么就可以說數據表 T 有 N 個分片。但是在工程實踐中,如果只有這些分片表,那么整個 Sharding 方案基本是不可用的。因為對於一個完整的方案來說,還需要考慮數據在寫入時,如何被均勻地寫入至各個 shard;以及數據在查詢時,如何路由到每個 shard,並組合形成結果集。所以 ClickHouse 的數據分片需要結合 Distributed 表引擎一同使用。

我們之前說過 Distributed 表引擎,Distributed 數據表本身不存儲任何數據,它只是作為分布式表的一層透明代理,在集群內部自動開展數據的寫入、分發、查詢、路由等工作。

集群的配置方式

在 ClickHouse 中,集群配置用 shard 代表分片、replica 代表副本,那么邏輯層面,表示 1 分片 0 副本語義的配置就是:

<shard>
    <replica>
        <host></host>
        <port></port>
    </replica>
</shard>

而表示 1 分片、1 副本的語義則是:

<shard>
    <replica>
        <host></host>
        <port></port>
    </replica>
    
    <replica>
        <host></host>
        <port></port>
    </replica>
</shard>

而表示 1 分片、2 副本的語義則是:

<shard>
    <replica>
        <host></host>
        <port></port>
    </replica>
    
    <replica>
        <host></host>
        <port></port>
    </replica>
    
    <replica>
        <host></host>
        <port></port>
    </replica>
</shard>

而表示 2 分片、0 副本的語義則是:

<shard>
    <replica>
        <host></host>
        <port></port>
    </replica>
</shard>

<shard>
    <replica>
        <host></host>
        <port></port>
    </replica>
</shard>

可以看到這種配置再次體現了,shard 只是邏輯層面的分組,最終的承載都是由副本來實現。

相信分片和副本的概念還是很容易區分的,你就可以理解為分片是把多個副本進行分組,每一組就是一個分片,一個分片下面可以有任意個副本。因此還是那句話,副本才是用來實際承載數據的,而分片只是一個起到一個邏輯組織的作用。同一分片下面的所有副本存放的數據相同,實現高可用,一個副本掛了,其它副本頂上去;不同分片下的副本存放的數據不同,從而實現數據的水平切分。

下面就來搭建分片集群,如果是多分片,那么就需要在 config.xml 中搭建集群配置了。

 

這里我們搭建 3 分片 0 副本,那么配置如下:

<!-- 集群的名稱,全局唯一,是后續引用集群配置的唯一標識 -->
<!-- 在一個配置文件內可以定義任意組集群,想用哪一個直接通過集群的名稱進行引用即可 -->
<ch_cluster_3shard_0replica>
	<shard> <!-- 分片 1 -->
    	<replica> <!-- 該分片只有一個副本,由 satori 節點承載 -->
            <!-- 注意:這里的 host 需要寫主機名,因此我們需要在 /etc/hosts 中配置其它節點的公網 IP 到主機名的映射 -->
            <host>satori</host>
            <port>9000</port>
        </replica>
    </shard>
    
    <shard> <!-- 分片 2 -->
    	<replica> <!-- 該分片只有一個副本,由 matsuri 節點承載 -->
            <host>matsuri</host>
            <port>9000</port>
        </replica>
    </shard>
    
    <shard> <!-- 分片 3 -->
    	<replica> <!-- 該分片只有一個副本,由 aqua 節點承載 -->
            <host>aqua</host>
            <port>9000</port>
        </replica>
    </shard>
</ch_cluster_3shard_0replica>

如果只是 1 分片,那么只需要配置 ZooKeeper 即可,但如果是多分片,那么除了 ZooKeeper 之外我們還要配置集群。這里首先要為集群起一個名字,然后自定義分片,當然我們說分片只是邏輯層面的分組,副本才是真正存儲數據的。所以還需要在 shard 標簽中定義 replica 標簽,replica 標簽中指定相應的 IP 和端口,從配置中我們可以看出一個 shard 可以有多個 replica,具體有多少個則由我們自己決定;此外也可以有多個 shard,每個 shard 內的 replica 也可以不同。我們上面相當於創建了 3 個 shard,每個 shard 都只有 1 個 replica,在 ClickHouse 中也就是 3 分片 0 副本。

那么問題來了,雖然我們這里只是演示多分片,但上面的配置如果真要放在生產上會有什么后果呢?沒錯,無法達到高可用,因為每個分片只有一個副本,如果一個節點掛了,那么整個服務就不可用了,所以每個 shard 應該配置多個 replica。這里我們不管那么多,先看看如何實現多分片。

我們關閉 satori、matsuri 節點上的 ClickHouse 服務,然后修改其 config.xml 文件,將上面的配置拷貝到 remote_servers 標簽下面即可。最后是 aqua 節點,我阿里雲上有三個 CentOS,主機名分別是 satori、matsuri、aqua,目前使用了前兩個。而 aqua 節點上還沒有 ClickHouse,所以我們需要先安裝,安裝之后將配置拷貝過去。注意:除了上面的關於集群的配置,還有下面這些配置也別忘記,也就是我們在前兩個節點中所做的配置,所有節點都應保持一致。

<zookeeper>   
    <node index="1">  
        <host>47.94.174.89</host>  
        <port>2181</port>
    </node>
</zookeeper>

<listen_host>0.0.0.0</listen_host>
<!-- 內部通信 IP,改成 aqua 節點 -->
<interserver_http_host>47.93.235.147</interserver_http_host>
<interserver_http_port>9009</interserver_http_port>

所有配置都完成之后,我們啟動三台節點上的 ClickHouse,然后驗證集群是否搭建成功。

我們在 aqua 節點上進行查詢,cluster 列表示集群的名字,這里我們選擇 cluster 為 ch_cluster_3shard_0replica 的記錄;然后 host_name、host_address 表示當前副本所在的節點的主機名、IP;replica_num 表示該副本屬於當前所在分片的第幾個副本,因為每個分片只有一個副本,所以它們都是 1;shard_num 表示該副本所在的分片是第幾個分片,顯然 satori 副本處於第一個分片,matsuri 副本處於第二個分片,aqua 處於第三個分片。

基於集群實現分布式 DDL

我們前面介紹副本的時候為了創建副本表,需要分別登錄到每個 ClickHouse 節點,在它們本地執行各自的 CREATE 語句,這是因為在默認情況下,CREATE、DROP、RENAME 等 DDL 語句不支持分布式執行。而在加入集群配置之后,就可以使用新的語法實現分布式 DDL 執行了,語法形式如下:

CREATE/DROP/RENAME TABLE table_name ON CLUSTER cluster_name ...

ON CLUSTER 位於 TABLE table_name 之后,表示集群的每個節點都執行該 DDL 語句,而 cluster_name 就是集群的名稱,ClickHouse 會通過 cluster_name 找到該集群的配置信息,然后順藤摸瓜,分別去各個節點中執行 DDL 語句。下面就用分布式 DLL 的形式建表:

CREATE TABLE distributed_test_1 ON CLUSTER ch_cluster_3shard_0replica(
    id UInt64
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/distributed_test_1', '{replica}')
ORDER BY id;

該語句先不要執行,我們還有一些細節要說,首先就是這里 ENGINE,可以指定其它任意的表引擎。然后是 ON CLUSTER,我們可以在任何一個節點執行這個建表語句,而 ClickHouse 在看到 ON CLUSTER 之后就知道這是一個分布式 DDL,會根據 ch_cluster_3shard_0replica 找到相關的集群配置,然后在其它節點上也執行這個 DDL。

最后是 ReplicatedMergeTree,首先如果在不同的節點上建表,那么 zk_path 和 replica_name 顯然是不同的,因此我們不可以寫死。否則 ClickHouse 在廣播 DDL 給其它節點執行之后,所有分片下的所有副本的 zk_path 和 replica_name 就都是一樣的了。因此 ClickHouse 提供了兩個動態宏變量 {shard} 和 {replica},用於替換之前的硬編碼方式,而動態宏變量的值可以通過系統表 system.macros 進行查看。

但問題來了,我們看到查詢的結果是空的,原因是我們沒有在 config.xml 中沒有進行配置。在 config.xml 下面有一個 macros 標簽,是被注釋掉的,我們配置一下就行了。

配置如下:

<!-- satori 節點 -->
<macros>
    <shard>01</shard>
    <replica>satori_name</replica>
</macros>

<!-- matsuri 節點 -->
<macros>
    <shard>02</shard>
    <replica>matsuri_name</replica>
</macros>

<!-- aqua 節點 -->
<macros>
    <shard>03</shard>
    <replica>aqua_name</replica>
</macros>

我們將注釋打開,分別進行修改,或者不打開注釋,直接在粘貼在 yandex 標簽下即可,然后重啟三台節點上的 ClickHouse 服務。

然后我們查看宏變量是否生效:

現在我們就能執行上面那條分布式建表語句了,然后 shard_num 和 replica_name 會用 {shard} 和 {replica} 兩個宏變量進行替代。我們下面隨便挑一個節點,就在 matsuri 節點上執行上面那條建表語句吧。

根據返回的內容,我們看到表在 satori、matsuri、aqua 三個節點上均已成功創建。同理,如果想刪除的話,那么也可以執行分布式 DROP。

DROP TABLE distributed_test_1 ON CLUSTER ch_cluster_3shard_0replica

通過搭建集群,我們便可以執行分布式 DDL,記得在上面介紹 1 分片、多副本的時候我們說過,只有 1 個分片的話,不需要搭建集群,只需要借助 ZooKeeper 即可。但很明顯,1 分片、多副本這種模式只是不需要搭建,但不是說不能搭建,如果搭建了集群(1 shard、多 replica),一樣可以執行分布式 DLL。

然后我們來介紹一下整個流程,當我們在一個節點上執行分布式 DDL,該 DDL 是如何被廣播到其它節點上的。

數據結構

和 ReplicatedMergeTree 類似,分布式 DDL 語句在執行的過程中也需要借助 ZooKeeper 的協同能力,以實現日志分發。

1)ZooKeeper 內的節點結構

在默認情況下,分布式 DDL 在 ZooKeeper 內使用的根路徑為:

/clickhouse/task_queue/ddl

該路徑由 config.xml 內的 distributed_ddl 配置指定:

在此根路徑下,還有一些其它的監聽節點,其中包括 DDL 操作日志 /query-[seq],每執行一次分布式 DDL 查詢,在該節點下就會新增一條操作日志,以記錄相應的操作指令。當各個節點監聽到有新日志加入的時候,便會響應執行。DDL 操作日志使用 ZooKeeper 的持久順序型節點,每條指令的名稱以 query- 為前綴,后面的序號遞增,例如 query-0000000000、query-0000000001 等等。

另外在每條 query-[seq] 操作日志之下,還有兩個狀態節點:

  • 1)/query-[seq]/active:用於狀態監控等用途,在任務執行的過程中,該節點下會臨時保存當前集群內狀態為 active 的節點
  • 2)/query-[seq]/finished:用於檢查任務的完成情況,在任務的執行過程中,每當集群內的某個 host 節點執行完畢之后,便會在該節點下寫入記錄

上述語句表示集群內的 satori、matsuri、aqua 三個節點已經完成任務。

 

2)DDLLogEntry 日志對象的數據結構

在 /query-[seq] 下記錄的日志信息由 DDLLogEntry 承載,它擁有如下幾個核心屬性:

query 記錄了 DDL 查詢的執行語句:

CREATE TABLE default.distributed_test_1
UUID \'d5d4c459-4e89-47b2-95d4-c4594e8957b2\'
ON CLUSTER ch_cluster_3shard_0replica
(
    `id` UInt64
) ENGINE = ReplicatedMergeTree(\'/clickhouse/tables/{shard}/distributed_test_1\', \'{replica}\') ORDER BY id

hosts 記錄了指定集群的 hosts 主機列表,集群由分布式 DDL 語句中的 ON CLUSTER 指定。在分布式 DDL 的執行過程中,會根據 hosts 列表逐個判斷它們的執行狀態。

initiator 記錄初始化 host 主機的名稱,說白就是分布式 DDL 最開始是在哪個節點執行的

分布式 DDL 的核心執行流程

與副本協同的核心流程類似,接下來就以上面創建 distributed_test_1 為例,解釋分布式 DDL 的核心執行流程。整個流程從上到下按照時間順序進行,大致可以分為三個步驟:

 

1)推送 DDL 日志

首先在 matsuri 節點執行 CREATE TABEL ON CLUSTER,本着誰執行誰負責的原則,在這個案例中也會由 matsuri 節點負責創建 DDLLogEntry 日志並將日志推送到 ZooKeeper,同時也會由這個節點負責監控任務的執行進度。

2)拉取日志並執行

matsuri、satori、aqua 三個節點分別監聽 /ddl/query-0000000000 日志的推送,於是它們分別拉取日志到本地。首先它們會判斷各自的 host 是否被包含在 DDLLogEntry 的 hosts 列表中,如果包含在內,則進入執行流程,執行完畢后寫入 finished 節點;如果不包含,則忽略這次日志的推送。

3)確認執行進度

在步驟 1 執行 DDL 語句之后,客戶端會阻塞 180 秒(由參數 distributed_ddl_task_timeout 指定),以期望所有 host 都執行完這個分布式 DDL。如果阻塞時間超過了 180 秒,則會轉入后台線程繼續等待。

流程圖總結:

我們這里有三個副本,但是為了畫圖方便,圖中只畫了兩個。因為 satori_replica 和 aqua_replica 的表現是一致的,所以 satori_replica 就不畫了。

Distributed 原理解析

Distributed 表引擎是分布式表的代名詞,它自身不存儲任何數據,而是作為數據分片的透明代理,能夠自動路由數據至集群中的各個節點,所以 Distributed 表引擎需要和其它數據表引擎一起協同工作,如圖所示:

從實體表的層面來看,一張分片表由兩部分組成:

  • 本地表:通常以 _local 為后綴進行命名,本地表是承接數據的載體,可以使用非 Distributed 的任意表引擎,一張本地表對應了一個數據分片
  • 分布式表:通常以 _all 為后綴進行命名,分布式表只能使用 Distributed 表引擎,它和本地表形成一對多的映射關系,后續將通過分布式表代理操作多張本地表

我們剛才在建表的時候,將表起名為 distributed_test_1 實際上是不符合規范的,應該以 _local 為后綴進行命名,不過無所謂,理解就好。

對於分布式表與本地表之間表結構的一致性檢查,Distributed 表引擎采用了讀時檢查的機制,這意味着如果它們的表結構不兼容,只有在查詢時才會拋出錯誤,而在創建表時並不會進行檢查。另外不同 ClickHouse 節點上本地表之間可以使用不同的表引擎,但是通常不建議這么做,保持它們的結構一致,有利於后期的維護、並避免造成不可預計的錯誤。

定義形式

Distributed 表引擎的定義形式如下:

ENGINE = Distributed(cluster, database, table [, sharding_key])

里面的參數也比較簡單,cluster 表示集群的名稱,也就是在 config.xml 中配置的集群名稱,在分布式表執行寫入和查詢的過程中,它會使用集群的配置信息來找到相應的 host 節點;database 和 table 則表示數據庫和數據表的名稱,分布式表使用這組配置映射到本地表;shard_key 表示分片鍵,在數據寫入的過程中,分布式表會依據分片鍵的規則,將數據分布到各個 host 節點的本地表。

我們之前在三個節點中創建了本地表 distributed_test_1,下面就來創建一張 Distributed 表來作為它們的代理表。

CREATE TABLE distributed_test_1_all ON CLUSTER ch_cluster_3shard_0replica(
    id UInt64
)ENGINE = Distributed(ch_cluster_3shard_0replica, default, distributed_test_1, rand());

上面的建表語句會創建一張 Distributed 表,名為 distributed_test_1_all,然后通過 ON CLUSTER 會將該建表語句廣播到集群的每一個分片節點上,確保它們都會創建一張 Distributed 表,這樣就可以在任意一個分片節點發起對所有分片的讀、寫請求。當然只在一個節點上創建分布式表也是可以的,但是后續的操作只能在該節點上進行。然后是表引擎參數,我們可以看出代理的本地表為 distributed_test_1,其分布在集群 ch_cluster_3shard_0replica 的各個 shard 中,並且在數據寫入的時候會根據 rand() 隨機函數的取值決定數據寫入哪個分片。

下面在 matsuri 節點執行上述 Distributed 表(分布式表)的建表語句,當然在哪個節點上執行都無所謂的,因為是分布式 DDL,每個分片節點上都會執行:

至此本地表和分布式表就都創建好了,這里再多提一句,即使沒有本地表,也是可以創建分布式表的。因為 Distributed 表運用的是讀時檢查的機制,所以對於分布式表和本地表的創建順序沒有要求。

查詢種類

Distributed 表的查詢操作可以分為如下幾類:

1)會作用於本地表的查詢:對於 INSERT 和 SELECT 查詢,Distributed 表將會以分布式的方式作用於 local 本地表,而對於這些查詢的具體規則,一會介紹。

2)只會影響 Distributed 表本身,不會作用於本地表的查詢:Distributed 表支持部分元數據的操作,包括 CREATE、DROP、RENAME 和 ALTER,其中 ALTER 並不包括分區的操作(ATTACH PARTITION、REPLACE PARTITION 等)。這些查詢只會修改 Distributed 表自身,並不會修改 local 本地表,所以如果想將分布式表和本地表都刪掉的話,那么要分別刪除。

-- 刪除分布式表
DROP TABLE distributed_test_1_all ON CLUSTER ch_cluster_3shard_0replica;

-- 刪除本地表
DROP TABLE distributed_test_1 ON CLUSTER ch_cluster_3shard_0replica;

3)不支持的查詢:Distributed 表不支持任何 MUTATION 類型的操作,包括 ALTER DELETE 和 ALTER UPDATE。

分片規則

這里再進一步說明一下分片的規則,我們在創建 Distributed 表的時候,表引擎中指定了 sharding_key,也就是分片鍵。而分片鍵是由要求的,它必須返回一個整型類型的取值,比如:

-- 按照用戶 id 的余數划分,userid 是整型
ENGINE = Distributed(cluster, database, table, userid)

-- 也可以是一個返回整數的表達式,比如按照隨機數划分
ENGINE = Distributed(cluster, database, table, rand())

-- 按照用戶 id 的散列值划分
ENGINE = Distributed(cluster, database, table, intHash64(userid))

如果不聲明分片鍵,那么分布式表只能包含一個分片,這意味着只能映射一張本地表,否則在寫入數據的時候會拋出異常。但如果一張分布式表只能包含一個分片,那還不如單機,顯然此時就沒意義了。因此,雖然 sharding_key 是選填參數,但是通常都會按照業務規則進行設置。

那么設置完分片鍵之后,數據又是如何被划分的呢?想要搞清楚這一點,需要先明確幾個概念。

1.分片權重

在集群的配置中,有一項 weight(分片權重)的設置:

<!-- 集群的名稱 -->
<ch_cluster_3shard_0replica>
	<shard> <!-- 分片 1 -->
        <weight>10</weight> <!-- 分片權重 -->
        ......
    </shard>
    
    <shard> <!-- 分片 2 -->
        <weight>20</weight> <!-- 分片權重 -->
        ......
    </shard>
......

shard 里面不僅僅可以指定 replica,還可以指定其它屬性,比如 weight(權重)。weight 默認為 1,雖然它可以設置成任意整數,但官方建議應該盡可能設置成較小的值,分片權重會影響數據在分片中的傾斜程度,一個分片的權重值越大,那么它被寫入的數據就越多。

slot(槽)

如果把數據比作是水的話,那么 slot 就可以理解為許多的小水槽,數據會順着這些水槽流進每個數據分片。slot 的數量等於所有分片的權重之和,假設每個集群有兩個 shard,第一個 shard 的 weight 為 10,第二個 shard 的 weight 為 20,那么 slot 的數量則等於 30。slot 按照權重元素的取值區間,與對應的分片形成映射關系,如果 slot 值落在 [0, 10) 區間,則對應第一個分片;如果 slot 值落在 [10, 30) 區間,則對應第二個分片。還是比較簡單粗暴好理解的,因此 weight 值越大,那么區間范圍也就越廣,slot 值落在該區間的概率也就越大。

選擇函數

選擇函數用於判斷一行待寫入的數據應該被寫入哪個分片,整個判斷過程可以分為兩步:

1)它會找出 slot 的取值,計算公式如下:

slot = shard_value % sum_weight

其中,shard_value 是分片鍵的取值,sum_weight 是所有分片的權重之和,slot 等於 shard_value 對 sum_weight 取余。假設某一行的數據的 shard_value 是 10,sum_weight 是 30(兩分片,第一個分片權重為 10,第二個分片權重為 20),那么 slot 值就等於 10(10 % 30);再比如 shard_value 如果為 90,那么 slot 值就為 0(90 % 30)。

2)基於 slot 值找到對應的數據分片,當 slot 值等於 10 的時候,它屬於 [10, 30) 區間,此時這行數據會對應到第二個 shard;當 slot 值為 0 的時候,它屬於 [0, 10) 區間,所以這行數據會對應到第一個分片。

分布式寫入的核心流程

在向集群內的分片寫入數據時,通常有兩種思路:一種是借助外部計算系統,事先根據分片數量將數據計算均勻,再借由計算系統直接將數據寫入 ClickHouse 集群的各個本地表。

上述這種方案通常擁有更好的寫入性能,因為分片數據是被並行點對點寫入的。但這種方案重度依賴於外部系統,而不在 ClickHouse 自身,所以這里主要會介紹第二種思路。

第二種思路是通過 Distributed 表引擎代理寫入分片數據,下面就來介紹整個流程。首先關於流程我們大致也能猜到,就是 Distributed 表將數據寫入到每個分片的一個副本中,然后獲得數據的副本再將數據同步到自己所在分片的其它副本中。因此核心可以分為兩個步驟,分別是 "分片寫入" 和 "副本復制(同步)",由於我們目前搭建的是 3 分片 0 副本,所以先介紹 "分片寫入";后續再通過 1 分片 2 副本來介紹 "副本復制",由於只有三台服務器,所以無法給 3 個分片都配置副本,因為這樣至少需要 6 台服務器,於是在介紹副本復制的時候我們會使用 1 分片。當然副本復制和分片數量無關,因為每個分片之間的副本復制所做的事情都是一樣的,所在在介紹副本復制的時候,1 分片 和 多分片之間沒有什么區別。下面先來看看分片寫入的整個過程是怎么樣的吧。

將數據寫入分片的核心流程

對 Distributed 表執行 INSERT 查詢的時候,會進入數據寫入分片的執行邏輯,我們按照時間順序從上往下,大致可以分為 5 個步驟,下面依次介紹。

 

1)在第一個分片節點寫入本地分片數據

首先在 aqua 節點,對分布式表 distributed_test_1_all 執行 INSERT 查詢:

INSERT INTO distributed_test_1_all 
VALUES (10), (30), (50), (200);

執行之后分布式表會做兩件事情:第一,根據分片規則划分數據,不過由於分片鍵是隨機函數,所以我們也不知道數據會如何划分;第二,將數據當前分片的數據直接寫入本地表 distributed_test_1。

因為每個節點上都有分布式表,所以我們在任意一個節點執行都是可以的。注意:此時不需要加 ON CLUSTER,我們只在一個節點上執行即可,如果加上 ON CLUSTER,那么每個節點就都會執行一遍,這樣可能導致數據的重復寫入。

對於當前的栗子來說,50 、10 和 100 、30 分別划分到了不同的分片中,注意:圖中打印的順序不代表分片的順序,所以它們分別划分到了哪一個分片中,我們是不知道的。

 

2)第一個分片節點建立遠端連接,准備發送數據

將發送給遠端分片節點的數據以分區為單位(我們創建本地表時沒有指定 PARTITION BY,所以當前只有一個分區),分別寫入 distributed_test_1_all 存儲目錄下的臨時 bin 文件,該數據文件的命名規則如下:

{database}@{host_name}:{port}/{increase_num}.bin

我們是在 aqua 節點中寫入的數據,所以對於 aqua 節點來說,它有兩個遠端分片節點:satori、matsuri,因此臨時數據文件如下所示:

distributed_test_1_all/default@satori/1.bin
distributed_test_1_all/default@matsuri/1.bin

然后和遠端的 satori 分片節點、matsuri 分片節點建立連接:

Connection (satori:9000): Connetced to ClickHouse server
Connection (matsuri:9000): Connetced to ClickHouse server

 

3)第一個分片節點向遠端發送數據

此時會有另一種監聽任務負責監聽 /distributed_test_1_all 目錄下的文件變化,這些任務負責將目錄數據發送至遠端分片節點:

distributed_test_1_all.Distributed.DirectoryMonitor:
Started processing /distributed_test_1_all/default@satori/1.bin
Started processing /distributed_test_1_all/default@matsuri/1.bin

其中每個目錄將會由獨立的線程負責發送,數據在傳輸之前會被壓縮。

 

4)其余的分片節點接收數據並寫入本地

satori、matsuri 節點會確認建立和 aqua 節點的連接,然后在收到來 aqua 發送的數據之后,將它們寫入到本地表。

 

5)由第一個分片確認完成寫入

最后,還是由 aqua 分片節點確認所有的數據發送完畢。至此,整個流程結束,因為過程比較簡單,所以流程圖就不畫了。可以看到,在整個過程中 Distributed 表負責所有分片是我寫入工作,本着誰執行誰負責的原則,在當前這個例子中,由 aqua 節點的分布式表負責切分數據,並向所有其它分片節點發送數據。

在由 Distributed 表負責向遠端分片節點發送數據時,有異步寫和同步寫兩種模式:如果是異步寫,則在 Distributed 表寫完本地分片之后,INSERT 查詢就會返回成功寫入的信息;如果是同步寫,則在執行 INSERT 查詢之后,會等待所有分片完成寫入。至於選擇何種模,由 insert_distributed_sync 參數控制,默認為 false,也就是異步寫;如果將其設置成 true,那么可以進一步通過 insert_distributed_timeout 參數控制同步等待的超時時間。

我們來測試一下,看看在 aqua 節點上寫的數據,有沒有被切分、並寫入到不同的分片節點中。

顯然輸出一切正常,並且根據輸出我們也可以看出:10 和 100 被划分到了 satori 分片節點(shard 1)、30 被划分到了 matsuri 分片節點(shard 2)、50 被划分到了 aqua 分片節點(shard 3)。

副本復制數據的核心流程

如果在集群的配置中包含了副本,那么除了剛才的分片寫入流程之外,還會觸發副本數據的復制流程。而數據在多個副本之間,有兩種復制方式:第一種是繼續借助 Distributed 表引擎,由它將數據寫入副本;第二種是借助於 ReplicatedMergeTree 表引擎實現副本數據的分發。

 

1)通過 Distributed 表復制數據

下面我們增加一下配置,實現 1 分片 2 副本。

<!-- 
還記得嗎,我們之前配置過 1 分片 1 副本,當時我們說,對於單分片而言,可以不配置集群,只配置 ZooKeeper 即可 
但很明顯,單分片也是可以配置集群的
-->
<ch_cluster_1shard_2replica>
    <shard>
    	<replica>
            <host>satori</host>
            <port>9000</port>
        </replica>
        
    	<replica>
            <host>matsuri</host>
            <port>9000</port>
        </replica>
        
    	<replica>
            <host>aqua</host>
            <port>9000</port>
        </replica>        
    </shard>
</ch_cluster_1shard_2replica>

我們將上面的配置增加到 config.xml 的 remote_server 標簽中,而我們之間的 ch_cluster_3shard_0replica 不需要動,因為 ClickHouse 支持多個集群,我們想用哪一個,直接通過集群的名稱指定即可。

修改完三個節點的配置文件之后,分別重啟 ClickHouse。

我們查看集群,發現已經生效了,並且 shard_num 都是 1,因為只有 1 個分片。然后 replica_num 分別為 1、2、3,因為該分片下有 3 個副本。

接下來我們在這個集群內創建數據表,首先創建本地表:

-- 我們之前不指定集群的時候,需要在每個副本上分別執行一遍建表語句
-- 當搭建了集群之后,通過 ON CLUSTER 的方式只需要在一個節點上執行即可
CREATE TABLE distributed_test_2 ON CLUSTER ch_cluster_1shard_2replica (
    id UInt64
) ENGINE = MergeTree() 
ORDER BY id;

這里我們指定的集群是 ch_cluster_1shard_2replica,那么  ClickHouse 會根據集群信息,自動將建表語句分發到集群中的所有節點上執行。並且,由於是通過 Distributed 表復制數據,所以對表引擎沒有任何要求。

顯然創建成功,那么接下來創建分布式表:

CREATE TABLE distributed_test_2_all ON CLUSTER ch_cluster_1shard_2replica (
    id UInt64
) ENGINE = Distributed(ch_cluster_1shard_2replica, default, distributed_test_2)

然后我們向 Distributed 表寫入數據,它會負責將數據寫入集群內的每個 replica。

當然,如果查詢 satori 節點和 aqua 節點的本地表,肯定也是有數據的。

如果我們配置的是多分片、多副本也是可以的,因為數據會寫入到所有分片的所有副本中。不過從這里也能看出,當前方案對 Distributed 表所在節點會造成很大壓力,因為它需要同時負責分片以及所有副本的數據寫入工作,因此它有可能成為寫入的單點瓶頸,所以就有了下面的第二種方案。

 

2)通過 ReplicatedMergeTree 表復制數據

如果在集群的 shard 配置中增加 internal_replication 參數並將其設置為 true(默認為 false),那么 Distributed 表在該 shard 中只會選擇一個合適的 replica 並對其寫入數據。此時,如果使用 ReplicatedMergeTree 作為本地表的引擎,該 shard 內的多個 replica 之間的數據則會交給 ReplicatedMergeTree 自己處理,不再由 Distributed 負責,從而為其減負。

在 shard 中選擇 replica 的算法大致如下:首先在 ClickHouse 的服務節點中,擁有一個全局技術器 errors_count,當服務出現任何異常時,該計數器都會加 1;然后當一個 shard 擁有多個 replica 時,選擇 errors_count 最少的那個。下面我們修改一下配置:

<ch_cluster_1shard_2replica>
    <!-- 增加該配置 -->
    <internal_replication>true</internal_replication>
    <shard>
    	<replica>
            <host>satori</host>
            <port>9000</port>
        </replica>
        
    	<replica>
            <host>matsuri</host>
            <port>9000</port>
        </replica>
        
    	<replica>
            <host>aqua</host>
            <port>9000</port>
        </replica>        
    </shard>
</ch_cluster_1shard_2replica>

修改三個節點的配置文件 config.xml,然后重啟 ClickHouse。

然后我們創建表,注意:由於指定了 internal_replication 為 true,那么 Distributed 表只會往一個副本中寫,這就要求副本之間是可以進行同步的,也就是必須指定 Replicated* 表引擎。我們測試一下,先以 MergeTree 為例,然后再以 ReplicatedMergeTree 為例,看看數據在兩者之間的同步情況。

-- 創建本地表,表引擎為 MergeTree
CREATE TABLE distributed_test_3 ON CLUSTER ch_cluster_1shard_2replica (
    id UInt64
) ENGINE = MergeTree() 
ORDER BY id;

-- 創建分布式表
CREATE TABLE distributed_test_3_all ON CLUSTER ch_cluster_1shard_2replica (
    id UInt64
) ENGINE = Distributed(ch_cluster_1shard_2replica, default, distributed_test_3)

指定表引擎為 MergeTree,副本之間無法發生同步。然后我們再創建一張表,將表引擎指定為 ReplicatedMergeTree,看看副本之間是否會發生同步。

-- 創建本地表,表引擎為 ReplicatedMergeTree
CREATE TABLE distributed_test_4 ON CLUSTER ch_cluster_1shard_2replica (
    id UInt64
-- 注意這里的 zk_path 和 replica_name,zk_path 顯然都是一樣的,但是 replica_name 不同
-- 我們可以不使用 ON CLUSTER 和宏變量,而是每個節點手動執行一遍,不同節點指定不同的 replica_name,像最開始介紹副本那樣
-- 當然也可以使用 ON CLUSTER,只不過此時不同副本的 replica_name 不同,所以我們需要使用宏變量 {replica},而在之前我們已經配好了
-- 然后我們知道還有一個宏變量 {shard},因為是單分片,所以我們不需要使用它,如果是多分片多副本,那么就需要時候用了
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/distributed_test_4', '{replica}') 
ORDER BY id;

-- 創建分布式表
CREATE TABLE distributed_test_4_all ON CLUSTER ch_cluster_1shard_2replica (
    id UInt64
) ENGINE = Distributed(ch_cluster_1shard_2replica, default, distributed_test_4)

我們測試一下:

分布式查詢的核心流程

當面臨數據查詢時,毫無疑問要通過分布式表實現,當分布式表接收到 SELECT 查詢的時候,會依次查詢每個分片的數據,再合並匯總返回。當然我們也可以手動將每個節點都查詢一遍,然后手動匯總,但顯然不會有人這么做,不然還要分布式表做什么呢。下面就來介紹分布式表的數據查詢。

多副本的路由規則

在查詢數據的時候,如果集群中的一個 shard 擁有多個 replica,那么 Distributed 表引擎需要面臨副本選擇的問題。它會使用負載均衡算法從眾多 replica 中選擇一個,而具體使用何種算法,則由 load_balancing 參數控制,其取值有如下幾種。

 

1)random

random 是默認的負載均衡算法,正如前文所述,在 ClickHouse 的服務節點中,擁有一個全局計數器 errors_count,當服務發生任何故障時,該計數器都會自增 1。而 randon 算法會選擇 errors_count 最小的 replica,如果擁有最小 errors_count 的 replica 有多個,那么就從中隨機選擇一個。

 

2)nearest_hostname

nearest_hostname 可以看做是 random 算法的變種,首先它也會選擇 errors_count 最小的 replica,但如果擁有最小 errors_count 的 replica 有多個,則比較集群配置中的 host(主機名)和當前 Distributed 表所在節點的主機名的相似度,選擇相似度最高的哪一個。

 

3)in_order

in_order 可以同樣可以看做是 random 算法的變種,首先它也會選擇 errors_count 最小的 replica,但如果擁有最小 errors_count 的 replica 有多個,則根據集群中 replica 的定義順序進行選擇。

 

4)first_or_random

first_or_random 可以看做是 in_order 算法的變種,首先它還是會選擇 errors_count 最小的 replica,但如果擁有最小 errors_count 的 replica 有多個,則根據集群中 replica 的定義順序進行選擇。但如果選擇的 replica 不可用,則進一步隨機選擇一個其它的 replica。

感覺都沒有什么太大區別,直接使用 random 就好。

多分片查詢的核心流程

分布式查詢與分布式寫入類似,同樣本着誰執行誰負責的原則,它會接收 SELECT 查詢的 Distributed 表,並負責串聯起整個過程。首先它會將針對分布式表的 SQL 語句,按照分片數量拆分成若干個針對本地表的子查詢,然后向各個分片發起查詢,最后再匯總各個分片的返回結果。以我們之前創建的 distributed_test_1_all 為例,如果對它發起如下查詢:

SELECT * FROM distributed_test_1_all

那么它會將其轉化為如下形式,然后再發送到遠端分片節點來執行:

SELECT * FROM distributed_test_1

再比如執行如下查詢:

SELECT count() FROM distributed_test_1_all

那么 Distributed 表引擎會將查詢計划轉換為多個分片 UNION 聯合查詢,如圖所示:

整個執行計划從上大小大致分為兩個步驟,下面進行介紹,我們當前是在 aqua 節點查詢的分布式表。

 

1)查詢各個分片數據

在上圖中,One 和 Remote 步驟是並行執行的,它們分別負責了本地和遠端分片的查詢動作。其中,在 One 這個步驟會將 SQL 轉成對本地表的查詢:

SELECT count() FROM default.distributed_test_1

而在 Remote 步驟中,會和其它兩個副本節點(satori、matsuri)建立連接,並向其發起遠程查詢:

Connection (satori:9000): Connecting. Database: ...
Connection (matsuri:9000): Connecting. Database: ...

satori 節點和 matsuri 節點在收到 aqua 的查詢請求后,會分別在本地開始執行,同樣,SQL 會轉換成對本地表的查詢。

 

2)合並返回結果

多個分片數據均查詢返回后,在 aqua 節點將其合並。

以上顯然是比較簡單的,即使執行一些復雜的語句也是沒有問題的。

所以即使 SQL 語句復雜,也是沒有問題的,當然我們這個 SQL 語句說復雜就明顯太過了。總之你對普通表所做的查詢,對分布式表同樣可以,像一般的 WHERE 子句、GROUP BY 子句、ORDER BY 子句、LIMIT OFFSET 子句,都沒有什么區別。

但我們需要注意的是子查詢,不像單表,分布式表的子查詢是由很多坑點的。

使用 Global 優化分布式子查詢

如果在分布式查詢中使用子查詢,可能會面臨兩難的局面。下面舉個栗子,首先使用之前的 3 分片 0 副本集群創建本地表和分布式表:

CREATE TABLE in_clause_test_1 ON CLUSTER ch_cluster_3shard_0replica (
    id UInt64,
    repo UInt64
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/in_clause_test_1', '{replica}')
ORDER BY id;

-- 創建分布式表
CREATE TABLE in_clause_test_1_all ON CLUSTER ch_cluster_3shard_0replica (
    id UInt64,
    repo UInt64
) ENGINE = Distributed(ch_cluster_3shard_0replica, default, in_clause_test_1)

然后寫入數據:

查詢三張本地表的數據如圖所示,而將圖中的三個結果集合在一塊顯然就是查詢分布式表所得到的結果。其中 id 代表用戶的編號,repo 代表倉庫的編號。

重點來了,如果我想查詢至少擁有兩個倉庫的用戶 id 以及對應的倉庫編號,這個時候該怎么做呢?如果是關系型數據庫、或者說不是分布式表的話,那么顯然一個子查詢就搞定了,我們上面已經通過 GROUP BY 查詢出了相應的 id,然后根據 id 去篩選即可。

SELECT
    id,
    repo
FROM in_clause_test_1_all
WHERE id IN (
    SELECT id
    FROM in_clause_test_1_all
    GROUP BY id
    HAVING count() >= 2
)

首先可以肯定這個語句本身沒有任何問題,但執行的時候 ClickHouse 會報出如下錯誤:

Code: 288. DB::Exception: Received from localhost:9000. DB::Exception: Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny'). You may rewrite query to use local tables in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.: While processing in_clause_test_1_all: While processing id IN (SELECT id FROM in_clause_test_1_all GROUP BY id HAVING count() >= 2).

其原因就在於 ClickHouse 是默認不支持分布式子查詢的,而解決辦法有兩種。

1)通過 distributed_product_mode 參數讓其支持分布式子查詢

根據報錯信息我們知道 distributed_product_mode 默認為 'deny',表示禁止分布式子查詢,我們在 users.xml 將其設置成 'allow' 即可,或者直接在命令行中設置也行。

將 distributed_product_mode 設置為 'allow' 之后就可以成功執行並獲取數據了,雖然結果是符合我們的預期了,但是還有一些需要思考🤔的東西。首先為什么 ClickHouse 會默認將分布式子查詢給禁止呢?子查詢這么常見,明顯不應該禁止啊。所以,我們目前這種使用分布式子查詢的方式一定是存在弊端的。

仔細思考一下會很容易發現存在效率上問題,我們知道查詢分布式表會依次查詢所有的本地表,而子查詢里面查詢的還是分布式表,這就導致了查詢請求會被放大 N 的平方倍,其中 N 是集群內分片節點數量。我們上面只有 3 個分片節點,會進行 9 次查詢,這還沒有什么;但如果有 100 個分片節點呢,那么最終會導致 10000 次查詢請求。而帶有一個子查詢的 SQL 會導致 10000 次的查詢請求,不用想,這絕對是無法接受的。並且這里只有一個子查詢,如果有多個子查詢呢?如果有的老鐵沒有對 SQL 進行優化,導致子查詢內部又嵌套了子查詢,那就不是按平方倍擴大了,而是按立方倍擴大。

而為了解決查詢放大的問題,可以使用 GLOBAL IN 進行優化,而從名字上可以看出,它和 IN 相比,在結果上沒有什么卻別,但是解決了查詢放大的問題。

那么 GLOBAL IN 是如何做的呢?大致可以分為以下幾步:

  • 將 IN 子句單獨提出,發起了一次分布式查詢
  • 將分布式表轉成本地表后,分別是在本地分片節點和遠端分片節點執行查詢
  • 將 IN 子句查詢的結果進行匯總,並放入一張臨時的內存表進行保存
  • 將內存表發送到遠端分片節點
  • 將分布式表轉為本地表后,開始執行完整的 SQL 語句,IN 子句直接使用臨時內存表的數據

至此,整個核心流程結束。可以看到在使用 GLOBAL 修飾符之后,ClickHouse 使用內存表臨時保存餓了 IN 子句查詢到的數據,並將其發送到遠端分片節點,以此達到了數據共享的目的,從而避免了查詢放大的問題。由於數據會在網絡間分發,所以需要特別注意臨時表的大小,IN 子句返回的數據不宜過大。如果表內存在重復數據,也可以事先在 IN 子句中加入 DISTINCT 進行去重,以減少內存表中的數據量,從而實現更快的傳輸。

除了 GLOBAL IN 之外,還有 GLOBAL JOIN 作用是相似的,但我們說能不用 JOIN 就不用 JOIN,將其存成一張寬表是最好的。

小結

以上就是副本、分片和集群的使用方法、作用,以及核心流程。

我們首先介紹了數據副本的特點,並詳細解釋了 ReplicatedMergeTree 表引擎,它是 MergeTree 表引擎的變種,同時也是數據副本的代名詞,接着又介紹了數據分片的特點和作用。同時在這個過程中引入了 ClickHouse 集群的概念,並講解了它的工作原理,最后介紹了 Distributed 表引擎的核心功能與工作流程,借助它的能力,可以實現分布式寫入與查詢。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM