HDFS寫文件過程分析


轉自http://shiyanjun.cn/archives/942.html

HDFS是一個分布式文件系統,在HDFS上寫文件的過程與我們平時使用的單機文件系統非常不同,從宏觀上來看,在HDFS文件系統上創建並寫一個文件,流程如下圖(來自《Hadoop:The Definitive Guide》一書)所示:
hdfs-write-flow
具體過程描述如下:

  1. Client調用DistributedFileSystem對象的create方法,創建一個文件輸出流(FSDataOutputStream)對象
  2. 通過DistributedFileSystem對象與Hadoop集群的NameNode進行一次RPC遠程調用,在HDFS的Namespace中創建一個文件條目(Entry),該條目沒有任何的Block
  3. 通過FSDataOutputStream對象,向DataNode寫入數據,數據首先被寫入FSDataOutputStream對象內部的Buffer中,然后數據被分割成一個個Packet數據包
  4. 以Packet最小單位,基於Socket連接發送到按特定算法選擇的HDFS集群中一組DataNode(正常是3個,可能大於等於1)中的一個節點上,在這組DataNode組成的Pipeline上依次傳輸Packet
  5. 這組DataNode組成的Pipeline反方向上,發送ack,最終由Pipeline中第一個DataNode節點將Pipeline ack發送給Client
  6. 完成向文件寫入數據,Client在文件輸出流(FSDataOutputStream)對象上調用close方法,關閉流
  7. 調用DistributedFileSystem對象的complete方法,通知NameNode文件寫入成功

下面代碼使用Hadoop的API來實現向HDFS的文件寫入數據,同樣也包括創建一個文件和寫數據兩個主要過程,代碼如下所示:

01 static String[] contents = new String[] {
02      "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
03      "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
04      "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
05      "dddddddddddddddddddddddddddddddd",
06      "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
07 };
08  
09 public static void main(String[] args) {
10      String file = "hdfs://h1:8020/data/test/test.log";
11    Path path = new Path(file);
12    Configuration conf = new Configuration();
13    FileSystem fs = null;
14    FSDataOutputStream output = null;
15    try {
16           fs = path.getFileSystem(conf);
17           output = fs.create(path); // 創建文件
18           for(String line : contents) { // 寫入數據
19                output.write(line.getBytes("UTF-8"));
20                output.flush();
21           }
22      catch (IOException e) {
23           e.printStackTrace();
24      finally {
25           try {
26                output.close();
27           catch (IOException e) {
28                e.printStackTrace();
29           }
30      }
31 }

結合上面的示例代碼,我們先從fs.create(path);開始,可以看到FileSystem的實現DistributedFileSystem中給出了最終返回FSDataOutputStream對象的抽象邏輯,代碼如下所示:

1 public FSDataOutputStream create(Path f, FsPermission permission,
2   boolean overwrite,
3   int bufferSize, short replication, long blockSize,
4   Progressable progress) throws IOException {
5  
6   statistics.incrementWriteOps(1);
7   return new FSDataOutputStream
8      (dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics);
9 }

上面,DFSClient dfs的create方法中創建了一個OutputStream對象,在DFSClient的create方法:

01   public OutputStream create(String src,
02                              FsPermission permission,
03                              boolean overwrite,
04                              boolean createParent,
05                              short replication,
06                              long blockSize,
07                              Progressable progress,
08                              int buffersize
09                              throws IOException {
10    ... ...
11 }

創建了一個DFSOutputStream對象,如下所示:

1 final DFSOutputStream result = new DFSOutputStream(src, masked,
2     overwrite, createParent, replication, blockSize, progress, buffersize,
3     conf.getInt("io.bytes.per.checksum"512));

下面,我們從DFSOutputStream類開始,說明其內部實現原理。

DFSOutputStream內部原理

打開一個DFSOutputStream流,Client會寫數據到流內部的一個緩沖區中,然后數據被分解成多個Packet,每個Packet大小為64k字節,每個Packet又由一組chunk和這組chunk對應的checksum數據組成,默認chunk大小為512字節,每個checksum是對512字節數據計算的校驗和數據。
當Client寫入的字節流數據達到一個Packet的長度,這個Packet會被構建出來,然后會被放到隊列dataQueue中,接着DataStreamer線程會不斷地從dataQueue隊列中取出Packet,發送到復制Pipeline中的第一個DataNode上,並將該Packet從dataQueue隊列中移到ackQueue隊列中。ResponseProcessor線程接收從Datanode發送過來的ack,如果是一個成功的ack,表示復制Pipeline中的所有Datanode都已經接收到這個Packet,ResponseProcessor線程將packet從隊列ackQueue中刪除。
在發送過程中,如果發生錯誤,所有未完成的Packet都會從ackQueue隊列中移除掉,然后重新創建一個新的Pipeline,排除掉出錯的那些DataNode節點,接着DataStreamer線程繼續從dataQueue隊列中發送Packet。
下面是DFSOutputStream的結構及其原理,如圖所示:
hdfs-write-internal
我們從下面3個方面來描述內部流程:

  • 創建Packet

Client寫數據時,會將字節流數據緩存到內部的緩沖區中,當長度滿足一個Chunk大小(512B)時,便會創建一個Packet對象,然后向該Packet對象中寫Chunk Checksum校驗和數據,以及實際數據塊Chunk Data,校驗和數據是基於實際數據塊計算得到的。每次滿足一個Chunk大小時,都會向Packet中寫上述數據內容,直到達到一個Packet對象大小(64K),就會將該Packet對象放入到dataQueue隊列中,等待DataStreamer線程取出並發送到DataNode節點。

  • 發送Packet

DataStreamer線程從dataQueue隊列中取出Packet對象,放到ackQueue隊列中,然后向DataNode節點發送這個Packet對象所對應的數據。

  • 接收ack

發送一個Packet數據包以后,會有一個用來接收ack的ResponseProcessor線程,如果收到成功的ack,則表示一個Packet發送成功。如果成功,則ResponseProcessor線程會將ackQueue隊列中對應的Packet刪除。

DFSOutputStream初始化

首先看一下,DFSOutputStream的初始化過程,構造方法如下所示:

01     DFSOutputStream(String src, FsPermission masked, boolean overwrite,
02         boolean createParent, short replication, long blockSize, Progressable progress,
03         int buffersize, int bytesPerChecksum) throws IOException {
04       this(src, blockSize, progress, bytesPerChecksum, replication);
05  
06       computePacketChunkSize(writePacketSize, bytesPerChecksum); // 默認 writePacketSize=64*1024(即64K),bytesPerChecksum=512(沒512個字節計算一個校驗和),
07  
08       try {
09         if (createParent) { // createParent為true表示,如果待創建的文件的父級目錄不存在,則自動創建
10           namenode.create(src, masked, clientName, overwrite, replication, blockSize);
11         else {
12           namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
13         }
14       catch(RemoteException re) {
15         throw re.unwrapRemoteException(AccessControlException.class,
16                                        FileAlreadyExistsException.class,
17                                        FileNotFoundException.class,
18                                        NSQuotaExceededException.class,
19                                        DSQuotaExceededException.class);
20       }
21       streamer.start(); // 啟動一個DataStreamer線程,用來將寫入的字節流打包成packet,然后發送到對應的Datanode節點上
22     }
23 上面computePacketChunkSize方法計算了一個packet的相關參數,我們結合代碼來查看,如下所示:
24       int chunkSize = csize + checksum.getChecksumSize();
25       int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
26       chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
27       packetSize = n + chunkSize*chunksPerPacket;

我們用默認的參數值替換上面的參數,得到:

1 int chunkSize = 512 4;
2 int n = 21 4;
3 chunksPerPacket = Math.max((64*1024 25 516-1)/5161);  // 127
4 packetSize = 25 516*127;

上面對應的參數,說明如下表所示:

參數名稱 參數值 參數含義
chunkSize 512+4=516 每個chunk的字節數(數據+校驗和)
csize 512 每個chunk數據的字節數
psize 64*1024 每個packet的最大字節數(不包含header)
DataNode.PKT_HEADER_LEN 21 每個packet的header的字節數
chunksPerPacket 127 組成每個packet的chunk的個數
packetSize 25+516*127=65557 每個packet的字節數(一個header+一組chunk)

在計算好一個packet相關的參數以后,調用create方法與Namenode進行RPC請求,請求創建文件:

1 if (createParent) { // createParent為true表示,如果待創建的文件的父級目錄不存在,則自動創建
2   namenode.create(src, masked, clientName, overwrite, replication, blockSize);
3 else {
4   namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
5 }

遠程調用上面方法,會在FSNamesystem中創建對應的文件路徑,並初始化與該創建的文件相關的一些信息,如租約(向Datanode節點寫數據的憑據)。文件在FSNamesystem中創建成功,就要初始化並啟動一個DataStreamer線程,用來向Datanode寫數據,后面我們詳細說明具體處理邏輯。

Packet結構與定義

Client向HDFS寫數據,數據會被組裝成Packet,然后發送到Datanode節點。Packet分為兩類,一類是實際數據包,另一類是heatbeat包。一個Packet數據包的組成結構,如圖所示:
hdfs-write-packet-structure
上圖中,一個Packet是由Header和Data兩部分組成,其中Header部分包含了一個Packet的概要屬性信息,如下表所示:

字段名稱 字段類型 字段長度 字段含義
pktLen int 4 4 + dataLen + checksumLen
offsetInBlock long 8 Packet在Block中偏移量
seqNo long 8 Packet序列號,在同一個Block唯一
lastPacketInBlock boolean 1 是否是一個Block的最后一個Packet
dataLen int 4 dataPos – dataStart,不包含Header和Checksum的長度

Data部分是一個Packet的實際數據部分,主要包括一個4字節校驗和(Checksum)與一個Chunk部分,Chunk部分最大為512字節。
在構建一個Packet的過程中,首先將字節流數據寫入一個buffer緩沖區中,也就是從偏移量為25的位置(checksumStart)開始寫Packet數據的Chunk Checksum部分,從偏移量為533的位置(dataStart)開始寫Packet數據的Chunk Data部分,直到一個Packet創建完成為止。如果一個Packet的大小未能達到最大長度,也就是上圖對應的緩沖區中,Chunk Checksum與Chunk Data之間還保留了一段未被寫過的緩沖區位置,這種情況說明,已經在寫一個文件的最后一個Block的最后一個Packet。在發送這個Packet之前,會檢查Chunksum與Chunk Data之間的緩沖區是否為空白緩沖區(gap),如果有則將Chunk Data部分向前移動,使得Chunk Data 1與Chunk Checksum N相鄰,然后才會被發送到DataNode節點。
我們看一下Packet對應的Packet類定義,定義了如下一些字段:

01 ByteBuffer buffer;           // only one of buf and buffer is non-null
02 byte[]  buf;
03 long    seqno;               // sequencenumber of buffer in block
04 long    offsetInBlock;       // 該packet在block中的偏移量
05 boolean lastPacketInBlock;   // is this the last packet in block?
06 int     numChunks;           // number of chunks currently in packet
07 int     maxChunks;           // 一個packet中包含的chunk的個數
08 int     dataStart;
09 int     dataPos;
10 int     checksumStart;
11 int     checksumPos;

Packet類有一個默認的沒有參數的構造方法,它是用來做heatbeat的,如下所示:

01 Packet() {
02   this.lastPacketInBlock = false;
03   this.numChunks = 0;
04   this.offsetInBlock = 0;
05   this.seqno = HEART_BEAT_SEQNO; // 值為-1
06  
07   buffer = null;
08   int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // 21+4=25
09   buf = new byte[packetSize];
10  
11   checksumStart = dataStart = packetSize;
12   checksumPos = checksumStart;
13   dataPos = dataStart;
14   maxChunks = 0;
15 }

通過代碼可以看到,一個heatbeat的內容,實際上只有一個長度為25字節的header數據。通過this.seqno = HEART_BEAT_SEQNO;的值可以判斷一個packet是否是heatbeat包,如果seqno為-1表示這是一個heatbeat包。

Client發送Packet數據

可以DFSClient類中看到,發送一個Packet之前,首先需要向選定的DataNode發送一個Header數據包,表明要向DataNode寫數據,該Header的數據結構,如圖所示:
hdfs-write-transfer-header
上圖顯示的是Client發送Packet到第一個DataNode節點的Header數據結構,主要包括待發送的Packet所在的Block(先向NameNode分配Block ID等信息)的相關信息、Pipeline中另外2個DataNode的信息、訪問令牌(Access Token)和校驗和信息,Header中各個字段及其類型,詳見下表:

字段名稱 字段類型 字段長度 字段含義
Transfer Version short 2 Client與DataNode之間數據傳輸版本號,由常量DataTransferProtocol.DATA_TRANSFER_VERSION定義,值為17
OP int 4 操作類型,由常量DataTransferProtocol.OP_WRITE_BLOCK定義,值為80
blkId long 8 Block的ID值,由NameNode分配
GS long 8 時間戳(Generation Stamp),NameNode分配blkId的時候生成的時間戳
DNCnt int 4 DataNode復制Pipeline中DataNode節點的數量
Recovery Flag boolean 1 Recover標志
Client Text   Client主機的名稱,在使用Text進行序列化的時候,實際包含長度len與主機名稱字符串ClientHost
srcNode boolean 1 是否發送src node的信息,默認值為false,不發送src node的信息
nonSrcDNCnt int 4 由Client寫的該Header數據,該數不包含Pipeline中第一個節點(即為DNCnt-1)
DN2 DatanodeInfo   DataNode信息,包括StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState
DN3 DatanodeInfo   DataNode信息,包括StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState
Access Token Token   訪問令牌信息,包括IdentifierLength、Identifier、PwdLength、Pwd、KindLength、Kind、ServiceLength、Service
CheckSum Header DataChecksum 1+4 校驗和Header信息,包括type、bytesPerChecksum

Header數據包發送成功,Client會收到一個成功響應碼(DataTransferProtocol.OP_STATUS_SUCCESS = 0),接着將Packet數據發送到Pipeline中第一個DataNode上,如下所示:

1 Packet one = null;
2 one = dataQueue.getFirst(); // regular data packet
3 ByteBuffer buf = one.getBuffer();
4 // write out data to remote datanode
5 blockStream.write(buf.array(), buf.position(), buf.remaining());
6  
7 if (one.lastPacketInBlock) { // 如果是Block中的最后一個Packet,還要寫入一個0標識該Block已經寫入完成
8     blockStream.writeInt(0); // indicate end-of-block
9 }

否則,如果失敗,則會與NameNode進行RPC調用,刪除該Block,並把該Pipeline中第一個DataNode加入到excludedNodes列表中,代碼如下所示:

01 if (!success) {
02   LOG.info("Abandoning " + block);
03   namenode.abandonBlock(block, src, clientName);
04  
05   if (errorIndex < nodes.length) {
06     LOG.info("Excluding datanode " + nodes[errorIndex]);
07     excludedNodes.add(nodes[errorIndex]);
08   }
09  
10   // Connection failed.  Let's wait a little bit and retry
11   retry = true;
12 }

DataNode端服務組件

數據最終會發送到DataNode節點上,在一個DataNode上,數據在各個組件之間流動,流程如下圖所示:
hdfs-write-pipeline-single
DataNode服務中創建一個后台線程DataXceiverServer,它是一個SocketServer,用來接收來自Client(或者DataNode Pipeline中的非最后一個DataNode節點)的寫數據請求,然后在DataXceiverServer中將連接過來的Socket直接派發給一個獨立的后台線程DataXceiver進行處理。所以,Client寫數據時連接一個DataNode Pipeline的結構,實際流程如圖所示:
hdfs-write-pipeline-datanodes
每個DataNode服務中的DataXceiver后台線程接收到來自前一個節點(Client/DataNode)的Socket連接,首先讀取Header數據:

01 Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong());
02 LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress);
03 int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
04 boolean isRecovery = in.readBoolean(); // is this part of recovery?
05 String client = Text.readString(in); // working on behalf of this client
06 boolean hasSrcDataNode = in.readBoolean(); // is src node info present
07 if (hasSrcDataNode) {
08   srcDataNode = new DatanodeInfo();
09   srcDataNode.readFields(in);
10 }
11 int numTargets = in.readInt();
12 if (numTargets < 0) {
13   throw new IOException("Mislabelled incoming datastream.");
14 }
15 DatanodeInfo targets[] = new DatanodeInfo[numTargets];
16 for (int i = 0; i < targets.length; i++) {
17   DatanodeInfo tmp = new DatanodeInfo();
18   tmp.readFields(in);
19   targets[i] = tmp;
20 }
21 Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
22 accessToken.readFields(in);

上面代碼中,讀取Header的數據,與前一個Client/DataNode寫入Header字段的順序相對應,不再累述。在完成讀取Header數據后,當前DataNode會首先將Header數據再發送到Pipeline中下一個DataNode結點,當然該DataNode肯定不是Pipeline中最后一個DataNode節點。接着,該DataNode會接收來自前一個Client/DataNode節點發送的Packet數據,接收Packet數據的邏輯實際上在BlockReceiver中完成,包括將來自前一個Client/DataNode節點發送的Packet數據寫入本地磁盤。在BlockReceiver中,首先會將接收到的Packet數據發送寫入到Pipeline中下一個DataNode節點,然后再將接收到的數據寫入到本地磁盤的Block文件中。

DataNode持久化Packet數據

在DataNode節點的BlockReceiver中進行Packet數據的持久化,一個Packet是一個Block中一個數據分組,我們首先看一下,一個Block在持久化到磁盤上的物理存儲結構,如下圖所示:
hdfs-write-block-physical
每個Block文件(如上圖中blk_1084013198文件)都對應一個meta文件(如上圖中blk_1084013198_10273532.meta文件),Block文件是一個一個Chunk的二進制數據(每個Chunk的大小是512字節),而meta文件是與每一個Chunk對應的Checksum數據,是序列化形式存儲。

寫文件過程中Client/DataNode與NameNode進行RPC調用

Client在HDFS文件系統中寫文件過程中,會發生多次與NameNode節點進行RPC調用來完成寫數據相關操作,主要是在如下時機進行RPC調用:

  • 寫文件開始時創建文件:Client調用create在NameNode節點的Namespace中創建一個標識該文件的條目
  • 在Client連接Pipeline中第一個DataNode節點之前,Client調用addBlock分配一個Block(blkId+DataNode列表+租約)
  • 如果與Pipeline中第一個DataNode節點連接失敗,Client調用abandonBlock放棄一個已經分配的Block
  • 一個Block已經寫入到DataNode節點磁盤,Client調用fsync讓NameNode持久化Block的位置信息數據
  • 文件寫完以后,Client調用complete方法通知NameNode寫入文件成功
  • DataNode節點接收到並成功持久化一個Block的數據后,DataNode調用blockReceived方法通知NameNode已經接收到Block

具體RPC調用的詳細過程,可以參考源碼。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM