讀文件
讀文件時內部工作機制參看下圖:
客戶端通過調用FileSystem對象(對應於HDFS文件系統,調用DistributedFileSystem對象)的open()方法來打開文件(也即圖中的第一步),DistributedFileSystem通過RPC(Remote Procedure Call)調用詢問NameNode來得到此文件最開始幾個block的文件位置(第二步)。對每一個block來說,namenode返回擁有此block備份的所有namenode的地址信息(按集群的拓撲網絡中與客戶端距離的遠近排序,關於在Hadoop集群中如何進行網絡拓撲請看下面介紹)。如果客戶端本身就是一個datanode(如客戶端是一個mapreduce任務)並且此datanode本身就有所需文件block的話,客戶端便從本地讀取文件。
以上步驟完成后,DistributedFileSystem會返回一個FSDataInputStream(支持文件seek),客戶端可以從FSDataInputStream中讀取數據。FSDataInputStream包裝了一個DFSInputSteam類,用來處理namenode和datanode的I/O操作。
客戶端然后執行read()方法(第三步),DFSInputStream(已經存儲了欲讀取文件的開始幾個block的位置信息)連接到第一個datanode(也即最近的datanode)來獲取數據。通過重復調用read()方法(第四、第五步),文件內的數據就被流式的送到了客戶端。當讀到該block的末尾時,DFSInputStream就會關閉指向該block的流,轉而找到下一個block的位置信息然后重復調用read()方法繼續對該block的流式讀取。這些過程對於用戶來說都是透明的,在用戶看來這就是不間斷的流式讀取整個文件。
當真個文件讀取完畢時,客戶端調用FSDataInputSteam中的close()方法關閉文件輸入流(第六步)。
如果在讀某個block是DFSInputStream檢測到錯誤,DFSInputSteam就會連接下一個datanode以獲取此block的其他備份,同時他會記錄下以前檢測到的壞掉的datanode以免以后再無用的重復讀取該datanode。DFSInputSteam也會檢查從datanode讀取來的數據的校驗和,如果發現有數據損壞,它會把壞掉的block報告給namenode同時重新讀取其他datanode上的其他block備份。
這種設計模式的一個好處是,文件讀取是遍布這個集群的datanode的,namenode只是提供文件block的位置信息,這些信息所需的帶寬是很少的,這樣便有效的避免了單點瓶頸問題從而可以更大的擴充集群的規模。
Hadoop中的網絡拓撲 在Hadoop集群中如何衡量兩個節點的遠近呢?要知道,在高速處理數據時,數據處理速率的唯一限制因素就是數據在不同節點間的傳輸速度:這是由帶寬的可怕匱乏引起的。所以我們把帶寬作為衡量兩個節點距離大小的標准。
|
寫文件
現在我們來看一下Hadoop中的寫文件機制解析,通過寫文件機制我們可以更好的了解一下Hadoop中的一致性模型。
上圖為我們展示了一個創建一個新文件並向其中寫數據的例子。
首先客戶端通過DistributedFileSystem上的create()方法指明一個欲創建的文件的文件名(第一步),DistributedFileSystem再通過RPC調用向NameNode申請創建一個新文件(第二步,這時該文件還沒有分配相應的block)。namenode檢查是否有同名文件存在以及用戶是否有相應的創建權限,如果檢查通過,namenode會為該文件創建一個新的記錄,否則的話文件創建失敗,客戶端得到一個IOException異常。DistributedFileSystem返回一個FSDataOutputStream以供客戶端寫入數據,與FSDataInputStream類似,FSDataOutputStream封裝了一個DFSOutputStream用於處理namenode與datanode之間的通信。
當客戶端開始寫數據時(第三步),DFSOutputStream把寫入的數據分成包(packet), 放入一個中間隊列——數據隊列(data queue)中去。DataStreamer從數據隊列中取數據,同時向namenode申請一個新的block來存放它已經取得的數據。namenode選擇一系列合適的datanode(個數由文件的replica數決定)構成一個管道線(pipeline),這里我們假設replica為3,所以管道線中就有三個datanode。DataSteamer把數據流式的寫入到管道線中的第一個datanode中(第四步),第一個datanode再把接收到的數據轉到第二個datanode中(第四步),以此類推。
DFSOutputStream同時也維護着另一個中間隊列——確認隊列(ack queue),確認隊列中的包只有在得到管道線中所有的datanode的確認以后才會被移出確認隊列(第五步)。
如果某個datanode在寫數據的時候當掉了,下面這些對用戶透明的步驟會被執行:
1)管道線關閉,所有確認隊列上的數據會被挪到數據隊列的首部重新發送,這樣可以確保管道線中當掉的datanode下流的datanode不會因為當掉的datanode而丟失數據包。
2)在還在正常運行的datanode上的當前block上做一個標志,這樣當當掉的datanode重新啟動以后namenode就會知道該datanode上哪個block是剛才當機時殘留下的局部損壞block,從而可以把它刪掉。
3)已經當掉的datanode從管道線中被移除,未寫完的block的其他數據繼續被寫入到其他兩個還在正常運行的datanode中去,namenode知道這個block還處在under-replicated狀態(也即備份數不足的狀態)下,然后他會安排一個新的replica從而達到要求的備份數,后續的block寫入方法同前面正常時候一樣。
有可能管道線中的多個datanode當掉(雖然不太經常發生),但只要dfs.replication.min(默認為1)個replica被創建,我們就認為該創建成功了。剩余的replica會在以后異步創建以達到指定的replica數。
當客戶端完成寫數據后,它會調用close()方法(第六步)。這個操作會沖洗(flush)所有剩下的package到pipeline中,等待這些package確認成功,然后通知namenode寫入文件成功(第七步)。這時候namenode就知道該文件由哪些block組成(因為DataStreamer向namenode請求分配新block,namenode當然會知道它分配過哪些blcok給給定文件),它會等待最少的replica數被創建,然后成功返回。
replica是如何分布的 Hadoop在創建新文件時是如何選擇block的位置的呢,綜合來說,要考慮以下因素:帶寬(包括寫帶寬和讀帶寬)和數據安全性。如果我們把三個備份全部放在一個datanode上,雖然可以避免了寫帶寬的消耗,但幾乎沒有提供數據冗余帶來的安全性,因為如果這個datanode當機,那么這個文件的所有數據就全部丟失了。另一個極端情況是,如果把三個冗余備份全部放在不同的機架,甚至數據中心里面,雖然這樣數據會安全,但寫數據會消耗很多的帶寬。Hadoop 0.17.0給我們提供了一個默認replica分配策略(Hadoop 1.X以后允許replica策略是可插拔的,也就是你可以自己制定自己需要的replica分配策略)。replica的默認分配策略是把第一個備份放在與客戶端相同的datanode上(如果客戶端在集群外運行,就隨機選取一個datanode來存放第一個replica),第二個replica放在與第一個replica不同機架的一個隨機datanode上,第三個replica放在與第二個replica相同機架的隨機datanode上。如果replica數大於三,則隨后的replica在集群中隨機存放,Hadoop會盡量避免過多的replica存放在同一個機架上。選取replica的放置位置后,管道線的網絡拓撲結構如下所示:
|
一致性模型
HDFS某些地方為了性能可能會不符合POSIX(是的,你沒有看錯,POSIX不僅僅只適用於linux/unix,Hadoop 使用了POSIX的設計來實現對文件系統文件流的讀取),所以它看起來可能與你所期望的不同,要注意。
創建了一個文件以后,它是可以在命名空間(namespace)中可以看到的:
Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p), is(true));
但是任何向此文件中寫入的數據並不能保證是可見的,即使你flush了已經寫入的數據,此文件的長度可能仍然為零:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));
這是因為,在Hadoop中,只有滿一個block數據量的數據被寫入文件后,此文件中的內容才是可見的(即這些數據會被寫入到硬盤中去),所以當前正在寫的block中的內容總是不可見的。
Hadoop提供了一種強制使buffer中的內容沖洗到datanode的方法,那就是FSDataOutputStream的sync()方法。調用了sync()方法后,Hadoop保證所有已經被寫入的數據都被沖洗到了管道線中的datanode中,並且對所有讀者都可見了:
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
這個方法就像POSIX中的fsync系統調用(它沖洗給定文件描述符中的所有緩沖數據到磁盤中)。例如,使用java API寫一個本地文件,我們可以保證在調用flush()和同步化后可以看到已寫入的內容:
FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operating system
out.getFD().sync(); // sync to disk(getFD()返回與該流所對應的文件描述符)
assertThat(localFile.length(), is(((long) "content".length())));
在HDFS中關閉一個流隱式的調用了sync()方法:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
由於Hadoop中的一致性模型限制,如果我們不調用sync()方法的話,我們很可能會丟失多大一個block的數據。這是難以接受的,所以我們應該使用sync()方法來確保數據已經寫入磁盤。但頻繁調用sync()方法也是不好的,因為會造成很多額外開銷。我們可以再寫入一定量數據后調用sync()方法一次,至於這個具體的數據量大小就要根據你的應用程序而定了,在不影響你的應用程序的性能的情況下,這個數據量應越大越好。
轉載請注明出處:http://www.cnblogs.com/beanmoon/archive/2012/12/17/2821548.html