1. 背景
shard allocation 意思是分片分配, 是一個將分片分配到節點的過程; 可能發生該操作的過程包括:
- 初始恢復(
initial recovery) - 副本分配(
replica allocation) - 重新平衡(
rebalance) - 節點的新增和刪除
分片的分配操作, 是由 master 角色的節點來決定什么時候移動分片, 以及移動到哪個節點上, 以達到集群的均衡;
說明
本文基於 Elasticsearch 7.4.0 版本
2. 機制分析
2.1. Allocation 觸發條件
- 新增或刪除
index索引 node節點的新增或刪除- 執行
reroute命令 - 修改
replica副本數量 - 集群重啟
具體對應源碼解釋:來源
| 序號 | 調用函數 | 說明 |
|---|---|---|
| 1 | AllocationService.applyStartedShards | Shard 啟動狀態修改 |
| 2 | AllocationService.applyFailedShards | Shard 失效狀態修改 |
| 3 | AllocationService.deassociateDeadNodes | Node 節點離開集群 |
| 4 | AllocationService.reroute(AllocationCommands) | 執行 relocation 命令 |
| 5 | TransportClusterUpdateSettingsAction.masterOperation | 集群配置修改操作 |
| 6 | MetaDataCreateIndexService.onlyCreateIndex | 創建新索引 index 請求 |
| 7 | MetaDataDeleteIndexService.deleteIndexs | 刪除索引 index 操作 |
| 8 | MetaDataIndexStateService.closeIndex | 關閉 index 操作 |
| 9 | MetaDataIndexStateService.openIndex | 打開 index操作 |
| 10 | NodeJoinController.JoinTaskExecutor | 通過集群發現的節點加入集群 |
| 11 | GatewayService.GatewayRecoveryListener | 通過 GatewayRecovery 恢復的節點加入集群 |
| 12 | LocalAllocateDangledIndices.submitStateUpdateTask | 恢復磁盤內存而在 MateDate 內不存在的 index |
| 13 | RestoreService.restoreSnapshot | 從 snapshot 中恢復的 index |
2.2. Rebalance 的觸發條件
在 rebalance 之前會經過 2.3.2 中介紹的所有策略里實現的 canRebalance 方法, 全部通過后才會執行下面的 Rebalance 過程;
Rebalance 過程是通過調用 balanceByWeights() 方法, 計算 shard 所在的每個 node 的 weight 值,
其中:
numAdditionalShards一般為 0, 調用weightShardAdded,weightShardRemoved方法時分別取值為1和-1;- theta0 =
cluster.routing.allocation.balance.shard系統動態配置項, 默認值為0.45f; - theta1 =
cluster.routing.allocation.balance.index系統動態配置項, 默認值為0.55f;

源碼如下:
private static class WeightFunction {
private final float indexBalance;
private final float shardBalance;
private final float theta0;
private final float theta1;
WeightFunction(float indexBalance, float shardBalance) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
}
float weight(Balancer balancer, ModelNode node, String index) {
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex;
}
}
2.3. 源碼分析
分片分配就是把一個分片分配到集群中某個節點的過程, 其中分配決策包含了兩個方面:
- 哪些分片應該分配到哪些節點上
- 哪個分片作為主分片, 哪個作為副本分片
Elasticsearch 主要通過兩個基礎組件來完成分片分配這個過程的: allocator 和 deciders;
allocator尋找最優的節點來分配分片;deciders負責判斷並決定是否要進行分配;
- 新建的索引
allocator 負責找出擁有分片數量最少的節點列表, 按分片數量遞增排序, 分片數量較少的會被優先選擇; 對於新建索引, allocator 的目標是以更為均衡的方式把新索引的分片分配到集群的節點中;
deciders 依次遍歷 allocator 給出的節點列表, 判斷是否要把分片分配給該節點, 比如是否滿足分配過濾規則, 分片是否將超出節點磁盤容量閾值等等;
- 已有的索引
allocator 對於主分片, 只允許把主分片指定在已經擁有該分片完整數據的節點上; 對於副本分片, 則是先判斷其他節點上是否已有該分片的數據的拷貝, 如果有這樣的節點, allocator 則優先把分片分配到這其中一個節點上;
2.3.1. Allocator

PrimaryShardAllocator找到擁有某Shard最新數據(主分片)的節點;ReplicaShardAllocator找到磁盤上擁有這個Shard數據(副本分片)的節點;BalancedShardsAllocator找到擁有最少Shard個數的節點;
public class BalancedShardsAllocator implements ShardsAllocator {
public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.index", 0.55f, 0.0f, Property.Dynamic, Property.NodeScope);
public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.shard", 0.45f, 0.0f, Property.Dynamic, Property.NodeScope);
public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.threshold", 1.0f, 0.0f, Property.Dynamic, Property.NodeScope);
private volatile WeightFunction weightFunction;
private volatile float threshold;
}
2.3.2. Deciders
Deciders 決策期基礎組件的抽象類為 AllocationDecider:
public abstract class AllocationDecider {
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canRebalance(RoutingAllocation allocation) {
return Decision.ALWAYS;
}
}
ES 7.4.0 中的 Decider 決策器包括以下所示, 他們均實現上面的 AllocationDecider 抽象類, 並重寫 canRebalance, canAllocate, canRemain, canForceAllocatePrimary 等方法;

決策器比較多, 大致分類如下, 並列舉決策器對應的配置項:
2.3.2.1. 負載均衡類
-
SameShardAllocationDecider: 避免主副分片分配到同一個節點; -
AwarenessAllocationDecider: 感知分配器, 感知服務器, 機架等, 盡量分散存儲 Shard;對應的配置參數有:
cluster.routing.allocation.awareness.attributes: rack_idcluster.routing.allocation.awareness.attributes: zone -
ShardsLimitAllocationDecider: 同一個節點上允許存在同一個index的shard數目;index.routing.allocation.total_shards_per_node: 表示該索引每個節點上允許最多的shard數量; 默認值=-1, 表示無限制;cluster.routing.allocation.total_shards_per_node:cluster級別, 表示集群范圍內每個節點上允許最多的shard數量, 默認值=-1, 表示無限制;index級別會覆蓋cluster級別;
2.3.2.2. 並發控制類
-
ThrottlingAllocationDecider:recovery階段的限速配置, 避免過多的recovering allocation導致該節點的負載過高;cluster.routing.allocation.node_initial_primaries_recoveries: 當前節點在進行主分片恢復時的數量, 默認值=4;cluster.routing.allocation.node_concurrent_incoming_recoveries: 默認值=2, 通常是其他節點上的副本 shard 恢復到該節點上;cluster.routing.allocation.node_concurrent_outgoing_recoveries: 默認值=2, 通常是當前節點上的主分片 shard 恢復副本分片到其他節點上;cluster.routing.allocation.node_concurrent_recoveries: 統一配置上面兩個配置項; -
ConcurrentRebalanceAllocationDecider:rebalace並發控制, 表示集群同時允許進行rebalance操作的並發數量;cluster.routing.allocation.cluster_concurrent_rebalance, 默認值=2通過檢查
RoutingNodes類中維護的reloadingShard計數器, 看是否超過配置的並發數; -
DiskThresholdDecider: 根據節點的磁盤剩余量來決定是否分配到該節點上;cluster.routing.allocation.disk.threshold_enabled, 默認值=true;cluster.routing.allocation.disk.watermark.low: 默認值=85%, 達到這個值后, 新索引的分片不會分配到該節點上;cluster.routing.allocation.disk.watermark.high: 默認值=90%, 達到這個值后, 會觸發已分配到該節點上的Shard會rebalance到其他節點上去;
2.3.2.3. 條件限制類
-
RebalanceOnlyWhenActiveAllocationDecider: 所有Shard都處於active狀態下才可以執行rebalance操作; -
FilterAllocationDecider: 通過接口動態設置的過濾器;cluster級別會覆蓋index級別;index.routing.allocation.require.{attribute}
index.routing.allocation.include.{attribute}
index.routing.allocation.exclude.{attribute}
cluster.routing.allocation.require.{attribute}
cluster.routing.allocation.include.{attribute}
cluster.routing.allocation.exclude.{attribute}- require 表示必須滿足, include 表示可以分配到指定節點, exclude 表示不允許分配到指定節點;
- {attribute} 還有 ES 內置的幾個選擇, _name, _ip, _host;
-
ReplicaAfterPrimaryActiveAllocationDecider: 保證只在主分片分配完成后(active狀態)才開始分配副本分片; -
ClusterRebalanceAllocationDecider: 通過集群中active的shard狀態來決定是否可以執行rebalance;cluster.routing.allocation.allow_rebalanceindices_all_active(默認): 當集群所有的節點分配完成, 才可以執行rebalance操作;
indices_primaries_active: 只要所有主分片分配完成, 才可以執行rebalance操作;
always: 任何情況下都允許 rebalance 操作; -
MaxRetryAllocationDecider: 防止 shard 在失敗次數達到上限后繼續分配;index.allocation.max_retries: 設置分配的最大失敗重試次數, 默認值=5;
2.3.2.4. 其他決策類
-
EnableAllocationDecider: 設置允許分配的分片類型;index級別配置會覆蓋cluster級別配置;all(默認): 允許所有類型的分片;primaries: 僅允許主分片;new_primaries: 僅允許新建索引的主分片;none: 禁止分片分配操作; -
NodeVersionAllocationDecider: 檢查分片所在 Node 的版本是否高於目標 Node 的 ES 版本; -
SnapshotInProgressAllocationDecider: 決定snapshot期間是否允許allocation, 因為snapshot只會發生在主分片上, 所以該配置只會限制主分片的allocation;cluster.routing.allocation.snapshot.relocation_enabled
接下來介紹一下在 Elasticsearch 中涉及到 Allocation 和 Rebalance 的相關配置項;
3. cluster-level 配置
3.1. Shard allocation 配置
控制分片的分配和恢復;
| 配置 | 默認值 | 說明 |
|---|---|---|
| cluster.routing.allocation.enable | all | 啟用或禁用針對特定類型分片的分配; 1. all: 允許分配所有類型的分片; 2. primaries: 只允許分配主分片(primary shard); 3. new_primaries: 只允許分配新索引的主分片(primary shard);4. none: 禁用分片分配;該設置不會影響重啟節點時本地主分片的恢復; |
| cluster.routing.allocation.node_concurrent_incoming_recoveries | 2 | 一個節點允許並發的傳入分片(incoming shard)數量 |
| cluster.routing.allocation.node_concurrent_outgoing_recoveries | 2 | 一個節點允許並發的傳出分片(incoming shard)數量 |
| cluster.routing.allocation.node_concurrent_recoveries | 上面兩者的合並配置 | |
| cluster.routing.allocation.node_initial_primaries_recoveries | 4 | 單個節點上同時初始化的主分片數量 |
| cluster.routing.allocation.same_shard.host | false | 是否執行檢查, 以防止基於host name和host address, 在單個主機上分配同一分片的多個實例; 該設置僅用於在同一台計算機上啟動多個節點的情況; |
3.2. Shard rebalancing 配置
控制集群之間的分片平衡;
| 配置 | 默認值 | 說明 |
|---|---|---|
| cluster.routing.rebalance.enable | all | 啟用或禁用針對特定類型分片的rebalancing;1. all: 允許rebalancing所有類型的分片;2. primaries: 只允許rebalancing主分片;3. replicas: 只允許rebalancing副本分片;4. none: 禁用rebalancing; |
| cluster.routing.allocation.allow_rebalance | indices_all_active | 指定何時允許執行rebalancing;1. always: 總是允許;2. indices_primaries_active: 當集群中所有主分片已分配時才允許rebalancing;3. indices_all_active: 當集群中所有分片(包括主分片和副本分片)都已分配時才允許rebalancing; |
| cluster.routing.allocation.cluster_concurrent_rebalance | 2 | 指定整個集群中允許同時在節點間移動的分片數量; 該配置僅控制由於集群不平衡引起的並發分片分配數量, 對分配過濾(allocation filtering)或強制感知(forced awareness)的分片分配不做限制; |
3.3. 分片平衡啟發式
以下配置用於決定每個分片的存放位置; 當rebalancing操作不再使任何節點的權重超過balance.threshold時, 集群即達到平衡;
| 配置 | 默認值 | 說明 |
|---|---|---|
| cluster.routing.allocation.balance.shard | 0.45f | 定義節點上分配的分片總數的權重因子; 提升該值會導致集群中所有節點趨向於分片數量相等; |
| cluster.routing.allocation.balance.index | 0.55f | 定義節點上分配的每個索引的分片數量的權重因子; 提升該值會導致集群中所有節點上每個索引的分片數量趨向於相等; |
| cluster.routing.allocation.balance.threshold | 1.0f | 定義應當執行操作的最小優化值(非負浮點數); 提升該值會導致集群在優化分片平衡方面不太積極; |
4. Index-level 配置
以下配置控制每個索引中的分片分配;
4.1. index-level 分片分配過濾(來源)
配置需要分兩步:
- 在每個 Elasticsearch 節點的
elasticsearch.yml配置文件中添加自定義節點屬性, 比如以small,medium,big區分節點類型, 則配置文件中可添加:
node.attr.size: medium
或者在啟動 Elasticsearch 服務時, 在命令行里添加 ./bin/elasticsearch -Enode.attr.size=medium;
- 在新建索引的
mapping時, 添加index.routing.allocation.include/exclude/require.size: medium的過濾配置即可;
PUT <index_name>/_settings
{
"index.routing.allocation.include.size": "medium"
}
可以配置多個自定義節點屬性, 並且必須同時滿足索引里配置的多個過濾條件;
- index.routing.allocation.include.{attribute}: {values}
- index.routing.allocation.require.{attribute}: {values}
- index.routing.allocation.exclude.{attribute}: {values}
其中 {attribute} 可以是上面提到的自定義節點屬性, ES 自己也有一些內置的節點屬性:
| attribute | 說明 |
|---|---|
| _name | 通過節點名稱進行匹配 |
| _host_ip | 通過節點 IP 地址進行匹配 |
| _publish_ip | 通過節點的發布 IP 地址進行匹配 |
| _ip | 通過 _host_ip 或 _publish_ip 進行匹配 |
| _host | 通過節點的hostname進行匹配 |
| _id | 通過節點的 id 進行匹配 |
其中 {values} 可以是單個值, 也可以是逗號分隔的多個值, 也可以使用通配符 * 進行模糊匹配;
4.2. 設置延遲分配, 當節點離開時(來源)
當某個節點由於突發原因, 比如網絡中斷, 人為操作重啟等, 需要暫時離開集群時, 集群會立刻新建副本分片以替換丟失的副本, 然后在剩余的所有節點之間進行rebalancing, 這樣導致在短時間內該突發節點又恢復過來后, 原先的副本就無法再使用, 集群會將剛才新建的副本分片再拷貝回到該節點上; 這樣就會造成不必要的資源浪費, 以及節點分片rebalancing帶來的波動;
可以使用 index.unassigned.node_left.delayed_timeout 動態設置來延遲由於節點離開而導致未分配的副本分片的分配問題; 該配置默認值 1m;
PUT _all/_settings
{
"settings": {
"index.unassigned.node_left.delayed_timeout": "5m"
}
}
修改成以上配置后, 如果在 5m 內, 該節點可以恢復重新加入集群, 則集群會自動恢復該節點的副本分片分配, 恢復速度很快;
注意
- 此設置不影響將副本分片升級為主分片;
- 此設置不影響之前未分配的副本分片;
- 在整個集群重新啟動后, 該延遲分配不會生效;
4.3. 索引恢復的優先級(來源)
索引分片恢復的優先級按照:
- 可選的
index.priority配置, 值越大優先級越高; index索引的創建日期, 越新的索引優先級越高;index索引的名稱;
4.4. 每個節點的分片總數(來源)
| 配置 | 默認值 | 說明 |
|---|---|---|
| index.routing.allocation.total_shards_per_node | unbounded(-1) | 指定單個節點上最多分配的分片數量, 包括主分片和副本分片;(具體某個索引) |
| cluster.routing.allocation.total_shards_per_node | unbounded(-1) | 指定單個節點上最多分配的分片數量, 包括主分片和副本分片;(與索引無關, 全局設置) |
這些配置是硬性配置, 可能會導致一些分片無法分配, 需要慎重配置;
