HADOOP學習之HDFS
HDFS介紹
HDFS是hadoop自帶的分布式文件系統,英文名為:Hadoop Distributed Filesystem,HDFS以流式數據訪問模式來存儲超大文件。根據設計,HDFS具有如下特點
HDFS特點
-
支持超大文件
一般來說,HDFS存儲的文件可以支持TB和PB級別的數據
-
檢測和快速應對硬件故障
在集群環境中,硬件故障是常見性問題。因為有上千台服務器連在一起,故障率很高,因此故障檢測和自動恢復hdfs文件系統的一個設計目標。假設某一個datanode掛掉之后,因為數據是有備份的,還可以從其他節點里找到。namenode通過心跳機制來檢測datanode是否還存活
-
流式數據訪問
HDFS的訪問模式是一次寫入,多次讀取,數據集通常都是由數據源生成或者從數據源復制而來,接着長時間在此數據集上進行各種分析,因此讀取整個數據集的時間延遲比讀取第一條記錄的時間延遲更重要
-
高容錯性
數據自動保存多個副本,副本丟失后自動恢復。 可構建在廉價的機器上,實現線性擴展。當集群增加新節點之后,namenode也可以感知,將數據分發和備份到相應的節點上
-
商用硬件
Hadoop並不需要運行在昂貴且高可靠的硬件上。它是設計運行在商用硬件(在各種零售店都能買到的普通硬件)的集群上的,因此至少對於龐大的集群來說,節點故障的幾率還是非常搞得。HDFS遇到上述故障時,被設計成能夠繼續運行且不讓用戶察覺到明顯的中斷。
HDFS不適用場景
-
不能做到低延遲數據訪問
由於hadoop針對高數據吞吐量做了優化,犧牲了獲取數據的延遲,所以對於低延遲數據訪問,不適合hadoop。對於低延遲的訪問需求,HBase是更好的選擇。
-
不適合大量的小文件存儲
由於namenode將文件系統的元數據存儲在內存中,因此該文件系統所能存儲的文件總數受限於namenode的內存容量。根據經驗,每個文件、目錄和數據塊的存儲信息大約占150字節。因此,如果有一百萬個小文件,每個小文件都會占一個數據塊,那至少需要300MB內存。如果是上億級別的,就會超出當前硬件的能力。
-
不適合多用戶寫入文件、修改文件
對於上傳到HDFS上的文件,不支持修改文件。Hadoop2.0雖然支持了文件的追加功能,但是還是不建議對HDFS上的文件進行修改。因為效率低下。HDFS適合一次寫入,然后多次讀取的場景。
HDFS概念
HDFS架構圖
-
namenode,名字節點。要管理元數據信息(Metadata),注意,只存儲元數據信息。
namenode對於元數據信息的管理,放在內存一份,供訪問查詢,也會通過fsimage和edits文件,將元數據信息持久化到磁盤上。Hadoop1.0版本利用了SecondaryNamenode做fsimage和edits文件的合並,但是這種機制達不到熱備的效果。Hadoop1.0的namenode存在單點故障問題。 -
datanode,數據節點。用於存儲文件塊。為了防止datanode掛掉造成的數據丟失,對於文件塊要有備份,一個文件塊有三個副本。
-
rack 機架
-
client 客戶端,凡是通過API或指令操作的一端都可以看做是客戶端
-
blockSize,Hadoop1.0:64MB。Hadoop2.0 :128MB。
數據塊
每個磁盤都有默認的數據塊大小,這是磁盤進行數據讀/寫的最小單位,一般磁盤塊為512字節。而HDFS的塊默認是128MB(hadoop2.*),和單一磁盤的文件系統相似,HDFS上的文件也被划分為塊大小的多個分塊,作為獨立的存儲單元。不同的是HDFS中一個小於塊大小的文件不會占據整個塊的空間。
HDFS的塊比磁盤的塊大,目的是為了最小化尋址開銷,如果塊足夠大,從磁盤傳輸數據的時間會明顯大於定位這個塊開始位置所需的時間,因而,傳輸一個由多個塊組成的大文件的時間取決於磁盤傳輸速率。
塊緩存
通常datanode從磁盤中讀取塊,但對於訪問頻繁的文件,其對應的塊可能被顯式的緩存在datanode的內存中,以堆外塊緩存的形式存在。默認情況下,一個塊緩存在一個datanode的內存中。MapReduce,Spark和其他框架通過在緩存塊的datanode上運行任務,可以提高讀操作的性能,
HDFS讀寫流程
讀流程圖
- 客戶端發出讀數據請求,Open File指定讀取的文件路徑,去找namenode要元數據信息。
- namenode將文件的元數據信息返回給客戶端。
- 客戶端根據返回的元數據信息,去對應的datanode去讀塊數據。
假如一個文件特別大,比如1TB,會分成好多塊,此時,namenode並是不一次性把所有的元數據信息返回給客戶端。客戶端讀完此部分后,再去想namenode要下一部分的元數據信息,再接着讀。 - 讀完之后,通知namenode關閉流。
寫流程圖
- 發起一個寫數據請求,並指定上傳文件的路徑,然后去找namenode。namenode首先會判斷路徑合法性,然后會判斷此客戶端是否有寫權限。然后都滿足,namenode會給客戶端返回一個輸出流。此外,namenode會為文件分配塊存儲信息。namenode也是分配塊的存儲信息,但不做物理切塊工作。
- 客戶端拿到輸出流以及塊存儲信息之后,就開始向datanode寫數據。因為一個塊數據,有三個副本,所以圖里有三個datanode。
pipeLine:[bl1,datanode01-datanode03-datanode-07]
- 數據塊的發送,先發給第一台datanode,然后再有第一台datanode發往第二台datanode,……。實際這里,用到了pipeLine 數據流管道的思想。
- 通過ack確認機制,向上游節點發送確認,這么做的目的是確保塊數據復制的完整性。
- 通過最上游節點,向客戶端發送ack,如果塊數據沒有發送完,就繼續發送下一塊。如果所有塊數據都已發完,就可以關流了。
- 所有塊數據都寫完后,關流。
HDFS相關API
@Test
public void testConnect() throws Exception{
Configuration conf=new Configuration();
//conf.set("dfs.replication","1");
//FileSystem是Hadoop的文件系統的抽象類,HDFS分布式文件系統只是Hadoop文件系統中的一 種,對應的實現類:
//org.apache.hadoop.hdfs.DistributedFileSystem。HDFS是Hadoop開發者最常用的文件 系統,
//因為HDFS可以和MapReduce結合,從而達到處理海量數據的目的
//Hftp:基於HTTP方式去訪問HDFS文件提供(只讀)
//本地文件系統、存檔文件系統、基於安全模式的HTTP訪問HDFS的文件系統。
FileSystem fs=FileSystem.get(new URI("hdfs://localhost:9000"),conf);
fs.close();
}
@Test
public void testMkdir() throws Exception{
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI("hdfs://localhost:9000"),conf);
//創建目錄
fs.mkdirs(new Path("/park07"));
fs.close();
}
@Test
public void testCopyToLocal() throws Exception{
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI("hdfs://localhost:9000"),conf);
//fs.copyToLocalFile(new Path("/park01/helloworld.txt"),new Path("1.txt"));
I nputStream in=fs.open(new Path("/park01/helloworld.txt"));
OutputStream out=new FileOutputStream(new File("2.txt"));
IOUtils.copyBytes(in, out, conf);
fs.close();
}
@Test
public void testCopyFromLocal()throws Exception{
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI("hdfs://localhost:9000"),conf);
//fs.copyFromLocalFile(new Path("3.txt"),new Path("/park01"));
InputStream in=new FileInputStream(new File("3.txt"));
//流的對接,主要要寫到文件名
OutputStream out=fs.create(new Path("/park01/3.txt"));
IOUtils.copyBytes(in, out, conf);
fs.close();
}
@Test
public void testDelete() throws Exception{
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI("hdfs://localhost:9000"),conf);
//fs.delete(new Path("/park07"));
//true 遞歸刪除
//false 只能刪除為空的目錄
fs.delete(new Path("/park02"),true);
fs.close();
}
@Test
public void testStatus() throws Exception{
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI("hdfs://localhost:9000"),conf);
FileStatus[] s=fs.listStatus(new Path("/park01"));
for(FileStatus status:s){
System.out.println(status);
}
}
@Test
public void testBlockLocation() throws Exception{
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI("hdfs://localhost:9000"),conf);
BlockLocation[] bl=fs.getFileBlockLocations(new Path("/park01/ hadoop-2.7.1_64bit.tar.gz"),0,Integer.MAX_VALUE);
for(BlockLocation location:bl){
System.out.println(location);
}
}