對bulk request的處理流程:
1、遍歷所有的request,對其做一些加工,主要包括:獲取routing(如果mapping里有的話)、指定的timestamp(如果沒有帶timestamp會使用當前時間),如果沒有指定id字段,在action.bulk.action.allow_id_generation配置為true的情況下,會自動生成一個base64UUID作為id字段,並會將request的opType字段置為CREATE,因為如果是使用es自動生成的id的話,默認就是createdocument而不是updatedocument。(注:坑爹啊,我從github上面下的最新的ES代碼,發現自動生成id這一段已經沒有設置opType字段了,看起來和有指定id是一樣的處理邏輯了,見https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java)。
2、創建一個shardId--> Operation的Map,再次遍歷所有的request,獲取獲取每個request應該發送到的shardId,獲取的過程是這樣的:request有routing就直接返回,如果沒有,會先對id求一個hash,這里的hash函數默認是Murmur3,當然你也可以通過配置index.legacy.routing.hash.type來決定使用的hash函數,決定發到哪個shard:
return MathUtils.mod(hash, indexMetaData.getNumberOfShards()); 注意:最新版ES代碼已經改變!
即用hash對shard的總數求模來獲取shardId,將shardId作為key,通過遍歷的index和request組成BulkItemRequest的集合作為value放入之前說的map中(為什么要拿到遍歷的index,因為在bulk response中可以看到對每個request的請求處理結果的),其實說了這么多就是要對request按shard來分組(為負載均衡)。
3、遍歷上面得到的map,對不同的分組創建一個bulkShardRequest,包含配置consistencyLevel和timeout。並從集群state中獲得primary shard,如果primary在本機就直接執行,如果不在會再發送到其shard所在的node。
源碼位置:https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) { final ClusterState clusterState = clusterService.state(); // TODO use timeout to wait here if its blocked... clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE); final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); MetaData metaData = clusterState.metaData(); for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); //the request can only be null because we set it to null in the previous step, so it gets ignored if (docWriteRequest == null) { continue; } if (addFailureIfIndexIsUnavailable(docWriteRequest, bulkRequest, responses, i, concreteIndices, metaData)) { continue; } Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); try { switch (docWriteRequest.opType()) { case CREATE: case INDEX: IndexRequest indexRequest = (IndexRequest) docWriteRequest; MappingMetaData mappingMd = null; final IndexMetaData indexMetaData = metaData.index(concreteIndex); if (indexMetaData != null) { mappingMd = indexMetaData.mappingOrDefault(indexRequest.type()); } indexRequest.resolveRouting(metaData); indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName()); break; case UPDATE: TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest); break; case DELETE: TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest) docWriteRequest); break; default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); } } catch (ElasticsearchParseException | RoutingMissingException e) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e); BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure); responses.set(i, bulkItemResponse); // make sure the request gets never processed again bulkRequest.requests.set(i, null); } } // first, go over all the requests and create a ShardId -> Operations mapping Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>(); for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest request = bulkRequest.requests.get(i); if (request == null) { continue; } String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); shardRequests.add(new BulkItemRequest(i, request)); } if (requestsByShard.isEmpty()) { listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))); return; } final AtomicInteger counter = new AtomicInteger(requestsByShard.size()); String nodeId = clusterService.localNode().getId(); for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); final List<BulkItemRequest> requests = entry.getValue(); BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(), requests.toArray(new BulkItemRequest[requests.size()])); bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); bulkShardRequest.timeout(bulkRequest.timeout()); if (task != null) { bulkShardRequest.setParentTask(nodeId, task.getId()); } shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() { @Override public void onResponse(BulkShardResponse bulkShardResponse) { for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { // we may have no response if item failed if (bulkItemResponse.getResponse() != null) { bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); } responses.set(bulkItemResponse.getItemId(), bulkItemResponse); } if (counter.decrementAndGet() == 0) { finishHim(); } } }); } }
路由代碼:
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();