(二)HDFS數據流
作為一個文件系統,文件的讀和寫是最基本的需求,這一部分我們來了解客戶端是如何與HDFS進行交互的,也就是客戶端與HDFS,以及構成HDFS的兩類節點(namenode和datanode)之間的數據流是怎樣的。
1、剖析文件讀取過程
客戶端從HDFS讀取文件,其內部的讀取過程實際是比較復雜的,可以用下圖來表示讀取文件的基本流程。
對於客戶端來說,首先是調用FileSystem對象的open()方法來打開希望讀取的文件,然后DFS會返回一個文件輸入流FSDataInputStream ,客戶端對這個輸入流調用read()方法,讀取數據,一旦完成讀取,就對這個輸入流調用close()方法關閉,這三個過程對應圖中的步驟1、3、6。
以上三個步驟是從客戶端的角度來分析的,實際上,要實現文件讀取,HDFS內部還需要比較復雜的機制來支持,而這些過程都是對客戶端透明的,所以客戶端感受不到,在客戶看來就像是在讀取一個連續的流。

具體的,從HDFS的角度來說,客戶端調用的FileSystem對象的open()方法,這個FileSystem對象實際上是分布式文件系統DistributedFileSystem的一個實例,DistributedFileSystem通過遠程過程調用(RPC)來調用namenode,以獲得文件起始塊的位置(步驟2,namenode返回存有該數據塊副本的datanode的地址)。當然,由於HDFS保存了一個數據塊的多個副本(默認是3),所以滿足請求的datanode地址不止一個,此時會根據它們與客戶端的距離來排序,優先選擇距離近的datanode,如果該客戶端本身就是一個datanode,該客戶端就可以從本地讀取數據(比如:mapReduce就利用了這里的數據本地化優勢)。
open方法完成后,DistributedFileSystem類返回一個FSDataInputStream文件輸入流對象給客戶端。這個類轉而封裝為DFSInputStream對象,該對象管理着datanode和namenode的I/O。
這個DFSInputStream存儲着文件起始幾個塊的datanode地址,因此,客戶端對這個輸入流調用read()方法就可以知道到哪個datanode(網絡拓撲中距離最近的)去讀取數據,這樣,反復調用read方法就可以將數據從datanode傳輸到客戶端(步驟4)。到達一個塊的末端時,會關閉和這個datanode的連接,尋找下一個塊的最佳datanode,重復這個過程。
當然,上面我們說DFSInputStream只存儲着文件起始的幾個塊,在讀取過程中,也會根據需要再次詢問namenode來獲取下一批數據塊的datanode地址。一旦客戶端讀取完成,就調用close方法關閉數據流。
如果在讀取過程中,datanode遇到故障,很明顯,輸入流只需要從另外一個保存了該數據塊副本的最近datanode讀取即可,同時記住那個故障datanode,以后避免從那里讀取數據。
總結:
以上就是HDFS的文件讀取過程,從這個過程的分析中我們可以看出:其優勢在於客戶端可以直接連接到datanode進行數據的讀取,這樣由於數據分散在不同的datanode上,就可以同時為大量並發的客戶端提供服務。而namenode作為管理節點,只需要響應數據塊位置的請求,告知客戶端每個數據塊所在的最佳datanode即可(datanode的位置信息存儲在內存中,非常高效的可以獲取)。這樣使得namenode無需進行具體的數據傳輸任務,否則namenode在客戶端數量多的情況下會成為瓶頸。
2、剖析文件寫入過程
接下來我們分析文件寫入的過程,重點考慮的情況是如何新建一個文件、如何將數據寫入文件並最后關閉該文件。
同樣的道理,從客戶端的角度來說,這個過程是比較簡單的,首先通過對DistributedFileSystem對象調用create()方法來新建文件,然后會返回一個FSDataOutputStream的文件輸出流對象,由此客戶端便可以調用這個輸出流的write()方法寫入數據,完成寫入后,調用close()方法關閉輸出流(下圖中的步驟1、3、6)。

然而,具體的,從HDFS的角度來看,這個寫數據的過程就相當復雜了。客戶端在調用create方法新建文件時,DistributedFileSystem會對namenode創建一個RPC調用,在文件系統的命名空間中新建一個文件,此時還沒有相應的數據塊(步驟2)。namedata接收到這個RPC調用后,會進行一系列的檢查,確保這個文件不存在,並且這個客戶端有新建文件的權限,然后再通過檢查后就會為這個新文件在命名空間中加入一條記錄(如果未通過檢查則會返回異常),最后給客戶端返回一個FSDataOutputStream對象。
類似於文件讀的過程,這個FSDataOutputStream對象轉而封裝成為一個DFSOutputStream對象,用於處理datanode和namenode之間的I/O。
接下來,客戶端就可以調用輸出流的write()方法進行數據寫入,而在寫入時,DFSOutputStream將數據分為一個一個的數據包,先寫入內部隊列,稱為“數據隊列”。然后有一個單獨的DataStreamer來處理數據隊列,它的職責是挑選出適合存儲數據副本的一組datanode,並要求namenode分配新的數據塊。假設副本數為3,那么選出來的datanode就是3個,這3個dadanode會構成一個數據管線。DataStreamer將數據包流式傳輸到管線中的第一個datanode,第一個存儲並發到第二個,第二個存儲並發到第三個(步驟4)。
然后DFSOutputStream對象內部還有一個數據包隊列用於接收datanode的確認回執,稱為“確認隊列”,收到所有datanode的確認消息后,該數據包才會從隊列中刪除。
在客戶端完成數據的寫入后,對數據流調用close()方法(步驟6),該操作將剩余的所有數據包寫入數據管線,並聯系namenode告知文件寫入完成之前,等待確認(步驟7)。
3、一致模型
HDFS的一致模型描述了文件讀、寫的數據可見性。
基於以上對文件讀寫過程的分析,我們知道新建一個文件之后,它可以在命名空間中立即可見,但是即使數據流已經刷新並存儲,寫入文件的內容並不保證能立即可見。當寫入的數據超過一個數據塊后,第一個數據塊對新的reader就是可見的,也就是說:當前正在寫的塊對其他reader不可見。
HDFS提供了一種將所有緩存刷新到datanode中的方法,即對FSDataOutputStream調用hflush()方法,當hflush方法調用成功后,到目前為止寫入的數據都到達了datanode的寫入管道並且對所有reader可見。
但是,hflush()並不保證數據已經都寫到磁盤上,為確保數據都寫入磁盤,可以使用hsync()操作代替。
在HDFS中,close方法實際上隱含了執行hflush()方法。
4、通過distcp並行復制
當我們想從Hadoop文件系統中復制大量數據或者將大量數據復制到HDFS中時,可以采用Hadoop自帶的一個程序distcp,它用來並行復制。
distcp的一個用法是代替hadoop fs -cp,也可以用來在兩個HDFS集群之間傳輸數據。
$ hadoop distcp file1 file2
$ hadoop distcp dir1 dir2
$ hadoop distco -update -delete -p hdfs://namenode1/foo hdfs://namenode2/foo
總結
以上主要對HDFS文件系統的文件讀寫進行了詳細的介紹,重點是掌握HDFS的文件讀寫流程,體會這種機制對整個分布式系統性能提升帶來的好處。