HDFS中的File由Block組成,一個File包含一個或多個Block,當創建File時會創建一個Block,然后根據配置的副本數量(默認是3)申請3個Datanode來存放這個Block;
通過hdfs fsck命令可以查看一個文件具體的Block、Datanode、Rack信息,例如:
hdfs fsck /tmp/test.sql -files -blocks -locations -racks
Connecting to namenode via http://name_node:50070
FSCK started by hadoop (auth:SIMPLE) from /client for path /tmp/test.sql at Thu Dec 13 15:44:12 CST 2018
/tmp/test.sql 16 bytes, 1 block(s): OK
0. BP-436366437-name_node-1493982655699:blk_1449692331_378721485 len=16 repl=3 [/DEFAULT/server111:50010, /DEFAULT/server121:50010, /DEFAULT/server43:50010]Status: HEALTHY
Total size: 16 B
Total dirs: 0
Total files: 1
Total symlinks: 0
Total blocks (validated): 1 (avg. block size 16 B)
Minimally replicated blocks: 1 (100.0 %)
Over-replicated blocks: 0 (0.0 %)
Under-replicated blocks: 0 (0.0 %)
Mis-replicated blocks: 0 (0.0 %)
Default replication factor: 3
Average block replication: 3.0
Corrupt blocks: 0
Missing replicas: 0 (0.0 %)
Number of data-nodes: 193
Number of racks: 1
FSCK ended at Thu Dec 13 15:44:12 CST 2018 in 1 milliseconds
The filesystem under path '/tmp/test.sql' is HEALTHY
那3個Datanode是如何選擇出來的?有一個優先級:
1 當前機架(相對hdfs client而言)
2 遠程機架(相對hdfs client而言)
3 另一機架
4 全部隨機
然后每個機架能選擇幾個Datanode(即maxNodesPerRack)有一個計算公式,詳見代碼
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer
private int findNewDatanode(final DatanodeInfo[] original
) throws IOException { if (nodes.length != original.length + 1) { throw new IOException( new StringBuilder() .append("Failed to replace a bad datanode on the existing pipeline ") .append("due to no more good datanodes being available to try. ") .append("(Nodes: current=").append(Arrays.asList(nodes)) .append(", original=").append(Arrays.asList(original)).append("). ") .append("The current failed datanode replacement policy is ") .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ") .append("a client may configure this via '") .append(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY) .append("' in its configuration.") .toString()); }
注釋:當沒有找到新的datanode時會報異常,報錯如下:
Caused by: java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[server82:50010], original=[server.82:50010]).
The current failed datanode replacement policy is ALWAYS, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
private void addDatanode2ExistingPipeline() throws IOException {
...
final DatanodeInfo[] original = nodes; final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode( src, fileId, block, nodes, storageIDs, failed.toArray(new DatanodeInfo[failed.size()]), 1, dfsClient.clientName); setPipeline(lb); //find the new datanode final int d = findNewDatanode(original);
注釋:會調用getAdditionalDatanode方法來獲取1個新的datanode,此處略去很多調用堆棧
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosenStorage, boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize, final BlockStoragePolicy storagePolicy) { ... int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas); numOfReplicas = result[0]; int maxNodesPerRack = result[1]; ... final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty());
注釋:此處maxNodesPerRack表示每個機架最多只能分配幾個datanode
private Node chooseTarget(int numOfReplicas,
Node writer,
final Set<Node> excludedNodes, final long blocksize, final int maxNodesPerRack, final List<DatanodeStorageInfo> results, final boolean avoidStaleNodes, final BlockStoragePolicy storagePolicy, final EnumSet<StorageType> unavailableStorages, final boolean newBlock) { ... if (numOfResults <= 1) { chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); if (--numOfReplicas == 0) { return writer; } }
注釋:此處會嘗試在遠程機架(即與已有的datanode不同的機架)獲取一個新的datanode
protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine,
Set<Node> excludedNodes, long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { ... chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(), excludedNodes, blocksize, maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
注釋:此處會在所有可選的datanode中隨機選擇一個
protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
String scope,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { ... int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes( scope, excludedNodes); ... if (numOfReplicas>0) { String detail = enableDebugLogging; if (LOG.isDebugEnabled()) { if (badTarget && builder != null) { detail = builder.toString(); builder.setLength(0); } else { detail = ""; } } throw new NotEnoughReplicasException(detail); }
注釋:如果由於一些原因(比如節點磁盤滿或者下線),導致numOfAvailableNodes計算結果為0,會拋出NotEnoughReplicasException
其中maxNodesPerRack計算邏輯如下:
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault
/**
* Calculate the maximum number of replicas to allocate per rack. It also
* limits the total number of replicas to the total number of nodes in the
* cluster. Caller should adjust the replica count to the return value.
*
* @param numOfChosen The number of already chosen nodes.
* @param numOfReplicas The number of additional nodes to allocate.
* @return integer array. Index 0: The number of nodes allowed to allocate
* in addition to already chosen nodes.
* Index 1: The maximum allowed number of nodes per rack. This
* is independent of the number of chosen nodes, as it is calculated
* using the target number of replicas.
*/
private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
int clusterSize = clusterMap.getNumOfLeaves(); int totalNumOfReplicas = numOfChosen + numOfReplicas; if (totalNumOfReplicas > clusterSize) { numOfReplicas -= (totalNumOfReplicas-clusterSize); totalNumOfReplicas = clusterSize; } // No calculation needed when there is only one rack or picking one node. int numOfRacks = clusterMap.getNumOfRacks(); if (numOfRacks == 1 || totalNumOfReplicas <= 1) { return new int[] {numOfReplicas, totalNumOfReplicas}; } int maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2; // At this point, there are more than one racks and more than one replicas // to store. Avoid all replicas being in the same rack. // // maxNodesPerRack has the following properties at this stage. // 1) maxNodesPerRack >= 2 // 2) (maxNodesPerRack-1) * numOfRacks > totalNumOfReplicas // when numOfRacks > 1 // // Thus, the following adjustment will still result in a value that forces // multi-rack allocation and gives enough number of total nodes. if (maxNodesPerRack == totalNumOfReplicas) { maxNodesPerRack--; } return new int[] {numOfReplicas, maxNodesPerRack}; }
注釋:
int maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2;
if (maxNodesPerRack == totalNumOfReplicas) {
maxNodesPerRack--;
}