具有分布式引擎的表不存儲自己的任何數據,但允許在多個服務器上進行分布式查詢處理。讀取是自動並行的。在讀取期間,將使用遠程服務器上的表索引(如果有的話)。
一、創建表
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]
1.來源表
當Distributed
表指向當前服務器上的表時,可以采用該表的模式:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]
分布式參數
-
cluster
- 服務器配置文件中的集群名稱 -
database
- 遠程數據庫的名稱 -
table
- 遠程表的名稱 -
sharding_key
- (可選)分片鍵 -
policy_name
-(可選)策略名稱,它將用於存儲異步發送的臨時文件
分布式設置
-
fsync_after_insert
-fsync
異步插入分布式后的文件數據。保證操作系統將整個插入的數據刷新到啟動器節點磁盤上的文件中。 -
fsync_directories
-fsync
為目錄做。保證操作系統在分布式表上與異步插入相關的操作后(插入后、將數據發送到分片后等)刷新目錄元數據。 -
bytes_to_throw_insert
- 如果超過此數量的壓縮字節將等待異步 INSERT,則會引發異常。0 - 不扔。默認為 0。 -
bytes_to_delay_insert
- 如果超過此數量的壓縮字節將等待異步 INSERT,則查詢將被延遲。0 - 不延遲。默認為 0。 -
max_delay_to_insert
- 如果有大量用於異步發送的待處理字節,則以秒為單位將數據插入分布式表的最大延遲。默認 60。 -
monitor_batch_inserts
- 與Distributed_directory_monitor_batch_inserts相同 -
monitor_split_batch_on_failure
- 與distributed_directory_monitor_split_batch_on_failure相同 -
monitor_sleep_time_ms
- 與Distributed_directory_monitor_sleep_time_ms相同 -
monitor_max_sleep_time_ms
- 與Distributed_directory_monitor_max_sleep_time_ms相同
注意:
- 當數據首先存儲在啟動器節點磁盤上,然后異步發送到分片時,僅影響異步 INSERT(即 `insert_distributed_sync=false`)。
- 可能會顯着降低刀片的性能
- 影響將分布式表文件夾中存儲的數據寫入接受插入的**節點**。 如果需要保證將數據寫入底層 MergeTree 表
例子
CREATE TABLE hits_all AS hits ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]]) SETTINGS fsync_after_insert=0, fsync_directories=0;
數據將從集群中的所有服務器讀取logs
,從default.hits
位於集群中每台服務器上的表中讀取。數據不僅在遠程服務器上被讀取,而且在遠程服務器上進行部分處理(在可能的范圍內)。例如,對於帶有 的查詢GROUP BY
,數據將在遠程服務器上聚合,聚合函數的中間狀態將被發送到請求服務器。然后將進一步匯總數據。
可以使用返回字符串的常量表達式來代替數據庫名稱。例如:currentDatabase()
。
二、集群
集群在服務器配置文件中配置:
<remote_servers> <logs> <!-- Inter-server per-cluster secret for Distributed queries default: no secret (no authentication will be performed) If set, then Distributed queries will be validated on shards, so at least: - such cluster should exist on the shard, - such cluster should have the same secret. And also (and which is more important), the initial_user will be used as current user for the query. --> <!-- <secret></secret> --> <shard> <!-- Optional. Shard weight when writing data. Default: 1. --> <weight>1</weight> <!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). --> <internal_replication>false</internal_replication> <replica> <!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). --> <priority>1</priority> <host>example01-01-1</host> <port>9000</port> </replica> <replica> <host>example01-01-2</host> <port>9000</port> </replica> </shard> <shard> <weight>2</weight> <internal_replication>false</internal_replication> <replica> <host>example01-02-1</host> <port>9000</port> </replica> <replica> <host>example01-02-2</host> <secure>1</secure> <port>9440</port> </replica> </shard> </logs> </remote_servers>
這里定義了一個集群,其名稱logs
由兩個分片組成,每個分片包含兩個副本。分片是指包含不同部分數據的服務器(為了讀取所有數據,必須訪問所有分片)。副本是復制服務器(為了讀取所有數據,可以訪問任何一個副本上的數據)。
集群名稱不能包含點。
為每個服務器指定參數host
,port
和可選user
的 , password
, secure
:compression
host
– 遠程服務器的地址。可以使用域或 IPv4 或 IPv6 地址。如果指定域,則服務器在啟動時會發出 DNS 請求,只要服務器正在運行,就會存儲結果。如果 DNS 請求失敗,則服務器不會啟動。如果更改 DNS 記錄,請重新啟動服務器。port
– 信使活動的 TCP 端口(tcp_port
在配置中,通常設置為 9000)。不要與http_port
.user
– 連接遠程服務器的用戶名。默認值為default
用戶。此用戶必須有權連接到指定的服務器。訪問在users.xml
文件中配置。有關詳細信息,請參閱訪問權限部分。password
– 連接遠程服務器的密碼(未屏蔽)。默認值:空字符串。secure
- 是否使用安全的 SSL/TLS 連接。通常還需要指定端口(默認安全端口是9440
)。服務器應該監聽<tcp_port_secure>9440</tcp_port_secure>
並配置正確的證書。compression
- 使用數據壓縮。默認值:true
。
指定副本時,讀取時將為每個分片選擇一個可用副本。可以配置負載平衡算法(訪問哪個副本的首選項)。如果未建立與服務器的連接,則將嘗試連接一個短暫的超時。如果連接失敗,將選擇下一個副本,以此類推所有副本。如果所有副本的連接嘗試都失敗,則嘗試以相同的方式重復幾次。這有利於彈性,但不提供完整的容錯能力:遠程服務器可能接受連接,但可能無法正常工作,或者工作不佳。
可以僅指定一個分片(在這種情況下,查詢處理應稱為遠程,而不是分布式)或最多指定任意數量的分片。在每個分片中,可以指定從一個到任意數量的副本。可以為每個分片指定不同數量的副本。
可以在配置中指定任意數量的集群。
要查看的集群,請使用該system.clusters
表。
該Distributed
引擎允許使用像本地服務器這樣的集群。但是,集群的配置不能動態指定,必須在服務器配置文件中進行配置。通常,集群中的所有服務器都將具有相同的集群配置(盡管這不是必需的)。配置文件中的集群會即時更新,無需重新啟動服務器。
如果每次都需要向一組未知的分片和副本發送查詢,則無需創建Distributed
表 -remote
而是使用 table 函數。
其他注意點:
internal_replication配置用法
1)如果底層是非復制表,那么這個值設為false(默認)。表示insert分布式表時,會在分片的所有副本都寫入一份。
2)如果底層是復制表,那么這個值配置為true。表示分布式表不會往所有副本都寫入。僅寫入到一個副本。
internal_replication這個參數是控制寫入數據到分布式表時,分布式表會控制這個寫入是否的寫入到所有副本中。與復制表的同步是不一樣的。為什么<2>中要設置為true,這就是為了避免和復制表的同步復制機制出現沖突,導致數據重復或者不一致。
因為如果既是復制表、internal_replication又為false,那么寫入到分布式表時會寫入到同一分片的所有副本,而此時復制表的機制也會把不同副本之間的數據進行同步。而且分布式表寫入到所有副本並不是原子性的,也就是說,寫入到所有副本時,寫入某個副本失敗了,那這個副本就寫入失敗了,不會矯正。復制表的同步是會保證同步的。
三、寫入數據
1.將數據寫入集群有兩種方法:
1)可以定義將哪些數據寫入哪些服務器並直接在每個分片上執行寫入。換句話說,對表所指向INSERT
的集群中的遠程表執行直接語句。Distributed
這是最靈活的解決方案,因為可以使用任何分片方案,即使是由於主題領域的要求而並非微不足道的分片方案。這也是最優化的解決方案,因為數據可以完全獨立地寫入不同的分片。
2)可以在表上執行INSERT
語句Distributed
。在這種情況下,表將在服務器本身之間分配插入的數據。為了寫入Distributed
表,它必須sharding_key
配置參數(除非只有一個分片)。
2.注意:
每個分片都可以<weight>
在配置文件中定義。默認情況下,權重為1
。數據以與分片權重成比例的數量分布在分片上。將所有分片權重相加,然后將每個分片的權重除以總和,以確定每個分片的比例。例如,如果有兩個分片,第一個的權重為 1,而第二個的權重為 2,第一個將被發送三分之一 (1 / 3) 的插入行,第二個將被發送三分之二 (2 / 3)。
每個分片都可以internal_replication
在配置文件中定義參數。如果此參數設置為true
,則寫入操作會選擇第一個健康的副本並向其寫入數據。如果表基礎的Distributed
表是復制表(例如任何Replicated*MergeTree
表引擎),請使用此選項。其中一個表副本將接收寫入,並將自動復制到其他副本。
如果internal_replication
設置為false
(默認值),則將數據寫入所有副本。在這種情況下,Distributed
表本身會復制數據。這比使用復制表更糟糕,因為不檢查副本的一致性,並且隨着時間的推移,它們將包含稍微不同的數據。
為了選擇將一行數據發送到的分片,分析分片表達式,並將其除以分片的總權重得到余數。prev_weights
該行被發送到對應於余數從到的半區間的分片prev_weights + weight
,其中prev_weights
是編號最小的分片的總權重,並且weight
是該分片的權重。例如,如果有兩個分片,第一個的權重為 9,而第二個的權重為 10,則該行將發送到第一個分片以獲取范圍 [0, 9) 中的余數,並發送到第二個用於范圍 [9, 19) 的余數。
分片表達式可以是返回整數的常量和表列中的任何表達式。例如,可以使用表達式rand()
進行數據的隨機分布,或者UserID
通過除以用戶 ID 的余數進行分布(然后單個用戶的數據將駐留在單個分片上,這簡化了運行IN
和JOIN
按用戶)。如果其中一列分布不夠均勻,可以將其包裝在哈希函數中,例如intHash64(UserID)
.
除法的簡單余數是分片的有限解決方案,並不總是合適的。它適用於中型和大量數據(數十台服務器),但不適用於非常大量的數據(數百台或更多服務器)。在后一種情況下,使用主題區域所需的分片方案,而不是使用表中的條目Distributed
。
在以下情況下,應該關注分片方案:
- 使用需要通過特定鍵連接數據(
IN
或)的查詢。JOIN
如果數據被這個鍵分片,你可以使用 localIN
orJOIN
代替GLOBAL IN
orGLOBAL JOIN
,這樣效率更高。 - 使用大量服務器(數百台或更多)和大量小型查詢,例如,查詢單個客戶(例如網站、廣告商或合作伙伴)的數據。為了使小查詢不影響整個集群,將單個客戶端的數據定位在單個分片上是有意義的。或者,可以設置雙層分片:將整個集群划分為“層”,其中一層可能由多個分片組成。單個客戶端的數據位於單個層上,但可以根據需要將分片添加到層中,並且數據在其中隨機分布。
Distributed
為每一層創建表,並為全局查詢創建一個共享分布式表。
數據是異步寫入的。當插入表中時,數據塊只是寫入本地文件系統。數據會盡快在后台發送到遠程服務器。發送數據的周期由distributed_directory_monitor_sleep_time_ms和distributed_directory_monitor_max_sleep_time_ms設置管理。Distributed
引擎會單獨發送每個包含插入數據的文件,但可以使用 Distributed_directory_monitor_batch_inserts 設置啟用批量發送文件。此設置通過更好地利用本地服務器和網絡資源來提高集群性能。應該通過查看表目錄中的文件列表(等待發送的數據)來檢查數據是否發送成功:/var/lib/clickhouse/data/database/table/
. 執行后台任務的線程數可以通過background_distributed_schedule_pool_size設置來設置。
INSERT
如果服務器停止存在或在訪問表后粗略重新啟動(例如,由於硬件故障)Distributed
,則插入的數據可能會丟失。如果在表目錄中檢測到損壞的數據部分,則將其轉移到broken
子目錄中,不再使用。
3.應用
直接寫分布式表的優點自然是可以讓ClickHouse控制數據到分片的路由,而缺點:
- 數據是先寫到一個分布式表的實例中並緩存起來,再逐漸分發到各個分片上去,實際是雙寫了數據(寫入放大),浪費資源;
- 數據寫入默認是異步的,短時間內可能造成不一致;
- 目標表中會產生較多的小parts,使merge(即compaction)過程壓力增大。
相對而言,直接寫本地表是同步操作,更快,parts的大小也比較合適,但是就要求應用層額外實現sharding和路由邏輯,如輪詢或者隨機等。
以下為分布式表插入流程圖:
四、讀取數據
查詢Distributed
表時,SELECT
查詢被發送到所有分片並且無論數據如何在分片中分布(它們可以完全隨機分布)都可以工作。添加新分片時,不必將舊數據傳輸到其中。相反,可以通過使用更重的權重向其寫入新數據——數據將稍微不均勻地分布,但查詢將正確有效地工作。
啟用該max_parallel_replicas
選項后,查詢處理將在單個分片內的所有副本中並行處理。
在分布式表上執行查詢的流程簡圖如下所示。發出查詢后,各個實例之間會交換自己持有的分片的表數據,最終匯總到同一個實例上返回給用戶。
針對多分片多副本的情況
讀取數據分布式查詢遵循多副本的路由規則
該配置項為:load_balance=random/nearest_hostname/in_order/first_or_random
多副本的路由規則
查詢數據時,如果一個分片shard有多個副本repIica,那么Distributed表引擎就需要面對副本選擇的問題,選擇查詢究竟在哪個副本上執行。ck的負載均衡算法有以下四種:
- random
- nearest_hostname
- in_order
- first_or_random
1) random
這是默認的負載均衡算法。在ck的服務節點中,有一個errors_count全局計數器,當服務發生任何異常時,技術器加1。randdom算法會選擇errors_count最小的那個repIica,如果多個repIica的errors_count相同,則在這幾個里隨機選擇一個。
2) nearest_hostname
選擇errors_count最小的那個,如果多個errors_count相同,則選擇集群配置中host名稱和當前host名稱最相似的那個。相似比較的規則是與當前host的名稱,按字節進行逐位對比,找到不同字節最少的那個。 例如當前host是a.bc.de,那么,a.bc.df就比a.bf.hh要更加相似。 a.bc.de a.bc.df a.bf.hh
3)in_order
選擇errors_count最小的那個,如果多個errors_count相同,則按照集群配置順序選擇。
4)first_or_random
選擇errors_count最小的那個,如果多個errors_count相同,則按照集群配置順序選擇第一個,如果第一個不可用,則隨意選擇一個其他的。
總結起來,其實這4個負載算法中,都是優先選擇errors_count最小的那個,如果多個errors_count相同時,再根據不同的負載算法來選擇。
五、虛擬列
_shard_num
— 包含shard_num
表中的值system.clusters
。類型:UInt32。
注意
由於遠程和集群表功能在內部創建臨時分布式表,_shard_num
因此在那里也可用。
六、常用sql
1.查看集群: select * from system.clusters 2.查看數據庫容量、行數、壓縮率 SELECT sum(rows) AS `總行數`, formatReadableSize(sum(data_uncompressed_bytes)) AS `原始大小`, formatReadableSize(sum(data_compressed_bytes)) AS `壓縮大小`, round((sum(data_compressed_bytes) / sum(data_uncompressed_bytes)) * 100, 0) AS `壓縮率` FROM system.parts 3.查看數據表容量、行數、壓縮率 SELECT table AS `表名`, sum(rows) AS `總行數`, formatReadableSize(sum(data_uncompressed_bytes)) AS `原始大小`, formatReadableSize(sum(data_compressed_bytes)) AS `壓縮大小`, round((sum(data_compressed_bytes) / sum(data_uncompressed_bytes)) * 100, 0) AS `壓縮率` FROM system.parts WHERE table IN ('dm_order_dis') GROUP BY table 4.查看數據表分區信息 SELECT partition AS `分區`, sum(rows) AS `總行數`, formatReadableSize(sum(data_uncompressed_bytes)) AS `原始大小`, formatReadableSize(sum(data_compressed_bytes)) AS `壓縮大小`, round((sum(data_compressed_bytes) / sum(data_uncompressed_bytes)) * 100, 0) AS `壓縮率` FROM system.parts GROUP BY partition ORDER BY partition ASC 添加查詢條件 WHERE (database IN ('data_prod')) AND (table IN ('dm_order_dis')) AND (partition LIKE '2020%') 5.查看數據表字段的信息 SELECT column AS `字段名`, any(type) AS `類型`, formatReadableSize(sum(column_data_uncompressed_bytes)) AS `原始大小`, formatReadableSize(sum(column_data_compressed_bytes)) AS `壓縮大小`, sum(rows) AS `行數` FROM system.parts_columns WHERE (database = 'data_prod') AND (table = 'dm_order_dis') GROUP BY column ORDER BY column ASC
七、總結
ClickHouse分布式表的本質並不是一張表,而是一些本地物理表(分片)的分布式視圖,本身並不存儲數據。
在生產環境中總是推薦寫本地表、讀分布式表。