HDFS會周期性的檢查是否有文件缺少副本,並觸發副本復制邏輯使之達到配置的副本數,
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
具體實現是在BlockManager中啟動線程ReplicationMonitor完成:
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager
/** * Periodically calls computeReplicationWork(). */ private class ReplicationMonitor implements Runnable { @Override public void run() { while (namesystem.isRunning()) { try { // Process replication work only when active NN is out of safe mode. if (namesystem.isPopulatingReplQueues()) { computeDatanodeWork(); processPendingReplications(); } Thread.sleep(replicationRecheckInterval); } catch (Throwable t) {
注釋:sleep間隔replicationRecheckInterval取配置dfs.namenode.replication.interval,默認為3,即3s
/** * Compute block replication and block invalidation work that can be scheduled * on data-nodes. The datanode will be informed of this work at the next * heartbeat. * * @return number of blocks scheduled for replication or removal. */ int computeDatanodeWork() { // Blocks should not be replicated or removed if in safe mode. // It's OK to check safe mode here w/o holding lock, in the worst // case extra replications will be scheduled, and these will get // fixed up later. if (namesystem.isInSafeMode()) { return 0; } final int numlive = heartbeatManager.getLiveDatanodeCount(); final int blocksToProcess = numlive * this.blocksReplWorkMultiplier; final int nodesToProcess = (int) Math.ceil(numlive * this.blocksInvalidateWorkPct); int workFound = this.computeReplicationWork(blocksToProcess);
注釋:倍數blocksReplWorkMultiplier取配置dfs.namenode.replication.work.multiplier.per.iteration,默認為2,即每次處理datanode數量*2個block;
/** * Scan blocks in {@link #neededReplications} and assign replication * work to data-nodes they belong to. * * The number of process blocks equals either twice the number of live * data-nodes or the number of under-replicated blocks whichever is less. * * @return number of blocks scheduled for replication during this iteration. */ int computeReplicationWork(int blocksToProcess) { List<List<Block>> blocksToReplicate = null; namesystem.writeLock(); try { // Choose the blocks to be replicated blocksToReplicate = neededReplications .chooseUnderReplicatedBlocks(blocksToProcess); } finally { namesystem.writeUnlock(); } return computeReplicationWorkForBlocks(blocksToReplicate); } int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) { ... // Add block to the to be replicated list rw.srcNode.addBlockToBeReplicated(block, targets); scheduledWork++;
注釋:具體的處理過程是將待復制block添加到對應的原始datanode上;
下面看DatanodeManager代碼:
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, final String blockPoolId, long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes ) throws IOException { ... final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(); //check pending replication List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand( maxTransfers); if (pendingList != null) { cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)); }
注釋:然后在DatanodeManager中處理心跳時將復制block信息發給對應的原始datanode;其中maxTransfer取值為
final int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress;
getMaxReplicationStreams取配置dfs.namenode.replication.max-streams,默認是2,即一個datanode同時最多有2個block在復制;