1.本文目的
通過解析客戶端創建文件流程,認知hadoop的HDFS系統的一些功能和概念。
2.主要概念
2.1 NameNode(NN):
HDFS系統核心組件,負責分布式文件系統的名字空間管理、INode表的文件映射管理。如果不開啟備份/故障恢復/Federation模式,一般的HDFS系統就只有1個NameNode,當然這樣是存在單點故障隱患的。
NN管理兩個核心的表:文件到塊序列的映射、塊到機器序列的映射。
第一個表存儲在磁盤中,第二表在NN每次啟動后重建。
2.2 NameNodeServer(NNS):
負責NN和其它組件的通信接口的開放(IPC、http)等。
NN通過客戶端協議(ClientProtocol)和客戶端通信,通過數據節點協議(DataNodeProtocol)和DN通信。
2.3 FSNameSystem:
管理文件系統相關,承擔了NN的主要職責。
2.4 DataNode(DN):
分布式文件系統中存放實際數據的節點,存儲了一系列的文件塊,一個DFS部署中通常有許多DN。
DN和NN,DN和DN,DN和客戶端都通過不同的IPC協議進行交互。
通常,DN接受來自NN的指令,比如拷貝、刪除文件塊。
客戶端在通過NN獲取了文件塊的位置信息后,就可以和DN直接交互,比如讀取塊、寫入塊數據等。
DN節點只管理一個核心表:文件塊到字節流的映射。
在DN的生命周期中,不斷地和NN通信,報告自己所存儲的文件塊的狀態,NN不直接向DN通信,而是應答DN的請求,比如在DN的心跳請求后,回復一些關於復制、刪除、恢復文件塊的命令(comands)。
DN和外界通信的接口<host:port>會報告給NN,想和此DN交互的客戶端或其它DN可以通過和NN通信來獲取這一信息。
2.5 Block
文件塊,hadoop文件系統的原語,hadoop分布式文件系統中存儲的最小單位。一個hadoop文件就是由一系列分散在不同的DataNode上的block組成。
2.6 BlockLocation
文件塊在分布式網絡中的位置<host:port>,也包括一些塊的元數據,比如塊是否損壞、塊的大小、塊在文件中的偏移等。
2.7 DFSClient
分布式文件系統的客戶端,用戶可以獲取一個客戶端實例和NameNode及DataNode交互,DFSClient通過客戶端協議和hadoop文件系統交互。
2.8 Lease
租約,當客戶端創建或打開一個文件並准備進行寫操作,NameNode會維護一個文件租約,以標記誰正在對此文件進行寫操作。客戶端需要定時更新租約,否則當租約過期,NN會關閉文件或者將文件的租約交給其它客戶端。
2.9 LeaseRenewer
續約管控線程,當一個DFSClient調用申請租約后,如果此線程尚未啟動,則啟動,並定期向NameNode續約。
三.創建一個文件
當hadoop的分布式集群啟動之后,可以通過FS或Shell來創建文件,FS創建文件的命令如下:
//cluser是hadoop集群,通過fs和集群文件系統交互
final DistributedFileSystem fs = cluster.getFileSystem();
// 要創建的文件名
final Path tmpFile1 = new Path("/tmpfile1.dat");
//創建文件
public static void createFile(FileSystem fs, Path fileName, long fileLen,
short replFactor, long seed) throws IOException {
if (!fs.mkdirs(fileName.getParent())) {
throw new IOException("Mkdirs failed to create " +
fileName.getParent().toString());
}
FSDataOutputStream out = null;
try {
out = fs.create(fileName, replFactor);
byte[] toWrite = new byte[1024];
Random rb = new Random(seed);
long bytesToWrite = fileLen;
while (bytesToWrite>0) {
rb.nextBytes(toWrite);
int bytesToWriteNext = (1024<bytesToWrite)?1024:(int)bytesToWrite;
out.write(toWrite, 0, bytesToWriteNext);
bytesToWrite -= bytesToWriteNext;
}
out.close();
out = null;
} finally {
IOUtils.closeStream(out);
}
}
四、流程分析
創建一個名為tmpfile1.dat的文件,主要流程如下:

4.1 發送創建文件的請求(CreateFile)
客戶端向NN發起請求,獲取文件信息,NN會在緩存中查找是否存在請求創建的文件項(file entry),如果沒找到,就在NameSystem中創建一個新的文件項:
塊管理器(BlockManager)檢查復制因子是否在范圍內,如果復制因子過小或過大就會異常。
同時會進行權限驗證、加密、安全模式檢測(如果在安全模式不能創建文件)等,並記錄操作日志和事件日志,然后向客戶端返回文件狀態。
4.2 申請文件租用權(beginFileLease)
客戶端取得文件狀態后,對文件申請租用(lease),如果租用過期,客戶端將無法再繼續對文件進行訪問,除非進行續租。
4.3 數據流管控線程啟動(DataStreamer & ResponseProcessor)
DataStreamer線程負責數據的實際發送:
當數據隊列(Data Queue)為空時,會睡眠,並定期蘇醒以檢測數據隊列是否有新的數據需要發送、Socket套接字是否超時、是否繼續睡眠等狀態。
ResponseProcessor負責接收和處理pipeline下游傳回的數據接收確認信息pipelineACK。
4.4 發送添加塊申請並初始化數據管道(AddBlock & Setup Pipeline)
當有新的數據需要發送,並且塊創建階段處於PIPELINE_SETUP_CREATE,DataStreamer會和NameNode通信,調用AddBlock方法,通知NN創建、分配新的塊及位置,NN返回后,初始化Pipeline和發送流。
4.5 DataNode數據接收服務線程啟動(DataXceiverServer & DataXceiver)
當DataNode啟動后,其內部的DataXceiverServer組件啟動,此線程管理向其所屬的DN發送數據的連接建立工作,新連接來時,DataXceiverServer會啟動一個DataXceiver線程,此線程負責流向DN的數據接收工作。
4.6 在Pipeline中處理數據的發送和接收
客戶端在獲取了NameNode分配的文件塊的網絡位置之后,就可以和存放此塊的DataNode交互。
客戶端通過SASL加密方式和DN建立連接,並通過pipeline來發送數據。
4.6.1 從pipeline接收數據
pipeline由數據源節點、多個數據目的節點組成,請參考上面的流程圖。
位於pipeline中的第一個DataNode會接收到來自客戶端的數據流,其內部DataXceiver組件,通過讀取操作類型(OP),來區分進行何種操作,如下所示:
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
//本例中將會使用WRITE_BLOCK指令
case WRITE_BLOCK:
opWriteBlock(in);
break;
//略...
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
如果OP是WRITE_BLOCK,調用寫數據塊的方法,此方法會根據數據源是客戶端還是其他DataNode、塊創建的階段等條件進行不同的邏輯。
4.6.2 數據在pipeline中流動
在本例中,第一個收到數據的DN會再啟動一個blockReceiver線程,以接收實際的塊數據,在本地保存了塊數據后,其負責向pipeline中的后續DN繼續發送塊數據。
每次向下游DN節點發送數據,標志着數據目的節點的targets數組都會排除自身,這樣,就控制了pipeline的長度。
下游收到塊數據的DN會向上游DN或者客戶端報告數據接收狀態。
這種鏈式或者序列化的數據轉移方式,就像數據在管道中從上游流向下游,所以這種方式稱作
pipeline。
4.6.3 pipeline的生命周期

在本例中:
DataStreamer線程啟動后,pipeline進入PIPELINE_SETUP_CREATE階段;
數據流初始化后,pipeline進入DATA_STREAMING階段;
數據發送完畢后,pipeline進入PIPELINE_CLOSE階段。
客戶端在DataStreamer線程啟動后,同時啟動了一個ResponseProcessor線程,此線程用於接收pipeline中來自下游節點的數據接收狀態報告pipelineACK,同時此線程和DataStreamer線程協調管理pipeline狀態。
當DataStreamer向pipeline發送數據時,會將發送的數據包(packet)從數據隊列(Data Queue)中移除,並加入數據確認隊列(Ack Queue):
//DataStreamer發送數據后,將dataQueue的第一個元素出隊,並加入ackQueue
one = dataQueue.getFirst();
dataQueue.removeFirst();
ackQueue.addLast(one);
而當ResponseProcessor收到下游的pipelineAck后,據此確認信息來判斷pipeline狀態,是否需要重置和重新調整。如果確認信息是下游節點數據接收成功了,就將確認隊列(AckQueue)的第一個數據包刪除。
//ResponseProcessor收到成功的Ack,就將ackQueue的第一個包移除
lastAckedSeqno = seqno;
ackQueue.removeFirst();
dataQueue.notifyAll();
通過這樣的方式,DataStreamer可以確認數據包是否發送成功,也可以確認全部的數據包是否已經發送完畢。
顯然,當AckQueue空了,並且已經發送的數據包是塊里的最后一個包,數據就發送完畢了。
發送完畢的判斷如下所示:
if (one.lastPacketInBlock) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
//在沒有錯誤的情況下,AckQueue為空,並且包one是block的最后一個包,數據就發送完了
stage = BlockConstructionStage.PIPELINE_CLOSE;
}
4.7 發送文件操作完成請求(completeFile)
客戶端向NameNode發送completeFile請求:
NN收到請求后,驗證塊的BlockPoolId是否正確,接着對操作權限、文件寫鎖(write lock)、安全模式、租約、INode是否存在、INode類型等等進行驗證,最后記錄操作日志並返回給客戶端。
4.8 停止文件租約(endFileLease)
客戶端在完成文件寫操作后,調用leaseRenewer(LR)實例,從LR管理的續約文件表中刪除此文件,表明不再更新租約,一段時間后,租約在NN端自然失效。
