http://www.cnblogs.com/bonelee/p/6078947.html 里分析了ES bulk實現,其中路由代碼:
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
其實現: https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java
public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
return shards(clusterState, index, id, routing).shardsIt(); } protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) { int shardId = generateShardId(indexMetaData(clusterState, index), id, routing); return clusterState.getRoutingTable().shardRoutingTable(index, shardId); } static int generateShardId(IndexMetaData indexMetaData, String id, @Nullable String routing) { final int hash; if (routing == null) { hash = Murmur3HashFunction.hash(id); } else { hash = Murmur3HashFunction.hash(routing); } // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size // of original index to hash documents return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor(); }
可以看到最新的Es代碼實現路由是:
Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
在https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java 里可以看到getRoutingFactor實現:
/**
* Returns the routing factor for this index. The default is <tt>1</tt>.
*
* @see #getRoutingFactor(IndexMetaData, int) for details
*/
public int getRoutingFactor() {
return routingFactor; }
構造函數里有:
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
反正默認是1,也就是所有的shard節點都會負責路由!
當心,ES2.4版本的路由實現:https://github.com/elastic/elasticsearch/blob/2.4/core/src/main/java/org/elasticsearch/cluster/routing/
@SuppressForbidden(reason = "Math#abs is trappy") private int generateShardId(ClusterState clusterState, String index, String type, String id, @Nullable String routing) { IndexMetaData indexMetaData = clusterState.metaData().index(index); if (indexMetaData == null) { throw new IndexNotFoundException(index); } final Version createdVersion = indexMetaData.getCreationVersion(); final HashFunction hashFunction = indexMetaData.getRoutingHashFunction(); final boolean useType = indexMetaData.getRoutingUseType(); final int hash; if (routing == null) { if (!useType) { hash = hash(hashFunction, id); } else { hash = hash(hashFunction, type, id); } } else { hash = hash(hashFunction, routing); } if (createdVersion.onOrAfter(Version.V_2_0_0_beta1)) { return MathUtils.mod(hash, indexMetaData.getNumberOfShards()); } else { return Math.abs(hash % indexMetaData.getNumberOfShards()); } }
@Deprecated protected int hash(HashFunction hashFunction, String type, String id) { if (type == null || "_all".equals(type)) { throw new IllegalArgumentException("Can't route an operation with no type and having type part of the routing (for backward comp)"); } return hashFunction.hash(type, id); }
而該hash function實現由:
DjbHashFunction.java
SimpleHashFunction.java
Murmur3HashFunction.java
三種。
hash相關設置如下:
#分片數 index.number_of_shards #副本數 index.number_of_replicas
#該index各索引的routing規則,采用何種Hash方式,默認使用Murmur3,還有一種普通的Hash算法 index.legacy.routing.hash.type #routing計算是否使用type,內部計算shard id的方法已經廢棄,建議不使用,不設置,默認false即可 index.legacy.routing.use_type