ES bulk源碼分析——ES 5.0


對bulk request的處理流程:

1、遍歷所有的request,對其做一些加工,主要包括:獲取routing(如果mapping里有的話)、指定的timestamp(如果沒有帶timestamp會使用當前時間),如果沒有指定id字段,在action.bulk.action.allow_id_generation配置為true的情況下,會自動生成一個base64UUID作為id字段,並會將request的opType字段置為CREATE,因為如果是使用es自動生成的id的話,默認就是createdocument而不是updatedocument。(注:坑爹啊,我從github上面下的最新的ES代碼,發現自動生成id這一段已經沒有設置opType字段了,看起來和有指定id是一樣的處理邏輯了,見https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java)。

2、創建一個shardId--> Operation的Map,再次遍歷所有的request,獲取獲取每個request應該發送到的shardId,獲取的過程是這樣的:request有routing就直接返回,如果沒有,會先對id求一個hash,這里的hash函數默認是Murmur3,當然你也可以通過配置index.legacy.routing.hash.type來決定使用的hash函數,決定發到哪個shard:

return MathUtils.mod(hash, indexMetaData.getNumberOfShards()); 注意:最新版ES代碼已經改變!

即用hash對shard的總數求模來獲取shardId,將shardId作為key,通過遍歷的index和request組成BulkItemRequest的集合作為value放入之前說的map中(為什么要拿到遍歷的index,因為在bulk response中可以看到對每個request的請求處理結果的),其實說了這么多就是要對request按shard來分組(為負載均衡)。

3、遍歷上面得到的map,對不同的分組創建一個bulkShardRequest,包含配置consistencyLevel和timeout。並從集群state中獲得primary shard,如果primary在本機就直接執行,如果不在會再發送到其shard所在的node。

源碼位置:https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

    void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
        final ClusterState clusterState = clusterService.state();
        // TODO use timeout to wait here if its blocked...
        clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);

        final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
        MetaData metaData = clusterState.metaData();
        for (int i = 0; i < bulkRequest.requests.size(); i++) {
            DocWriteRequest docWriteRequest = bulkRequest.requests.get(i);
            //the request can only be null because we set it to null in the previous step, so it gets ignored
            if (docWriteRequest == null) {
                continue;
            }
            if (addFailureIfIndexIsUnavailable(docWriteRequest, bulkRequest, responses, i, concreteIndices, metaData)) {
                continue;
            }
            Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
            try {
                switch (docWriteRequest.opType()) {
                    case CREATE:
                    case INDEX:
                        IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                        MappingMetaData mappingMd = null;
                        final IndexMetaData indexMetaData = metaData.index(concreteIndex);
                        if (indexMetaData != null) {
                            mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
                        }
                        indexRequest.resolveRouting(metaData);
                        indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
                        break;
                    case UPDATE:
                        TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
                        break;
                    case DELETE:
                        TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest) docWriteRequest);
                        break;
                    default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
                }
            } catch (ElasticsearchParseException | RoutingMissingException e) {
                BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e);
                BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
                responses.set(i, bulkItemResponse);
                // make sure the request gets never processed again
                bulkRequest.requests.set(i, null);
            }
        }

        // first, go over all the requests and create a ShardId -> Operations mapping
        Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
        for (int i = 0; i < bulkRequest.requests.size(); i++) {
            DocWriteRequest request = bulkRequest.requests.get(i);
            if (request == null) {
                continue;
            }
            String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
            ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
            List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
            shardRequests.add(new BulkItemRequest(i, request));
        }

        if (requestsByShard.isEmpty()) {
            listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
            return;
        }

        final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
        String nodeId = clusterService.localNode().getId();
        for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
            final ShardId shardId = entry.getKey();
            final List<BulkItemRequest> requests = entry.getValue();
            BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
                    requests.toArray(new BulkItemRequest[requests.size()]));
            bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
            bulkShardRequest.timeout(bulkRequest.timeout());
            if (task != null) {
                bulkShardRequest.setParentTask(nodeId, task.getId());
            }
            shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
                @Override
                public void onResponse(BulkShardResponse bulkShardResponse) {
                    for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
                        // we may have no response if item failed
                        if (bulkItemResponse.getResponse() != null) {
                            bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                        }
                        responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                    }
                    if (counter.decrementAndGet() == 0) {
                        finishHim();
                    }
                }
            });
        }
    }

路由代碼:

ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM