Hadoop集群對datanode宕機后的處理機制源碼閱讀


本文通過MetaWeblog自動發布,原文及更新鏈接:https://extendswind.top/posts/technical/hadoop_datanode_failure_processing

總體上涉及了心跳檢測、副本移除線程、副本恢復線程。當datanode發生宕機或者datanode中的某個storage(如一塊硬盤)發生的錯誤時,namenode會根據datanode發送的心跳進行檢測。但namenode並沒有在心跳檢測的匯報中進行即時反應,而是先記錄對應的心跳信息,由另一個定期檢測線程移除DatanodeManager和BlockManager中對應的block信息,並記錄需要恢復的數據。對於數據的恢復,又新建了一個線程進行定期掃描,分配恢復副本需要的源數據節點和目標數據節點,在datanode的下一輪心跳檢測中轉換為對應的命令返回給datanode。

宕機的心跳檢測

datanode會定時向namenode發送心跳數據包匯報當前的運行狀態。namenode在一定時間內沒收到數據節點的心跳時會標記為stale狀態,然后轉移該數據節點中的block到其它的數據節點。

hdfs配置中的幾個參數:

  • dfs.heartbeat.interval,Hadoop心跳檢測間隔,默認為3s。
  • dfs.namenode.stale.datanode.minimum.interval,datanode標記為stale狀態的需要丟失的最小心跳次數,默認為3。
  • dfs.namenode.stale.datanode.interval,Hadoop datanode超時范圍,超過此時間沒收到心跳檢測會被標記為stale狀態,默認為30s,大小必須超過前面兩個參數的乘積。

接收心跳消息

Hadoop的datanode心跳檢測通過rpc的形式發送,rpc函數通過參數傳遞數據節點統計信息,返回namenode需要對數據節點的命令。

datanode在通過rpc發送消息時,namenode首先在rpc server處理,交給NameSystem。NameNodeRpcServer中的處理:

@Override // DatanodeProtocol public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
    int xmitsInProgress, int xceiverCount,
    int failedVolumes, VolumeFailureSummary volumeFailureSummary,
    boolean requestFullBlockReportLease,
    @Nonnull SlowPeerReports slowPeers,
    @Nonnull SlowDiskReports slowDisks) throws IOException {
  checkNNStartup();
  verifyRequest(nodeReg);
  return namesystem.handleHeartbeat(nodeReg, report,
      dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
      failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
      slowPeers, slowDisks);
}

namesystem的類型為FSNamesystem,負責name-space state的相關管理(is a container of both transient and persisted name-space state, and does all the book-keeping work on a NameNode),是BlockManager, DatanodeManager, DelegationTokens, LeaseManager等服務的容器。在handleHeartbeat函數中,通過blockManager獲取的DatanodeManager進行了處理:

DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
  nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
  xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
  slowPeers, slowDisks);

然后DatanodeManager中調用HeartbeatManager進行了處理:

heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
        cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);

HeartbeatManager中心跳的處理

HeartbeatManager類負責心跳的處理,心跳的處理並沒有在接收到心跳消息后,而是用了一個額外的線程進行處理,默認每5min進行一次狀態掃描。可能是某些處理中需要多個datanode的信息,所以沒有直接對單個datanode發送消息時回復。對於datanodeManager中記錄的有問題的datanode和storage,直接進行移除。此處只負責移除namenode(BlockManager和DatanodeManager等)中的datanode信息,對於丟失副本的恢復過程並不處理。

一個Monitor內部類實現了Runnable接口,負責監測線程的運行。private final Daemon heartbeatThread = new Daemon(new Monitor());。在當前的時間與上次檢測的時間超過heartbeatRecheckInterval時,會調用heartbeatCheck()函數進行處理。

heartbeatCheck()函數中。每次循環首先遍歷DatanodeManager中的所有的datanode狀態以及每個datanode中的storage狀態,統計發生錯誤的datanode和storage(每個datanode上可能有多個storage,標記datanode運行正常但是storage出現問題的情況);然后通過DatanodeManager和BlockManager處理其中第一個datanode和storage,直至所有存在問題的datanode和storage都被處理完。

  public void run() {
    while(namesystem.isRunning()) {
      restartHeartbeatStopWatch();
      try {
        final long now = Time.monotonicNow();
        if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
          heartbeatCheck();
          lastHeartbeatCheck = now;
        }
	  // ....      }
  }
	
	
  void heartbeatCheck() {
    final DatanodeManager dm = blockManager.getDatanodeManager();
    // It's OK to check safe mode w/o taking the lock here, we re-check     // for safe mode after taking the lock before removing a datanode.     if (namesystem.isInStartupSafeMode()) {
      return;
    }
    boolean allAlive = false;
    while (!allAlive) {
      // locate the first dead node.       DatanodeDescriptor dead = null;

      // locate the first failed storage that isn't on a dead node.       DatanodeStorageInfo failedStorage = null;

      // check the number of stale nodes       int numOfStaleNodes = 0;
      int numOfStaleStorages = 0;
      synchronized(this) {
        for (DatanodeDescriptor d : datanodes) {
          // check if an excessive GC pause has occurred           if (shouldAbortHeartbeatCheck(0)) {
            return;
          }
          if (dead == null && dm.isDatanodeDead(d)) {
            stats.incrExpiredHeartbeats();
            dead = d;
          }
          if (d.isStale(dm.getStaleInterval())) {
            numOfStaleNodes++;
          }
          DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
          for(DatanodeStorageInfo storageInfo : storageInfos) {
            if (storageInfo.areBlockContentsStale()) {
              numOfStaleStorages++;
            }
            if (failedStorage == null &&
                storageInfo.areBlocksOnFailedStorage() &&
                d != dead) {
              failedStorage = storageInfo;
            }
          }
        }
        
        // Set the number of stale nodes in the DatanodeManager         dm.setNumStaleNodes(numOfStaleNodes);
        dm.setNumStaleStorages(numOfStaleStorages);
      }

      allAlive = (dead == null && failedStorage == null);
      if (!allAlive && namesystem.isInStartupSafeMode()) {
        return;
      }
      if (dead != null) {
        // acquire the fsnamesystem lock, and then remove the dead node.         namesystem.writeLock();
        try {
          dm.removeDeadDatanode(dead, !dead.isMaintenance());
        } finally {
          namesystem.writeUnlock();
        }
      }
      if (failedStorage != null) {
        // acquire the fsnamesystem lock, and remove blocks on the storage.         namesystem.writeLock();
        try {
          blockManager.removeBlocksAssociatedTo(failedStorage);
        } finally {
          namesystem.writeUnlock();
        }
      }
    }
  }

dm.removeDeadDatanode(dead, !dead.isMaintenance())。在removeDeadDatanode函數中又調用了removeDatanode處理datanode的刪除邏輯。刪除heartbeatManager中記錄的datanode、blockManager中相關的block、DatanodeManager內部(networktopology)的datanode記錄、版本信息處理、blockManager中的租約信息。

  private void removeDatanode(DatanodeDescriptor nodeInfo,
      boolean removeBlocksFromBlocksMap) {
    assert namesystem.hasWriteLock();
    heartbeatManager.removeDatanode(nodeInfo);
    if (removeBlocksFromBlocksMap) {
      blockManager.removeBlocksAssociatedTo(nodeInfo);
    }
    networktopology.remove(nodeInfo);
    decrementVersionCount(nodeInfo.getSoftwareVersion());
    blockManager.getBlockReportLeaseManager().unregister(nodeInfo);

    if (LOG.isDebugEnabled()) {
      LOG.debug("remove datanode " + nodeInfo);
    }
    blockManager.checkSafeMode();
  }

恢復數據

當數據節點被判斷為丟失時,blockManager在刪除數據節點內的block信息的同時,會將block加入到pendingReconstruction類的列表中。BlockManager中的另一個線程會定期(默認3s)處理pendingReconstruction對象中的數據。

主要分成3步:1. 將block分為EC碼block和副本block;2. 選擇目標節點執行task;3. 將task放入到DatanodeDescriptor類的replicateBlocks隊列中。

  /**  * Periodically calls computeBlockRecoveryWork().  * 默認每3s調用一次block recovery的操作。  */
  private class RedundancyMonitor implements Runnable {

    @Override
    public void run() {
      while (namesystem.isRunning()) {
        try {
          // Process recovery work only when active NN is out of safe mode.           if (isPopulatingReplQueues()) {
		    // 掃描neededReconstruction中的block,並且對每個block選擇需要被恢復到的數據節點和拷貝數據的節點             computeDatanodeWork();
            processPendingReconstructions();
            rescanPostponedMisreplicatedBlocks();
          }
          TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs); // 默認 3s         } catch (Throwable t) {
			// 省略異常處理         }
      }
    }
  }
  
    /**  * Reconstruct a set of blocks to full strength through replication or  * erasure coding  *  * @param blocksToReconstruct blocks to be reconstructed, for each priority  * @return the number of blocks scheduled for replication  */
  @VisibleForTesting
  int computeReconstructionWorkForBlocks(
      List<List<BlockInfo>> blocksToReconstruct) {
    int scheduledWork = 0;
    List<BlockReconstructionWork> reconWork = new LinkedList<>();

    // Step 1: categorize at-risk blocks into replication and EC tasks     namesystem.writeLock();
    try {
      synchronized (neededReconstruction) {
        for (int priority = 0; priority < blocksToReconstruct.size(); priority++) {
          for (BlockInfo block : blocksToReconstruct.get(priority)) {
            BlockReconstructionWork rw = scheduleReconstruction(block,
                priority);
            if (rw != null) {
              reconWork.add(rw);
            }
          }
        }
      }
    } finally {
      namesystem.writeUnlock();
    }

    // Step 2: choose target nodes for each reconstruction task     final Set<Node> excludedNodes = new HashSet<>();
    for(BlockReconstructionWork rw : reconWork){
      // Exclude all of the containing nodes from being targets.       // This list includes decommissioning or corrupt nodes.       excludedNodes.clear();
      for (DatanodeDescriptor dn : rw.getContainingNodes()) {
        excludedNodes.add(dn);
      }

      // choose replication targets: NOT HOLDING THE GLOBAL LOCK       final BlockPlacementPolicy placementPolicy =
          placementPolicies.getPolicy(rw.getBlock().getBlockType());
      rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
    }

    // Step 3: add tasks to the DN     namesystem.writeLock();
    try {
      for(BlockReconstructionWork rw : reconWork){
        final DatanodeStorageInfo[] targets = rw.getTargets();
        if(targets == null || targets.length == 0){
          rw.resetTargets();
          continue;
        }

        synchronized (neededReconstruction) {
          if (validateReconstructionWork(rw)) {
            scheduledWork++;
          }
        }
      }
    } finally {
      namesystem.writeUnlock();
    }
	// 省略debug     return scheduledWork;
  }

在對每個block創建新的轉移任務時,需要選擇一個當前已有副本的datanode和需要被復制到的datanode。對於已有副本的數據節點的選擇,默認會先從沒有寫開銷的DECOMMISSION_INPROGRESS狀態的datanode中選,否則隨機選一個沒有達到副本限制的節點(每個節點會記錄將要被復制的副本數,參數dfs.namenode.replication.max-streams用於限制每個節點上的副本數,默認為2),如果還不存在則隨機選擇其它符合要求的節點。對於被復制的數據節點,如同文件的第一次上傳過程,調用了對應的副本放置策略進行選擇。

/**  * Parse the data-nodes the block belongs to and choose a certain number  * from them to be the recovery sources.  *  * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes  * since the former do not have write traffic and hence are less busy.  * We do not use already decommissioned nodes as a source, unless there is  * no other choice.  * Otherwise we randomly choose nodes among those that did not reach their  * replication limits. However, if the recovery work is of the highest  * priority and all nodes have reached their replication limits, we will  * randomly choose the desired number of nodes despite the replication limit.  *  * In addition form a list of all nodes containing the block  * and calculate its replication numbers.  *  * @return the array of DatanodeDescriptor of the chosen nodes from which to  * recover the given block  */
  @VisibleForTesting
  DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
      List<DatanodeDescriptor> containingNodes,
      List<DatanodeStorageInfo> nodesContainingLiveReplicas,
      NumberReplicas numReplicas,
      List<Byte> liveBlockIndices, int priority) // ... 

數據節點的執行恢復的邏輯

前面向blockManager中獲取的DatanodeDescriptor類加入了block需要創建副本的任務。DatanodeManager在通過RPC發送心跳消息給namenode時,namenode會在處理心跳時將副本復制任務轉變為對應的命令返回給datanode。


// datanode發送心跳的rpc函數 @Override // DatanodeProtocol public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
    int xmitsInProgress, int xceiverCount,
    int failedVolumes, VolumeFailureSummary volumeFailureSummary,
    boolean requestFullBlockReportLease,
    @Nonnull SlowPeerReports slowPeers,
    @Nonnull SlowDiskReports slowDisks) throws IOException {
  checkNNStartup();
  verifyRequest(nodeReg);
  return namesystem.handleHeartbeat(nodeReg, report,
      dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
      failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
      slowPeers, slowDisks);
}

// FSNameSystem的handleHearbeat函數,通過blockManager調用DatanodeManager處理心跳 {
  // ...   DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
          nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
          xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
          slowPeers, slowDisks);
  // ... }

// DatanodeManager中handleHeartbeat取出先前存儲的任務,並轉為BlockCommander。 {
  // ...   List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
      numReplicationTasks);
  if (pendingList != null && !pendingList.isEmpty()) {
    cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
        pendingList));
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM