Colocation Join 是在 Doris 0.9 版本中引入的新功能。旨在為某些 Join 查詢提供本地性優化,來減少數據在節點間的傳輸耗時,加速查詢。
1、基本理論
Join 的常見連接類型分為以下幾種:
- INNER JOIN
- OUTER JOIN
- CROSS JOIN
- SEMI JOIN
- ANTI JOIN
Join 的常見算法實現包含以下幾種:
- Nested Loop Join
- Sort Merge Join
- Hash Join
分布式系統實現 Join 數據分布的常見策略有:
- Shuffle Join
- Broadcast Join
- Colocate/Local Join
Colocate/Local Join 就是指多個節點 Join 時沒有數據移動和網絡傳輸,每個節點只在本地進行 Join,能夠本地進行 Join 的前提是相同 Join Key 的數據分布在相同的節點。
shuffle join 和 broadcast join 中, 參與join的兩張表的數據行, 若滿足join條件, 則需要將它們匯合在一台節點上, 完成join; 這兩種join方式, 都無法避免節點間數據網絡傳輸帶來額外的延遲和其他開銷。
2、名詞解釋
- Colocation Group(CG):一個 CG 中會包含一張及以上的 Table。一個CG內的Table 按相同的分桶方式和副本放置方式, 使用Colocation Group Schema描述. -- (組合分組)
- Colocation Group Schema(CGS): 包含CG的分桶鍵,分桶數以及副本數等信息。
- Colocate Parent Table:將決定一個 Group 數據分布的 Table 稱為 Parent Table。(當創建表時, 通過表的 PROPERTIES 的屬性 "colocate_with" = "group_name" 指定表歸屬的CG; 如果CG不存在, 說明該表為CG的第一張表, 稱之為Parent Table, Parent Table的數據分布(分桶鍵的類型,數量和順序, 副本數和分桶數)決定了CGS; 如果CG存在, 則檢查表的數據分布是否和CGS一致.)
- Colocate Child Table:將一個 Group 中除 Parent Table 之外的 Table 稱為 Child Table。
- Bucket Seq:如下圖,如果一個表有 N 個 Partition, 則每個 Partition 的第 M 個 bucket 的 Bucket Seq 是 M。
3、基本原理
Colocation Join 功能,是將一組擁有相同 CGS 的 Table 組成一個 CG。並保證這些 Table 對應的分桶副本會落在相同一組BE 節點上。使得當 CG 內的表進行分桶列上的 Join 操作時,可以直接進行本地數據 Join,減少數據在節點間的傳輸耗時。
為了使得 Table 能夠有相同的數據分布,同一 CG 內的 Table 必須保證下列約束:
- 1、同一CG 內的 Table 的分桶鍵的類型, 數量和順序完全一致,並且桶數一致; 這樣才能保證多張表的數據分片能夠一一對應的進行分布控制。
- 2、同一個 CG 內所有表的所有分區(Partition)的副本數必須一致。如果不一致,可能出現某一個 Tablet 的某一個副本,在同一個 BE 上沒有其他的表分片的副本對應。
- 分桶鍵:即在建表語句中 DISTRIBUTED BY HASH(col1, col2, ...) 中指定一組列。分桶鍵決定了一張表的數據通過哪些列的值進行 Hash 划分到不同的 Bucket Seq中。
- 同CG的table的分桶鍵的名字可以不相同, 分桶列的定義在建表語句中的出現次序可以不一致, 但是在 DISTRIBUTED BY HASH(col1, col2, ...) 的對應數據類型的順序要完全一致.
- 3、同一個 CG 內所有表的分區鍵, 分區數量可以不同.
所以我們在數據導入時保證本地性的核心思想就是兩次映射,對於 colocate tables,我們保證相同 Distributed Key 的數據映射到相同的 Bucket Seq,再保證相同 Bucket Seq 的 buckets 映射到相同的 BE
具體實現:
第一步:我們計算 Distributed Key 的 hash 值,並對 bucket num 取模,保證相同 Distributed Key 的數據映射到相同的 Bucket Seq。
第二步:將同一個 Colocate Group 下所有相同 Bucket Seq 的 Bucket 映射到相同的 BE,方法如下:
- 1、Group 中所有 Table 的 Bucket Seq 和 BE 節點的映射關系和 Parent Table 一致
- 2、Parent Table 中所有 Partition 的 Bucket Seq 和 BE 節點的映射關系和第一個 Partition 一致
- 3、Parent Table 第一個 Partition 的 Bucket Seq 和 BE 節點的映射關系利用原生的 Round Robin 算法決定
4、使用方式
1、建表
建表時,可以在 PROPERTIES 中指定屬性 "colocate_with" = "group_name",表示這個表是一個 Colocation Join 表,並且歸屬於一個指定的 Colocation Group。如下圖:
說明:
- 如果指定的 Group 不存在,則 DorisDB 會自動創建一個只包含當前這張表的 Group。
- 如果 Group 已存在,則 DorisDB 會檢查當前表是否滿足 Colocation Group Schema。如果滿足,則會創建該表,並將該表加入 Group。同時,表會根據已存在的 Group 中的數據分布規則創建分片和副本。
- Group 歸屬於一個 Database,Group 的名字在一個 Database 內唯一。在內部存儲是 Group 的全名為 dbId_groupName,但用戶只感知 groupName。
2、刪除
當 Group 中最后一張表徹底刪除后(徹底刪除是指從回收站中刪除。通常,一張表通過 DROP TABLE 命令刪除后,會在回收站默認停留一天的時間后,再刪除),該 Group 也會被自動刪除。
3、查看 Group 信息
以下命令可以查看集群內已存在的 Group 信息
- SHOW PROC '/colocation_group';
結果示例:
其中:
- GroupId:一個 Group 的全集群唯一標識,前半部分為 db id,后半部分為 group id。
- GroupName:Group 的全名。
- TabletIds:該 Group 包含的 Table 的 id 列表。
- BucketsNum:分桶數。
- ReplicationNum:副本數。
- DistCols:Distribution columns,即分桶列類型。
- IsStable:該 Group 是否穩定(穩定的定義,見 下面 Colocation 副本均衡和修復)。
通過以下命令可以進一步查看一個 Group 的數據分布情況:
- SHOW PROC '/colocation_group/10005.10008';
結果示例:
其中:
- BucketIndex: 分桶序列的下標。
- BackendIds: 分桶中數據分片所在的 BE 節點 id 列表。
注意:以上命令需要 AMDIN 權限。暫不支持普通用戶查看。
4、修改表 Group 屬性
可以對一個已經創建的表,修改其 Colocation Group 屬性。示例:
- ALTER TABLE tbl SET ("colocate_with" = "group2");
如果該表之前沒有指定過 Group,則該命令檢查 Schema,並將該表加入到該 Group(Group 不存在則會創建)。如果該表之前有指定其他 Group,則該命令會先將該表從原有 Group 中移除,並加入新 Group(Group 不存在則會創建)。
也可以通過以下命令,刪除一個表的 Colocation 屬性:
- ALTER TABLE tbl SET ("colocate_with" = "");
5、其他相關操作
當對一個具有 Colocation 屬性的表進行增加分區(ADD PARTITION)、修改副本數時,DorisDB 會檢查修改是否會違反 Colocation Group Schema,如果違反則會拒絕。
5、Colocation 副本均衡和修復
Colocation 表的副本分布需要遵循 Group 中指定的分布,所以在副本修復和均衡方面和普通分片有所區別。
Group 自身有一個 Stable 屬性:
- 當 Stable 為 true 時,表示當前 Group 內的表的所有分片沒有正在進行變動,Colocation 特性可以正常使用。
- 當 Stable 為 false 時(Unstable),表示當前 Group 內有部分表的分片正在做修復或遷移,此時,相關表的 Colocation Join 將退化為普通 Join。
1、副本修復
- 副本只能存儲在指定的 BE 節點上。所以當某個 BE 不可用時(宕機、Decommission 等),需要尋找一個新的 BE 進行替換。
- DorisDB 會優先尋找負載最低的 BE 進行替換。替換后,該 Bucket 內的所有在舊 BE 上的數據分片都要做修復。遷移過程中,Group 被標記為 Unstable。
2、副本均衡
DorisDB 會盡力將 Colocation 表的分片均勻分布在所有 BE 節點上。
對於普通表的副本均衡,是以單副本為粒度的,即單獨為每一個副本尋找負載較低的 BE 節點即可。
而 Colocation 表的均衡必須保證同一個 colocate group 下所有 bucket 的數據本地性,所以我們 balance 的單位是 colocate group。
- 注1:當前的 Colocation 副本均衡和修復算法,對於異構部署的 DorisDB 集群效果可能不佳。所謂異構部署,即 BE 節點的磁盤容量、數量、磁盤類型(SSD 和 HDD)不一致。在異構部署情況下,可能出現小容量的 BE 節點和大容量的 BE 節點存儲了相同的副本數量。
- 注2:當一個 Group 處於 Unstable 狀態時,其中的表的 Join 將退化為普通 Join。此時可能會極大降低集群的查詢性能。如果不希望系統自動均衡,可以設置 FE 的配置項 disable_colocate_balance 來禁止自動均衡。然后在合適的時間打開即可。
3、核心思路:
- 新增一個 daemon 線程專門處理 colocate table 的 balance,並讓正常的 balance 線程不處理 colocate table 的 balance。
4、何時 balance
- 有 BE 節點新增,刪除,down 掉時。
5、balance 的粒度:
- 正常 balance 的粒度是 bucket,但是對於 colocate table,我們必須保證同一個 colocate group 下所有 bucket 的數據本地性,所以我們 balance 的單位是 colocate group。
6、balance 對查詢的影響:
- 當一個 colocate group 正在 balance 時,colocate join 會退化為原始的 shuffle join 或 broadcast join。
7、balance 流程:
- 1、為需要復制或遷移的 Bucket 選擇目標 BE
- 2、標記 colocate group 的轉態為 balancing
- 3、對於需要復制或遷移的 Bucket,發起 Clone Job,Clone Job 會從 Bucket 的現有副本復制一個新副本目標 BE
- 4、更新 backendsPerBucketSeq(維護 Bucket Seq 到 BE 映射關系的元數據)
- 5、當一個 colocate group 下的所有 Clone Job 都完成時,標記 colocate group 的轉態為 stable
- 6、刪除冗余的副本
6、FE 配置項
1、disable_colocate_relocate: 是否關閉 DorisDB 的自動 Colocation 副本修復。默認為 false,即不關閉。該參數只影響 Colocation 表的副本修復,不影響普通表。
2、disable_colocate_balance: 是否關閉 DorisDB 的自動 Colocation 副本均衡。默認為 false,即不關閉。該參數只影響 Colocation 表的副本均衡,不影響普通表。
3、disable_colocate_join:可以通過改該變量在 session 粒度關閉 colocate join功能
7、FAQ
一句話總結,凡是不能進行 Colocate Join 的場景都會自動退化為原始的 Shuffle Join 或者 Broadcast Join。
1、支持多張表進行 Colocate Join 嗎?
- 支持
2、支持 Colocate 表和正常表 Join 嗎?
- 支持
3、Colocate 表支持用非分桶的 Key 進行 Join 嗎?
- 支持:不符合 Colocate Join 條件的 Join 會使用 Shuffle Join 或 Broadcast Join
4、如何確定 Join 是按照 Colocate Join 執行的?
- explain 的結果中 Hash Join 的孩子節點如果直接是 OlapScanNode, 沒有 Exchange Node,就說明是 Colocate Join
5、如何修改 colocate_with 屬性?
- ALTER TABLE example_db.my_table set ("colocate_with"="target_table");
6、如何禁用 colocate join?
- set disable_colocate_join = true; 就可以禁用 Colocate Join,查詢時就會使用 Shuffle Join 或 Broadcast Join
參考資料