Hadoop 學習總結


Hadoop 關於

大數據概念

  • 不能使用一台機器進行處理數據
  • 大數據的核心是樣本=總體

大數據特性

  • 大量性(volume): 一般在大數據里,單個文件的級別至少為幾十,幾百GB以上
  • 快速性(velocity): 反映在數據的快速產生及數據變更的頻率上
  • 多樣性(variety): 泛指數據類型及其來源的多樣化,進一步可以把數據結構歸納為結構化(structured),半結構化(semi-structured),和非結構化(unstructured)
  • 易變性: 伴隨數據快速性的特征,數據流還呈現一種波動的特征。不穩定的數據流會隨着日,季節,特定事件的觸發出現周期性峰值
  • 准確性: 又稱為數據保證(data assurance)。不同方式,渠道收集到的數據在質量上會有很大差異。數據分析和輸出結果的錯誤程度和可信度在很大程度上取決於收集到的數據質量的高低
  • 復雜性: 體現在數據的管理和操作上。如何抽取,轉換,加載,連接,關聯以把握數據內蘊的有用信息已經變得越來越有挑戰性

關鍵技術

  1. 數據分布在多台機器上

    • 可靠性:每個數據塊都復制到多個節點
    • 性能:多個節點同時處理數據
  2. 計算隨數據走

    • 網絡IO速度 << 本地磁盤 IO 速度,大數據系統會盡量地將任務分配到離數據最近的機器上運行(程序運行時,將程序及其依賴包都復制到數據所在的機器運行)
    • 代碼向數據遷移,避免大規模數據時,造成大量數據遷移的情況,盡量讓一段數據的計算發生在同一台機器上
  3. 串行 IO 取代隨機 IO

    傳輸時間 << 尋道時間,一般數據寫入后不在修改

Hadoop 簡介

概念

Hadoop 可運行與一般的商用機器上,具有高容錯,高可靠性,高擴展等特點

特別適合寫一次,讀多次的場景

適用場景

  • 大規模數據
  • 流式數據(寫一次,讀多次)
  • 商用硬件(一般硬件)

不適用場景

  • 低延時的數據訪問
  • 大量的小文件
  • 頻繁修改文件(基本就是寫1次)

Hadoop 架構

img

  • HDFS:分布式文件存儲
  • YARN:分布式資源管理
  • MapReduce:分布式計算
  • Others:利用YARN的資源管理功能實現其他的數據處理方式

內部各個節點基本都是采用 Master-Worker 架構

Hadoop HDFS

Hadoop Distributed File System,分布式文件系統

HDFS 架構

hdfs-architecture

  • Block 數據塊

    1. 基本存儲單位,一般大小為 128M,配置大的塊主要因為:
      • 減少搜索時間,一般硬盤傳輸速率比尋道時間要快,大的塊可以減少尋道時間;
      • 減少管理塊的數據開銷,每個塊都需要在 NameNode 上有對應的記錄;
      • 對數據塊進行讀寫,減少建立網絡的連接成本。
    2. 一個大文件會被拆分為一個個的塊,然后存儲於不同的機器上。如果一個文件小於 Block 大小,那么實際占用空間為其文件的大小。
    3. 基本的讀寫單位,類似磁盤的頁,每次都是讀寫一個塊。
    4. 每個塊都會被復制到多台機器,默認復制3份。
  • NameNode

    1. 存儲文件的 metadata,運行時所有數據都保存到內存,整個HDFS可存儲的文件數受限於 NameNode 的內存大小。
    2. 一個Block在 NameNode 中對應一條記錄(一般一個Block占用150字節),如果是大量的小文件,會消耗大量內存。同時 map task 的數量使用 splits 來決定的,所以用 MapReduce 處理大量的小文件時,就會產生過多的 map task,線程管理開銷將會增加作業時間。處理大量小文件的速度遠遠小於處理同等大小的大文件的速度。因此 Hadoop 建議存儲大文件。
    3. 數據會定時保存到本地磁盤,但不保存 Block 的位置信息,而是由 DataNode 注冊時上報和運行時維護(NameNode 中與 DataNode 相關的信息並不保存到 NameNode 的文件系統中,而是 NameNode 每次重啟后,動態創建)。
    4. NameNode 失效則整個HDFS都失效了,所以要保證 NameNode 的可用性。
  • Secondary NameNode

    定時與 NameNode 進行同步(定期合並文件系統鏡像和編輯日志,然后把合並后的結果傳給 NameNode,替換其鏡像,並清空編輯日志,類似於 CheckPoint 機制),但 NameNode 失效后仍需要手工將其設置成主機。

  • DataNode

    1. 保存具體的 Block 數據。
    2. 負責數據的讀寫操作和復制操作。
    3. DataNode 啟動時會向 NameNode 報告當前存儲的數據塊信息,后續也會定時報告修改信息。
    4. DataNode 之間會進行通信,復制數據塊,保證數據的冗余性。

HDFS 寫文件

img

  1. 客戶端將文件寫入本地磁盤的文件中

  2. 當臨時文件大小達到一個Block大小時,HDFS Client 通知 NameNode,申請寫入文件

  3. NameNode 在HDFS的文件系統中創建一個文件,並把該 Block ID 和要寫入的 DataNode 的列表返回給客戶端

  4. 客戶端收到這些消息后,將臨時文件寫入 DataNodes

    1. 客戶端將文件內容寫入第一個 DataNode(一般以 4kb 單位進行傳輸)。
    2. 第一個 DataNode 接收后,將數據寫入本地磁盤,同時也傳輸給第二個 DataNode。
    3. 以此類推到最后一個 DataNode,數據在 DataNode 之間是通過 pipeline 的方式進行復制的。
    4. 后面的 DataNode 接受完數據后,都會發送一個確認給前一個 DataNode,最終第一個 DataNode 返回確認給客戶端。
    5. 當客戶端接收到整個 Block 的確認后,會向 NameNode 發送一個最終的確認信息。
    6. 如果寫入某個 DataNode 失敗,數據會繼續寫入其他的 DataNode。然后 NameNode 會找另一個好的 DataNode 繼續復制,以保證冗余性。
    7. 每個Block 都會有一個校驗碼,並存放到獨立的文件中,以便讀的時候驗證其完整性。
  5. 文件寫完后(客戶端關閉),NameNode 提交文件(這時文件才可見,如果提交前,NameNode 掛掉,那文件也就丟失了。

    fsync:只保證數據的信息寫到 NameNode 上,但並不保證數據已經被寫道 DataNode 中)

Rack Aware(機架感知)

  • 通過配置文件制定機架名和DNS的對應關系

  • 假設復制參數是3,在寫入文件時,會在本地的機架保存一份數據,然后再另一個機架內保存兩份數據(同機架內的傳輸速度快,從而提高性能)

  • 整個HDFS的集群,最好是負載均衡的,這樣才能盡量利用集群的優勢

HDFS 讀文件

img

  1. 客戶端向 NameNode 發送讀取請求。
  2. NameNode 返回文件的所有 Block 和這些 Block 所在的 DataNodes(包括復制節點)。
  3. 客戶端直接從 DataNode 中讀取數據,如果該 DataNode 讀取失敗(DataNode 失效或校驗碼不對),則從復制節點中讀取(如果讀取的數據就在本機,則直接讀取,否則通過網絡讀取)。

HDFS 可靠性

  1. DataNode 可以失效

    DataNode 會定時發送心跳到 NameNode。如果在一段時間內 NameNode 沒有收到 DataNode 的心跳信息,則認為其失效。此時 NameNode 就會將該節點的數據(從該節點的復制節點中獲取)復制到另外的 DataNode 中。

  2. 數據可以損壞

    無論是寫入時還是硬盤本身的問題,只要數據有問題(讀取時通過校驗碼來檢測),都可以通過其他的復制節點讀取,同時還會再復制一份到健康的節點中。

  3. NameNode 不可靠

HDFS 命令工具

  • fsck:檢查文件的完整性
  • start-balancer.sh:重新平衡 HDFS
  • hdfs dfs -copyFromLocal:從本地磁盤復制文件到 HDFS

Hadoop YARN

舊版架構

yarn-old-mapreduce

  • JobTracker:負責資源管理,跟蹤資源消耗和可用性,作業生命周期管理(調度作業任務,跟蹤進度,為任務提供容錯)
  • TaskTracker:加載或關閉任務,定時報告任務狀態

架構存在的問題:

  1. JobTracker 是 MapReduce 的集中式處理點,存在單點故障。
  2. JobTracker 完成了太多的任務,造成了過多的資源消耗,當 MapReduce Job 非常多的時候,會造成很大的內存開銷。這也是業界普遍總結出來 Hadoop 的 MapReduce 只能支持 4000 節點主機的上限。
  3. 在 TaskTracker 端,以 map/reduce task 的數目作為資源的表示過於簡單,沒有考慮到 cpu/內存 的占用情況。如果兩個大內存消耗的 task 被調度到一塊,很容易出現 OOM。
  4. 在 TaskTracker 端,把資源強制划分為 Map task slot 和 Reduce task slot,如果當系統中只有 Map task 或者只有 Reduce task 的時候,會造成資源的浪費,也就是集群資源利用的問題。

總的來說,就是 單點問題資源利用率問題

新版架構 YARN

yarn-architecture

yarn-architecture-physical

YARN 就是將 JobTracker 的職責進行拆分,將資源管理和任務調度拆分成獨立的進程:一個全局的資源管理(ResourceManager)和一個單個作業的管理(ApplicationMaster)。ResourceManager 和 NodeManager 提供計算資源的分配和管理,為 ApplicationMaster 完成應用程序的運行。

  • ResourceManager:全局資源管理和任務調度
  • NodeManager:單個節點的資源管理和監控
  • ApplicationMaster:單個作業的資源管理和任務監控
  • Container:資源申請的單位和任務運行的容器

新舊架構對比

hadoop-different

YARN 架構下形成了一個通用的資源管理平台和一個通用的應用計算平台,避免了舊架構的單點問題和資源利用率問題,同時也讓在其上在運行的應用不再局限於MapReduce形式。

YARN 基本流程

yarn-process

yarn-process-status-update

  1. Job Submission

    從 ResourceManager 中獲取一個 Application ID 檢查作業輸出配置,計算輸入分片、拷貝作業資源(Job jar、配置文件、分片信息)到 HDFS,以便后面任務的執行。

  2. Job Initialization

    • ResourceManager 將作業遞交給 Scheduler(有很多調度算法,一般是根據優先級)。

    • Scheduler為作業分配一個 Container,ResourceManager 就加載一個 application master process 並交給 NodeManager 管理。

    • ApplicationMaster 主要是創建一系列的監控進程來跟蹤作業的進度,同時獲取輸入分片,為每一個分片創建一個Map task 和響應的 Reduce task。

      ApplicationMaster 還決定如何運行作業,如果作業很小(可配置),則直接在同一個 JVM 下運行。

  3. Task Assignment

    ApplicationMaster 向 ResourceManager 申請資源(一個個的 Container,指定任務分配的資源要求),一般是根據 data locality 來分配資源。

  4. Task Execution

    ApplicationMaster 根據 ResourceManager 的分配情況,在對應的 NodeManager 中啟動 Container,從HDFS中讀取任務所需的資源(job jar,配置文件等),然后執行該任務。

  5. Progress and Status Update

    定時將任務的進度和狀態報告給 ApplicationMaster Client,定時向 ApplicationMaster 獲取整個任務的進度和狀態。

  6. Job Completion

    Client 定時檢查整個作業是否完成。作業完成后,會清空臨時文件、目錄等。

YARN ResourceManager

負責全局的資源管理和任務調度,把整個集群當成計算資源池,只關注分配,不管應用,且不負責容錯。

資源管理

  1. 以前資源是每個節點分成一個個的 Map slot 和 Reduce slot,現在是一個個 Container,每個 Container 可以根據需要運行 ApplicationMaster、Map、Reduce 或者任意程序
  2. 以前資源分配是靜態的,目的是動態的,資源利用率更高
  3. Container 是資源申請的單位,一個資源申請格式:<resource-name,priority,resource-requirement,number-of-containers>
    • resource-name:主機名、機架名或*(代表任意機器)
    • resource-requirement:目前只支持CPU和內存
  4. 用戶提交作業到 ResourceManager,然后再某個 NodeManager 上分配一個 Container 來運行 ApplicationMaster,ApplicationMaster 再根據自身程序需要向 ResourceManager 申請資源。
  5. YARN有一套 Container 的生命周期管理機制,而 ApplicationMaster 和其 Container 之間的管理是應用程序自己定義的。

任務調度

  1. 只關注資源的使用情況,根據需求合理分配資源。
  2. Scheduler 可以考慮申請的需要,在特定的機器上申請特定的資源(ApplicationMaster 負責申請資源時的數據本地化的考慮,ResourceManager將盡量滿足其申請需求,在制定的機器上分配 Container,從而減少數據移動)。

內部結構

yarn-resource-manager

  • Client Service:應用提交、終止、輸出信息(應用、隊列、集群等的狀態信息)。
  • Admin Service:隊列、節點、Client權限管理。
  • ApplicationMasterService:注冊、終止ApplicationMaster,獲取ApplicationMaster 的資源申請或取消的請求,並將其異步地傳給 Scheduler,單線程處理。
  • ApplicationMaster Liveliness Monitor:接收ApplicationMaster 的心跳消息,如果某個 ApplicationMaster 在一定時間內沒有發送心跳,則被認為任務失敗,其資源將會被回收,然后 ResourceManager 會重新分配一個 ApplicationMaster 運行該應用(默認嘗試 2次)。
  • Resource Tracker Service:注冊節點,接收各注冊節點的心跳消息。
  • NodeManagers Liveliness Monitor:監控每個節點的心跳消息,如果長時間沒有接收到心跳消息,則認為該節點無效,同時所有在該節點上的 Container 都標記成無效,也不會調度任務到該節點運行。
  • ApplicationManager:管理應用程序,記錄和管理已完成的應用。
  • ApplicationMaster Launcher:一個應用提交后,負責與 NodeManager 交互,分配 Container 並加載 ApplicationMaster,也負責終止或銷毀。
  • YarnScheduler:資源調度分配,有 FIFO,Fair,Capacity 方式
  • ContainerAllocationExpirer:管理已分配但沒有啟用的 Container,超過一定時間則將其回收。

YARN NodeManager

Container 管理

  1. 啟動時向 ResourceManager 注冊並定時發送心跳信息,等待 ResourceManager的指令。
  2. 監控 Container 的運行,維護 Container 的生命周期,監控 Container 的資源使用情況。
  3. 啟動或停止 Container,管理任務運行時的依賴包(根據 ApplicationMaster 的需要,啟動 Container 之前將需要的程序機器依賴包、配置文件等拷貝到本地)。

內部結構

yarn-node-manager

  • NodeStatusUpdater:啟動時向 ResourceManager 注冊,報告該節點的可用資源情況,通信的端口和后續狀態的維護。
  • ContainerManager:接收 RPC 請求(啟動、停止),資源本地換(下載應用需要的資源到本地,根據需要共享這些資源)
    • PUBLIC:/filecache
    • PRIVATE:/usercache//filecache
    • APPLICATION:/usercache//appcache//(在程序完成后會被刪除)
  • ContainersLauncher:加載或終止 Container
  • ContainerMonitor:監控 Container 的運行和資源使用情況
  • ContainerExecutor:和底層操作系統交互,加載要運行的程序

YARN ApplicationMaster

單個作業的資源管理和任務監控。

功能描述

  1. 計算應用的資源需求,資源可以是靜態或動態計算的,靜態的一般是 Client 申請時就指定了,動態則需要ApplicationMaster根據應用的運行狀態來決定。
  2. 根據數據來申請對應位置的資源 (Data Locality)
  3. 向 ResourceManager 申請資源,與 NodeManager 交互進行程序的運行和監控,監控申請資源的使用情況,監控作業進度。
  4. 跟蹤任務狀態和進度,定時向 ResourceManager 發送心跳消息,報告資源的使用情況和應用的進度信息
  5. 負責本作業內任務的容錯

ApplicationMaster 可以是用任何語言編寫的程序,它和 ResourceManager 和 NodeManager 之間是通過 ProtocolBuf 交互,以前是一個全局 JobTracker 負責的,現在每個作業都有一個,可伸縮性更強,至少不會應為作業太多,造成 JobTracker 瓶頸。同時將作業的邏輯放到一個獨立的 ApplicationMaster 中,使得靈活性更高,每個作業都可以有自己的處理方式,不用綁定到 MapReduce 的處理模式上。

如何計算資源需求

一般的 MapReduce 是根據 Block 數量來決定 Map 和 Reduce 的計算數量,然后一般的 Map 或 Reduce 就占用一個 Container。

如何發現數據的本地化

數據的本地化是通過 HDFS 的 Block 分片信息獲取的。

YARN Container

  1. 基本的資源單位(CPU、內存等)
  2. Container 可以加載任意程序,而且不限於 Java
  3. 一個 Node 可以包含多個 Container,也可以是一個大的 Container
  4. ApplicationMaster 可以根據需要,動態申請和釋放 Container

YARN Failover

失敗類型

  1. 程序問題
  2. 進程奔潰
  3. 硬件問題

失敗處理

任務失敗

  1. 運行時異常或者 JVM 退出都會報告給 ApplicationMaster。
  2. 通過心跳來檢查掛住的任務(Timeout),會檢查多次(可配置)才判斷任務是否失效。
  3. 一個作業的任務失敗率超過配置,則認為改作業失敗。
  4. 失敗的任務或作業都會有 ApplicationMaster 重新運行。

ApplicationMaster 失敗

  1. ApplicationMaster 定時發送心跳信息到 ResourceManager,通常一旦 ApplicationMaster 失敗,則認為失敗,但也可以通過配置多次后才失敗。
  2. 一旦失敗,ResourceManager 會啟動一個新的 ApplicationMaster。
  3. 新的 ApplicationMaster 負責恢復之前錯誤的 ApplicationMaster 狀態(yarn.app.mapreduce.am.job.recovery.enable=true),這一步是通過將應用運行狀態保存到共享的存儲上來實現的,ResourceManager不會負責任務狀態的保存和恢復。
  4. Client 也會定時向 ApplicationMaster 查詢進度和狀態,一旦發現其失敗,則向 ResourceManager 詢問新的 ApplicationMaster

NodeManager 失敗

  1. NodeManager 定時發送心跳到 ResourceManager,如果超過一段時間沒有收到心跳信息,ResourceManager 就會將其移除。
  2. 任何運行在 NodeManager 上的任務和 ApplicationMaster 都會在其他 NodeManager 上進行恢復。
  3. 如果某個 NodeManager 失敗的次數太多,ApplicationMaster 會將其加入黑名單(ResourceManager 沒有),任務調度時不再其上運行。

ResourceManager 失敗

  1. 通過 checkpoint 機制,定時將其狀態保存到磁盤,然后失敗的時候,重新運行。
  2. 通過 Zookeeper 同步狀態和實現透明的高可用(HA)

可以看出,一般的錯誤處理都是由當前模塊的父模塊進行監控(心跳)和恢復。而最頂端的模塊則通過定時保存,同步狀態和Zookeeper來實現 HA

Hadoop MapReduce

簡介

一種分布式的計算方式,指定一個 Map(映射) 函數,用來把一組鍵值對映射成一組新的鍵值對,指定並發的 Reduce(歸約) 函數,用來保證所有映射的鍵值對中的每一個共享相同的鍵組。

img

Map 輸出格式和 Reduce 輸入格式一定是相同的。

基本流程

  1. 讀取文件數據
  2. 進行 Map 處理
  3. 進行 Reduce 處理
  4. 把處理結果寫到文件中

img

詳細流程

img

多節點下的流程

img

主要過程

img

Map Side

  • Record Reader

    記錄閱讀器會翻譯由輸入格式生成的記錄,記錄閱讀器用於數據解析給記錄,並不分析記錄本身。記錄讀取器的目的是將數據解析成記錄,但不分析記錄本身。它將數據以鍵值對的形式傳輸給 Mapper。通常鍵是位置信息,值是構成記錄的數據存儲塊(自定義的記錄不再本文的討論范圍內)。

  • Map

    在映射器中用戶提供的代碼稱為中間對。對於鍵值的具體定義是慎重的,因為定義對於分布式任務的完成具有重要意義。鍵決定可數據分類的依據,而值決定了處理器中的分析信息。

  • Shuffle and Sort

    Reduce 任務以隨機和排序步驟開始,此步驟寫入輸出文件並下載到本地計算機。這些數據采用鍵進行排序以把等價秘鑰組合到一起。

  • Reduce

    Reduce 采用分組數據作為輸入,該功能傳遞鍵和此鍵相關值的迭代器。可以采用多種方式來匯總、過濾或者合並數據。當 Reduce 功能完成,就會發送0個或多個鍵值對。

  • 輸出格式

    輸出格式會轉換最終的鍵值對並寫入文件。默認情況下鍵和值以 tab 分割,各記錄以換行符分割。因此可以自定義更多輸出格式,最終數據會寫入 HDFS。類似記錄讀取(自定義的輸出格式不再本文的討論范圍內)。

MapReduce 讀取數據

通過 InputFormat 決定讀取的數據的類型,然后拆分成一個個 InputSplit,每個 InputSplit 對應一個 Map 處理,RecordReader 讀取 InputSplit 的內容給 Map。

InputFormat

決定讀取數據的格式,可以使文件或者數據庫等。

功能

  1. 驗證作業輸入的正確性,如格式等。
  2. 將輸入文件切割成邏輯分片(InputSplit),一個 InputSplit 將會被分配給一個獨立的 Map 任務。
  3. 提供 RecorderReader 實現,讀取 InputSplit 中的 K-V對 供 Mapper 使用。

方法

  • List getSplits():獲取由輸入文件計算出輸入分片(InputSplit),解決數據或文件分割成片問題。
  • RecordReader <K,V> createRecordReader():創建 RecordReader,從InputSplit中讀取數據,解決讀取分片中數據問題。

類結構

img

  • TextInputFormat:輸入文件中的每一行就是一個記錄,Key 是這一行的 byte offset,而 Value 是這一行的內容。
  • KeyValueTextFormat:輸入文件中每一行就是一個記錄,第一個分隔符字符切分每行。在分隔符字符之前的內容為 Key,在之后的為 Value。分隔符變量通過 key.value.separator.in.input.line 變量設置,默認為 \t 字符。
  • NLineInputFormat:與 TextInputFormat 一樣,但每個數據塊必須保證有且只有 N 行,mapred.line.input.format.linespermap 屬性,默認為 1。
  • SequenceFileInputFormat:一個用來讀取字符流數據的 InputFormat,<Key,Value> 為用戶自定義的。字符流數據是 Hadoop自定義的壓縮的二進制數據格式。它用來優化一個從 MapReduce 任務輸出到另一個 MapReduce 任務的輸入之間的數據傳輸過程。

InputSplit

代表一個個邏輯分片,並沒有真正存儲數據,只是提供了一個如何將數據分片的方法。

Split內有 Location 信息,有利於數據局部化。一個InputSplit交給一個單獨的 Map 處理。

public abstract class InputSplit {
	/**
	 * 獲取 Split 的大小,支持根據 size 對 InputSplit 排序。
	 */
    public abstract long getLength() throws IOException, InterruptedException;
    
    /**
	 * 獲取存儲該分片的數據所在的節點位置。
	 */
    public abstract String[] getLocations() throws IOException, InterruptedException;
}

RecordReader

將 InputSplit 拆分成一個個 <Key,Value>對給 Map 處理,也是實際的文件讀取分割對象。

常見問題

大文件如何處理

CombineFileInputFormat 可以將若干個 Split 打包成一個,目的是避免過多的 Map 任務(因為 Split 的數目決定了 Map 的數目,大量的 Mapper Task 創建銷毀開銷將是巨大的)。

如何計算 split

通常一個 split 就是一個Block(FileInputFormat 僅僅拆分比Block大的文件),這樣做的好處使得 Map 可以存儲有當前數據的節點上運行的本地的任務,而不需要通過網絡進行跨節點的任務調度。

通過mapred.min.split.size, mapred.max.split.sizeblock.size 來控制拆分的大小。

如果mapred.min.split.size 大於 block size,則會將兩個 Block 合成到一個 split,這樣有部分 Block 數據需要通過網絡讀取。

如果mapred.max.split.size 小於 block size,則會將一個 Block 拆成多個 split,增加了 Map 任務數(Map 對 split 進行計算,且上報結果,關閉當前計算打開新的 split 均需要耗費資源)。

先獲取文件在 HDFS 上的路徑和 Block 信息,然后根據 splitSize 對文件進行切分( splitSize = computeSplitSize(blockSize,minSize,maxSize) ),默認 splitSize 就等於 blockSize 的默認值( 128M)。

public List<InputSplit> getSplits(JobContext job) throws IOException {
    // 首先計算分片的最大和最小值。這兩個值將會用來計算分片的大小
	long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);
    
    // 拆分 splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for( FileStatus file : files ){
        Path path = file.getPath();
        long length = file.getLen();
        if( length != 0 ){
            FileSystem fs = path.getFileSystem(job.getConfiguration());
            // 獲取該文件所有的 Block 信息列表[hostname, offset, length]
            BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
            // 判斷文件是否可分割,通常是可分割的,但如果文件是壓縮的,將不可分割
            if( isSplitable(job, path) ){
                long blockSize = file.getBlockSize();
                // 計算分片大小,即 Math.max(minSize, Math.min(maxSize, blockSize));
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);
                
                long bytesRemaining = length;
                // 循環分片:當剩余數據與分片大小比值大於 Split_Slot 時,繼續分片;
                // 小於等於時,停止分片。
                while( ((double) bytesRemaining) / splitSize > SPLIT_SLOP ){
                    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                    splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
                    bytesRemaining -= splitSize;
                }
                // 處理剩下的數據,不足一個 Block 大小的
                if(bytesRemaining != 0){
                    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));
                }
            } else {
                // 不可拆分成 splits,整塊返回
                splits.add(makeSplit(path, 0, length, blkLocation[0].getHosts()));
            }
        } else {
            // 對於長度為 0 的文件,創建空 Hosts 列表,返回
            splits.add(makeSplit(path, 0, length, new String[0]));
        }
    }
    // 設置輸入文件的數量
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    LOG.debug("Total  of Splits:" + splits.size);
    return splits;
}

分片間的數據如何處理

Split 是根據文件大小分割的,而一般處理是根據分隔符進行分割的,這樣勢必存在一條記錄橫跨兩個 split。

img

解決辦法:只要不是第一個 split,都會遠程讀取一條記錄,忽略掉第一條記錄。

public class LineRecordReader extends RecordReader<LongWritable, Text> {
    public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength";
    private CompressionCodecFactory compressionCodecs = null;
   
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private int maxLineLength;
    private LongWritable key = null;
    private Text value = null;
    
    // initialize 函數即對 LineRecordReader 的一個初始化
    // 主要是計算分片的始末位置,打開輸入流以供讀取 K-V對,處理經過分片壓縮的情況等
    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throw IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.Max_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        
        // 打開文件,並定位到分片讀取的起始位置
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        
        boolean skipFirstLine = false;
        if(codec != null){
            // 文件是壓縮文件的話,直接打開文件
            in = new LineReader(codec.createInputStream(fileIn),job);
            end = Lone.MAX_VALUE;
        } else {
            // 只要不是第一個 split,則忽略本 split 的第一行數據
            if(start != 0){
                skipFirstLine = true;
                --start;
                // 定位到偏移位置,下面的讀取就會從偏移位置開始
                fileIn.seek(start);
            }
            in = new LineReader(fileIn, job);
        }
        
        if(skipFirstLine) {
            // 忽略第一行數據,重新定位 start
            start += in.readLine(new Text(), 0, (int)Math.min((long) Integer.MAX_VALUE, end-start));
        }
        this.pos = start;
    }
    
    public boolean nextKeyValue() throws IOException {
        if(key == null) {
            key = new LongWritable();
        }
        // key 即為偏移量
        key.set(pos);
        if(value == null){
            value = new Text();
        }
        int newSize = 0;
        while(pos < end){
            newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end-pos), maxLineLength));
            // 讀取數據長度為 0,則說明已經讀完
            if(newSize == 0){
                break;
            }
            pos += newSize;
            // 讀取的數據長度小於最大行長度,也說明已經讀取完畢
            if(newSize < maxLineLength){
                break;
            }
            // 執行到此處,說明改行數據沒讀完,繼續讀入
        }
        
        if(newSize == 0){
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }
}

MapReduce Mapper

主要是讀取InputSplit 的每一個Key-Value對,並進行處理。

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    /**
     * 預處理,僅在 map task 啟動時運行一次
     */
    protected void setup(Context context) throws IOException, InterruptedException{
    }
    
    /**
     * 對於InputSplit中的每一對<key, value>都會運行一次
     */
    @SuppressWarnings("unchecked")
    protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
    }

    /**
     * 掃尾工作,比如關閉流等
     */
    protected void cleanup(Context context) throws IOException, InterruptedException {
    }
    
    /**
     * map task的驅動器
     */
    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
        cleanup(context);
    }
}

public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    private RecordReader<KEYIN, VALUEIN> reader;
    private InputSplit split;
    
    /**
     * Get the input split for this map.
     */
    public InputSplit getInputSplit() {
        return split;
    }
    
    @Override
    public KEYIN getCurrentKey() throws IOException, InterruptedException {
        return reader.getCurrentKey();
    }
    
    @Override
    public VALUEIN getCurrentValue() throws IOException, InterruptedException {
        return reader.getCurrentValue();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        return reader.nextKeyValue();
    }
}

Hadoop Shuffle

對 Map 的結果進行排序並傳輸到 Reduce 進行處理。Map 的結果並不直接存放到硬盤,而是利用緩存做一些預排序處理。Map 會調用 Combiner 進行壓縮,按 Key 進行分區、排序等,盡量減少結果的大小。每個 Map 完成后都會通知 Task,然后 Reduce 就可以進行處理。

img

Map 端

  • 當 Map 程序開始產生結果的時候,並不是直接寫到文件的,而是利用緩存做一些排序方面的預處理操作。每個 Map 任務都有一個循環內存緩沖區(默認 100M),當緩存的內容達到80%時,后台線程開始將內容寫到文件,此時 Map 任務可以繼續輸出結果;但如果緩存區滿了,Map 任務則需要等待。
  • 寫文件使用 round-robin 方式。在寫入文件之前,現將數據按照 Reduce 進行分區。對於每一個分區,都會在內存中根據 Key 進行排序,如果配置了 Combiner,則排序后執行 Combiner(Combiner之后可以減少寫入文件和傳輸的數據)。
  • 每次結果到達緩存區的閾值時,都會創建一個文件,當 Map 結束后,可能會產生大量的文件。在 Map 完成前,會將這些文件進行合並和排序。如果文件的數量超過 3 個,則合並后會再次運行 Combiner(1/2 個文件就沒有必要了)。
  • 如果配置壓縮,則最終寫入的文件會先進行壓縮,這樣可以減少寫入和傳輸的數據。一旦 Map 完成,則通知任務管理器,此時 Reduce 就可以開始復制結果數據。

Reduce 端

  • Map 的結果文件存放到運行 Map 任務的機器的本地硬盤中。如果 Map 的結果很少,則直接放到內存中,否則寫入文件中。
  • 同時后台線程將這些文件進行合並和排序到一個更大的文件中(如果文件是壓縮的,則需要先解壓文件)
  • 當所有的 Map 結果都被復制和合並后,就會調用 Reduce 方法,Reduce 結果會寫入到 HDFS 中

調優

  • 一般的原則是給 Shuffle 分配盡可能多的內存,但前提要保證 Map、Reduce 任務有足夠的內存。
  • 對於 Map,主要就是避免把文件寫入磁盤,例如使用 Combiner,增大io.sort.mb 的值
  • 對於 Reduce,主要是把 Map 的結果盡可能地保存到內存中,同樣也要避免中間結果寫入磁盤。默認情況下,所有的內存都是分配給 Reduce 方法的,如果 Reduce 方法不怎么消耗內存,可以將 mapred.inmem.merge.threshold 設置為 0,mapred.job.reduce.input.buffer.percent 設成 1.0。
  • 在任務監控中可以通過 Spilled Records Counter 來監控寫入磁盤的數,但這個值是包括 Map 和 Reduce 的。對於 I/O 方面,可以 Map 的結果可以使用壓縮,同時增大 Buffer Size(io.file.buffer.size,默認4kb)。

配置

屬性 默認值 描述
io.sort.mb 100 映射輸出分類時所使用緩沖區的大小.
io.sort.record.percent 0.05 剩余空間用於映射輸出自身記錄.在1.X發布后去除此屬性.隨機代碼用於使用映射所有內存並記錄信息.
io.sort.spill.percent 0.80 針對映射輸出內存緩沖和記錄索引的閾值使用比例.
io.sort.factor 10 文件分類時合並流的最大數量。此屬性也用於reduce。通常把數字設為100.
min.num.spills.for.combine 3 組合運行所需最小溢出文件數目.
mapred.compress.map.output false 壓縮映射輸出.
mapred.map.output.compression.codec DefaultCodec 映射輸出所需的壓縮解編碼器.
mapred.reduce.parallel.copies 5 用於向reducer傳送映射輸出的線程數目.
mapred.reduce.copy.backoff 300 時間的最大數量,以秒為單位,這段時間內若reducer失敗則會反復嘗試傳輸
io.sort.factor 10 組合運行所需最大溢出文件數目.
mapred.job.shuffle.input.buffer.percent 0.70 隨機復制階段映射輸出緩沖器的堆棧大小比例
mapred.job.shuffle.merge.percent 0.66 用於啟動合並輸出進程和磁盤傳輸的映射輸出緩沖器的閥值使用比例
mapred.inmem.merge.threshold 1000 用於啟動合並輸出和磁盤傳輸進程的映射輸出的閥值數目。小於等於0意味着沒有門檻,而溢出行為由 mapred.job.shuffle.merge.percent單獨管理.
mapred.job.reduce.input.buffer.percent 0.0 用於減少內存映射輸出的堆棧大小比例,內存中映射大小不得超出此值。若reducer需要較少內存則可以提高該值.

Hadoop 編程

處理

  1. select:直接分析輸入數據,取出需要的字段數據即可
  2. where:也是對輸入數據處理的過程進行處理,判斷是否需要該數據
  3. aggregation:min,max,sum
  4. group by:通過 Reduce 實現
  5. sort
  6. join:Map join,Reduce join

Third-Party Libraries

第一種:指定依賴可以利用Public Cache

export LIBJARS=$MYLIB/commons-lang-2.3.jar, hadoop jar prohadoop-0.0.1-SNAPSHOT.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner -libjars $LIBJARS

第二種:包含依賴,則每次都需要拷貝

hadoop jar prohadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar org.aspress.prohadoop.c3. WordCountUsingToolRunner The dependent libraries are now included inside the application JAR file

Hadoop IO

  1. 輸入文件從 HDFS 進行讀取。
  2. 輸出文件會存入本地磁盤。
  3. Reducer 和 Mapper 間的網絡I/O,從 Mapper 節點得到 Reducer 的檢索文件。
  4. 使用 Reducer 實例從本地磁盤會讀數據。
  5. Reducer 輸出-回傳到 HDFS。

串行化

  • 傳輸、存儲都需要
  • Writable 接口
  • Avro 框架:IDL,版本支持,跨語言,JSON-linke

壓縮

  • 能夠減少磁盤的占用空間和網絡傳輸的量
  • Compressed Size,Speed,Splittable
  • gzip,bzip,LZO,LZ4,Snappy
  • 要比較各種壓縮算法的壓縮比和性能

重點:壓縮和拆分一般是沖突的(壓縮后的文件的 Block 是不能很好地拆分獨立運行,很多時候某個文件的拆分點是被拆分成兩個壓縮文件中,這是 Map 任務就無法處理,所以對於這些壓縮,Hadoop 往往是直接使用一個 Map 任務處理整個文件的分析。Map 的輸出結果也可以進行壓縮,這樣可以減少 Map 結果到 Reduce 的傳輸的數據量,加快傳輸速率

完整性

  • 磁盤和網絡很容易出錯,保證數據傳輸的完整性一般是通過CRC32這種校驗法
  • 每次寫數據到磁盤前都驗證一下,同時保存校驗碼
  • 每次讀取數據時,也驗證校驗碼
  • 同時每個 DataNode 都會定時檢查每一個 Block 的完整性
  • 當發現某個 Block 數據有問題時,也不是立刻報錯,而是先去 NameNode 找一塊該數據的完整備份進行恢復,不能恢復才報錯

Hadoop 安裝

安裝方式

  • 單節點安裝

    所有服務都運行在一個 JVM 中,適合調試、單元測試

  • 偽集群

    所有服務運行在一台機器中,每個服務都在獨立的 JVM 中,適合做簡單、抽樣測試

  • 多節點集群

    服務運行在不同的機器中,適合生產環境

配置公共賬號

方便主從服務器進行無密鑰通信,主要使用公鑰/私鑰機制。

  • 所有節點的賬號都一樣,在主節點上執行 ssh-keygen -t rsa 生成密鑰對。
  • 復制公鑰到每台目標節點中。

Hadoop 配置

配置文件

  • xxx-default.xml:只讀,默認的配置
  • xxx-site.xml:替換 default 中的配置
    • core-site.xml:配置公共屬性
    • hdfs-site.xml:配置HDFS
    • yarn-site.xml:配置YARN
    • mapred-site.xml:配置MapReduce

配置文件的順序

  1. 在 JobConf 中指定的
  2. 客戶端機器上的 xxx-site.xml中的配置
  3. Slave 節點上的 xxx-site.xml 中的配置
  4. xxx-default.xml 中的配置

如果某個屬性不想被覆蓋,可以將其設置為 final

<property>
	<name>{PROPERTY_NAME}</name>
 	<value>{PROPERTY_VALUE}</value>
	<final>true</final>
</property>

本文整理自 W3Cschool Hadoop 教程 (https://www.w3cschool.cn/hadoop/)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM