HDFS Lease Recovey 和 Block Recovery


這篇分析一下Lease Recovery 和 Block Recovery

hdfs支持hflush后,需要保證hflush的數據被讀到,datanode重啟不能簡單的丟棄文件的最后一個block,而是需要保留下hflush的數據。同時為了支持append,需要將已經finalized的block重新打開追加數據。這就為宕機的恢復處理帶來了更大的困難,支持hflush/append之前,hdfs只需要將未關閉文件的最后一個block的多個副本刪除即可.

在hdfs的設計中,Lease是為了實現一個文件在一個時刻只能被一個客戶端寫。客戶端寫文件或者append之前都需要向namenode申請這個文件的Lease,在客戶端寫數據的過程中,后台線程會不斷的renew lease,不斷的延長獨占寫的時間.實際上,Lease有兩個limit,一個是soft limit,默認60s,一個是hard limit,默認1小時。這兩個limit的區別如下:

lease soft limit過期之前,該客戶端擁有對這個文件的獨立訪問權,其他客戶端不能剝奪該客戶端獨占寫這個文件的權利。

lease soft limit過期后,任何一個客戶端都可以回收lease,繼而得到這個文件的lease,獲得對這個文件的獨占訪問權。

lease hard limit過期后,namenode強制關閉文件,撤銷lease.

考慮客戶端寫文件的過程中宕機,那么在lease soft limit過期之前,其他的客戶端不能寫這個文件,等到lease soft limit過期后,其他客戶端可以寫這個文件,在寫文件之前,會首先檢查文件是不是沒有關閉,如果沒有,那么就會進入lease recovery和block recovery階段,這個階段的目的是使文件的最后一個block的所有副本數據達到一致,因為客戶端寫block的多個副本是pipeline寫,pipeline中的副本數據不一致很正常。

本文考慮客戶端寫的過程中客戶端宕機,隨后其他客戶端對這個文件進行append操作的場景。

客戶端通過如下代碼對一個文件進行append:

FileSystem fs = FileSystem.get(configuration);
FSDataOutputStream out = fs.append(path);
out.write(byte[]);

append操作在namenode這端主要邏輯在FSNameSystem的appendFileInternal函數中處理,內部會調用

 // Opening an existing file for write - may need to recover lease.
 recoverLeaseInternal(myFile, src, holder, clientMachine, false);

來檢查是否需要首先對文件進行lease recovery.重點看看這個函數.

 private void recoverLeaseInternal(INodeFile fileInode, 
      String src, String holder, String clientMachine, boolean force)
      throws IOException {
    // holder是對這個文件進行append的clientname
    assert hasWriteLock();
    if (fileInode != null && fileInode.isUnderConstruction()) {
      //
      // If the file is under construction , then it must be in our
      // leases. Find the appropriate lease record.
      //
      Lease lease = leaseManager.getLease(holder);
      //
      // We found the lease for this file. And surprisingly the original
      // holder is trying to recreate this file. This should never occur.
      //
      if (!force && lease != null) {
        Lease leaseFile = leaseManager.getLeaseByPath(src);
        if ((leaseFile != null && leaseFile.equals(lease)) ||
            lease.getHolder().equals(holder)) { 
          throw new AlreadyBeingCreatedException(
            "failed to create file " + src + " for " + holder +
            " for client " + clientMachine +
            " because current leaseholder is trying to recreate file.");
        }
      }
      //
      // Find the original holder.
      //
      FileUnderConstructionFeature uc = fileInode.getFileUnderConstructionFeature();
      String clientName = uc.getClientName();
      lease = leaseManager.getLease(clientName);
      if (lease == null) {
        throw new AlreadyBeingCreatedException(
          "failed to create file " + src + " for " + holder +
          " for client " + clientMachine +
          " because pendingCreates is non-null but no leases found.");
      }
      if (force) {
        // close now: no need to wait for soft lease expiration and 
        // close only the file src
        LOG.info("recoverLease: " + lease + ", src=" + src +
          " from client " + clientName);
        internalReleaseLease(lease, src, holder);
      } else {
        assert lease.getHolder().equals(clientName) :
          "Current lease holder " + lease.getHolder() +
          " does not match file creator " + clientName;
        //
        // If the original holder has not renewed in the last SOFTLIMIT 
        // period, then start lease recovery.
        //
        if (lease.expiredSoftLimit()) {
          LOG.info("startFile: recover " + lease + ", src=" + src + " client "
              + clientName);
          boolean isClosed = internalReleaseLease(lease, src, null);
          if(!isClosed)
            throw new RecoveryInProgressException(
                "Failed to close file " + src +
                ". Lease recovery is in progress. Try again later.");
        } else {
          final BlockInfo lastBlock = fileInode.getLastBlock();
          if (lastBlock != null
              && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
            throw new RecoveryInProgressException("Recovery in progress, file ["
                + src + "], " + "lease owner [" + lease.getHolder() + "]");
          } else {
            throw new AlreadyBeingCreatedException("Failed to create file ["
                + src + "] for [" + holder + "] for client [" + clientMachine
                + "], because this file is already being created by ["
                + clientName + "] on ["
                + uc.getClientMachine() + "]");
          }
        }
      }
    }
  }

  1. 通過檢查文件的INode看文件的狀態,如果處於under construction狀態,說明,該文件不處於關閉狀態,那么很可能這個文件需要經過lease recovery和block recovery階段來對文件的最后一個block的多個副本達到一致.

  2. 從lease manager中根據clientname拿到clientname持有的Lease(holder是調用此次append操作的clientname),如果不為空,說明該客戶端依然持有lease,那么接着看這個lease中是否包含append的這個文件名,如果確實有,那么說明當前客戶端仍然持有這個文件的lease,append失敗,因為append的前提條件是文件處於closed狀態.如果lease中不包含這個文件,說明客戶端當前不持有這個文件的Lease,那么繼續往下走

  3. 從INode中找出這個之前擁有這個文件的leaseholder,也就是在我們設定的場景中的宕機的客戶端,然后從lease manager中找到宕機的客戶端對應的Lease,然后檢查是否這個lease已經soft limit過期,如果過期,則調用

boolean isClosed = internalReleaseLease(lease, src, null);

這個函數檢查是否需要真正的進入block recovery階段,這個階段需要datanode的參與。下面函數的主要邏輯如下.

3.1. 如果文件的所有block都是completed狀態,則不需要進行block recovery,關閉文件.

	則從lease manager將這個文件的lease刪除,將INode的狀態置為complete,最后記一條close file的edit log

3.2. 如果最后一個block是committed狀態,那么看該文件的最后兩個block的狀態,如果倒數第二個block和最后一個block都滿足最小副本數要求(默認是1),關閉文件.否則,客戶端拋異常。

3.3. 如果最后一個block是under construction或者under recovery狀態,並且最后一個block沒有任何datanode匯報上來,很有可能是pipeline還沒建立起來,客戶端就宕機了,這種情況下,只需要把最后一個block從INode中溢出,並且關閉文件.

3.4. 進入block recovery階段.

  1. 為這次block recovery過程申請一個block recovery id,標示這次block recovery過程.block recovery id實際是一個新分配的generation stamp

  2. 將block狀態設置為under recovery,從block的多個副本中選擇一個副本所在的datanode作為primary data node,然后將這個block放入這個datanode的recoverBlocks列表中,隨后,namenode在處理datanode的定期心跳中,會將這個datanode的所有的recoverBlocks都在心跳回復中發送給datanode,以BlockRecoveryCommand的形式.代碼:

    DatanodeManager::handleHeartbeat
    //check lease recovery
        BlockInfoUnderConstruction[] blocks = nodeinfo
            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
        if (blocks != null) {
          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
              blocks.length);
          for (BlockInfoUnderConstruction b : blocks) {
            final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
            // Skip stale nodes during recovery - not heart beated for some time (30s by default).
            final List<DatanodeStorageInfo> recoveryLocations =
                new ArrayList<DatanodeStorageInfo>(storages.length);
            for (int i = 0; i < storages.length; i++) {
              if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
                recoveryLocations.add(storages[i]);
              }
            }
            // If we only get 1 replica after eliminating stale nodes, then choose all
            // replicas for recovery and let the primary data node handle failures.
            if (recoveryLocations.size() > 1) {
              if (recoveryLocations.size() != storages.length) {
                LOG.info("Skipped stale nodes for recovery : " +
                    (storages.length - recoveryLocations.size()));
              }
              brCommand.add(new RecoveringBlock(
                  new ExtendedBlock(blockPoolId, b),
                  DatanodeStorageInfo.toDatanodeInfos(recoveryLocations),
                  b.getBlockRecoveryId()));
            } else {
              // If too many replicas are stale, then choose all replicas to participate
              // in block recovery.
              brCommand.add(new RecoveringBlock(
                  new ExtendedBlock(blockPoolId, b),
                  DatanodeStorageInfo.toDatanodeInfos(storages),
                  b.getBlockRecoveryId()));
            }
          }
          return new DatanodeCommand[] { brCommand };
        }
    
    

現在看DataNode端.
DataNode端的BPServiceActor處理心跳回復,在offerService()函數中,從心跳回復中拿出所有的DataNodeCommand處理。在processCommandFromActive函數中檢查,command類型是DNA_RECOVERBLOCK,說明是block recovery命令,調用DataNode的recoverBlocks處理.

    case DatanodeProtocol.DNA_RECOVERBLOCK:
      String who = "NameNode at " + actor.getNNSocketAddress();
      dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
      break;

dn.recoverBlocks會起一個后台線程專門來處理這件事,對於每個需要recover的block:

  1. 從block拿出副本所在的datanode,給其他兩個副本所在的datanode建立連接,datanode之間的接口定義在InterDatanodeProtocol接口中,調用DataNode(包括自己)的initReplicaRecovery(rBlock)函數,DataNode最終會調用FsDatasetImpl的initReplicaRecovery方法來初始化datanode上需要恢復的replica。看看這個函數:

static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);

//check replica
if (replica == null) {
  return null;
}

//stop writer if there is any
if (replica instanceof ReplicaInPipeline) {
  final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
  rip.stopWriter(xceiverStopTimeout);

  //check replica bytes on disk.
  if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
    throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
        + " getBytesOnDisk() < getVisibleLength(), rip=" + rip);
  }

  //check the replica's files
  checkReplicaFiles(rip);
}

//check generation stamp
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
  throw new IOException(
      "replica.getGenerationStamp() < block.getGenerationStamp(), block="
      + block + ", replica=" + replica);
}

//check recovery id
if (replica.getGenerationStamp() >= recoveryId) {
  throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
      + " replica.getGenerationStamp() >= recoveryId = " + recoveryId
      + ", block=" + block + ", replica=" + replica);
}

//check RUR
final ReplicaUnderRecovery rur;
if (replica.getState() == ReplicaState.RUR) {
  rur = (ReplicaUnderRecovery)replica;
  if (rur.getRecoveryID() >= recoveryId) {
    throw new RecoveryInProgressException(
        "rur.getRecoveryID() >= recoveryId = " + recoveryId
        + ", block=" + block + ", rur=" + rur);
  }
  final long oldRecoveryID = rur.getRecoveryID();
  rur.setRecoveryID(recoveryId);
  LOG.info("initReplicaRecovery: update recovery id for " + block
      + " from " + oldRecoveryID + " to " + recoveryId);
}
else {
  rur = new ReplicaUnderRecovery(replica, recoveryId);
  map.add(bpid, rur);
  LOG.info("initReplicaRecovery: changing replica state for "
      + block + " from " + replica.getState()
      + " to " + rur.getState());
}
return rur.createInfo();

}
```

首先,檢查副本的狀態,如果當前副本的狀態是正在寫的過程中,那么調用replica的stopWriter停止這個寫線程,停止的方法就是interupt這個寫線程(寫pipeline時,datanode創建replica時會將當前寫線程的handle存到replica中),從這可以看出blcok recovery優先級很高。然后做一些check,比如副本在磁盤上的文件是否存在,meta文件是否存在等,然后,檢查generation stamp,namenode記錄的generation stamp不能比實際的大,recovery id不能比副本的generation stamp小,最后,創建一個ReplicaUnderRecovery,放入replica map中,這里還會檢查,如果replica已經處於under recovery狀態,則看當前的block recovery過程的recovery id和它誰大,如果更大,則強占它。
接着,將三個副本的信息(包括recovery前的副本的信息)都加入一個列表,然后開始sync,sync就是根據三個副本的原來的狀態,做一些選擇,規則如下,這是兩個副本的情況:

參考資料

hadoop-hdfs-2.4.1.jar

Append/Hflush/Read Design


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM