有必要了解的大數據知識(一) Hadoop


前言

之前工作中,有接觸到大數據的需求,雖然當時我們體系有專門的大數據部門,但是由於當時我們中台重構,整個體系的開發量巨大,共用一個大數據部門,人手已經忙不過來,沒法辦,為了趕時間,我自己負責的系統的大數據相關操作,由我們自己承擔了。此前對大數據的知識了解的很少,於是晚上回去花時間突擊大數據知識,白天就開始上手干,一邊學一邊做,總算在部門規定的時間,跟系統一起上線了。后來的維護迭代就交給大數據去了,雖然接觸大數據的時間不長,但是對我來說,確是很有意思的一段經歷,覺得把當時匆匆學的知識點,再仔細回顧回顧,整理下。

大數據概述

​ 大數據: 就是對海量數據進行分析處理,得到一些有價值的信息,然后幫助企業做出判斷和決策.

​ 處理流程:

​ 1:獲取數據

​ 2:處理數據

​ 3:展示結果

Hadoop介紹

Hadoop是一個分布式系基礎框架,它允許使用簡單的編程模型跨大型計算機的大型數據集進行分布式處理.它主要解決兩個問題

大數據存儲問題: HDFS

大數據計算問題:MapReduce

問題一: 大文件怎么存儲?

假設一個文件非常非常大,大小為1PB/a.txt, 大到世界上所有的高級計算機都存儲不下, 怎么辦?

  • 為了保存大文件, 需要把文件放在多個機器上
    • 文件要分塊 block(128M)
    • 不同的塊放在不同的 HDFS 節點
  • 同時為了對外提供統一的訪問, 讓外部可以像是訪問本機一樣訪問分布式文件系統
    • 有一個統一的 HDFS Master
    • 它保存整個系統的文件信息
    • 所有的文件元數據的修改都從 Master 開始

問題二: 大數據怎么計算?

從一個網絡日志文件中計算獨立 IP, 以及其出現的次數
如果數據量特別大,我們可以將,整個任務拆開, 划分為比較小的任務, 從而進行計算呢。

問題三: 如何將這些計算任務跑在集群中?

如果能夠在不同的節點上並行執行, 更有更大的提升, 如何把這些任務跑在集群中?

  • 可以設置一個集群的管理者, 這個地方叫做 Yarn
    • 這個集群管理者有一個 Master, 用於接收和分配任務
    • 這個集群管理者有多個 Slave, 用於運行任務

Hadoop 的組成

  • Hadoop分布式文件系統(HDFS) 提供對應用程序數據的高吞吐量訪問的分布式文件系統
  • Hadoop Common 其他Hadoop模塊所需的Java庫和實用程序。這些庫提供文件系統和操作系統級抽象,並包含啟動Hadoop所需的必要Java文件和腳本
  • Hadoop MapReduce 基於YARN的大型數據集並行處理系統
  • Hadoop YARN 作業調度和集群資源管理的框架

Hadoop前生今世

  1. Hadoop最早起源於Nutch。Nutch的設計目標是構建一個大型的全網搜索引擎,包括網頁抓取、索引、查詢等功能,但隨着抓取網頁數量的增加,遇到了嚴重的可擴展性問題——如何解決數十億網頁的存儲和索引問題。
  2. 2003年、2004年谷歌發表的兩篇論文為該問題提供了可行的解決方案。

——分布式文件系統(GFS),可用於處理海量網頁的存儲

——分布式計算框架MAPREDUCE,可用於處理海量網頁的索引計算問題。

  1. Nutch的開發人員完成了相應的開源實現HDFSMAPREDUCE,並從Nutch中剝離成為獨立項目HADOOP,到2008年1月,HADOOP成為Apache頂級項目.

狹義上來說,hadoop就是單獨指代hadoop這個軟件,

HDFS :分布式文件系統

MapReduce : 分布式計算系統

廣義上來說,hadoop指代大數據的一個生態圈,包括很多其他的軟件

hadoop的架構模型

1.x的版本架構模型介紹

文件系統核心模塊:

NameNode:集群當中的主節點,管理元數據(文件的大小,文件的位置,文件的權限),主要用於管理集群當中的各種數據

secondaryNameNode:主要能用於hadoop當中元數據信息的輔助管理

DataNode:集群當中的從節點,主要用於存儲集群當中的各種數據

數據計算核心模塊:

JobTracker:接收用戶的計算請求任務,並分配任務給從節點

TaskTracker:負責執行主節點JobTracker分配的任務

2.x的版本架構模型介紹

第一種:NameNode與ResourceManager單節點架構模型

文件系統核心模塊:

NameNode:集群當中的主節點,主要用於管理集群當中的各種數據

secondaryNameNode:主要能用於hadoop當中元數據信息的輔助管理

DataNode:集群當中的從節點,主要用於存儲集群當中的各種數據

數據計算核心模塊:

ResourceManager:接收用戶的計算請求任務,並負責集群的資源分配

NodeManager:負責執行主節點APPmaster分配的任務

第二種:NameNode單節點與ResourceManager高可用架構模型

文件系統核心模塊:

NameNode:集群當中的主節點,主要用於管理集群當中的各種數據

secondaryNameNode:主要能用於hadoop當中元數據信息的輔助管理

DataNode:集群當中的從節點,主要用於存儲集群當中的各種數據

數據計算核心模塊:

ResourceManager:接收用戶的計算請求任務,並負責集群的資源分配,以及計算任務的划分,通過zookeeper實現ResourceManager的高可用

NodeManager:負責執行主節點ResourceManager分配的任務

第三種:NameNode高可用與ResourceManager單節點架構模型

文件系統核心模塊:

NameNode:集群當中的主節點,主要用於管理集群當中的各種數據,其中nameNode可以有兩個,形成高可用狀態

DataNode:集群當中的從節點,主要用於存儲集群當中的各種數據

JournalNode:文件系統元數據信息管理

數據計算核心模塊:

ResourceManager:接收用戶的計算請求任務,並負責集群的資源分配,以及計算任務的划分

NodeManager:負責執行主節點ResourceManager分配的任務

第四種:NameNode與ResourceManager高可用架構模型

文件系統核心模塊:

NameNode:集群當中的主節點,主要用於管理集群當中的各種數據,一般都是使用兩個,實現HA高可用

JournalNode:元數據信息管理進程,一般都是奇數個

DataNode:從節點,用於數據的存儲

數據計算核心模塊:

ResourceManager:Yarn平台的主節點,主要用於接收各種任務,通過兩個,構建成高可用

NodeManager:Yarn平台的從節點,主要用於處理ResourceManager分配的任務

Hadoop 核心介紹

1. HDFS

HDFS(Hadoop Distributed File System) 是一個 Apache Software Foundation 項目, 是 Apache Hadoop 項目的一個子項目. Hadoop 非常適於存儲大型數據 (比如 TB 和 PB), 其就是使用 HDFS 作為存儲系統. HDFS 使用多台計算機存儲文件, 並且提供統一的訪問接口, 像是訪問一個普通文件系統一樣使用分布式文件系統. HDFS 對數據文件的訪問通過流的方式進行處理, 這意味着通過命令和 MapReduce 程序的方式可以直接使用 HDFS. HDFS 是容錯的, 且提供對大數據集的高吞吐量訪問.

HDFS 的一個非常重要的特點就是一次寫入、多次讀取, 該模型降低了對並發控制的要求, 簡化了數據聚合性, 支持高吞吐量訪問. 而吞吐量是大數據系統的一個非常重要的指標, 吞吐量高意味着能處理的數據量就大.

1.1. 設計目標

  • 通過跨多個廉價計算機集群分布數據和處理來節約成本
  • 通過自動維護多個數據副本和在故障發生時來實現可靠性
  • 它們為存儲和處理超大規模數據提供所需的擴展能力。

1.2. HDFS 的歷史

  1. Doug Cutting 在做 Lucene 的時候, 需要編寫一個爬蟲服務, 這個爬蟲寫的並不順利, 遇到了一些問題, 諸如: 如何存儲大規模的數據, 如何保證集群的可伸縮性, 如何動態容錯等
  2. 2013年的時候, Google 發布了三篇論文, 被稱作為三駕馬車, 其中有一篇叫做 GFS, 是描述了 Google 內部的一個叫做 GFS 的分布式大規模文件系統, 具有強大的可伸縮性和容錯性
  3. Doug Cutting 后來根據 GFS 的論文, 創造了一個新的文件系統, 叫做 HDFS

1.3. HDFS 的架構

  1. NameNode 是一個中心服務器, 單一節點(簡化系統的設計和實現), 負責管理文件系統的名字空間(NameSpace)以及客戶端對文件的訪問
  2. 文件操作, NameNode 是負責文件元數據的操作, DataNode 負責處理文件內容的讀寫請求, 跟文件內容相關的數據流不經過 NameNode, 只詢問它跟哪個 DataNode聯系, 否則 NameNode 會成為系統的瓶頸
  3. 副本存放在哪些 DataNode 上由 NameNode 來控制, 根據全局情況作出塊放置決定, 讀取文件時 NameNode 盡量讓用戶先讀取最近的副本, 降低讀取網絡開銷和讀取延時
  4. NameNode 全權管理數據庫的復制, 它周期性的從集群中的每個DataNode 接收心跳信合和狀態報告, 接收到心跳信號意味着 DataNode 節點工作正常, 塊狀態報告包含了一個該 DataNode 上所有的數據列表
NameNode DataNode
存儲元數據 存儲文件內容
元數據保存在內存中 文件內容保存在磁盤
保存文件, block, DataNode 之間的關系 維護了 block id 到 DataNode 文件之間的關系

1.4. HDFS 文件副本和 Block 塊存儲

所有的文件都是以 block 塊的方式存放在 HDFS 文件系統當中, 在 Hadoop1 當中, 文件的 block 塊默認大小是64M, hadoop2 當中, 文件的 block 塊大小默認是 128M, block 塊的大小可以通過 hdfs-site.xml 當中的配置文件進行指定

<property>
    <name>dfs.block.size</name>
    <value>塊大小 以字節為單位</value>
</property>

1.4.1. 引入塊機制的好處

  1. 一個文件有可能大於集群中任意一個磁盤
  2. 使用塊抽象而不是文件可以簡化存儲子系統
  3. 塊非常適合用於數據備份進而提供數據容錯能力和可用性

1.4.2. 塊緩存

通常 DataNode 從磁盤中讀取塊, 但對於訪問頻繁的文件, 其對應的塊可能被顯式的緩存在 DataNode 的內存中, 以堆外塊緩存的形式存在. 默認情況下,一個塊僅緩存在一個 DataNode 的內存中,當然可以針對每個文件配置 DataNode 的數量. 作業調度器通過在緩存塊的 DataNode 上運行任務, 可以利用塊緩存的優勢提高讀操作的性能.

例如:

連接(join) 操作中使用的一個小的查詢表就是塊緩存的一個很好的候選

用戶或應用通過在緩存池中增加一個 Cache Directive 來告訴 NameNode 需要緩存哪些文件及存多久. 緩存池(Cache Pool) 是一個擁有管理緩存權限和資源使用的管理性分組.

例如一個文件 130M, 會被切分成 2 個 block 塊, 保存在兩個 block 塊里面, 實際占用磁盤 130M 空間, 而不是占用256M的磁盤空間

1.4.3. HDFS 文件權限驗證

HDFS 的文件權限機制與 Linux 系統的文件權限機制類似

r:read  w:write  x:execute

權限 x 對於文件表示忽略, 對於文件夾表示是否有權限訪問其內容 如果 Linux 系統用戶 zhangsan 使用 Hadoop 命令創建一個文件, 那么這個文件在 HDFS 當中的 Owner 就是 zhangsan HDFS 文件權限的目的, 防止好人做錯事, 而不是阻止壞人做壞事. HDFS相信你告訴我你是誰, 你就是誰

1.5. HDFS 的元信息和 SecondaryNameNode

當 Hadoop 的集群當中, 只有一個 NameNode 的時候, 所有的元數據信息都保存在了 FsImage 與 Eidts 文件當中, 這兩個文件就記錄了所有的數據的元數據信息, 元數據信息的保存目錄配置在了 hdfs-site.xml 當中

<property>
  <name>dfs.namenode.name.dir</name>
  <value>file:///export/servers/hadoop-3.1.1/datas/namenode/namenodedatas</value>
</property>
<property>
  <name>dfs.namenode.edits.dir</name>
  <value>file:///export/servers/hadoop-3.1.1/datas/dfs/nn/edits</value>
</property>

1.5.1. FsImage 和 Edits 詳解

  • edits
    • edits 存放了客戶端最近一段時間的操作日志
    • 客戶端對 HDFS 進行寫文件時會首先被記錄在 edits 文件中
    • edits 修改時元數據也會更新
    • 每次 HDFS 更新時 edits 先更新后客戶端才會看到最新信息
  • fsimage
    • NameNode 中關於元數據的鏡像, 一般稱為檢查點, fsimage 存放了一份比較完整的元數據信息
    • 因為 fsimage 是 NameNode 的完整的鏡像, 如果每次都加載到內存生成樹狀拓撲結構,這是非常耗內存和CPU, 所以一般開始時對 NameNode 的操作都放在 edits 中
    • fsimage 內容包含了 NameNode 管理下的所有 DataNode 文件及文件 block 及 block 所在的 DataNode 的元數據信息.
    • 隨着 edits 內容增大, 就需要在一定時間點和 fsimage 合並

1.5.2. fsimage 中的文件信息查看

官方查看文檔

使用命令 hdfs oiv

cd /export/servers/hadoop-3.1.1/datas/namenode/namenodedatas
hdfs oiv -i fsimage_0000000000000000864 -p XML -o hello.xml

1.5.3. edits 中的文件信息查看

官方查看文檔

使用命令 hdfs oev

cd /export/servers/hadoop-3.1.1/datas/dfs/nn/edits
hdfs oev -i  edits_0000000000000000865-0000000000000000866 -o myedit.xml -p XML

1.5.4. SecondaryNameNode 如何輔助管理 fsimage 與 edits 文件?

  • SecondaryNameNode 定期合並 fsimage 和 edits, 把 edits 控制在一個范圍內

  • 配置 SecondaryNameNode

    • SecondaryNameNode 在 conf/masters 中指定

    • 在 masters 指定的機器上, 修改 hdfs-site.xml

      <property>
        <name>dfs.http.address</name>
        <value>host:50070</value>
      </property>
      
    • 修改 core-site.xml, 這一步不做配置保持默認也可以

      <!-- 多久記錄一次 HDFS 鏡像, 默認 1小時 -->
      <property>
        <name>fs.checkpoint.period</name>
        <value>3600</value>
      </property>
      
      <!-- 一次記錄多大, 默認 64M -->
      <property>
        <name>fs.checkpoint.size</name>
        <value>67108864</value>
      </property>
      
  1. SecondaryNameNode 通知 NameNode 切換 editlog
  2. SecondaryNameNode 從 NameNode 中獲得 fsimage 和 editlog(通過http方式)
  3. SecondaryNameNode 將 fsimage 載入內存, 然后開始合並 editlog, 合並之后成為新的 fsimage
  4. SecondaryNameNode 將新的 fsimage 發回給 NameNode
  5. NameNode 用新的 fsimage 替換舊的 fsimage
特點
  • 完成合並的是 SecondaryNameNode, 會請求 NameNode 停止使用 edits, 暫時將新寫操作放入一個新的文件中 edits.new
  • SecondaryNameNode 從 NameNode 中通過 Http GET 獲得 edits, 因為要和 fsimage 合並, 所以也是通過 Http Get 的方式把 fsimage 加載到內存, 然后逐一執行具體對文件系統的操作, 與 fsimage 合並, 生成新的 fsimage, 然后通過 Http POST 的方式把 fsimage 發送給 NameNode. NameNode 從 SecondaryNameNode 獲得了 fsimage 后會把原有的 fsimage 替換為新的 fsimage, 把 edits.new 變成 edits. 同時會更新 fstime
  • Hadoop 進入安全模式時需要管理員使用 dfsadmin 的 save namespace 來創建新的檢查點
  • SecondaryNameNode 在合並 edits 和 fsimage 時需要消耗的內存和 NameNode 差不多, 所以一般把 NameNode 和 SecondaryNameNode 放在不同的機器上

1.6 HDFS 文件寫入過程

  1. Client 發起文件上傳請求, 通過 RPC 與 NameNode 建立通訊, NameNode 檢查目標文件是否已存在, 父目錄是否存在, 返回是否可以上傳
  2. Client 請求第一個 block 該傳輸到哪些 DataNode 服務器上
  3. NameNode 根據配置文件中指定的備份數量及機架感知原理進行文件分配, 返回可用的 DataNode 的地址如: A, B, C
    • Hadoop 在設計時考慮到數據的安全與高效, 數據文件默認在 HDFS 上存放三份, 存儲策略為本地一份, 同機架內其它某一節點上一份, 不同機架的某一節點上一份。
  4. Client 請求 3 台 DataNode 中的一台 A 上傳數據(本質上是一個 RPC 調用,建立 pipeline ), A 收到請求會繼續調用 B, 然后 B 調用 C, 將整個 pipeline 建立完成, 后逐級返回 client
  5. Client 開始往 A 上傳第一個 block(先從磁盤讀取數據放到一個本地內存緩存), 以 packet 為單位(默認64K), A 收到一個 packet 就會傳給 B, B 傳給 C. A 每傳一個 packet 會放入一個應答隊列等待應答
  6. 數據被分割成一個個 packet 數據包在 pipeline 上依次傳輸, 在 pipeline 反方向上, 逐個發送 ack(命令正確應答), 最終由 pipeline 中第一個 DataNode 節點 A 將 pipelineack 發送給 Client
  7. 當一個 block 傳輸完成之后, Client 再次請求 NameNode 上傳第二個 block 到服務 1

1.7. HDFS 文件讀取過程

  1. Client向NameNode發起RPC請求,來確定請求文件block所在的位置;
  2. NameNode會視情況返回文件的部分或者全部block列表,對於每個block,NameNode 都會返回含有該 block 副本的 DataNode 地址; 這些返回的 DN 地址,會按照集群拓撲結構得出 DataNode 與客戶端的距離,然后進行排序,排序兩個規則:網絡拓撲結構中距離 Client 近的排靠前;心跳機制中超時匯報的 DN 狀態為 STALE,這樣的排靠后;
  3. Client 選取排序靠前的 DataNode 來讀取 block,如果客戶端本身就是DataNode,那么將從本地直接獲取數據(短路讀取特性);
  4. 底層上本質是建立 Socket Stream(FSDataInputStream),重復的調用父類 DataInputStream 的 read 方法,直到這個塊上的數據讀取完畢;
  5. 當讀完列表的 block 后,若文件讀取還沒有結束,客戶端會繼續向NameNode 獲取下一批的 block 列表;
  6. 讀取完一個 block 都會進行 checksum 驗證,如果讀取 DataNode 時出現錯誤,客戶端會通知 NameNode,然后再從下一個擁有該 block 副本的DataNode 繼續讀。
  7. read 方法是並行的讀取 block 信息,不是一塊一塊的讀取;NameNode 只是返回Client請求包含塊的DataNode地址,並不是返回請求塊的數據;
  8. 最終讀取來所有的 block 會合並成一個完整的最終文件。

1.9. HDFS 的 API 操作

1.9.1. 導入 Maven 依賴

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>
<dependencies>
  <dependency>
      <groupId>jdk.tools</groupId>
      <artifactId>jdk.tools</artifactId>
      <version>1.8</version>
      <scope>system</scope>
      <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.0.0</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>3.0.0</version>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs-client</artifactId>
      <version>3.0.0</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.0.0</version>
  </dependency>
  <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
  </dependency>
</dependencies>

1.9.2. 概述

在 Java 中操作 HDFS, 主要涉及以下 Class:

  • Configuration

    • 該類的對象封轉了客戶端或者服務器的配置
  • FileSystem

    • 該類的對象是一個文件系統對象, 可以用該對象的一些方法來對文件進行操作, 通過 FileSystem 的靜態方法 get 獲得該對象

      FileSystem fs = FileSystem.get(conf)
      
      • get 方法從 conf 中的一個參數 fs.defaultFS 的配置值判斷具體是什么類型的文件系統
      • 如果我們的代碼中沒有指定 fs.defaultFS, 並且工程 ClassPath 下也沒有給定相應的配置, conf 中的默認值就來自於 Hadoop 的 Jar 包中的 core-default.xml
      • 默認值為 file:///, 則獲取的不是一個 DistributedFileSystem 的實例, 而是一個本地文件系統的客戶端對象

1.9.3. 獲取 FileSystem 的幾種方式

  • 第一種方式
@Test
public void getFileSystem() throws URISyntaxException, IOException {
    Configuration configuration = new Configuration();
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), configuration);
    System.out.println(fileSystem.toString());
}
  • 第二種方式
@Test
public void getFileSystem2() throws URISyntaxException, IOException {
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020");
    FileSystem fileSystem = FileSystem.get(new URI("/"), configuration);
    System.out.println(fileSystem.toString());
}
  • 第三種方式
@Test
public void getFileSystem3() throws URISyntaxException, IOException {
    Configuration configuration = new Configuration();
    FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://192.168.52.250:8020"), configuration);
    System.out.println(fileSystem.toString());
}
  • 第四種方式
@Test
public void getFileSystem4() throws  Exception{
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020");
    FileSystem fileSystem = FileSystem.newInstance(configuration);
    System.out.println(fileSystem.toString());
}

1.9.4. 遍歷 HDFS 中所有文件

  • 遞歸遍歷
@Test
public void listFile() throws Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.100:8020"), new Configuration());
    FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/"));
    for (FileStatus fileStatus : fileStatuses) {
        if(fileStatus.isDirectory()){
            Path path = fileStatus.getPath();
            listAllFiles(fileSystem,path);
        }else{
            System.out.println("文件路徑為"+fileStatus.getPath().toString());

        }
    }
}
public void listAllFiles(FileSystem fileSystem,Path path) throws  Exception{
    FileStatus[] fileStatuses = fileSystem.listStatus(path);
    for (FileStatus fileStatus : fileStatuses) {
        if(fileStatus.isDirectory()){
            listAllFiles(fileSystem,fileStatus.getPath());
        }else{
            Path path1 = fileStatus.getPath();
            System.out.println("文件路徑為"+path1);
        }
    }
}
  • 使用 API 遍歷
@Test
public void listMyFiles()throws Exception{
    //獲取fileSystem類
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
    //獲取RemoteIterator 得到所有的文件或者文件夾,第一個參數指定遍歷的路徑,第二個參數表示是否要遞歸遍歷
    RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true);
    while (locatedFileStatusRemoteIterator.hasNext()){
        LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
        System.out.println(next.getPath().toString());
    }
    fileSystem.close();
}

1.9.5. 下載文件到本地

@Test
public void getFileToLocal()throws  Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
    FSDataInputStream open = fileSystem.open(new Path("/test/input/install.log"));
    FileOutputStream fileOutputStream = new FileOutputStream(new File("c:\\install.log"));
    IOUtils.copy(open,fileOutputStream );
    IOUtils.closeQuietly(open);
    IOUtils.closeQuietly(fileOutputStream);
    fileSystem.close();
}

1.9.6. HDFS 上創建文件夾

@Test
public void mkdirs() throws  Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
    boolean mkdirs = fileSystem.mkdirs(new Path("/hello/mydir/test"));
    fileSystem.close();
}

1.9.7. HDFS 文件上傳

@Test
public void putData() throws  Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
    fileSystem.copyFromLocalFile(new Path("file:///c:\\install.log"),new Path("/hello/mydir/test"));
    fileSystem.close();
}

1.9.8. 偽造用戶

  1. 停止hdfs集群,在node01機器上執行以下命令
cd /export/servers/hadoop-3.1.1
sbin/stop-dfs.sh
  1. 修改node01機器上的hdfs-site.xml當中的配置文件
cd /export/servers/hadoop-3.1.1/etc/hadoop
vim hdfs-site.xml
<property>
    <name>dfs.permissions.enabled</name>
    <value>true</value>
</property>
  1. 修改完成之后配置文件發送到其他機器上面去
scp hdfs-site.xml node02:$PWD
scp hdfs-site.xml node03:$PWD
  1. 重啟hdfs集群
cd /export/servers/hadoop-3.1.1
sbin/start-dfs.sh
  1. 隨意上傳一些文件到我們hadoop集群當中准備測試使用
cd /export/servers/hadoop-3.1.1/etc/hadoop
hdfs dfs -mkdir /config
hdfs dfs -put *.xml /config
hdfs dfs -chmod 600 /config/core-site.xml
  1. 使用代碼准備下載文件
@Test
public void getConfig()throws  Exception{
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop");
    fileSystem.copyToLocalFile(new Path("/config/core-site.xml"),new Path("file:///c:/core-site.xml"));
    fileSystem.close();
}

1.9.9. 小文件合並

由於 Hadoop 擅長存儲大文件,因為大文件的元數據信息比較少,如果 Hadoop 集群當中有大量的小文件,那么每個小文件都需要維護一份元數據信息,會大大的增加集群管理元數據的內存壓力,所以在實際工作當中,如果有必要一定要將小文件合並成大文件進行一起處理

在我們的 HDFS 的 Shell 命令模式下,可以通過命令行將很多的 hdfs 文件合並成一個大文件下載到本地

cd /export/servers
hdfs dfs -getmerge /config/*.xml ./hello.xml

既然可以在下載的時候將這些小文件合並成一個大文件一起下載,那么肯定就可以在上傳的時候將小文件合並到一個大文件里面去

@Test
public void mergeFile() throws  Exception{
    //獲取分布式文件系統
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop");
    FSDataOutputStream outputStream = fileSystem.create(new Path("/bigfile.xml"));
    //獲取本地文件系統
    LocalFileSystem local = FileSystem.getLocal(new Configuration());
    //通過本地文件系統獲取文件列表,為一個集合
    FileStatus[] fileStatuses = local.listStatus(new Path("file:///F:\\傳智播客大數據離線階段課程資料\\3、大數據離線第三天\\上傳小文件合並"));
    for (FileStatus fileStatus : fileStatuses) {
        FSDataInputStream inputStream = local.open(fileStatus.getPath());
       IOUtils.copy(inputStream,outputStream);
        IOUtils.closeQuietly(inputStream);
    }
    IOUtils.closeQuietly(outputStream);
    local.close();
    fileSystem.close();
}

下一遍將介紹 MapReduce


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM