總體上涉及了心跳檢測、副本移除線程、副本恢復線程。當datanode發生宕機或者datanode中的某個storage(如一塊硬盤)發生的錯誤時,namenode會根據datanode發送的心跳進行檢測。但namenode並沒有在心跳檢測的匯報中進行即時反應,而是先記錄對應的心跳信息,由另一個定期檢測線程移除DatanodeManager和BlockManager中對應的block信息,並記錄需要恢復的數據。對於數據的恢復,又新建了一個線程進行定期掃描,分配恢復副本需要的源數據節點和目標數據節點,在datanode的下一輪心跳檢測中轉換為對應的命令返回給datanode。
宕機的心跳檢測
datanode會定時向namenode發送心跳數據包匯報當前的運行狀態。namenode在一定時間內沒收到數據節點的心跳時會標記為stale狀態,然后轉移該數據節點中的block到其它的數據節點。
hdfs配置中的幾個參數:
dfs.heartbeat.interval
,Hadoop心跳檢測間隔,默認為3s。dfs.namenode.stale.datanode.minimum.interval
,datanode標記為stale狀態的需要丟失的最小心跳次數,默認為3。dfs.namenode.stale.datanode.interval
,Hadoop datanode超時范圍,超過此時間沒收到心跳檢測會被標記為stale狀態,默認為30s,大小必須超過前面兩個參數的乘積。
接收心跳消息
Hadoop的datanode心跳檢測通過rpc的形式發送,rpc函數通過參數傳遞數據節點統計信息,返回namenode需要對數據節點的命令。
datanode在通過rpc發送消息時,namenode首先在rpc server處理,交給NameSystem。NameNodeRpcServer中的處理:
@Override // DatanodeProtocol public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
slowPeers, slowDisks);
}
namesystem的類型為FSNamesystem,負責name-space state的相關管理(is a container of both transient and persisted name-space state, and does all the book-keeping work on a NameNode),是BlockManager, DatanodeManager, DelegationTokens, LeaseManager等服務的容器。在handleHeartbeat函數中,通過blockManager獲取的DatanodeManager進行了處理:
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
slowPeers, slowDisks);
然后DatanodeManager中調用HeartbeatManager進行了處理:
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
HeartbeatManager中心跳的處理
HeartbeatManager類負責心跳的處理,心跳的處理並沒有在接收到心跳消息后,而是用了一個額外的線程進行處理,默認每5min進行一次狀態掃描。可能是某些處理中需要多個datanode的信息,所以沒有直接對單個datanode發送消息時回復。對於datanodeManager中記錄的有問題的datanode和storage,直接進行移除。此處只負責移除namenode(BlockManager和DatanodeManager等)中的datanode信息,對於丟失副本的恢復過程並不處理。
一個Monitor內部類實現了Runnable接口,負責監測線程的運行。private final Daemon heartbeatThread = new Daemon(new Monitor());
。在當前的時間與上次檢測的時間超過heartbeatRecheckInterval時,會調用heartbeatCheck()函數進行處理。
heartbeatCheck()函數中。每次循環首先遍歷DatanodeManager中的所有的datanode狀態以及每個datanode中的storage狀態,統計發生錯誤的datanode和storage(每個datanode上可能有多個storage,標記datanode運行正常但是storage出現問題的情況);然后通過DatanodeManager和BlockManager處理其中第一個datanode和storage,直至所有存在問題的datanode和storage都被處理完。
public void run() {
while(namesystem.isRunning()) {
restartHeartbeatStopWatch();
try {
final long now = Time.monotonicNow();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
heartbeatCheck();
lastHeartbeatCheck = now;
}
// .... }
}
void heartbeatCheck() {
final DatanodeManager dm = blockManager.getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check // for safe mode after taking the lock before removing a datanode. if (namesystem.isInStartupSafeMode()) {
return;
}
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node. DatanodeDescriptor dead = null;
// locate the first failed storage that isn't on a dead node. DatanodeStorageInfo failedStorage = null;
// check the number of stale nodes int numOfStaleNodes = 0;
int numOfStaleStorages = 0;
synchronized(this) {
for (DatanodeDescriptor d : datanodes) {
// check if an excessive GC pause has occurred if (shouldAbortHeartbeatCheck(0)) {
return;
}
if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
}
if (d.isStale(dm.getStaleInterval())) {
numOfStaleNodes++;
}
DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
for(DatanodeStorageInfo storageInfo : storageInfos) {
if (storageInfo.areBlockContentsStale()) {
numOfStaleStorages++;
}
if (failedStorage == null &&
storageInfo.areBlocksOnFailedStorage() &&
d != dead) {
failedStorage = storageInfo;
}
}
}
// Set the number of stale nodes in the DatanodeManager dm.setNumStaleNodes(numOfStaleNodes);
dm.setNumStaleStorages(numOfStaleStorages);
}
allAlive = (dead == null && failedStorage == null);
if (!allAlive && namesystem.isInStartupSafeMode()) {
return;
}
if (dead != null) {
// acquire the fsnamesystem lock, and then remove the dead node. namesystem.writeLock();
try {
dm.removeDeadDatanode(dead, !dead.isMaintenance());
} finally {
namesystem.writeUnlock();
}
}
if (failedStorage != null) {
// acquire the fsnamesystem lock, and remove blocks on the storage. namesystem.writeLock();
try {
blockManager.removeBlocksAssociatedTo(failedStorage);
} finally {
namesystem.writeUnlock();
}
}
}
}
dm.removeDeadDatanode(dead, !dead.isMaintenance())。在removeDeadDatanode函數中又調用了removeDatanode處理datanode的刪除邏輯。刪除heartbeatManager中記錄的datanode、blockManager中相關的block、DatanodeManager內部(networktopology)的datanode記錄、版本信息處理、blockManager中的租約信息。
private void removeDatanode(DatanodeDescriptor nodeInfo,
boolean removeBlocksFromBlocksMap) {
assert namesystem.hasWriteLock();
heartbeatManager.removeDatanode(nodeInfo);
if (removeBlocksFromBlocksMap) {
blockManager.removeBlocksAssociatedTo(nodeInfo);
}
networktopology.remove(nodeInfo);
decrementVersionCount(nodeInfo.getSoftwareVersion());
blockManager.getBlockReportLeaseManager().unregister(nodeInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("remove datanode " + nodeInfo);
}
blockManager.checkSafeMode();
}
恢復數據
當數據節點被判斷為丟失時,blockManager在刪除數據節點內的block信息的同時,會將block加入到pendingReconstruction類的列表中。BlockManager中的另一個線程會定期(默認3s)處理pendingReconstruction對象中的數據。
主要分成3步:1. 將block分為EC碼block和副本block;2. 選擇目標節點執行task;3. 將task放入到DatanodeDescriptor類的replicateBlocks隊列中。
/** * Periodically calls computeBlockRecoveryWork(). * 默認每3s調用一次block recovery的操作。 */
private class RedundancyMonitor implements Runnable {
@Override
public void run() {
while (namesystem.isRunning()) {
try {
// Process recovery work only when active NN is out of safe mode. if (isPopulatingReplQueues()) {
// 掃描neededReconstruction中的block,並且對每個block選擇需要被恢復到的數據節點和拷貝數據的節點 computeDatanodeWork();
processPendingReconstructions();
rescanPostponedMisreplicatedBlocks();
}
TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs); // 默認 3s } catch (Throwable t) {
// 省略異常處理 }
}
}
}
/** * Reconstruct a set of blocks to full strength through replication or * erasure coding * * @param blocksToReconstruct blocks to be reconstructed, for each priority * @return the number of blocks scheduled for replication */
@VisibleForTesting
int computeReconstructionWorkForBlocks(
List<List<BlockInfo>> blocksToReconstruct) {
int scheduledWork = 0;
List<BlockReconstructionWork> reconWork = new LinkedList<>();
// Step 1: categorize at-risk blocks into replication and EC tasks namesystem.writeLock();
try {
synchronized (neededReconstruction) {
for (int priority = 0; priority < blocksToReconstruct.size(); priority++) {
for (BlockInfo block : blocksToReconstruct.get(priority)) {
BlockReconstructionWork rw = scheduleReconstruction(block,
priority);
if (rw != null) {
reconWork.add(rw);
}
}
}
}
} finally {
namesystem.writeUnlock();
}
// Step 2: choose target nodes for each reconstruction task final Set<Node> excludedNodes = new HashSet<>();
for(BlockReconstructionWork rw : reconWork){
// Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. excludedNodes.clear();
for (DatanodeDescriptor dn : rw.getContainingNodes()) {
excludedNodes.add(dn);
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK final BlockPlacementPolicy placementPolicy =
placementPolicies.getPolicy(rw.getBlock().getBlockType());
rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
}
// Step 3: add tasks to the DN namesystem.writeLock();
try {
for(BlockReconstructionWork rw : reconWork){
final DatanodeStorageInfo[] targets = rw.getTargets();
if(targets == null || targets.length == 0){
rw.resetTargets();
continue;
}
synchronized (neededReconstruction) {
if (validateReconstructionWork(rw)) {
scheduledWork++;
}
}
}
} finally {
namesystem.writeUnlock();
}
// 省略debug return scheduledWork;
}
在對每個block創建新的轉移任務時,需要選擇一個當前已有副本的datanode和需要被復制到的datanode。對於已有副本的數據節點的選擇,默認會先從沒有寫開銷的DECOMMISSION_INPROGRESS狀態的datanode中選,否則隨機選一個沒有達到副本限制的節點(每個節點會記錄將要被復制的副本數,參數dfs.namenode.replication.max-streams用於限制每個節點上的副本數,默認為2),如果還不存在則隨機選擇其它符合要求的節點。對於被復制的數據節點,如同文件的第一次上傳過程,調用了對應的副本放置策略進行選擇。
/** * Parse the data-nodes the block belongs to and choose a certain number * from them to be the recovery sources. * * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes * since the former do not have write traffic and hence are less busy. * We do not use already decommissioned nodes as a source, unless there is * no other choice. * Otherwise we randomly choose nodes among those that did not reach their * replication limits. However, if the recovery work is of the highest * priority and all nodes have reached their replication limits, we will * randomly choose the desired number of nodes despite the replication limit. * * In addition form a list of all nodes containing the block * and calculate its replication numbers. * * @return the array of DatanodeDescriptor of the chosen nodes from which to * recover the given block */
@VisibleForTesting
DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas,
List<Byte> liveBlockIndices, int priority) // ...
數據節點的執行恢復的邏輯
前面向blockManager中獲取的DatanodeDescriptor類加入了block需要創建副本的任務。DatanodeManager在通過RPC發送心跳消息給namenode時,namenode會在處理心跳時將副本復制任務轉變為對應的命令返回給datanode。
// datanode發送心跳的rpc函數 @Override // DatanodeProtocol public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
slowPeers, slowDisks);
}
// FSNameSystem的handleHearbeat函數,通過blockManager調用DatanodeManager處理心跳 {
// ... DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
slowPeers, slowDisks);
// ... }
// DatanodeManager中handleHeartbeat取出先前存儲的任務,並轉為BlockCommander。 {
// ... List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
numReplicationTasks);
if (pendingList != null && !pendingList.isEmpty()) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}