- HDFS集群分為兩大角色:NameNode、DataNode
- NameNode負責管理整個文件系統的元數據
- DataNode 負責管理用戶的文件數據塊
- 文件會按照固定的大小(blocksize)切成若干塊后分布式存儲在若干台datanode上
- 每一個文件塊可以有多個副本,並存放在不同的datanode上
- Datanode會定期向Namenode匯報自身所保存的文件block信息,而namenode則會負責保持文件的副本數量
- HDFS的內部工作機制對客戶端保持透明,客戶端請求訪問HDFS都是通過向namenode申請來進行
HDFS寫文件過程分析
HDFS是一個分布式文件系統,在HDFS上寫文件的過程與我們平時使用的單機文件系統非常不同,從宏觀上來看,在HDFS文件系統上創建並寫一個文件,流程如下圖(來自《Hadoop:The Definitive Guide》一書)所示:
具體過程描述如下:
- Client調用DistributedFileSystem對象的create方法,創建一個文件輸出流(FSDataOutputStream)對象
- 通過DistributedFileSystem對象與Hadoop集群的NameNode進行一次RPC遠程調用,在HDFS的Namespace中創建一個文件條目(Entry),該條目沒有任何的Block
- 通過FSDataOutputStream對象,向DataNode寫入數據,數據首先被寫入FSDataOutputStream對象內部的Buffer中,然后數據被分割成一個個Packet數據包
- 以Packet最小單位,基於Socket連接發送到按特定算法選擇的HDFS集群中一組DataNode(正常是3個,可能大於等於1)中的一個節點上,在這組DataNode組成的Pipeline上依次傳輸Packet
- 這組DataNode組成的Pipeline反方向上,發送ack,最終由Pipeline中第一個DataNode節點將Pipeline ack發送給Client
- 完成向文件寫入數據,Client在文件輸出流(FSDataOutputStream)對象上調用close方法,關閉流
- 調用DistributedFileSystem對象的complete方法,通知NameNode文件寫入成功
更詳細的流程:
- client發起文件上傳請求,通過RPC與NameNode建立連接,NameNode檢查目標文件是否已經存在,父目錄是否存在,並檢查用戶是否有相應的權限,若檢查通過,會為該文件創建一個新的記錄,否則的話文件創建失敗,客戶端得到異常信息
- client通過請求NameNode,第一個block應該傳輸到哪些DataNode服務器上
- NameNode根據配置文件中指定的備份(replica)數量及機架感知原理進行文件分配,返回可用的DataNode的地址 以三台DataNode為例:A B C。注: Hadoop在設計時考慮到數據的安全與高效,數據文件默認在HDFS上存放三份,存儲策略為:第一個備份放在客戶端相同的datanode上(若客戶端在集群外運行,就隨機選取一個datanode來存放第一個replica),第二個replica放在與第一個replica不同機架的一個隨機datanode上,第三個replica放在與第二個replica相同機架的隨機datanode上,如果replica數大於三,則隨后的replica在集群中隨機存放,Hadoop會盡量避免過多的replica存放在同一個機架上.選取replica存放在同一個機架上.(Hadoop 1.x以后允許replica是可插拔的,意思是說可以定制自己需要的replica分配策略)
- client請求3台的DataNode的一台A上傳數據,(本質是一個RPC調用,建立pipeline),A收到請求會繼續調用B,然后B調用C,將整個pipeline建立完成后,逐級返回client
- client開始往A上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet為單位(默認 64K),A收到一個packet就會傳給B,B傳遞給C;A每傳一個packet會放入一個應答隊列等待應答。注: 如果某個datanode在寫數據的時候宕掉了下面這些對用戶透明的步驟會被執行:數據被分割成一個個packet數據包在pipeline上一次傳輸,在pipeline反方向上,逐個發送ack(命令正確應答),最終由pipeline中第一個DataNode節點A將pipeline ack發送給client
- 管道線關閉,所有確認隊列上的數據會被挪到數據隊列的首部重新發送,這樣也就確保管道線中宕掉的datanode下流的datanode不會因為宕掉的datanode而丟失數據包
- 在還在正常運行datanode上的當前block上做一個標志,這樣當宕掉的datanode重新啟動以后namenode就會知道該datanode上哪個block是剛才宕機殘留下的局部損壞block,從而把他刪除掉
- 已經宕掉的datanode從管道線中被移除,未寫完的block的其他數據繼續唄寫入到其他兩個還在正常運行的datanode中,namenode知道這個block還處在under-replicated狀態(即備份數不足的狀態)下,然后它會安排一個新的replica從而達到要求的備份數,后續的block寫入方法同前面正常時候一樣
- 有可能管道線中的多個datanode宕掉(一般這種情況很少),但只要dfs.relication.min(默認值為1)個replica被創建,我么就認為該創建成功了,剩余的relica會在以后異步創建以達到指定的replica數
- 當一個block傳輸完成后,client再次發送請求NameNode上傳第二個block到服務器
機架感知(副本節點選擇):
- 第一個副本在client所處的節點上。如果客戶端在集群外,隨機選一個
- 第二個副本和第一個副本位於相同機架,隨機節點。
- 第三個副本位於不同機架,隨機節點
下面代碼使用Hadoop的API來實現向HDFS的文件寫入數據,同樣也包括創建一個文件和寫數據兩個主要過程,代碼如下所示:
static String[] contents = new String[] { "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc", "dddddddddddddddddddddddddddddddd", "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", }; public static void main(String[] args) { String file = "hdfs://h1:8020/data/test/test.log"; Path path = new Path(file); Configuration conf = new Configuration(); FileSystem fs = null; FSDataOutputStream output = null; try { fs = path.getFileSystem(conf); output = fs.create(path); // 創建文件 for(String line : contents) { // 寫入數據 output.write(line.getBytes("UTF-8")); output.flush(); } } catch (IOException e) { e.printStackTrace(); } finally { try { output.close(); } catch (IOException e) { e.printStackTrace(); } } }
結合上面的示例代碼,我們先從fs.create(path);開始,可以看到FileSystem的實現DistributedFileSystem中給出了最終返回FSDataOutputStream對象的抽象邏輯,代碼如下所示:
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { statistics.incrementWriteOps(1); return new FSDataOutputStream (dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics); }
上面,DFSClient dfs的create方法中創建了一個OutputStream對象,在DFSClient的create方法:
public OutputStream create(String src, FsPermission permission, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize ) throws IOException { ... ... }
final DFSOutputStream result = new DFSOutputStream(src, masked, overwrite, createParent, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512));
下面,我們從DFSOutputStream類開始,說明其內部實現原理。

- 創建Packet,Client寫數據時,會將字節流數據緩存到內部的緩沖區中,當長度滿足一個Chunk大小(512B)時,便會創建一個Packet對象,然后向該Packet對象中寫Chunk Checksum校驗和數據,以及實際數據塊Chunk Data,校驗和數據是基於實際數據塊計算得到的。每次滿足一個Chunk大小時,都會向Packet中寫上述數據內容,直到達到一個Packet對象大小(64K),就會將該Packet對象放入到dataQueue隊列中,等待DataStreamer線程取出並發送到DataNode節點。
- 發送Packet,DataStreamer線程從dataQueue隊列中取出Packet對象,放到ackQueue隊列中,然后向DataNode節點發送這個Packet對象所對應的數據
- 接收ack,發送一個Packet數據包以后,會有一個用來接收ack的ResponseProcessor線程,如果收到成功的ack,則表示一個Packet發送成功。如果成功,則ResponseProcessor線程會將ackQueue隊列中對應的Packet刪除
DFSOutputStream(String src, FsPermission masked, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, int bytesPerChecksum) throws IOException { this(src, blockSize, progress, bytesPerChecksum, replication); computePacketChunkSize(writePacketSize, bytesPerChecksum); // 默認 writePacketSize=64*1024(即64K),bytesPerChecksum=512(沒512個字節計算一個校驗和), try { if (createParent) { // createParent為true表示,如果待創建的文件的父級目錄不存在,則自動創建 namenode.create(src, masked, clientName, overwrite, replication, blockSize); } else { namenode.create(src, masked, clientName, overwrite, false, replication, blockSize); } } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileAlreadyExistsException.class, FileNotFoundException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); } streamer.start(); // 啟動一個DataStreamer線程,用來將寫入的字節流打包成packet,然后發送到對應的Datanode節點上 } 上面computePacketChunkSize方法計算了一個packet的相關參數,我們結合代碼來查看,如下所示: int chunkSize = csize + checksum.getChecksumSize(); int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1); packetSize = n + chunkSize*chunksPerPacket;
我們用默認的參數值替換上面的參數,得到:
int chunkSize = 512 + 4; int n = 21 + 4; chunksPerPacket = Math.max((64*1024 - 25 + 516-1)/516, 1); // 127 packetSize = 25 + 516*127;
上面對應的參數,說明如下表所示:
參數名稱 | 參數值 | 參數含義 |
chunkSize | 512+4=516 | 每個chunk的字節數(數據+校驗和) |
csize | 512 | 每個chunk數據的字節數 |
psize | 64*1024 | 每個packet的最大字節數(不包含header) |
DataNode.PKT_HEADER_LEN | 21 | 每個packet的header的字節數 |
chunksPerPacket | 127 | 組成每個packet的chunk的個數 |
packetSize | 25+516*127=65557 | 每個packet的字節數(一個header+一組chunk) |
在計算好一個packet相關的參數以后,調用create方法與Namenode進行RPC請求,請求創建文件:
if (createParent) { // createParent為true表示,如果待創建的文件的父級目錄不存在,則自動創建 namenode.create(src, masked, clientName, overwrite, replication, blockSize); } else { namenode.create(src, masked, clientName, overwrite, false, replication, blockSize); }
遠程調用上面方法,會在FSNamesystem中創建對應的文件路徑,並初始化與該創建的文件相關的一些信息,如租約(向Datanode節點寫數據的憑據)。文件在FSNamesystem中創建成功,就要初始化並啟動一個DataStreamer線程,用來向Datanode寫數據,后面我們詳細說明具體處理邏輯。
Packet結構與定義

字段名稱 | 字段類型 | 字段長度 | 字段含義 |
pktLen | int | 4 | 4 + dataLen + checksumLen |
offsetInBlock | long | 8 | Packet在Block中偏移量 |
seqNo | long | 8 | Packet序列號,在同一個Block唯一 |
lastPacketInBlock | boolean | 1 | 是否是一個Block的最后一個Packet |
dataLen | int | 4 | dataPos – dataStart,不包含Header和Checksum的長度 |
ByteBuffer buffer; // only one of buf and buffer is non-null byte[] buf; long seqno; // sequencenumber of buffer in block long offsetInBlock; // 該packet在block中的偏移量 boolean lastPacketInBlock; // is this the last packet in block? int numChunks; // number of chunks currently in packet int maxChunks; // 一個packet中包含的chunk的個數 int dataStart; int dataPos; int checksumStart; int checksumPos;
Packet類有一個默認的沒有參數的構造方法,它是用來做heatbeat的,如下所示:
Packet() { this.lastPacketInBlock = false; this.numChunks = 0; this.offsetInBlock = 0; this.seqno = HEART_BEAT_SEQNO; // 值為-1 buffer = null; int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // 21+4=25 buf = new byte[packetSize]; checksumStart = dataStart = packetSize; checksumPos = checksumStart; dataPos = dataStart; maxChunks = 0; }
通過代碼可以看到,一個heatbeat的內容,實際上只有一個長度為25字節的header數據。通過this.seqno = HEART_BEAT_SEQNO;的值可以判斷一個packet是否是heatbeat包,如果seqno為-1表示這是一個heatbeat包。
Client發送Packet數據

字段名稱 | 字段類型 | 字段長度 | 字段含義 |
Transfer Version | short | 2 | Client與DataNode之間數據傳輸版本號,由常量DataTransferProtocol.DATA_TRANSFER_VERSION定義,值為17 |
OP | int | 4 | 操作類型,由常量DataTransferProtocol.OP_WRITE_BLOCK定義,值為80 |
blkId | long | 8 | Block的ID值,由NameNode分配 |
GS | long | 8 | 時間戳(Generation Stamp),NameNode分配blkId的時候生成的時間戳 |
DNCnt | int | 4 | DataNode復制Pipeline中DataNode節點的數量 |
Recovery Flag | boolean | 1 | Recover標志 |
Client | Text | Client主機的名稱,在使用Text進行序列化的時候,實際包含長度len與主機名稱字符串ClientHost | |
srcNode | boolean | 1 | 是否發送src node的信息,默認值為false,不發送src node的信息 |
nonSrcDNCnt | int | 4 | 由Client寫的該Header數據,該數不包含Pipeline中第一個節點(即為DNCnt-1) |
DN2 | DatanodeInfo | DataNode信息,包括StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState | |
DN3 | DatanodeInfo | DataNode信息,包括StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState | |
Access Token | Token | 訪問令牌信息,包括IdentifierLength、Identifier、PwdLength、Pwd、KindLength、Kind、ServiceLength、Service | |
CheckSum Header | DataChecksum | 1+4 | 校驗和Header信息,包括type、bytesPerChecksum |
Header數據包發送成功,Client會收到一個成功響應碼(DataTransferProtocol.OP_STATUS_SUCCESS = 0),接着將Packet數據發送到Pipeline中第一個DataNode上,如下所示:
Packet one = null; one = dataQueue.getFirst(); // regular data packet ByteBuffer buf = one.getBuffer(); // write out data to remote datanode blockStream.write(buf.array(), buf.position(), buf.remaining()); if (one.lastPacketInBlock) { // 如果是Block中的最后一個Packet,還要寫入一個0標識該Block已經寫入完成 blockStream.writeInt(0); // indicate end-of-block }
if (!success) { LOG.info("Abandoning " + block); namenode.abandonBlock(block, src, clientName); if (errorIndex < nodes.length) { LOG.info("Excluding datanode " + nodes[errorIndex]); excludedNodes.add(nodes[errorIndex]); } // Connection failed. Let's wait a little bit and retry retry = true; }


Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong()); LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress); int pipelineSize = in.readInt(); // num of datanodes in entire pipeline boolean isRecovery = in.readBoolean(); // is this part of recovery? String client = Text.readString(in); // working on behalf of this client boolean hasSrcDataNode = in.readBoolean(); // is src node info present if (hasSrcDataNode) { srcDataNode = new DatanodeInfo(); srcDataNode.readFields(in); } int numTargets = in.readInt(); if (numTargets < 0) { throw new IOException("Mislabelled incoming datastream."); } DatanodeInfo targets[] = new DatanodeInfo[numTargets]; for (int i = 0; i < targets.length; i++) { DatanodeInfo tmp = new DatanodeInfo(); tmp.readFields(in); targets[i] = tmp; } Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>(); accessToken.readFields(in);

- 寫文件開始時創建文件:Client調用create在NameNode節點的Namespace中創建一個標識該文件的條目
- 在Client連接Pipeline中第一個DataNode節點之前,Client調用addBlock分配一個Block(blkId+DataNode列表+租約)
- 如果與Pipeline中第一個DataNode節點連接失敗,Client調用abandonBlock放棄一個已經分配的Block
- 一個Block已經寫入到DataNode節點磁盤,Client調用fsync讓NameNode持久化Block的位置信息數據
- 文件寫完以后,Client調用complete方法通知NameNode寫入文件成功
- DataNode節點接收到並成功持久化一個Block的數據后,DataNode調用blockReceived方法通知NameNode已經接收到Block