Es 恢复流程


Es 恢复流程大体上分为

  init index verify_index translog finalize done 6个阶段 

  恢复流程包括多个模块

    getway:  读取集群 索引 元信息  然后更新 之后调用reroute 

    allocation: 调用reroute后执行分片服务,分完片后更新集群信息 设置各个分片的恢复类型 

    recovery:  数据恢复 

 

 

phase1 是对shard 做快照往副本上发送

phase2 是对startSeqNo到enqSeqNo的translog做快照,phase1阶段完成之后就会通知副本启动engine了启动之后就可以进行写操作了 而在发送到启动这个过程可能会有数据操作所以会有phase2阶段

translog每次写完之后会返回一个location,通过这个location可以找到endSeqNo

 

 

 

 

    当节点启动或者集群启动时发现节点信息发生变化 
     private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState) {
        ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState);
        // new cluster state, notify all listeners
        final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();//新集群信息的node信息
        if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
            String summary = nodesDelta.shortSummary();
            if (summary.length() > 0) {
                logger.info("{}, reason: {}", summary, task.source);
            }
        }

        nodeConnectionsService.connectToNodes(newClusterState.nodes());

        logger.debug("applying cluster state version {}", newClusterState.version());
        try {
            // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
            if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
                final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
                clusterSettings.applySettings(incomingSettings);//如果集群信息已经发生变化 更新集群信息
            }
        } catch (Exception ex) {
            logger.warn("failed to apply cluster settings", ex);
        }

        logger.debug("apply cluster state with version {}", newClusterState.version());
        callClusterStateAppliers(clusterChangedEvent);//通知所有的applier

        nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());

        logger.debug("set locally applied cluster state to version {}", newClusterState.version());
        state.set(newClusterState);//更新集群状态

        callClusterStateListeners(clusterChangedEvent);//通知所有的listener

        task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);//回调
    }

    接收到change的消息后
    @Override
    public synchronized void applyClusterState(final ClusterChangedEvent event) {
        if (!lifecycle.started()) {
            return;
        }

        final ClusterState state = event.state();

        // we need to clean the shards and indices we have on this node, since we
        // are going to recover them again once state persistence is disabled (no master / not recovered)
        // TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?
        if (state.blocks().disableStatePersistence()) {
            for (AllocatedIndex<? extends Shard> indexService : indicesService) {
                indicesService.removeIndex(indexService.index(), NO_LONGER_ASSIGNED,
                    "cleaning index (disabled block persistence)"); // also cleans shards
            }
            return;
        }

        updateFailedShardsCache(state);

        deleteIndices(event); // also deletes shards of deleted indices

        removeUnallocatedIndices(event); // also removes shards of removed indices

        failMissingShards(state);

        removeShards(state);   // removes any local shards that doesn't match what the master expects

        updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache

        createIndices(state);

        createOrUpdateShards(state);//恢复模块  node启动后会获取集群元信息 从集群元信息中可以获取到 当前的集群状态 路由表 索引 等等信息 
    }


    private void createOrUpdateShards(final ClusterState state) {
        RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
        if (localRoutingNode == null) {
            return;
        }

        DiscoveryNodes nodes = state.nodes();
        RoutingTable routingTable = state.routingTable();

        for (final ShardRouting shardRouting : localRoutingNode) {
            ShardId shardId = shardRouting.shardId();
            if (failedShardsCache.containsKey(shardId) == false) {
                AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());//分片服务
                assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices";
                Shard shard = indexService.getShardOrNull(shardId.id());
                if (shard == null) {
                    assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards";
                    createShard(nodes, routingTable, shardRouting, state);//新加入的节点需要创建新的shard
                } else {
                    updateShard(nodes, shardRouting, shard, routingTable, state);
                }
            }
        }
    }

    private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
        assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;

        DiscoveryNode sourceNode = null;
        if (shardRouting.recoverySource().getType() == Type.PEER)  {
            sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting);
            if (sourceNode == null) {
                logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
                return;
            }
        }

        try {
            logger.debug("{} creating shard", shardRouting.shardId());
            RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);//设置恢复阶段为INIT 此时还未恢复
            indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting),
                repositoriesService, failedShardHandler, globalCheckpointSyncer);//创建分片
        } catch (Exception e) {
            failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
        }
    }


    //shard的create 和 recovery 放在一起了 
  @Override
    public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
                                  PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
                                  Consumer<IndexShard.ShardFailure> onShardFailure,
                                  Consumer<ShardId> globalCheckpointSyncer) throws IOException {
        ensureChangesAllowed();
        IndexService indexService = indexService(shardRouting.index());
        IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);//创建shard
        indexShard.addShardFailureCallback(onShardFailure);
        indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,
            (type, mapping) -> {
                assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS:
                    "mapping update consumer only required by local shards recovery";
                try {
                    client.admin().indices().preparePutMapping()
                        .setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid
                        .setType(type)
                        .setSource(mapping.source().string(), XContentType.JSON)
                        .get();
                } catch (IOException ex) {
                    throw new ElasticsearchException("failed to stringify mapping source", ex);
                }
            }, this);
        return indexShard;
    }




    public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardId> globalCheckpointSyncer) throws IOException {
        /*
         * TODO: we execute this in parallel but it's a synced method. Yet, we might
         * be able to serialize the execution via the cluster state in the future. for now we just
         * keep it synced.
         */
        if (closed.get()) {
            throw new IllegalStateException("Can't create shard " + routing.shardId() + ", closed");
        }
        final Settings indexSettings = this.indexSettings.getSettings();
        final ShardId shardId = routing.shardId();//获取需要恢复的shardId
        boolean success = false;
        Store store = null;
        IndexShard indexShard = null;
        ShardLock lock = null;
        try {
            lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));//shard 锁
            eventListener.beforeIndexShardCreated(shardId, indexSettings);//分片创建前 会把该有的服务先注册进去 
            ShardPath path;
            try {
                path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings);//如果已经存在shard路径
            } catch (IllegalStateException ex) {
                logger.warn("{} failed to load shard path, trying to remove leftover", shardId);
                try {
                    ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings);
                    path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings);
                } catch (Exception inner) {
                    ex.addSuppressed(inner);
                    throw ex;
                }
            }

            if (path == null) {
                // TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard
                // that's being relocated/replicated we know how large it will become once it's done copying:
                // Count up how many shards are currently on each data path:
                Map<Path, Integer> dataPathToShardCount = new HashMap<>();
                for (IndexShard shard : this) {
                    Path dataPath = shard.shardPath().getRootStatePath();
                    Integer curCount = dataPathToShardCount.get(dataPath);
                    if (curCount == null) {
                        curCount = 0;
                    }
                    dataPathToShardCount.put(dataPath, curCount + 1);
                }
                path = ShardPath.selectNewPathForShard(nodeEnv, shardId, this.indexSettings,
                    routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE
                        ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(),
                    dataPathToShardCount);//计算主分片的位置 和分片ID 
                logger.debug("{} creating using a new path [{}]", shardId, path);
            } else {
                logger.debug("{} creating using an existing path [{}]", shardId, path);
            }

            if (shards.containsKey(shardId.id())) {
                throw new IllegalStateException(shardId + " already exists");
            }

            logger.debug("creating shard_id {}", shardId);
            // if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
            final Engine.Warmer engineWarmer = (searcher) -> {
                IndexShard shard =  getShardOrNull(shardId.getId());
                if (shard != null) {
                    warmer.warm(searcher, shard, IndexService.this.indexSettings);
                }
            };
            store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock,
                    new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
            indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
                indexCache, mapperService, similarityService, engineFactory,
                eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
                    searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId));
            eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");//更新shard信息 因为state文件夹已经创建 
            eventListener.afterIndexShardCreated(indexShard);
            shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); //把shard放进shard 集合中 通过shards.get()可以获取已存在的shard
            success = true;
            return indexShard;
        } catch (ShardLockObtainFailedException e) {
            throw new IOException("failed to obtain in-memory shard lock", e);
        } finally {
            if (success == false) {
                if (lock != null) {
                    IOUtils.closeWhileHandlingException(lock);
                }
                closeShard("initialization failed", shardId, indexShard, store, eventListener);
            }
        }
    }

     public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
                              PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
                              BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
                              IndicesService indicesService) {
        // TODO: Create a proper object to encapsulate the recovery context
        // all of the current methods here follow a pattern of:
        // resolve context which isn't really dependent on the local shards and then async
        // call some external method with this pointer.
        // with a proper recovery context object we can simply change this to:
        // startRecovery(RecoveryState recoveryState, ShardRecoverySource source ) {
        //     markAsRecovery("from " + source.getShortDescription(), recoveryState);
        //     threadPool.generic().execute()  {
        //           onFailure () { listener.failure() };
        //           doRun() {
        //                if (source.recover(this)) {
        //                  recoveryListener.onRecoveryDone(recoveryState);
        //                }
        //           }
        //     }}
        // }
        assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());//恢复的类型 在allcoation分片的时候设置 大致时先读集群元信息,如果有的话 主分片设置为EXISTING_STORE 副本分片设置为PEER,如果没有的话 先分片 在分片过程中设置
        switch (recoveryState.getRecoverySource().getType()) {
            case EMPTY_STORE:
            case EXISTING_STORE://主分片从本地恢复 
                markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
                threadPool.generic().execute(() -> {
                    try {
                        if (recoverFromStore()) { //主分片
                            recoveryListener.onRecoveryDone(recoveryState);
                        }
                    } catch (Exception e) {//如果恢复失败 关闭并移除shard 同时向master发送internal:cluster/shard/failure的消息
                        recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
                    }
                });
                break;
            case PEER: //副本分片从主分片恢复   sourceservice 是master target是本地需要恢复的
                try {
                    markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
                    recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
                } catch (Exception e) {
                    failShard("corrupted preexisting index", e);
                    recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
                }
                break;
            case SNAPSHOT://快照恢复
                markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
                SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
                threadPool.generic().execute(() -> {
                    try {
                        final Repository repository = repositoriesService.repository(recoverySource.snapshot().getRepository());
                        if (restoreFromRepository(repository)) {
                            recoveryListener.onRecoveryDone(recoveryState);
                            //主分片恢复完成 然后发送internal:cluster/shard/started  ShardStateAction的messageReceive接收到消息后 提交一个更新的task
                        }
                    } catch (Exception e) {
                        recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
                    }
                });
                break;
            case LOCAL_SHARDS://从本地节点的其他分片恢复
                final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
                final Index mergeSourceIndex = indexMetaData.getMergeSourceIndex();
                final List<IndexShard> startedShards = new ArrayList<>();
                final IndexService sourceIndexService = indicesService.indexService(mergeSourceIndex);
                final int numShards = sourceIndexService != null ? sourceIndexService.getIndexSettings().getNumberOfShards() : -1;
                if (sourceIndexService != null) {
                    for (IndexShard shard : sourceIndexService) {
                        if (shard.state() == IndexShardState.STARTED) {
                            startedShards.add(shard);
                        }
                    }
                }
                if (numShards == startedShards.size()) {
                    markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
                    threadPool.generic().execute(() -> {
                        try {
                            final Set<ShardId> shards = IndexMetaData.selectShrinkShards(shardId().id(), sourceIndexService.getMetaData(),
                                +indexMetaData.getNumberOfShards());
                            if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
                                .filter((s) -> shards.contains(s.shardId())).collect(Collectors.toList()))) {
                                recoveryListener.onRecoveryDone(recoveryState);
                            }
                        } catch (Exception e) {
                            recoveryListener.onRecoveryFailure(recoveryState,
                                new RecoveryFailedException(recoveryState, null, e), true);
                        }
                    });
                } else {
                    final RuntimeException e;
                    if (numShards == -1) {
                        e = new IndexNotFoundException(mergeSourceIndex);
                    } else {
                        e = new IllegalStateException("not all shards from index " + mergeSourceIndex
                            + " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard "
                            + shardId());
                    }
                    throw e;
                }
                break;
            default:
                throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
        }
    }

 主分片会调此方法 副本分片不会    
    boolean recoverFromStore(final IndexShard indexShard) {
        if (canRecover(indexShard)) {
            RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
            assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE :
                "expected store recovery type but was: " + recoveryType;
            return executeRecovery(indexShard, () -> {
                logger.debug("starting recovery from store ...");
                internalRecoverFromStore(indexShard);
            });
        }
        return false;
    }



    /**
     * Recovers the state of the shard from the store.
     */
    private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRecoveryException {
        final RecoveryState recoveryState = indexShard.recoveryState();
        final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE;
        indexShard.prepareForIndexRecovery();//设置为index 阶段
        long version = -1;
        SegmentInfos si = null;
        final Store store = indexShard.store();
        store.incRef();
        try {//获取本地需要恢复节点的版本号 通过最后提交的segment中获取 lucene 提交一次就是一个segment
            try {
                store.failIfCorrupted();
                try {
                    si = store.readLastCommittedSegmentsInfo();
                } catch (Exception e) {
                    String files = "_unknown_";
                    try {
                        files = Arrays.toString(store.directory().listAll());
                    } catch (Exception inner) {
                        inner.addSuppressed(e);
                        files += " (failure=" + ExceptionsHelper.detailedMessage(inner) + ")";
                    }
                    if (indexShouldExists) {
                        throw new IndexShardRecoveryException(shardId, "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e);
                    }
                }
                if (si != null) {
                    if (indexShouldExists) {
                        version = si.getVersion();
                    } else {
                        // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
                        // its a "new index create" API, we have to do something, so better to clean it than use same data
                        logger.trace("cleaning existing shard, shouldn't exists");
                        Lucene.cleanLuceneIndex(store.directory());
                        si = null;
                    }
                }
            } catch (Exception e) {
                throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e);
            }
            recoveryState.getIndex().updateVersion(version);//更新当前索引的版本号
            if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {//如果恢复的数据源在本地
                assert indexShouldExists;
                indexShard.skipTranslogRecovery();//跳过使用translog恢复  因为translog得从master上下载
            } else {
                // since we recover from local, just fill the files and size
                try {
                    final RecoveryState.Index index = recoveryState.getIndex();
                    if (si != null) {
                        addRecoveredFileDetails(si, store, index);//把需要获取的文件名称 长度等放到集合中
                    }
                } catch (IOException e) {
                    logger.debug("failed to list file details", e);
                }
                indexShard.performTranslogRecovery(indexShouldExists);
                assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
                indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
            }
            indexShard.finalizeRecovery();//设置当前的状态为finalizeRecovery 然后把数据写到cache里面  此时translog可以被删除了 设置setEnableGcDeletes(true)
            indexShard.postRecovery("post recovery from shard_store");//更新集群信息 恢复已经完成 同时设置当前阶段为DONE
        } catch (EngineException | IOException e) {
            throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e);
        } finally {
            store.decRef();
        }
    }



    /**
     * After the store has been recovered, we need to start the engine in order to apply operations
     */
    public void performTranslogRecovery(boolean indexExists) throws IOException {
        if (indexExists == false) {
            // note: these are set when recovering from the translog
            final RecoveryState.Translog translogStats = recoveryState().getTranslog();
            translogStats.totalOperations(0);
            translogStats.totalOperationsOnStart(0);
        }
        internalPerformTranslogRecovery(false, indexExists);
        assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
    }


 private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException {
        if (state != IndexShardState.RECOVERING) {
            throw new IndexShardNotRecoveringException(shardId, state);
        } 
        recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);//找到最后一次lucene提价的信息后 进入索引的检验  verify_index 阶段
        // also check here, before we apply the translog
        //执行索引检查  可以在配置文件中设置  默认为false
        if (Booleans.isTrue(checkIndexOnStartup)) {
            try {
                checkIndex();
            } catch (IOException ex) {
                throw new RecoveryFailedException(recoveryState, "check index failed", ex);
            }
        }
        //translog阶段
        recoveryState.setStage(RecoveryState.Stage.TRANSLOG);//检查完成之后 开始进入translog阶段  设置阶段translog
        final EngineConfig.OpenMode openMode;
        /* by default we recover and index and replay the translog but if the index
         * doesn't exist we create everything from the scratch. Yet, if the index
         * doesn't exist we don't need to worry about the skipTranslogRecovery since
         * there is no translog on a non-existing index.
         * The skipTranslogRecovery invariant is used if we do remote recovery since
         * there the translog isn't local but on the remote host, hence we can skip it.
         */
        if (indexExists == false) {
            openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
        } else if (skipTranslogRecovery) {
            //跳过translog恢复
            openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
        } else {
            //从translog恢复
            openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
        }

        assert indexExists == false || assertMaxUnsafeAutoIdInCommit();

        final EngineConfig config = newEngineConfig(openMode);
        // we disable deletes since we allow for operations to be executed against the shard while recovering
        // but we need to make sure we don't loose deletes until we are done recovering
        config.setEnableGcDeletes(false);//恢复阶段 不允许translog被删除 因为恢复过程会使用translog 如果删除了就恢复不了了 
        Engine newEngine = createNewEngine(config);//创建engine 打开后就可以进行写操作了 
        verifyNotClosed();
        if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
            // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
            // we still give sync'd flush a chance to run:
            active.set(true);
            //主分片恢复流程
            newEngine.recoverFromTranslog();
        }
        assertSequenceNumbersInCommit();
    }



    之后调用recoverFromTranslog 此过程如果失败的话 会关闭 释放锁 关闭searchManage translog (他们都是读文件操作) 索引回滚 
    里面主要调用的是recoverFromTranslogInternal


    private void recoverFromTranslogInternal() throws IOException {
        Translog.TranslogGeneration translogGeneration = translog.getGeneration();
        final int opsRecovered;
        //查找到最后一次的提交点版本   lucene中每提交一次 gen会变 
        //根据gen找到最后一次的快照
        final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
        try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {//对最后一次提交做快照
            opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);//读translog数据 进行操作
        } catch (Exception e) {
            throw new EngineException(shardId, "failed to recover from translog", e);
        }
        // flush if we recovered something or if we have references to older translogs
        // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
        assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
        pendingTranslogRecovery.set(false); // we are good - now we can commit
        if (opsRecovered > 0) {//opsRecoveryId为从translog的快照中 每执行一次该id+1 如果大于0 说明有操作 执行刷盘操作
            logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
                opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
           //刷盘
            flush(true, true);
        } else if (translog.isCurrent(translogGeneration) == false) {
            commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
            refreshLastCommittedSegmentInfos();
        } else if (lastCommittedSegmentInfos.getUserData().containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) == false)  {
            assert engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) :
                "index was created on version " + engineConfig.getIndexSettings().getIndexVersionCreated() + "but has "
                    + "no sequence numbers info in commit";

            commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
            refreshLastCommittedSegmentInfos();
        }
        // clean up what's not needed
        translog.trimUnreferencedReaders();
    }

// IndexShard     读快照 并做相应的索引操作
   int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
        recoveryState.getTranslog().totalOperations(snapshot.totalOperations());
        recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations());
        int opsRecovered = 0;
        Translog.Operation operation;
        while ((operation = snapshot.next()) != null) {//如果快照中有数据 
            try {
                logger.trace("[translog] recover op {}", operation);
                Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> {
                    throw new IllegalArgumentException("unexpected mapping update: " + update); //根据操作类型 对index进行写 删除等操作 ,此过程不会写translog 仍然调用的时写操作时的 IntarvlEngine.index() 方法
                });
                ExceptionsHelper.reThrowIfNotNull(result.getFailure());
                opsRecovered++;
                recoveryState.getTranslog().incrementRecoveredOperations();
            } catch (Exception e) {
                if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) {
                    // mainly for MapperParsingException and Failure to detect xcontent
                    logger.info("ignoring recovery of a corrupt translog entry", e);
                } else {
                    throw e;
                }
            }
        }
        return opsRecovered;
    }




以上为主分片的恢复流程  

private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) { RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout); if (newTarget != null) { threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId())); } } 副本分片恢复流程 也是 init index verify translog finalize done 不同的是 副本需要从主分片上复制translog 和 索引 private void doRecovery(final long recoveryId) { final StartRecoveryRequest request; final CancellableThreads cancellableThreads; final RecoveryState.Timer timer; try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { if (recoveryRef == null) { logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId); return; } final RecoveryTarget recoveryTarget = recoveryRef.target(); cancellableThreads = recoveryTarget.cancellableThreads(); timer = recoveryTarget.state().getTimer(); try { assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; request = getStartRecoveryRequest(recoveryTarget);//创建需要恢复的request对象 logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); recoveryTarget.indexShard().prepareForIndexRecovery();//设置为index阶段 } catch (final Exception e) { // this will be logged as warning later on... logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true); return; } } try { logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode()); final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>(); cancellableThreads.execute(() -> responseHolder.set( transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,//发送start_recovery的rpc请求 线程阻塞 等待响应 new FutureTransportResponseHandler<RecoveryResponse>() { @Override public RecoveryResponse newInstance() { return new RecoveryResponse(); } }).txGet()));//阻塞线程等待相应 final RecoveryResponse recoveryResponse = responseHolder.get(); final TimeValue recoveryTime = new TimeValue(timer.time()); // do this through ongoing recoveries to remove it from the collection onGoingRecoveries.markRecoveryAsDone(recoveryId);//恢复完成后 从集合中去除已恢复的id if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); sb.append('[').append(request.shardId().getIndex().getName()).append(']').append('[').append(request.shardId().id()) .append("] "); sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime).append("]\n"); sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with " + "total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]") .append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append (timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']') .append("\n"); sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with " + "total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n"); sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n"); sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log " + "operations") .append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]") .append("\n"); logger.trace("{}", sb); } else { logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(), recoveryTime); } } catch (CancellableThreads.ExecutionCancelledException e) { logger.trace("recovery cancelled", e); } catch (Exception e) { if (logger.isTraceEnabled()) { logger.trace( (Supplier<?>) () -> new ParameterizedMessage( "[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(), request.shardId().id()), e); } Throwable cause = ExceptionsHelper.unwrapCause(e); if (cause instanceof CancellableThreads.ExecutionCancelledException) { // this can also come from the source wrapped in a RemoteTransportException onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, "source has canceled the recovery", cause), false); return; } if (cause instanceof RecoveryEngineException) { // unwrap an exception that was thrown as part of the recovery cause = cause.getCause(); } // do it twice, in case we have double transport exception cause = ExceptionsHelper.unwrapCause(cause); if (cause instanceof RecoveryEngineException) { // unwrap an exception that was thrown as part of the recovery cause = cause.getCause(); } // here, we would add checks against exception that need to be retried (and not removeAndClean in this case) if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException || cause instanceof ShardNotFoundException) { // if the target is not ready yet, retry retryRecovery( recoveryId, "remote shard not ready", recoverySettings.retryDelayStateSync(), recoverySettings.activityTimeout()); return; } if (cause instanceof DelayRecoveryException) { retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(), recoverySettings.activityTimeout()); return; } if (cause instanceof ConnectTransportException) { logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(), recoverySettings.retryDelayNetwork(), cause.getMessage()); retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(), recoverySettings.activityTimeout()); return; } if (cause instanceof AlreadyClosedException) { onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, "source shard is closed", cause), false); return; } onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true); } } //主分片接收到 internal:index/shard/recovery/start_recovery 请求后 private RecoveryResponse recover(final StartRecoveryRequest request) throws IOException { final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); final ShardRouting routingEntry = shard.routingEntry(); if (routingEntry.primary() == false || routingEntry.active() == false) { throw new DelayRecoveryException("source shard [" + routingEntry + "] is not an active primary"); } if (request.isPrimaryRelocation() && (routingEntry.relocating() == false || routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) { logger.debug("delaying recovery of {} as source shard is not marked yet as relocating to {}", request.shardId(), request.targetNode()); throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]"); } RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard); logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode()); try { return handler.recoverToTarget();  //向副本分片发送数据 } finally { ongoingRecoveries.remove(shard, handler); } } /** * performs the recovery from the local engine to the target */ public RecoveryResponse recoverToTarget() throws IOException { runUnderPrimaryPermit(() -> { final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); if (targetShardRouting == null) { logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(), request.targetNode()); throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; }); try (Closeable ignored = shard.acquireTranslogRetentionLock()) {// 获取保留锁,作用是防止teanslog被 清理 final Translog translog = shard.getTranslog(); final long startingSeqNo; final long requiredSeqNoRangeStart; //检测是否可以从sequenceNumber恢复 /** * es中有sequenceNo的概念,正常情况下主分片和副本分片会保持一个相同的序列号,每次主分片操作后sequenceNo会+1然后从主分片 会同步到副本分片 * 如果某种情况下 副本分片的sequenceNo小于主分片的 恢复的时候只需要恢复副本分片sequenceNo之后的操作 不用全部恢复 有时候translog可能比较大 *判断seqNo 和 本地检查点 如果seqNo不是未标记状态 且小于本地检查点 只发送 startSeqNo到endSeqNo的数据 跳过全部发送 */ final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(); if (isSequenceNumberBasedRecoveryPossible) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); requiredSeqNoRangeStart = startingSeqNo; } else { final Engine.IndexCommitRef phase1Snapshot; try { phase1Snapshot = shard.acquireIndexCommit(false); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } // we set this to 0 to create a translog roughly according to the retention policy // on the target. Note that it will still filter out legacy operations with no sequence numbers startingSeqNo = 0; // but we must have everything above the local checkpoint in the commit requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; try { phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations);//快照恢复 } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { try { IOUtils.close(phase1Snapshot); } catch (final IOException ex) { logger.warn("releasing snapshot caused exception", ex); } } } assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + startingSeqNo + "]"; runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); try { //通知副本节点启动engine prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); /* * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all * operations in the required range will be available for replaying from the translog of the source. */ cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); final long targetLocalCheckpoint; //以startNo为起点对translog做快照 try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) { targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } finalizeRecovery(targetLocalCheckpoint); } return response; } /** * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source * translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain * all ops above the source local checkpoint, so we can stop check there. * * @return {@code true} if the source is ready for a sequence-number-based recovery * @throws IOException if an I/O exception occurred reading the translog snapshot * * 判断sequenceNo是否小于主分片的seqNo * */ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { final long startingSeqNo = request.startingSeqNo(); assert startingSeqNo >= 0; final long localCheckpoint = shard.getLocalCheckpoint(); logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint); // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one //开始恢复的seqNo应该小于检查节点的 if (startingSeqNo - 1 <= localCheckpoint) { final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); //检查translog中的数据是否够 try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { tracker.markSeqNoAsCompleted(operation.seqNo()); } } } return tracker.getCheckpoint() >= localCheckpoint; } else { return false; } } /** * Perform phase1 of the recovery operations. Once this {@link IndexCommit} * snapshot has been performed no commit operations (files being fsync'd) * are effectively allowed on this index until all recovery phases are done * <p> * Phase1 examines the segment files on the target node and copies over the * segments that are missing. Only segments that have the same size and * checksum can be reused * 对shard 做快照 然后发送 如果数据量大会占用带宽 * 当SyncId 一样且文档数也一样 跳过发送文件 * */ public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered long totalSize = 0; // Total size of segment files that were able to be re-used long existingTotalSize = 0; final Store store = shard.store(); store.incRef(); try { StopWatch stopWatch = new StopWatch().start(); final Store.MetadataSnapshot recoverySourceMetadata; try { recoverySourceMetadata = store.getMetadata(snapshot); } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { shard.failShard("recovery", ex); throw ex; } for (String name : snapshot.getFileNames()) { final StoreFileMetaData md = recoverySourceMetadata.get(name); if (md == null) { logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap()); throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " + recoverySourceMetadata.asMap().size() + " files", name); } } // Generate a "diff" of all the identical, different, and missing // segment files on the target node, using the existing files on // the source node //如果两者同步点相同并且文档数也相同 String recoverySourceSyncId = recoverySourceMetadata.getSyncId(); String recoveryTargetSyncId = request.metadataSnapshot().getSyncId(); final boolean recoverWithSyncId = recoverySourceSyncId != null && recoverySourceSyncId.equals(recoveryTargetSyncId); if (recoverWithSyncId) {//如果一个索引五分钟内没有写操作 es会往索引里面写一个syncid 正常情况下主分片和副本的SyncId是一样 但每次提交索引后 syncId会被删除 所以用在冷启动 相同的话 会跳过索引复制过程 final long numDocsTarget = request.metadataSnapshot().getNumDocs(); final long numDocsSource = recoverySourceMetadata.getNumDocs(); if (numDocsTarget != numDocsSource) { throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number " + "of docs differ: " + numDocsSource + " (" + request.sourceNode().getName() + ", primary) vs " + numDocsTarget + "(" + request.targetNode().getName() + ")"); } // we shortcut recovery here because we have nothing to copy. but we must still start the engine on the target. // so we don't return here logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target", recoverySourceSyncId); } else {//不同的话 先比较哪些不通 然后把不同的文件发送到target节点上 final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); for (StoreFileMetaData md : diff.identical) { response.phase1ExistingFileNames.add(md.name()); response.phase1ExistingFileSizes.add(md.length()); existingTotalSize += md.length(); if (logger.isTraceEnabled()) { logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," + " size [{}]", md.name(), md.checksum(), md.length()); } totalSize += md.length(); } List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size()); phase1Files.addAll(diff.different); phase1Files.addAll(diff.missing); for (StoreFileMetaData md : phase1Files) { if (request.metadataSnapshot().asMap().containsKey(md.name())) { logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", md.name(), request.metadataSnapshot().asMap().get(md.name()), md); } else { logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name()); } response.phase1FileNames.add(md.name()); response.phase1FileSizes.add(md.length()); totalSize += md.length(); } response.phase1TotalSize = totalSize; response.phase1ExistingTotalSize = existingTotalSize; logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); cancellableThreads.execute(() ->//target 接收文件 recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, translogOps.get())); // How many bytes we've copied since we last called RateLimiter.pause final Function<StoreFileMetaData, OutputStream> outputStreamFactories = md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogOps), chunkSizeInBytes); //比较不同后 发送文件 chunkSizeInBytes默认为512k sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); // Send the CLEAN_FILES request, which takes all of the files that // were transferred and renames them from their temporary file // names to the actual file names. It also writes checksums for // the files after they have been renamed. // // Once the files have been renamed, any other files that are not // related to this recovery (out of date segments, for example) // are deleted try { cancellableThreads.executeIO(() -> recoveryTarget.cleanFiles(translogOps.get(), recoverySourceMetadata)); } catch (RemoteTransportException | IOException targetException) { final IOException corruptIndexException; // we realized that after the index was copied and we wanted to finalize the recovery // the index was corrupted: // - maybe due to a broken segments file on an empty index (transferred with no checksum) // - maybe due to old segments without checksums or length only checks if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(targetException)) != null) { try { final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot); StoreFileMetaData[] metadata = StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new); ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first for (StoreFileMetaData md : metadata) { cancellableThreads.checkForCancel(); logger.debug("checking integrity for file {} after remove corruption exception", md); if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! shard.failShard("recovery", corruptIndexException); logger.warn("Corrupted file detected {} checksum mismatch", md); throw corruptIndexException; } } } catch (IOException ex) { targetException.addSuppressed(ex); throw targetException; } // corruption has happened on the way to replica RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " + "checksums are ok", null); exception.addSuppressed(targetException); logger.warn( (org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage( "{} Remote file corruption during finalization of recovery on node {}. local checksum OK", shard.shardId(), request.targetNode()), corruptIndexException); throw exception; } else { throw targetException; } } } logger.trace("recovery [phase1]: took [{}]", stopWatch.totalTime()); response.phase1Time = stopWatch.totalTime().millis(); } catch (Exception e) { throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e); } finally { store.decRef(); } } void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase1]: prepare remote engine for translog"); final long startEngineStart = stopWatch.totalTime().millis(); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps)); stopWatch.stop(); response.startTime = stopWatch.totalTime().millis() - startEngineStart; logger.trace("recovery [phase1]: remote engine start took [{}]", stopWatch.totalTime()); } /** * Perform phase two of the recovery process. * <p> * Phase two uses a snapshot of the current translog *without* acquiring the write lock (however, the translog snapshot is * point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new * shard. * * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all * ops should be sent * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) * @param endingSeqNo the highest sequence number that should be sent * @param snapshot a snapshot of the translog * @return the local checkpoint on the target */ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } cancellableThreads.checkForCancel(); final StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); // send all the snapshot's translog operations to the target final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);//对translog做快照然后发送给target stopWatch.stop(); logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); response.phase2Time = stopWatch.totalTime().millis(); response.phase2Operations = result.totalOperations; return result.targetLocalCheckpoint; }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM