原來大數據 Hadoop 是這樣存儲數據的


HDFS概述

產生背景

隨着數據量越來越大,在一個操作系統中存不下所有的數據。需要將這些數據分配到更多的操作系統中,帶來的問題是多操作系統不方便管理和維護。需要一種系統來管理多台機器上的文件,這就是分布式文件管理系統。HDFS是分布式文件管理系統中的一種

定義

HDFS(Hadoop Distributed File System)它是一個文件系統,用於存儲文件,通過目錄樹來定位文件。其次,他是分布式的,由很多服務器聯合起來實現其功能,集群中的服務器有各自的角色

HDFS 的使用場景:適合一次寫,多次讀的場景,且不支持文件的修改。適合用來做數據分析

優缺點

優點

  • 高容錯
    • 數據自動保存多個副本,通過增加副本的形式,提高容錯性
    • 某一個副本丟失以后,可以自動恢復
  • 適合處理大數據
    • 數據規模:達到 GB、TB、甚至 PB 級別的數據
    • 文件規模:能夠處理百萬規模以上的文件數量
  • 可以構建在廉價機器上

缺點

  • 不適合低延時數據訪問。比如毫秒級的存儲數據,做不到
  • 無法高效的對大量小文件進行存儲
    • namenode 對每個文件至少需要消耗 150 字節去存儲目錄和塊信息,小文件相比大文件更消耗 namenode 服務器內存
    • 小文件尋址時間會超過讀取時間,違背 HDFS 設計初衷
  • 不支持並發寫入、文件隨機修改
    • 一個文件只能有一個寫,不允許多線程同時寫
    • 僅支持數據追加,不支持修改

組成架構

image-20201221211014820

image-20201221211229108

HDFS 文件塊大小

HDFS 中的文件在物理上是分塊存儲(Block),塊的大小可以手動配置參數 dfs.blocksize 來修改(Hadoop 2.x 是 128m,之前是 64m)

為什么取 128m 呢?

普遍認為,尋址時間(即查找目標 block 的時間)為 10ms,而尋址時間為傳輸時間的 1% 時,為 HDFS 運行的理想最佳狀態。此時傳輸時間為 10ms / 1% = 1000ms = 1s,而目前硬盤的傳輸速度普遍為 100m/s ,因此 block 的大小取 1s*100m/s = 100m。離它最近的 2 的次冪就是 128 了。這塊可以看出,影響 block 大小的主要因素就是硬盤的讀取速度。因此當采用固態硬盤的時候完全可以把數值調整到 256 m 甚至更多。

塊太小的時候,會增加尋址時間

但當塊變得很大時,就要想辦法避免熱點數據的頻繁讀取了。這一點在 Google 的論文中有提到,論文中給到的解決思路是客戶端緩存,但是並沒有提具體實現 https://www.cnblogs.com/zzjhn/p/3834729.html

HDFS 的 Shell 操作

基本語法

bin/hadoop fs 具體命令 OR bin/hdfs dfs 具體命令

dfs是fs的實現類

常用命令

命令 解釋 示例 備注
-ls 顯示目錄信息
-mkdir 在HDFS上創建目錄 hadoop fs -mkdir -p /user/keats/love -p 創建多級目錄
-moveFromLocal 從本地剪切粘貼到HDFS hadoop fs -moveFromLocal ./yaya.txt /user/keats/love/ 前面是來源路徑 后面是目標路徑,下同
-appendToFile 追加一個文件到已經存在的文件末尾
-cat 顯示文件內容
-chgrp 、-chmod、-chown Linux文件系統中的用法一樣,修改文件所屬權限
-copyFromLocal 從本地文件系統中拷貝文件到HDFS路徑去 同 -put
-copyToLocal 從HDFS拷貝到本地 同 -get
-getmerge 合並下載多個文件 hadoop fs -getmerge /user/keats/love/* ./yaya.txt
-tail 顯示一個文件的末尾
-rm 刪除文件或文件夾
-rmdir 刪除空目錄
-du 統計文件夾的大小信息 可以理解為 disk use
-setrep 設置HDFS中文件的副本數量 里設置的副本數只是記錄在NameNode的元數據中,是否真的會有這么多副本,還得看DataNode的數量。因為目前只有3台設備,最多也就3個副本,只有節點數的增加到10台時,副本數才能達到10

HDFS 客戶端操作

環境准備

  1. 把之前下載的 hadoop 安裝包解壓,復制到一個不含中文路徑的目錄下(建議所有開發相關東西放在一個目錄下,方便管理)
  2. 配置環境變量 HADOOP_HOME 和 Path

項目准備

項目地址 https://github.com/keatsCoder/HdfsClientDemo

創建 maven 項目,引入依賴

 <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.10.1</version>
        </dependency>
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>
    </dependencies>

創建 Java 類,HdfsClient 主要進行了三步操作

  1. 創建 FileSystem 對象
  2. 通過 FileSystem 對象執行命令
  3. 關閉 FileSystem
public class HdfsClient {
    static FileSystem fs;

    static {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://linux102:9000");

        try {
            fs = FileSystem.get(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        // 執行操作
        mkDir();

        // 釋放資源
        fs.close();
    }

    private static void mkDir() throws IOException {
        fs.mkdirs(new Path("/john/keats"));
    }
}

嘗試運行,會得到第一個錯誤,大意是權限被拒絕,這個時候就需要配置 JVM 參數 -DHADOOP_USER_NAME=root 來告訴集群,使用 root 用戶進行操作

image-20201222213812660

配置好之后再運行,會遇到第二個錯誤

Could not locate Hadoop executable: D:\develop\hadoop\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems

這是因為我們之前配置環境的時候,解壓的 hadoop 文件 bin 目錄下沒有 winutils.exe 這個文件,根據后面地址 wiki 百科的指示,可以下載該文件放在 bin 目錄下。但是目前那個文件的最新版本是 2.8.1,也許會存在某些方面不兼容的問題,目前還暫時沒有發現。因此可以直接下載該版本使用 https://github.com/steveloughran/winutils/releases

簡化配置用戶名和訪問路徑

FileSystem.get() 有一個重載方法,三個參數,第一個是 hadoop namenode 地址,第二個是 conf 對象,第三個是用戶名。可以一次配好

測試方法詳見示例代碼 HdfsClient2.java 類

fs = FileSystem.get(new URI("hdfs://linux102:9000"), conf, "root");

上傳文件

在項目 resource 目錄下創建文件 zhangsan.txt

調用 copyFromLocalFile 方法上傳文件

private static void uploadFile() throws IOException {
    URL systemResource = ClassLoader.getSystemResource("zhangsan.txt");
    String path = systemResource.getPath();

    fs.copyFromLocalFile(new Path(path), new Path("/john/keats/love"));
}

copyFromLocalFile 還有三個重載方法,分別提供以下功能

  • 是否刪除源文件
  • 當目標文件存在時,是否覆蓋目標文件
  • 多文件批量上傳,但目標路徑必須是文件夾路徑

配置文件優先級

之前我們在 hadoop 集群配置的副本數量是 3 ,而 hadoop client 也支持兩種方式配置參數

  1. conf 對象通過 key-value 形式配置
  2. resources 目錄下放置 xml 配置文件配置

加上默認的 default-xxxx.xml 一共四種配置的方式。他們的優先級是

conf > resources 下的配置文件 > hadoop 集群配置文件 > default

ConfigFileTest.java 類對此處的配置進行的說明與測試,讀者可以運行體驗

image-20201222223559044

下載文件

fs.copyToLocalFile(new Path("/three.txt"), new Path("D://zhangsan.txt"));

copyToLocalFile 還有兩個重載方法,分別添加了。具體代碼可參考 DownLoadFileTest.java

// 是否刪除源文件
boolean delSrc
// 是否使用RawLocalFileSystem作為本地文件系統
// 默認是 false,目前比較直觀的就是 false 狀態下下載文件會同時生成 .crc 格式的校驗文件,設置為 true 時不會生成
boolean useRawLocalFileSystem

刪除文件

刪除文件的API,第二個參數表示是否遞歸刪除。如果要刪除的 Path 對應的是文件夾,recursive 需要設置為 true ,否則會拋異常。其實就相當於 rm -r 中的參數 -r

public abstract boolean delete(Path f, boolean recursive) throws IOException;
private static void deleteFile() throws IOException {
    // /john/keats 是文件夾目錄,遞歸設置為 false 會報錯 PathIsNotEmptyDirectoryException: ``/john/keats is non empty': Directory is not empty
    //  fs.delete(new Path("/john/keats"), false);

    // 先上傳,再刪除
    HdfsClient2.uploadFile(true);
    fs.delete(new Path("/john/keats/love/zhangsan.txt"), true);
}

這塊我在刪除之前添加了上傳操作,目的是為了防止文件不存在。而上傳又存在一種可能就是目標文件已存在。這是個薛定諤的文件- -,因此我查看了 FileSystem 的上傳 API,他是提供 overwrite 開關的,默認是 true 即覆蓋目標文件

文件重命名

private static void renameFile() throws IOException {

    String dstFileName = "wangwu.txt";
    HdfsClient2.uploadFile();
    deleteFile(dstFileName);
    // 目標文件不存在,則更名成功
    boolean rename = fs.rename(new Path("/john/keats/love/zhangsan.txt"), new Path("/john/keats/love/", dstFileName));
    Assert.assertTrue(rename);

    // 目標文件存在,則更名失敗
    boolean renameButDstIsExist = fs.rename(new Path("/john/keats/love/zhangsan.txt"), new Path("/john/keats/love/", dstFileName));
    Assert.assertFalse(renameButDstIsExist);
}

讀取文件詳細信息

public static void listFiles() throws IOException {
    // 獲取文件詳情
    RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

    while (listFiles.hasNext()) {
        LocatedFileStatus status = listFiles.next();

        // 輸出詳情
        // 文件名稱
        System.out.println(status.getPath().getName());
        // 長度
        System.out.println(status.getLen());
        // 權限
        System.out.println(status.getPermission());
        // 分組
        System.out.println(status.getGroup());

        // 獲取存儲的塊信息
        BlockLocation[] blockLocations = status.getBlockLocations();

        for (BlockLocation blockLocation : blockLocations) {

            // 獲取塊存儲的主機節點
            String[] hosts = blockLocation.getHosts();

            for (String host : hosts) {
                System.out.println(host);
            }
        }

        System.out.println("-----------分割線----------");
    }
}

判斷某路徑下的內容是文件還是文件夾

public static void isFile() throws IOException {
    // FileStatus[] listStatus = fs.listStatus(new Path("/"));
    FileStatus[] listStatus = fs.listStatus(new Path("/three.txt"));

    for (FileStatus fileStatus : listStatus) {
        // 如果是文件
        if (fileStatus.isFile()) {
            System.out.println("f:"+fileStatus.getPath().getName());
        }else {
            System.out.println("d:"+fileStatus.getPath().getName());
        }
    }
}

HDFS的I/O流操作

// 從本地上傳到HDFS
public static void copyFileFromDiskByIO() throws IOException {
    // 2 創建輸入流
    FileInputStream fis = new FileInputStream(new File("D:/zhangsan.txt"));

    // 3 獲取輸出流
    FSDataOutputStream fos = fs.create(new Path("/zhangsan.txt"));

    // 4 流對拷
    IOUtils.copyBytes(fis, fos, conf);
}

// 從HDFS拷貝到本地
public static void copyFileFromHDFSByIO() throws IOException {
    FSDataInputStream fis = fs.open(new Path("/zhangsan.txt"));

    // 3 獲取輸出流
    FileOutputStream fos = new FileOutputStream(new File("D:/zhangsan1.txt"));

    // 4 流的對拷
    IOUtils.copyBytes(fis, fos, conf);
}

文件的定位讀取

/**
 * 從某個位置開始拷貝文件,用於讀取某個完整文件的部分內容
 */
public static void copyFileSeek() throws Exception{
    // 2 打開輸入流
    FSDataInputStream fis = fs.open(new Path("/hadoop-2.10.1.tar.gz"));

    // 3 定位輸入數據位置
    fis.seek(1024*1024*128);

    // 4 創建輸出流
    FileOutputStream fos = new FileOutputStream(new File("D:/hadoop-2.7.2.tar.gz.part2"));

    // 5 流的對拷
    IOUtils.copyBytes(fis, fos, conf);
}

HDFS 原理

HDFS的讀寫數據流程

寫數據

image-20201229221340913

1)客戶端通過Distributed FileSystem模塊向NameNode請求上傳文件,NameNode檢查目標文件是否已存在,父目錄是否存在

2)NameNode返回是否可以上傳

3)客戶端請求第一個 Block上傳到哪幾個DataNode服務器上(根據服務器距離以及負載排序,取前副本數個服務器返回)

4)NameNode返回3個DataNode節點,分別為dn1、dn2、dn3

5)客戶端通過FSDataOutputStream模塊請求dn1上傳數據,dn1收到請求會繼續調用dn2,然后dn2調用dn3,將這個通信管道建立完成

6)dn1、dn2、dn3逐級應答客戶端

7)客戶端開始往dn1上傳第一個Block(先從磁盤讀取數據放到一個本地內存緩存),以Packet為單位,dn1收到一個Packet就會傳給dn2,dn2傳給dn3;dn1每傳一個packet會放入一個應答隊列等待應答

8)當一個Block傳輸完成之后,客戶端再次請求NameNode上傳第二個Block的服務器。(重復執行3-7步)

讀數據

image-20210104210100229

1)客戶端通過Distributed FileSystem向NameNode請求下載文件,NameNode通過查詢元數據,找到文件塊所在的DataNode地址

2)挑選一台DataNode(就近原則,然后隨機)服務器,請求讀取數據

3)DataNode開始傳輸數據給客戶端(從磁盤里面讀取數據輸入流,以Packet為單位來做校驗)

4)客戶端以Packet為單位接收,先在本地緩存,然后寫入目標文件

網絡拓撲圖,節點距離計算

在HDFS寫數據的過程中,NameNode會選擇距離待上傳數據最近距離的DataNode接收數據。那么這個最近距離怎么計算呢?

節點距離:兩個節點到達最近的共同祖先的距離總和

image-20201229222511164

機架感知,副本存儲節點選擇

機架感知說明

http://hadoop.apache.org/docs/r2.10.1/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html#Data_Replication

For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on one node in the local rack, another on a different node in the local rack, and the last on a different node in a different rack.

image-20201229223021321

這樣布置,第一考慮的是速度,也兼顧了容災的要求

NameNode和SecondaryNameNode

NameNode中的元數據是存儲在哪里的?

首先,我們做個假設,如果存儲在NameNode節點的磁盤中,因為經常需要進行隨機訪問,還有響應客戶請求,必然是效率過低。因此,元數據需要存放在內存中。但如果只存在內存中,一旦斷電,元數據丟失,整個集群就無法工作了。因此產生在磁盤中備份元數據的FsImage

這樣又會帶來新的問題,當在內存中的元數據更新時,如果同時更新FsImage,就會導致效率過低,但如果不更新,就會發生一致性問題,一旦NameNode節點斷電,就會產生數據丟失。因此,引入Edits文件(只進行追加操作,效率很高)。每當元數據有更新或者添加元數據時,修改內存中的元數據並追加到Edits中。這樣,一旦NameNode節點斷電,可以通過FsImage和Edits的合並,合成元數據。

但是,如果長時間添加數據到Edits中,會導致該文件數據過大,效率降低,而且一旦斷電,恢復元數據需要的時間過長。因此,需要定期進行FsImage和Edits的合並,如果這個操作由NameNode節點完成,又會效率過低。因此,引入一個新的節點SecondaryNamenode,專門用於FsImage和Edits的合並

整體的操作機制和 Redis 差不多,FsImage 相當於 Redis 中的 RDB 快照,Edits 相當於 Redis 中的 AOF 日志,兩者結合。而 Redis 合並兩個文件是采用的 Fork 進程的方式

image-20210104211308789

第一階段:NameNode啟動

(1)第一次啟動NameNode格式化后,創建Fsimage和Edits文件。如果不是第一次啟動,直接加載編輯日志和鏡像文件到內存

(2)客戶端對元數據進行增刪改的請求

(3)NameNode記錄操作日志,更新滾動日志

(4)NameNode在內存中對數據進行增刪改

第二階段:Secondary NameNode工作

​ (1)Secondary NameNode詢問NameNode是否需要CheckPoint。直接帶回NameNode是否檢查結果

​ (2)Secondary NameNode請求執行CheckPoint

​ (3)NameNode滾動正在寫的Edits日志

​ (4)將滾動前的編輯日志和鏡像文件拷貝到Secondary NameNode

​ (5)Secondary NameNode加載編輯日志和鏡像文件到內存,並合並

​ (6)生成新的鏡像文件fsimage.chkpoint

​ (7)拷貝fsimage.chkpoint到NameNode

​ (8)NameNode將fsimage.chkpoint重新命名成fsimage

DataNode 工作機制

image-20210104220225348

1)一個數據塊在DataNode上以文件形式存儲在磁盤上,包括兩個文件,一個是數據本身,一個是元數據包括數據塊的長度,塊數據的校驗和,以及時間戳

2)DataNode啟動后向NameNode注冊,通過后,周期性(1小時)的向NameNode上報所有的塊信息

3)心跳是每3秒一次,心跳返回結果帶有NameNode給該DataNode的命令如復制塊數據到另一台機器,或刪除某個數據塊。如果超過10分鍾沒有收到某個DataNode的心跳,則認為該節點不可用

4)集群運行中可以安全加入和退出一些機器

DataNode 多目錄配置

DataNode 也可以配置成多個目錄,每個目錄存儲的數據不一樣。不同於 NameNode 多目錄配置,NameNode 多個目錄直接的數據是一樣的,僅做備份和容災用。我想是因為 DataNode 已經使用副本來做備份了,如果還繼續在本機復制多份,不是很有必要。而 NameNode 在未做高可用之前並沒有足夠的備份,因此產生了差異

hdfs-site.xml

<property>
	<name>dfs.datanode.data.dir</name>
	<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2</value>
</property>

添加新數據節點

(1)在hadoop104主機上再克隆一台hadoop105主機

(2)修改IP地址和主機名稱

(3)刪除原來HDFS文件系統留存的文件(/opt/module/hadoop/data和log)

(4)source一下配置文件

(5)直接啟動DataNode,即可關聯到集群

如果數據不均衡,可以使用 ./start-balancer.sh 命令實現集群的再均衡

但是這樣存在一個問題:如果某些惡意分子知道了 NameNode 的地址,便可以連接集群並克隆出集群的數據,這樣是極不安全的

添加白名單

只允許白名單內的地址連接 NameNode

在 NameNode 的 /opt/module/hadoop/etc/hadoop目錄下創建 dfs.hosts 文件,並添加如下主機名稱

linux102
linux103
linux104

在 NameNode 的 hdfs-site.xml 配置文件中增加 dfs.hosts 屬性

<property>
    <name>dfs.hosts</name>
    <value>/opt/module/hadoop/etc/hadoop/dfs.hosts</value>
</property>

文件分發

xsync hdfs-site.xml

刷新NameNode

hdfs dfsadmin -refreshNodes

打開 Web 頁面,可以看到不在白名單的 DataNode 會被下線

黑名單退役

在黑名單上的節點會被強制退出

黑名單的配置 key 如下

<property>
	<name>dfs.hosts.exclude</name>
	<value>/opt/module/hadoop/etc/hadoop/dfs.hosts.exclude</value>
</property>

需要注意

  1. 退役節點時,需要等待退役節點狀態為 decommissioned(所有塊已經復制完成),停止該節點及節點資源管理器
  2. 如果副本數是3,服役的節點小於等於3,是不能退役成功的,需要修改副本數后才能退役
  3. 不允許黑白名單同時出現一個主機名

HDFS 2.X 新特性

集群間數據拷貝

bin/hadoop distcp
hdfs://linux102:9000/user/keats/hello.txt hdfs://linux103:9000/user/keats/hello.txt

小文件存檔

image-20210104231711569

回收站

開啟回收站功能,可以將刪除的文件在不超時的情況下,恢復原數據,起到防止誤刪除、備份等作用

快照管理

快照相當於對目錄做一個備份,並不會立刻復制所有文件。而是記錄文件變化

參考內容

B站尚硅谷大數據課程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM