clickhouse的分布式Distributed表引擎


  具有分布式引擎的表不存儲自己的任何數據,但允許在多個服務器上進行分布式查詢處理。讀取是自動並行的。在讀取期間,將使用遠程服務器上的表索引(如果有的話)。

一、創建表 

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

 

  1.來源表

  當Distributed表指向當前服務器上的表時,可以采用該表的模式:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]
  分布式參數
  • cluster- 服務器配置文件中的集群名稱

  • database- 遠程數據庫的名稱

  • table- 遠程表的名稱

  • sharding_key- (可選)分片鍵

  • policy_name-(可選)策略名稱,它將用於存儲異步發送的臨時文件

  分布式設置

  • fsync_after_insert-fsync異步插入分布式后的文件數據。保證操作系統將整個插入的數據刷新到啟動器節點磁盤上的文件中。

  • fsync_directories-fsync為目錄做。保證操作系統在分布式表上與異步插入相關的操作后(插入后、將數據發送到分片后等)刷新目錄元數據。

  • bytes_to_throw_insert- 如果超過此數量的壓縮字節將等待異步 INSERT,則會引發異常。0 - 不扔。默認為 0。

  • bytes_to_delay_insert- 如果超過此數量的壓縮字節將等待異步 INSERT,則查詢將被延遲。0 - 不延遲。默認為 0。

  • max_delay_to_insert- 如果有大量用於異步發送的待處理字節,則以秒為單位將數據插入分布式表的最大延遲。默認 60。

  • monitor_batch_inserts- 與Distributed_directory_monitor_batch_inserts相同

  • monitor_split_batch_on_failure- 與distributed_directory_monitor_split_batch_on_failure相同

  • monitor_sleep_time_ms- 與Distributed_directory_monitor_sleep_time_ms相同

  • monitor_max_sleep_time_ms- 與Distributed_directory_monitor_max_sleep_time_ms相同

    注意:

  - 當數據首先存儲在啟動器節點磁盤上,然后異步發送到分片時,僅影響異步 INSERT(即 `insert_distributed_sync=false`)。
  - 可能會顯着降低刀片的性能
  - 影響將分布式表文件夾中存儲的數據寫入接受插入的**節點**。 如果需要保證將數據寫入底層 MergeTree 表 

例子

CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;

 

  數據將從集群中的所有服務器讀取logs,從default.hits位於集群中每台服務器上的表中讀取。數據不僅在遠程服務器上被讀取,而且在遠程服務器上進行部分處理(在可能的范圍內)。例如,對於帶有 的查詢GROUP BY,數據將在遠程服務器上聚合,聚合函數的中間狀態將被發送到請求服務器。然后將進一步匯總數據。

  可以使用返回字符串的常量表達式來代替數據庫名稱。例如:currentDatabase()

二、集群 

  集群在服務器配置文件中配置:

<remote_servers>
    <logs>
        <!-- Inter-server per-cluster secret for Distributed queries
             default: no secret (no authentication will be performed)

             If set, then Distributed queries will be validated on shards, so at least:
             - such cluster should exist on the shard,
             - such cluster should have the same secret.

             And also (and which is more important), the initial_user will
             be used as current user for the query.
        -->
        <!-- <secret></secret> -->
        <shard>
            <!-- Optional. Shard weight when writing data. Default: 1. -->
            <weight>1</weight>
            <!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). -->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>

 

  這里定義了一個集群,其名稱logs由兩個分片組成,每個分片包含兩個副本。分片是指包含不同部分數據的服務器(為了讀取所有數據,必須訪問所有分片)。副本是復制服務器(為了讀取所有數據,可以訪問任何一個副本上的數據)。

  集群名稱不能包含點。

  為每個服務器指定參數host,port和可選user的 , passwordsecure:compression

  • host– 遠程服務器的地址。可以使用域或 IPv4 或 IPv6 地址。如果指定域,則服務器在啟動時會發出 DNS 請求,只要服務器正在運行,就會存儲結果。如果 DNS 請求失敗,則服務器不會啟動。如果更改 DNS 記錄,請重新啟動服務器。
  • port– 信使活動的 TCP 端口(tcp_port在配置中,通常設置為 9000)。不要與http_port.
  • user– 連接遠程服務器的用戶名。默認值為default用戶。此用戶必須有權連接到指定的服務器。訪問在users.xml文件中配置。有關詳細信息,請參閱訪問權限部分。
  • password– 連接遠程服務器的密碼(未屏蔽)。默認值:空字符串。
  • secure- 是否使用安全的 SSL/TLS 連接。通常還需要指定端口(默認安全端口是9440)。服務器應該監聽<tcp_port_secure>9440</tcp_port_secure>並配置正確的證書。
  • compression- 使用數據壓縮。默認值:true

  指定副本時,讀取時將為每個分片選擇一個可用副本。可以配置負載平衡算法(訪問哪個副本的首選項)如果未建立與服務器的連接,則將嘗試連接一個短暫的超時。如果連接失敗,將選擇下一個副本,以此類推所有副本。如果所有副本的連接嘗試都失敗,則嘗試以相同的方式重復幾次。這有利於彈性,但不提供完整的容錯能力:遠程服務器可能接受連接,但可能無法正常工作,或者工作不佳。

  可以僅指定一個分片(在這種情況下,查詢處理應稱為遠程,而不是分布式)或最多指定任意數量的分片。在每個分片中,可以指定從一個到任意數量的副本。可以為每個分片指定不同數量的副本。

  可以在配置中指定任意數量的集群。

  要查看的集群,請使用該system.clusters表。

  該Distributed引擎允許使用像本地服務器這樣的集群。但是,集群的配置不能動態指定,必須在服務器配置文件中進行配置。通常,集群中的所有服務器都將具有相同的集群配置(盡管這不是必需的)。配置文件中的集群會即時更新,無需重新啟動服務器。

  如果每次都需要向一組未知的分片和副本發送查詢,則無需創建Distributed表 -remote而是使用 table 函數。

  其他注意點:

  internal_replication配置用法

  1)如果底層是非復制表,那么這個值設為false(默認)。表示insert分布式表時,會在分片的所有副本都寫入一份。
  2)如果底層是復制表,那么這個值配置為true。表示分布式表不會往所有副本都寫入。僅寫入到一個副本。
  internal_replication這個參數是控制寫入數據到分布式表時,分布式表會控制這個寫入是否的寫入到所有副本中。與復制表的同步是不一樣的。為什么<2>中要設置為true,這就是為了避免和復制表的同步復制機制出現沖突,導致數據重復或者不一致。

  因為如果既是復制表、internal_replication又為false,那么寫入到分布式表時會寫入到同一分片的所有副本,而此時復制表的機制也會把不同副本之間的數據進行同步。而且分布式表寫入到所有副本並不是原子性的,也就是說,寫入到所有副本時,寫入某個副本失敗了,那這個副本就寫入失敗了,不會矯正。復制表的同步是會保證同步的。

三、寫入數據 

  1.將數據寫入集群有兩種方法:

  1)可以定義將哪些數據寫入哪些服務器並直接在每個分片上執行寫入。換句話說,對表所指向INSERT的集群中的遠程表執行直接語句。Distributed這是最靈活的解決方案,因為可以使用任何分片方案,即使是由於主題領域的要求而並非微不足道的分片方案。這也是最優化的解決方案,因為數據可以完全獨立地寫入不同的分片。

  2)可以在表上執行INSERT語句Distributed在這種情況下,表將在服務器本身之間分配插入的數據。為了寫入Distributed表,它必須sharding_key配置參數(除非只有一個分片)。

       2.注意:

  每個分片都可以<weight>在配置文件中定義。默認情況下,權重為1數據以與分片權重成比例的數量分布在分片上。將所有分片權重相加,然后將每個分片的權重除以總和,以確定每個分片的比例。例如,如果有兩個分片,第一個的權重為 1,而第二個的權重為 2,第一個將被發送三分之一 (1 / 3) 的插入行,第二個將被發送三分之二 (2 / 3)。

  每個分片都可以internal_replication在配置文件中定義參數。如果此參數設置為true,則寫入操作會選擇第一個健康的副本並向其寫入數據。如果表基礎的Distributed表是復制表(例如任何Replicated*MergeTree表引擎),請使用此選項。其中一個表副本將接收寫入,並將自動復制到其他副本。

  如果internal_replication設置為false(默認值),則將數據寫入所有副本。在這種情況下,Distributed表本身會復制數據。這比使用復制表更糟糕,因為不檢查副本的一致性,並且隨着時間的推移,它們將包含稍微不同的數據。

  為了選擇將一行數據發送到的分片,分析分片表達式,並將其除以分片的總權重得到余數。prev_weights該行被發送到對應於余數從到的半區間的分片prev_weights + weight,其中prev_weights是編號最小的分片的總權重,並且weight是該分片的權重。例如,如果有兩個分片,第一個的權重為 9,而第二個的權重為 10,則該行將發送到第一個分片以獲取范圍 [0, 9) 中的余數,並發送到第二個用於范圍 [9, 19) 的余數。

  分片表達式可以是返回整數的常量和表列中的任何表達式。例如,可以使用表達式rand()進行數據的隨機分布,或者UserID通過除以用戶 ID 的余數進行分布(然后單個用戶的數據將駐留在單個分片上,這簡化了運行INJOIN按用戶)。如果其中一列分布不夠均勻,可以將其包裝在哈希函數中,例如intHash64(UserID).

  除法的簡單余數是分片的有限解決方案,並不總是合適的。它適用於中型和大量數據(數十台服務器),但不適用於非常大量的數據(數百台或更多服務器)。在后一種情況下,使用主題區域所需的分片方案,而不是使用表中的條目Distributed

在以下情況下,應該關注分片方案:

  • 使用需要通過特定鍵連接數據(IN或)的查詢。JOIN如果數據被這個鍵分片,你可以使用 local INorJOIN代替GLOBAL INor GLOBAL JOIN,這樣效率更高。
  • 使用大量服務器(數百台或更多)和大量小型查詢,例如,查詢單個客戶(例如網站、廣告商或合作伙伴)的數據。為了使小查詢不影響整個集群,將單個客戶端的數據定位在單個分片上是有意義的。或者,可以設置雙層分片:將整個集群划分為“層”,其中一層可能由多個分片組成。單個客戶端的數據位於單個層上,但可以根據需要將分片添加到層中,並且數據在其中隨機分布。Distributed為每一層創建表,並為全局查詢創建一個共享分布式表。

  數據是異步寫入的。當插入表中時,數據塊只是寫入本地文件系統。數據會盡快在后台發送到遠程服務器。發送數據的周期由distributed_directory_monitor_sleep_time_msdistributed_directory_monitor_max_sleep_time_ms設置管理。Distributed引擎會單獨發送每個包含插入數據的文件,但可以使用 Distributed_directory_monitor_batch_inserts 設置啟用批量發送文件此設置通過更好地利用本地服務器和網絡資源來提高集群性能。應該通過查看表目錄中的文件列表(等待發送的數據)來檢查數據是否發送成功:/var/lib/clickhouse/data/database/table/執行后台任務的線程數可以通過background_distributed_schedule_pool_size設置來設置。

  INSERT如果服務器停止存在或在訪問表后粗略重新啟動(例如,由於硬件故障)Distributed,則插入的數據可能會丟失。如果在表目錄中檢測到損壞的數據部分,則將其轉移到broken子目錄中,不再使用。

  3.應用

  直接寫分布式表的優點自然是可以讓ClickHouse控制數據到分片的路由,而缺點:

  • 數據是先寫到一個分布式表的實例中並緩存起來,再逐漸分發到各個分片上去,實際是雙寫了數據(寫入放大),浪費資源;
  • 數據寫入默認是異步的,短時間內可能造成不一致;
  • 目標表中會產生較多的小parts,使merge(即compaction)過程壓力增大。

  相對而言,直接寫本地表是同步操作,更快,parts的大小也比較合適,但是就要求應用層額外實現sharding和路由邏輯,如輪詢或者隨機等。

  以下為分布式表插入流程圖:

 

 

四、讀取數據

  查詢Distributed表時,SELECT查詢被發送到所有分片並且無論數據如何在分片中分布(它們可以完全隨機分布)都可以工作。添加新分片時,不必將舊數據傳輸到其中。相反,可以通過使用更重的權重向其寫入新數據——數據將稍微不均勻地分布,但查詢將正確有效地工作。

  啟用該max_parallel_replicas選項后,查詢處理將在單個分片內的所有副本中並行處理。

  在分布式表上執行查詢的流程簡圖如下所示。發出查詢后,各個實例之間會交換自己持有的分片的表數據,最終匯總到同一個實例上返回給用戶。

 

       針對多分片多副本的情況

  讀取數據分布式查詢遵循多副本的路由規則

       該配置項為:load_balance=random/nearest_hostname/in_order/first_or_random

  多副本的路由規則

  查詢數據時,如果一個分片shard有多個副本repIica,那么Distributed表引擎就需要面對副本選擇的問題,選擇查詢究竟在哪個副本上執行。ck的負載均衡算法有以下四種:

  • random
  • nearest_hostname
  • in_order
  • first_or_random

  1) random

  這是默認的負載均衡算法。在ck的服務節點中,有一個errors_count全局計數器,當服務發生任何異常時,技術器加1。randdom算法會選擇errors_count最小的那個repIica,如果多個repIica的errors_count相同,則在這幾個里隨機選擇一個。

  2) nearest_hostname

  選擇errors_count最小的那個,如果多個errors_count相同,則選擇集群配置中host名稱和當前host名稱最相似的那個。相似比較的規則是與當前host的名稱,按字節進行逐位對比,找到不同字節最少的那個。 例如當前host是a.bc.de,那么,a.bc.df就比a.bf.hh要更加相似。 a.bc.de a.bc.df a.bf.hh

  3)in_order

  選擇errors_count最小的那個,如果多個errors_count相同,則按照集群配置順序選擇。

  4)first_or_random

  選擇errors_count最小的那個,如果多個errors_count相同,則按照集群配置順序選擇第一個,如果第一個不可用,則隨意選擇一個其他的。

  總結起來,其實這4個負載算法中,都是優先選擇errors_count最小的那個,如果多個errors_count相同時,再根據不同的負載算法來選擇。

 

五、虛擬列

  _shard_num— 包含shard_num表中的值system.clusters。類型:UInt32。

  注意

  由於遠程集群表功能在內部創建臨時分布式表,_shard_num因此在那里也可用。

六、常用sql

1.查看集群:
select * from system.clusters
 
2.查看數據庫容量、行數、壓縮率
SELECT
    sum(rows) AS `總行數`,
    formatReadableSize(sum(data_uncompressed_bytes)) AS `原始大小`,
    formatReadableSize(sum(data_compressed_bytes)) AS `壓縮大小`,
    round((sum(data_compressed_bytes) / sum(data_uncompressed_bytes)) * 100, 0) AS `壓縮率`
FROM system.parts
 
 
3.查看數據表容量、行數、壓縮率
SELECT
    table AS `表名`,
    sum(rows) AS `總行數`,
    formatReadableSize(sum(data_uncompressed_bytes)) AS `原始大小`,
    formatReadableSize(sum(data_compressed_bytes)) AS `壓縮大小`,
    round((sum(data_compressed_bytes) / sum(data_uncompressed_bytes)) * 100, 0) AS `壓縮率`
FROM system.parts
WHERE table IN ('dm_order_dis')
GROUP BY table
 
 
 
4.查看數據表分區信息
SELECT
    partition AS `分區`,
    sum(rows) AS `總行數`,
    formatReadableSize(sum(data_uncompressed_bytes)) AS `原始大小`,
    formatReadableSize(sum(data_compressed_bytes)) AS `壓縮大小`,
    round((sum(data_compressed_bytes) / sum(data_uncompressed_bytes)) * 100, 0) AS `壓縮率`
FROM system.parts
GROUP BY partition
ORDER BY partition ASC
 
添加查詢條件
WHERE (database IN ('data_prod')) AND (table IN ('dm_order_dis')) AND (partition LIKE '2020%')
 
 
 
5.查看數據表字段的信息
SELECT
    column AS `字段名`,
    any(type) AS `類型`,
    formatReadableSize(sum(column_data_uncompressed_bytes)) AS `原始大小`,
    formatReadableSize(sum(column_data_compressed_bytes)) AS `壓縮大小`,
    sum(rows) AS `行數`
FROM system.parts_columns
WHERE (database = 'data_prod') AND (table = 'dm_order_dis')
GROUP BY column
ORDER BY column ASC

 

七、總結

  ClickHouse分布式表的本質並不是一張表,而是一些本地物理表(分片)的分布式視圖,本身並不存儲數據。

  在生產環境中總是推薦寫本地表、讀分布式表。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM