Es 恢复流程大体上分为
init index verify_index translog finalize done 6个阶段
恢复流程包括多个模块
getway: 读取集群 索引 元信息 然后更新 之后调用reroute
allocation: 调用reroute后执行分片服务,分完片后更新集群信息 设置各个分片的恢复类型
recovery: 数据恢复
phase1 是对shard 做快照往副本上发送
phase2 是对startSeqNo到enqSeqNo的translog做快照,phase1阶段完成之后就会通知副本启动engine了启动之后就可以进行写操作了 而在发送到启动这个过程可能会有数据操作所以会有phase2阶段
translog每次写完之后会返回一个location,通过这个location可以找到endSeqNo
当节点启动或者集群启动时发现节点信息发生变化 private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState) { ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState); // new cluster state, notify all listeners final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();//新集群信息的node信息 if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { String summary = nodesDelta.shortSummary(); if (summary.length() > 0) { logger.info("{}, reason: {}", summary, task.source); } } nodeConnectionsService.connectToNodes(newClusterState.nodes()); logger.debug("applying cluster state version {}", newClusterState.version()); try { // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) { final Settings incomingSettings = clusterChangedEvent.state().metaData().settings(); clusterSettings.applySettings(incomingSettings);//如果集群信息已经发生变化 更新集群信息 } } catch (Exception ex) { logger.warn("failed to apply cluster settings", ex); } logger.debug("apply cluster state with version {}", newClusterState.version()); callClusterStateAppliers(clusterChangedEvent);//通知所有的applier nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes()); logger.debug("set locally applied cluster state to version {}", newClusterState.version()); state.set(newClusterState);//更新集群状态 callClusterStateListeners(clusterChangedEvent);//通知所有的listener task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);//回调 } 接收到change的消息后 @Override public synchronized void applyClusterState(final ClusterChangedEvent event) { if (!lifecycle.started()) { return; } final ClusterState state = event.state(); // we need to clean the shards and indices we have on this node, since we // are going to recover them again once state persistence is disabled (no master / not recovered) // TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks? if (state.blocks().disableStatePersistence()) { for (AllocatedIndex<? extends Shard> indexService : indicesService) { indicesService.removeIndex(indexService.index(), NO_LONGER_ASSIGNED, "cleaning index (disabled block persistence)"); // also cleans shards } return; } updateFailedShardsCache(state); deleteIndices(event); // also deletes shards of deleted indices removeUnallocatedIndices(event); // also removes shards of removed indices failMissingShards(state); removeShards(state); // removes any local shards that doesn't match what the master expects updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache createIndices(state); createOrUpdateShards(state);//恢复模块 node启动后会获取集群元信息 从集群元信息中可以获取到 当前的集群状态 路由表 索引 等等信息 } private void createOrUpdateShards(final ClusterState state) { RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); if (localRoutingNode == null) { return; } DiscoveryNodes nodes = state.nodes(); RoutingTable routingTable = state.routingTable(); for (final ShardRouting shardRouting : localRoutingNode) { ShardId shardId = shardRouting.shardId(); if (failedShardsCache.containsKey(shardId) == false) { AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());//分片服务 assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices"; Shard shard = indexService.getShardOrNull(shardId.id()); if (shard == null) { assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards"; createShard(nodes, routingTable, shardRouting, state);//新加入的节点需要创建新的shard } else { updateShard(nodes, shardRouting, shard, routingTable, state); } } } } private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) { assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting; DiscoveryNode sourceNode = null; if (shardRouting.recoverySource().getType() == Type.PEER) { sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); if (sourceNode == null) { logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId()); return; } } try { logger.debug("{} creating shard", shardRouting.shardId()); RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);//设置恢复阶段为INIT 此时还未恢复 indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting), repositoriesService, failedShardHandler, globalCheckpointSyncer);//创建分片 } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); } } //shard的create 和 recovery 放在一起了 @Override public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, Consumer<IndexShard.ShardFailure> onShardFailure, Consumer<ShardId> globalCheckpointSyncer) throws IOException { ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);//创建shard indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS: "mapping update consumer only required by local shards recovery"; try { client.admin().indices().preparePutMapping() .setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid .setType(type) .setSource(mapping.source().string(), XContentType.JSON) .get(); } catch (IOException ex) { throw new ElasticsearchException("failed to stringify mapping source", ex); } }, this); return indexShard; } public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardId> globalCheckpointSyncer) throws IOException { /* * TODO: we execute this in parallel but it's a synced method. Yet, we might * be able to serialize the execution via the cluster state in the future. for now we just * keep it synced. */ if (closed.get()) { throw new IllegalStateException("Can't create shard " + routing.shardId() + ", closed"); } final Settings indexSettings = this.indexSettings.getSettings(); final ShardId shardId = routing.shardId();//获取需要恢复的shardId boolean success = false; Store store = null; IndexShard indexShard = null; ShardLock lock = null; try { lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));//shard 锁 eventListener.beforeIndexShardCreated(shardId, indexSettings);//分片创建前 会把该有的服务先注册进去 ShardPath path; try { path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings);//如果已经存在shard路径 } catch (IllegalStateException ex) { logger.warn("{} failed to load shard path, trying to remove leftover", shardId); try { ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings); path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings); } catch (Exception inner) { ex.addSuppressed(inner); throw ex; } } if (path == null) { // TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard // that's being relocated/replicated we know how large it will become once it's done copying: // Count up how many shards are currently on each data path: Map<Path, Integer> dataPathToShardCount = new HashMap<>(); for (IndexShard shard : this) { Path dataPath = shard.shardPath().getRootStatePath(); Integer curCount = dataPathToShardCount.get(dataPath); if (curCount == null) { curCount = 0; } dataPathToShardCount.put(dataPath, curCount + 1); } path = ShardPath.selectNewPathForShard(nodeEnv, shardId, this.indexSettings, routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), dataPathToShardCount);//计算主分片的位置 和分片ID logger.debug("{} creating using a new path [{}]", shardId, path); } else { logger.debug("{} creating using an existing path [{}]", shardId, path); } if (shards.containsKey(shardId.id())) { throw new IllegalStateException(shardId + " already exists"); } logger.debug("creating shard_id {}", shardId); // if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary. final Engine.Warmer engineWarmer = (searcher) -> { IndexShard shard = getShardOrNull(shardId.getId()); if (shard != null) { warmer.warm(searcher, shard, IndexService.this.indexSettings); } }; store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))); indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier, indexCache, mapperService, similarityService, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId)); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");//更新shard信息 因为state文件夹已经创建 eventListener.afterIndexShardCreated(indexShard); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); //把shard放进shard 集合中 通过shards.get()可以获取已存在的shard success = true; return indexShard; } catch (ShardLockObtainFailedException e) { throw new IOException("failed to obtain in-memory shard lock", e); } finally { if (success == false) { if (lock != null) { IOUtils.closeWhileHandlingException(lock); } closeShard("initialization failed", shardId, indexShard, store, eventListener); } } } public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, BiConsumer<String, MappingMetaData> mappingUpdateConsumer, IndicesService indicesService) { // TODO: Create a proper object to encapsulate the recovery context // all of the current methods here follow a pattern of: // resolve context which isn't really dependent on the local shards and then async // call some external method with this pointer. // with a proper recovery context object we can simply change this to: // startRecovery(RecoveryState recoveryState, ShardRecoverySource source ) { // markAsRecovery("from " + source.getShortDescription(), recoveryState); // threadPool.generic().execute() { // onFailure () { listener.failure() }; // doRun() { // if (source.recover(this)) { // recoveryListener.onRecoveryDone(recoveryState); // } // } // }} // } assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());//恢复的类型 在allcoation分片的时候设置 大致时先读集群元信息,如果有的话 主分片设置为EXISTING_STORE 副本分片设置为PEER,如果没有的话 先分片 在分片过程中设置 switch (recoveryState.getRecoverySource().getType()) { case EMPTY_STORE: case EXISTING_STORE://主分片从本地恢复 markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(() -> { try { if (recoverFromStore()) { //主分片 recoveryListener.onRecoveryDone(recoveryState); } } catch (Exception e) {//如果恢复失败 关闭并移除shard 同时向master发送internal:cluster/shard/failure的消息 recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); } }); break; case PEER: //副本分片从主分片恢复 sourceservice 是master target是本地需要恢复的 try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener); } catch (Exception e) { failShard("corrupted preexisting index", e); recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); } break; case SNAPSHOT://快照恢复 markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource(); threadPool.generic().execute(() -> { try { final Repository repository = repositoriesService.repository(recoverySource.snapshot().getRepository()); if (restoreFromRepository(repository)) { recoveryListener.onRecoveryDone(recoveryState); //主分片恢复完成 然后发送internal:cluster/shard/started ShardStateAction的messageReceive接收到消息后 提交一个更新的task } } catch (Exception e) { recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); } }); break; case LOCAL_SHARDS://从本地节点的其他分片恢复 final IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); final Index mergeSourceIndex = indexMetaData.getMergeSourceIndex(); final List<IndexShard> startedShards = new ArrayList<>(); final IndexService sourceIndexService = indicesService.indexService(mergeSourceIndex); final int numShards = sourceIndexService != null ? sourceIndexService.getIndexSettings().getNumberOfShards() : -1; if (sourceIndexService != null) { for (IndexShard shard : sourceIndexService) { if (shard.state() == IndexShardState.STARTED) { startedShards.add(shard); } } } if (numShards == startedShards.size()) { markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(() -> { try { final Set<ShardId> shards = IndexMetaData.selectShrinkShards(shardId().id(), sourceIndexService.getMetaData(), +indexMetaData.getNumberOfShards()); if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream() .filter((s) -> shards.contains(s.shardId())).collect(Collectors.toList()))) { recoveryListener.onRecoveryDone(recoveryState); } } catch (Exception e) { recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); } }); } else { final RuntimeException e; if (numShards == -1) { e = new IndexNotFoundException(mergeSourceIndex); } else { e = new IllegalStateException("not all shards from index " + mergeSourceIndex + " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard " + shardId()); } throw e; } break; default: throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource()); } } 主分片会调此方法 副本分片不会 boolean recoverFromStore(final IndexShard indexShard) { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE : "expected store recovery type but was: " + recoveryType; return executeRecovery(indexShard, () -> { logger.debug("starting recovery from store ..."); internalRecoverFromStore(indexShard); }); } return false; } /** * Recovers the state of the shard from the store. */ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRecoveryException { final RecoveryState recoveryState = indexShard.recoveryState(); final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE; indexShard.prepareForIndexRecovery();//设置为index 阶段 long version = -1; SegmentInfos si = null; final Store store = indexShard.store(); store.incRef(); try {//获取本地需要恢复节点的版本号 通过最后提交的segment中获取 lucene 提交一次就是一个segment try { store.failIfCorrupted(); try { si = store.readLastCommittedSegmentsInfo(); } catch (Exception e) { String files = "_unknown_"; try { files = Arrays.toString(store.directory().listAll()); } catch (Exception inner) { inner.addSuppressed(e); files += " (failure=" + ExceptionsHelper.detailedMessage(inner) + ")"; } if (indexShouldExists) { throw new IndexShardRecoveryException(shardId, "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e); } } if (si != null) { if (indexShouldExists) { version = si.getVersion(); } else { // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) // its a "new index create" API, we have to do something, so better to clean it than use same data logger.trace("cleaning existing shard, shouldn't exists"); Lucene.cleanLuceneIndex(store.directory()); si = null; } } } catch (Exception e) { throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e); } recoveryState.getIndex().updateVersion(version);//更新当前索引的版本号 if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {//如果恢复的数据源在本地 assert indexShouldExists; indexShard.skipTranslogRecovery();//跳过使用translog恢复 因为translog得从master上下载 } else { // since we recover from local, just fill the files and size try { final RecoveryState.Index index = recoveryState.getIndex(); if (si != null) { addRecoveredFileDetails(si, store, index);//把需要获取的文件名称 长度等放到集合中 } } catch (IOException e) { logger.debug("failed to list file details", e); } indexShard.performTranslogRecovery(indexShouldExists); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); } indexShard.finalizeRecovery();//设置当前的状态为finalizeRecovery 然后把数据写到cache里面 此时translog可以被删除了 设置setEnableGcDeletes(true) indexShard.postRecovery("post recovery from shard_store");//更新集群信息 恢复已经完成 同时设置当前阶段为DONE } catch (EngineException | IOException e) { throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e); } finally { store.decRef(); } } /** * After the store has been recovered, we need to start the engine in order to apply operations */ public void performTranslogRecovery(boolean indexExists) throws IOException { if (indexExists == false) { // note: these are set when recovering from the translog final RecoveryState.Translog translogStats = recoveryState().getTranslog(); translogStats.totalOperations(0); translogStats.totalOperationsOnStart(0); } internalPerformTranslogRecovery(false, indexExists); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);//找到最后一次lucene提价的信息后 进入索引的检验 verify_index 阶段 // also check here, before we apply the translog //执行索引检查 可以在配置文件中设置 默认为false if (Booleans.isTrue(checkIndexOnStartup)) { try { checkIndex(); } catch (IOException ex) { throw new RecoveryFailedException(recoveryState, "check index failed", ex); } } //translog阶段 recoveryState.setStage(RecoveryState.Stage.TRANSLOG);//检查完成之后 开始进入translog阶段 设置阶段translog final EngineConfig.OpenMode openMode; /* by default we recover and index and replay the translog but if the index * doesn't exist we create everything from the scratch. Yet, if the index * doesn't exist we don't need to worry about the skipTranslogRecovery since * there is no translog on a non-existing index. * The skipTranslogRecovery invariant is used if we do remote recovery since * there the translog isn't local but on the remote host, hence we can skip it. */ if (indexExists == false) { openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG; } else if (skipTranslogRecovery) { //跳过translog恢复 openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG; } else { //从translog恢复 openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; } assert indexExists == false || assertMaxUnsafeAutoIdInCommit(); final EngineConfig config = newEngineConfig(openMode); // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering config.setEnableGcDeletes(false);//恢复阶段 不允许translog被删除 因为恢复过程会使用translog 如果删除了就恢复不了了 Engine newEngine = createNewEngine(config);//创建engine 打开后就可以进行写操作了 verifyNotClosed(); if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, // we still give sync'd flush a chance to run: active.set(true); //主分片恢复流程 newEngine.recoverFromTranslog(); } assertSequenceNumbersInCommit(); } 之后调用recoverFromTranslog 此过程如果失败的话 会关闭 释放锁 关闭searchManage translog (他们都是读文件操作) 索引回滚 里面主要调用的是recoverFromTranslogInternal private void recoverFromTranslogInternal() throws IOException { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; //查找到最后一次的提交点版本 lucene中每提交一次 gen会变 //根据gen找到最后一次的快照 final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {//对最后一次提交做快照 opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);//读translog数据 进行操作 } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); } // flush if we recovered something or if we have references to older translogs // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length. assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; pendingTranslogRecovery.set(false); // we are good - now we can commit if (opsRecovered > 0) {//opsRecoveryId为从translog的快照中 每执行一次该id+1 如果大于0 说明有操作 执行刷盘操作 logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]", opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration()); //刷盘 flush(true, true); } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); } else if (lastCommittedSegmentInfos.getUserData().containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) == false) { assert engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) : "index was created on version " + engineConfig.getIndexSettings().getIndexVersionCreated() + "but has " + "no sequence numbers info in commit"; commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); } // clean up what's not needed translog.trimUnreferencedReaders(); } // IndexShard 读快照 并做相应的索引操作 int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); int opsRecovered = 0; Translog.Operation operation; while ((operation = snapshot.next()) != null) {//如果快照中有数据 try { logger.trace("[translog] recover op {}", operation); Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> { throw new IllegalArgumentException("unexpected mapping update: " + update); //根据操作类型 对index进行写 删除等操作 ,此过程不会写translog 仍然调用的时写操作时的 IntarvlEngine.index() 方法 }); ExceptionsHelper.reThrowIfNotNull(result.getFailure()); opsRecovered++; recoveryState.getTranslog().incrementRecoveredOperations(); } catch (Exception e) { if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { // mainly for MapperParsingException and Failure to detect xcontent logger.info("ignoring recovery of a corrupt translog entry", e); } else { throw e; } } } return opsRecovered; } 以上为主分片的恢复流程
private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) { RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout); if (newTarget != null) { threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId())); } } 副本分片恢复流程 也是 init index verify translog finalize done 不同的是 副本需要从主分片上复制translog 和 索引 private void doRecovery(final long recoveryId) { final StartRecoveryRequest request; final CancellableThreads cancellableThreads; final RecoveryState.Timer timer; try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { if (recoveryRef == null) { logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId); return; } final RecoveryTarget recoveryTarget = recoveryRef.target(); cancellableThreads = recoveryTarget.cancellableThreads(); timer = recoveryTarget.state().getTimer(); try { assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; request = getStartRecoveryRequest(recoveryTarget);//创建需要恢复的request对象 logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); recoveryTarget.indexShard().prepareForIndexRecovery();//设置为index阶段 } catch (final Exception e) { // this will be logged as warning later on... logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true); return; } } try { logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode()); final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>(); cancellableThreads.execute(() -> responseHolder.set( transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,//发送start_recovery的rpc请求 线程阻塞 等待响应 new FutureTransportResponseHandler<RecoveryResponse>() { @Override public RecoveryResponse newInstance() { return new RecoveryResponse(); } }).txGet()));//阻塞线程等待相应 final RecoveryResponse recoveryResponse = responseHolder.get(); final TimeValue recoveryTime = new TimeValue(timer.time()); // do this through ongoing recoveries to remove it from the collection onGoingRecoveries.markRecoveryAsDone(recoveryId);//恢复完成后 从集合中去除已恢复的id if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); sb.append('[').append(request.shardId().getIndex().getName()).append(']').append('[').append(request.shardId().id()) .append("] "); sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime).append("]\n"); sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with " + "total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]") .append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append (timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']') .append("\n"); sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with " + "total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n"); sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n"); sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log " + "operations") .append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]") .append("\n"); logger.trace("{}", sb); } else { logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(), recoveryTime); } } catch (CancellableThreads.ExecutionCancelledException e) { logger.trace("recovery cancelled", e); } catch (Exception e) { if (logger.isTraceEnabled()) { logger.trace( (Supplier<?>) () -> new ParameterizedMessage( "[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(), request.shardId().id()), e); } Throwable cause = ExceptionsHelper.unwrapCause(e); if (cause instanceof CancellableThreads.ExecutionCancelledException) { // this can also come from the source wrapped in a RemoteTransportException onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, "source has canceled the recovery", cause), false); return; } if (cause instanceof RecoveryEngineException) { // unwrap an exception that was thrown as part of the recovery cause = cause.getCause(); } // do it twice, in case we have double transport exception cause = ExceptionsHelper.unwrapCause(cause); if (cause instanceof RecoveryEngineException) { // unwrap an exception that was thrown as part of the recovery cause = cause.getCause(); } // here, we would add checks against exception that need to be retried (and not removeAndClean in this case) if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException || cause instanceof ShardNotFoundException) { // if the target is not ready yet, retry retryRecovery( recoveryId, "remote shard not ready", recoverySettings.retryDelayStateSync(), recoverySettings.activityTimeout()); return; } if (cause instanceof DelayRecoveryException) { retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(), recoverySettings.activityTimeout()); return; } if (cause instanceof ConnectTransportException) { logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(), recoverySettings.retryDelayNetwork(), cause.getMessage()); retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(), recoverySettings.activityTimeout()); return; } if (cause instanceof AlreadyClosedException) { onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, "source shard is closed", cause), false); return; } onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true); } } //主分片接收到 internal:index/shard/recovery/start_recovery 请求后 private RecoveryResponse recover(final StartRecoveryRequest request) throws IOException { final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); final ShardRouting routingEntry = shard.routingEntry(); if (routingEntry.primary() == false || routingEntry.active() == false) { throw new DelayRecoveryException("source shard [" + routingEntry + "] is not an active primary"); } if (request.isPrimaryRelocation() && (routingEntry.relocating() == false || routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) { logger.debug("delaying recovery of {} as source shard is not marked yet as relocating to {}", request.shardId(), request.targetNode()); throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]"); } RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard); logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode()); try { return handler.recoverToTarget(); //向副本分片发送数据 } finally { ongoingRecoveries.remove(shard, handler); } } /** * performs the recovery from the local engine to the target */ public RecoveryResponse recoverToTarget() throws IOException { runUnderPrimaryPermit(() -> { final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); if (targetShardRouting == null) { logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(), request.targetNode()); throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; }); try (Closeable ignored = shard.acquireTranslogRetentionLock()) {// 获取保留锁,作用是防止teanslog被 清理 final Translog translog = shard.getTranslog(); final long startingSeqNo; final long requiredSeqNoRangeStart; //检测是否可以从sequenceNumber恢复 /** * es中有sequenceNo的概念,正常情况下主分片和副本分片会保持一个相同的序列号,每次主分片操作后sequenceNo会+1然后从主分片 会同步到副本分片 * 如果某种情况下 副本分片的sequenceNo小于主分片的 恢复的时候只需要恢复副本分片sequenceNo之后的操作 不用全部恢复 有时候translog可能比较大 *判断seqNo 和 本地检查点 如果seqNo不是未标记状态 且小于本地检查点 只发送 startSeqNo到endSeqNo的数据 跳过全部发送 */ final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(); if (isSequenceNumberBasedRecoveryPossible) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); requiredSeqNoRangeStart = startingSeqNo; } else { final Engine.IndexCommitRef phase1Snapshot; try { phase1Snapshot = shard.acquireIndexCommit(false); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } // we set this to 0 to create a translog roughly according to the retention policy // on the target. Note that it will still filter out legacy operations with no sequence numbers startingSeqNo = 0; // but we must have everything above the local checkpoint in the commit requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; try { phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations);//快照恢复 } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { try { IOUtils.close(phase1Snapshot); } catch (final IOException ex) { logger.warn("releasing snapshot caused exception", ex); } } } assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + startingSeqNo + "]"; runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); try { //通知副本节点启动engine prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); /* * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all * operations in the required range will be available for replaying from the translog of the source. */ cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); final long targetLocalCheckpoint; //以startNo为起点对translog做快照 try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) { targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } finalizeRecovery(targetLocalCheckpoint); } return response; } /** * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source * translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain * all ops above the source local checkpoint, so we can stop check there. * * @return {@code true} if the source is ready for a sequence-number-based recovery * @throws IOException if an I/O exception occurred reading the translog snapshot * * 判断sequenceNo是否小于主分片的seqNo * */ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { final long startingSeqNo = request.startingSeqNo(); assert startingSeqNo >= 0; final long localCheckpoint = shard.getLocalCheckpoint(); logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint); // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one //开始恢复的seqNo应该小于检查节点的 if (startingSeqNo - 1 <= localCheckpoint) { final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); //检查translog中的数据是否够 try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { tracker.markSeqNoAsCompleted(operation.seqNo()); } } } return tracker.getCheckpoint() >= localCheckpoint; } else { return false; } } /** * Perform phase1 of the recovery operations. Once this {@link IndexCommit} * snapshot has been performed no commit operations (files being fsync'd) * are effectively allowed on this index until all recovery phases are done * <p> * Phase1 examines the segment files on the target node and copies over the * segments that are missing. Only segments that have the same size and * checksum can be reused * 对shard 做快照 然后发送 如果数据量大会占用带宽 * 当SyncId 一样且文档数也一样 跳过发送文件 * */ public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered long totalSize = 0; // Total size of segment files that were able to be re-used long existingTotalSize = 0; final Store store = shard.store(); store.incRef(); try { StopWatch stopWatch = new StopWatch().start(); final Store.MetadataSnapshot recoverySourceMetadata; try { recoverySourceMetadata = store.getMetadata(snapshot); } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { shard.failShard("recovery", ex); throw ex; } for (String name : snapshot.getFileNames()) { final StoreFileMetaData md = recoverySourceMetadata.get(name); if (md == null) { logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap()); throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " + recoverySourceMetadata.asMap().size() + " files", name); } } // Generate a "diff" of all the identical, different, and missing // segment files on the target node, using the existing files on // the source node //如果两者同步点相同并且文档数也相同 String recoverySourceSyncId = recoverySourceMetadata.getSyncId(); String recoveryTargetSyncId = request.metadataSnapshot().getSyncId(); final boolean recoverWithSyncId = recoverySourceSyncId != null && recoverySourceSyncId.equals(recoveryTargetSyncId); if (recoverWithSyncId) {//如果一个索引五分钟内没有写操作 es会往索引里面写一个syncid 正常情况下主分片和副本的SyncId是一样 但每次提交索引后 syncId会被删除 所以用在冷启动 相同的话 会跳过索引复制过程 final long numDocsTarget = request.metadataSnapshot().getNumDocs(); final long numDocsSource = recoverySourceMetadata.getNumDocs(); if (numDocsTarget != numDocsSource) { throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number " + "of docs differ: " + numDocsSource + " (" + request.sourceNode().getName() + ", primary) vs " + numDocsTarget + "(" + request.targetNode().getName() + ")"); } // we shortcut recovery here because we have nothing to copy. but we must still start the engine on the target. // so we don't return here logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target", recoverySourceSyncId); } else {//不同的话 先比较哪些不通 然后把不同的文件发送到target节点上 final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); for (StoreFileMetaData md : diff.identical) { response.phase1ExistingFileNames.add(md.name()); response.phase1ExistingFileSizes.add(md.length()); existingTotalSize += md.length(); if (logger.isTraceEnabled()) { logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," + " size [{}]", md.name(), md.checksum(), md.length()); } totalSize += md.length(); } List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size()); phase1Files.addAll(diff.different); phase1Files.addAll(diff.missing); for (StoreFileMetaData md : phase1Files) { if (request.metadataSnapshot().asMap().containsKey(md.name())) { logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", md.name(), request.metadataSnapshot().asMap().get(md.name()), md); } else { logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name()); } response.phase1FileNames.add(md.name()); response.phase1FileSizes.add(md.length()); totalSize += md.length(); } response.phase1TotalSize = totalSize; response.phase1ExistingTotalSize = existingTotalSize; logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); cancellableThreads.execute(() ->//target 接收文件 recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, translogOps.get())); // How many bytes we've copied since we last called RateLimiter.pause final Function<StoreFileMetaData, OutputStream> outputStreamFactories = md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogOps), chunkSizeInBytes); //比较不同后 发送文件 chunkSizeInBytes默认为512k sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); // Send the CLEAN_FILES request, which takes all of the files that // were transferred and renames them from their temporary file // names to the actual file names. It also writes checksums for // the files after they have been renamed. // // Once the files have been renamed, any other files that are not // related to this recovery (out of date segments, for example) // are deleted try { cancellableThreads.executeIO(() -> recoveryTarget.cleanFiles(translogOps.get(), recoverySourceMetadata)); } catch (RemoteTransportException | IOException targetException) { final IOException corruptIndexException; // we realized that after the index was copied and we wanted to finalize the recovery // the index was corrupted: // - maybe due to a broken segments file on an empty index (transferred with no checksum) // - maybe due to old segments without checksums or length only checks if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(targetException)) != null) { try { final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot); StoreFileMetaData[] metadata = StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new); ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first for (StoreFileMetaData md : metadata) { cancellableThreads.checkForCancel(); logger.debug("checking integrity for file {} after remove corruption exception", md); if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! shard.failShard("recovery", corruptIndexException); logger.warn("Corrupted file detected {} checksum mismatch", md); throw corruptIndexException; } } } catch (IOException ex) { targetException.addSuppressed(ex); throw targetException; } // corruption has happened on the way to replica RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " + "checksums are ok", null); exception.addSuppressed(targetException); logger.warn( (org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage( "{} Remote file corruption during finalization of recovery on node {}. local checksum OK", shard.shardId(), request.targetNode()), corruptIndexException); throw exception; } else { throw targetException; } } } logger.trace("recovery [phase1]: took [{}]", stopWatch.totalTime()); response.phase1Time = stopWatch.totalTime().millis(); } catch (Exception e) { throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e); } finally { store.decRef(); } } void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase1]: prepare remote engine for translog"); final long startEngineStart = stopWatch.totalTime().millis(); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps)); stopWatch.stop(); response.startTime = stopWatch.totalTime().millis() - startEngineStart; logger.trace("recovery [phase1]: remote engine start took [{}]", stopWatch.totalTime()); } /** * Perform phase two of the recovery process. * <p> * Phase two uses a snapshot of the current translog *without* acquiring the write lock (however, the translog snapshot is * point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new * shard. * * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all * ops should be sent * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) * @param endingSeqNo the highest sequence number that should be sent * @param snapshot a snapshot of the translog * @return the local checkpoint on the target */ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } cancellableThreads.checkForCancel(); final StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); // send all the snapshot's translog operations to the target final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);//对translog做快照然后发送给target stopWatch.stop(); logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); response.phase2Time = stopWatch.totalTime().millis(); response.phase2Operations = result.totalOperations; return result.targetLocalCheckpoint; }