hadoop核心組件(一)


  hadoop的核心組件:hdfs(分布式文件系統)、mapreduce(分布式計算框架)、Hive(基於hadoop的數據倉庫)、HBase(分布式列存數據庫)、Zookeeper(分布式協作服務)、Sqoop(數據同步工具)和Flume(日志手機工具)
 
hdfs(分布式文件系統):
由client、NameNode、DataNode組成
  • client負責切分文件,並與NameNode交互,獲取文件位置;與DataNode交互,讀取和寫入數據
  • NameNode是Master節點,管理HDFS的名稱空間和數據塊映射信息,配置副本策略,處理客戶端請求
  • DataNode是Slave節點,存儲實際數據,匯報存儲信息給NameNode
  • DataNode與NameNode保持心跳,提交block列表
在hadoop1.x的時候還有Secondary NameNode,負責輔助NameNode,分擔其工作量;定期合並fsimage和fsedits,推送給NameNode;緊急情況下,可輔助恢復NameNode
 
存儲模型
(1)文件線性切割成Block  offset
(2)Block分散存儲在集群節點中,Block是HDFS的基本存儲單元,默認大小是64M
(3)單一文件Block大小一致,文件與文件可以不一致
(4)Block可設置副本數(小於節點數),分散在不同節點
(5)文件上傳可以設置Block大小和副本數
(6)已上傳的文件Block副本數可以調整,大小不變
(7)只支持一次寫入多次讀取,同一時刻只有一個寫入者
(8)可以append追加數據
 
架構模型
(1)NameNode節點保存文件元數據
(2)DataNode節點保存文件Block數據
(3)DataNode與NameNode保持心跳,提交Block列表
(4)HdfsClient與NameNode交互元數據信息
(5)HdfsClient與DataNode交互文件Block數據
 
hdfs結構
一、NameNode(不會與磁盤發生交換)
    (1)基於內存存儲
  • 只存在內存中
  • 持久化
    • 啟動后, 元數據(metadate)信息加載到內存
    • metadata的磁盤文件名為”fsimage”
    • Block的位置信息不會保存到fsimage
    • (journalNode的作用是存放EditLog的)edits記錄對metadata的操作日志
 
    (2)功能
  • 接收客戶端讀寫
  • 收集DataNode匯報的block列表信息
    (3) metadata
  • 文件ownership, permissions(文件所有權、權限)
  • 文件大小, 時間
  • (block列表,block偏移量)--->會持久化, 位置信息--->不會持久化(啟動時候由DataNode匯報過來)
  • block每個副本位置(dataNode上報)
二、DataNode
    (1)本地文件形式存儲block
    (2)存儲Block的元數據信息文件
    (3)啟動DN時會向NN匯報block信息
    (4)通過向NameNode發送心跳(3秒一次),如果NameNode 10分鍾沒有收到,則認為已經lost,並copy其上的block到其它DN
三、SecondaryNameNode/Qurom Journal Manager
    合並時機
        fs.checkpoint.period  3600s
        fs.checkpoint.size  64MB
四、ZooKeeper Failover Controller(HDFS 2.0 HA)
    (1)監控NameNode健康狀態
    (2)向Zookeeper注冊NameNode
    (3)NameNode掛掉后,ZKFC為NameNode競爭鎖,獲得ZKFC 鎖的NameNode變為active
 
5、Block副本放置位置
(1)第一個副本:放置在上傳文件的DN;如果是集群外提交,則隨機挑選一台磁盤不太滿,CPU不太忙的節點
(2)第二個副本:放置在於第一個副本不同的機架的節點上
(3)第三個副本:與第二個副本相同機架的節點
(4)更多副本:隨機節點
 
6、安全模式
(1)NameNode啟動, fsimage載入內存, 執行edits
(2)成功建立元數據映射后, 創建新的fsimage文件(無需SNN)和空的edits
(3)檢查副本數, 數量正常后,過若干時間, 解除安全模式
 
7、優缺點
優點:
    高容錯性(多副本, 自動恢復)
    適合批處理(計算移動, 數據位置暴露給計算框架(block))
    適合大數據處理(GB TB PB級數據)
    可構建在廉價機器上
    高吞吐
缺點:
    高延遲
    小文件存取(占用namenode內存, 尋道時間超過讀取時間)
    並發寫入、文件隨機修改(一個文件一個寫入者, 只能append)
 
hdfs寫流程

 

client切分文件與NanmeNode交互,獲取DataNode列表,驗證DataNode后連接DataNode,各節點之間兩兩交互,確定可用后,
client以更小單位流式傳輸數據;
Block傳輸數據結束后,DataNode向NameNode匯報Block信息,DataNode向Client匯報完成,Client向NameNode匯報完成,獲
取去下一個Block存放的DataNode列表,循環以上步驟,最終client匯報完成,NameNode會在寫流程更新文件狀態。
 
hdfs讀流程
 
client與NameNode交互,獲取Block存放的DataNode列表(Block副本的位置信息),線性和DataNode交互,獲取Block,最終
合並為一個文件,其中,在Block副本列表中按距離擇優選取DataNode節點獲取Block塊。
 
mapreduce(分布式計算框架)

 MR運行原理:

    1、客戶端提交作業之前,檢查輸入輸出路徑,首先創建切片列表
            反射出作業中設置的input對象,默認是TextInputFormat類
            通過input類得到切片列表(getSpilits()方法)
                        最小值 minSize 默認為1,如果設置就取設置的值
                        最大值 maxSize 默認為long的最大值
                        根據輸入路徑取出文件,獲取每個文件的所有block列表,接着創建splits列表(包含文件名,偏移量,長度和位置信息)
                        切片大小根據最大最小值取,默認為block的大小
                   一個split對應一個map
        提交作業到集群(submitJob()方法)
    2、mapInput:
            input.initialize    輸入初始化
                    拿到taskContext(上下文)
                    創建mapper(默認為Mapper類,一般取用戶設置的)
                    獲取InputFomat類(輸入格式化的類)
                    獲取split
                    根據以上信息創建input(NewTrackingRecordReader)
                        input初始化
                           獲取split的開始和結束位置和文件,開啟對文件的IO流,將起始偏移量個IO設置一下
                               如果不是第一個切片(split),每次讀取放棄第一行(跳過第一行數據),只有第一個切片才會讀取第一行數據
            mapper.run
    3、output:
            MapOutputBuffer初始化
                    環形緩沖區的閾值0.8、大小(100M)    默認值
                    sorter :QuickSort算法
                    反射獲取比較器 OutputKeyComparator
                    排序,溢寫,一些一次觸發一次combiner
                    溢寫達到3次的時候還會觸發一次combiner
            通過反射獲取Partitioner類,默認為HashPartitoner
            write(k,v)
                collector.collect(key,value,partition)
            output.close()
                merger
                如果numSplits<minSpillsForCombiner 判斷溢寫的次數是不是小於設置的合並的溢寫次數(默認是3),成立的話combiner
    4、reduce:
            shuffle:copy
            sort:SecondarySort
            reduce
 
1、mapreduce shuffle
(1)maptask的輸入是hdfs上的block塊,maptask只讀取split,block與split的對應關系默認是一對一
(2)進過map端的運行后,輸出的格式為key/value,Mapreduce提供接口partition,他的作用是根據maptask輸出的key hash后與
reduce數量取模,來決定當前的輸出對應到哪個reduce處理,也可以自定義partition
(3)map運行后的數據序列化到緩沖區,默認這個緩沖區大小為100M,作用是收集這個map的結果,當數據達到溢寫比例
(默認是spill.percent=0.8)后,所定這80M的內存,對這80M內存中的key做排序(sort),maptask的輸出結果還可以往剩下的20M內
存中寫,互不影響。之后執行溢寫的線程會往磁盤中寫數據。每次溢寫都會產生一個溢寫小文件,map執行完后,會合並這些溢寫小文件,
這個過程叫Merge。
(4)如果客戶端設置了Combiner,那么會優化MapReduce的中間結果,合並map端的數據(相當於reduce端的預處理),Combiner
不能改變最終的計算結果。
(5)reduce在執行之前就是從各個maptask執行完后的溢寫文件中拿到所對應的數據,然后做合並(Merge),最終形成的文件作為
reduce的輸入文件,這個過程是歸並排序。最后就是reduce計算,把結果放到hdfs上面。
 
hdfs參數調優
io.file.buffer.size:4096 (core-default.xml) SequenceFiles在讀寫中可以使用緩存大小,可減少I/O次數;在大型Hadoop cluster,建議可設定為65536-131072
dfs.blockes:134217728( hdfs-default.xml ) hdfs中一個文件的Block塊的大小,CDH5中默認為128M;設置太大影響map同時計算的數量,設置較少會浪費map個數資源
mapred.reduce.tasks(mapreduce.job.reduces):1 默認啟動的reduce數
mapreduce.task.io.sort.factor:10 reduce task中合並文件時,一次合並的文件數據
mapred.child.java.opts:-Xmx200m jvm啟動子線程可以使用的最大內存
mapred.reduce.parallel.copies:5 Reduce copy數據的線程數量,默認值是5
 
mapreduce.tasktracker.http.threads:40 map和reduce是通過http進行傳輸的,這個設置傳輸的並行線程數
mapreduce.map.output.compress:flase map輸出是否進行壓縮,如果壓縮就會多耗cpu,但是減少傳輸時間,如果不壓縮,就需要較多的傳輸帶寬。配合 mapreduce.map.output.compress.codec使用,默認是 org.apache.hadoop.io.compress.DefaultCodec,可以根據需要設定數據壓縮方式。
mapreduce.tasktracker.tasks.reduce.maximum:2 一個tasktracker並發執行的reduce數,建議為cpu核數
mapreduce.map.sort.spill.percent:0.8 溢寫比例
min.num.spill.for.combine:3 spill的文件達到設置的參數進行combiner
 
避免推測執行
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
 
自定義partition
適當添加combiner
 
自定義reduce端的grouping Comparator
- mapred.reduce.tasks:手動設置reduce個數
- mapreduce.map.output.compress:map輸出結果是否壓縮
  - mapreduce.map.output.compress.codec
- mapreduce.output.fileoutputformat.compress:job輸出結果是否壓縮
  - mapreduce.output.fileoutputformat.compress.type
  - mapreduce.output.fileoutputformat.compress.codec
 
9、調優文件以及參數
 
一、調優的目的
    充分的利用機器的性能,更快的完成mr程序的計算任務。甚至是在有限的機器條件下,能夠支持運行足夠多的mr程序。
 
二、調優的總體概述
    從mr程序的內部運行機制,我們可以了解到一個mr程序由mapper和reducer兩個階段組成,其中mapper階段包括數據的讀取、map處理以及寫出操作(排序和合並/sort&merge),而reducer階段包含mapper輸出數據的獲取、數據合並(sort&merge)、reduce處理以及寫出操作。那么在這七個子階段中,能夠進行較大力度的進行調優的就是map輸出、reducer數據合並以及reducer個數這三個方面的調優操作。也就是說雖然性能調優包括cpu、內存、磁盤io以及網絡這四個大方面,但是從mr程序的執行流程中,我們可以知道主要有調優的是內存、磁盤io以及網絡。在mr程序中調優,主要考慮的就是減少網絡傳輸和減少磁盤IO操作,故本次課程的mr調優主要包括服務器調優、代碼調優、mapper調優、reducer調優以及runner調優這五個方面。
 
三、服務器調優
    服務器調優主要包括服務器參數調優和jvm調優。在本次項目中,由於我們使用hbase作為我們分析數據的原始數據存儲表,所以對於hbase我們也需要進行一些調優操作。除了參數調優之外,和其他一般的java程序一樣,還需要進行一些jvm調優。
    hdfs調優
    1. dfs.datanode.failed.volumes.tolerated: 允許發生磁盤錯誤的磁盤數量,默認為0,表示不允許datanode發生磁盤異常。當掛載多個磁盤的時候,可以修改該值。
    2. dfs.replication: 復制因子,默認3
    3. dfs.namenode.handler.count: namenode節點並發線程量,默認10
    4. dfs.datanode.handler.count:datanode之間的並發線程量,默認10。
    5. dfs.datanode.max.transfer.threads:datanode提供的數據流操作的並發線程量,默認4096。
        一般將其設置為linux系統的文件句柄數的85%~90%之間,查看文件句柄數語句ulimit -a,修改vim /etc/security/limits.conf, 不能設置太大文件末尾,添加
            * soft nofile 65535
            * hard nofile 65535
            注意:句柄數不能夠太大,可以設置為1000000以下的所有數值,一般不設置為-1。
            異常處理:當設置句柄數較大的時候,重新登錄可能出現unable load session的提示信息,這個時候采用單用戶模式進行修改操作即可。
                單用戶模式:
                    啟動的時候按'a'鍵,進入選擇界面,然后按'e'鍵進入kernel修改界面,然后選擇第二行'kernel...',按'e'鍵進行修改,在最后添加空格+single即可,按回車鍵回到修改界面,最后按'b'鍵進行單用戶模式啟動,當啟動成功后,還原文件后保存,最后退出(exit)重啟系統即可。
    6. io.file.buffer.size: 讀取/寫出數據的buffer大小,默認4096,一般不用設置,推薦設置為4096的整數倍(物理頁面的整數倍大小)。
    mapreduce調優
    1. mapreduce.task.io.sort.factor: mr程序進行合並排序的時候,打開的文件數量,默認為10個.
    2. mapreduce.task.io.sort.mb: mr程序進行合並排序操作的時候或者mapper寫數據的時候,內存大小,默認100M
    3. mapreduce.map.sort.spill.percent: mr程序進行flush操作的閥值,默認0.80。
    4. mapreduce.reduce.shuffle.parallelcopies:mr程序reducer copy數據的線程數,默認5。
    5. mapreduce.reduce.shuffle.input.buffer.percent: reduce復制map數據的時候指定的內存堆大小百分比,默認為0.70,適當的增加該值可以減少map數據的磁盤溢出,能夠提高系統性能。
    6. mapreduce.reduce.shuffle.merge.percent:reduce進行shuffle的時候,用於啟動合並輸出和磁盤溢寫的過程的閥值,默認為0.66。如果允許,適當增大其比例能夠減少磁盤溢寫次數,提高系統性能。同mapreduce.reduce.shuffle.input.buffer.percent一起使用。
    7. mapreduce.task.timeout:mr程序的task執行情況匯報過期時間,默認600000(10分鍾),設置為0表示不進行該值的判斷。
    
四、代碼調優
    代碼調優,主要是mapper和reducer中,針對多次創建的對象,進行代碼提出操作。這個和一般的java程序的代碼調優一樣。
    
五、mapper調優
    mapper調優主要就是就一個目標:減少輸出量。我們可以通過增加combine階段以及對輸出進行壓縮設置進行mapper調優。
    combine介紹:
        實現自定義combine要求繼承reducer類,特點:
        以map的輸出key/value鍵值對作為輸入輸出鍵值對,作用是減少網絡輸出,在map節點上就合並一部分數據。
        比較適合,map的輸出是數值型的,方便進行統計。
    壓縮設置:
        在提交job的時候分別設置啟動壓縮和指定壓縮方式。
 
六、reducer調優
    reducer調優主要是通過參數調優和設置reducer的個數來完成。
    reducer個數調優:
        要求:一個reducer和多個reducer的執行結果一致,不能因為多個reducer導致執行結果異常。
        規則:一般要求在hadoop集群中的執行mr程序,map執行完成100%后,盡量早的看到reducer執行到33%,可以通過命令hadoop job -status job_id或者web頁面來查看。
            原因: map的執行process數是通過inputformat返回recordread來定義的;而reducer是有三部分構成的,分別為讀取mapper輸出數據、合並所有輸出數據以及reduce處理,其中第一步要依賴map的執行,所以在數據量比較大的情況下,一個reducer無法滿足性能要求的情況下,我們可以通過調高reducer的個數來解決該問題。
        優點:充分利用集群的優勢。
        缺點:有些mr程序沒法利用多reducer的優點,比如獲取top n的mr程序。
 
七、runner調優
    runner調優其實就是在提交job的時候設置job參數,一般都可以通過代碼和xml文件兩種方式進行設置。
    1~8詳見ActiveUserRunner(before和configure方法),9詳解TransformerBaseRunner(initScans方法)
    1. mapred.child.java.opts: 修改childyard進程執行的jvm參數,針對map和reducer均有效,默認:-Xmx200m
    2. mapreduce.map.java.opts: 需改map階段的childyard進程執行jvm參數,默認為空,當為空的時候,使用mapred.child.java.opts。
    3. mapreduce.reduce.java.opts:修改reducer階段的childyard進程執行jvm參數,默認為空,當為空的時候,使用mapred.child.java.opts。
    4. mapreduce.job.reduces: 修改reducer的個數,默認為1。可以通過job.setNumReduceTasks方法來進行更改。
    5. mapreduce.map.speculative:是否啟動map階段的推測執行,默認為true。其實一般情況設置為false比較好。可通過方法job.setMapSpeculativeExecution來設置。
    6. mapreduce.reduce.speculative:是否需要啟動reduce階段的推測執行,默認為true,其實一般情況設置為fase比較好。可通過方法job.setReduceSpeculativeExecution來設置。
    7. mapreduce.map.output.compress:設置是否啟動map輸出的壓縮機制,默認為false。在需要減少網絡傳輸的時候,可以設置為true。
    8. mapreduce.map.output.compress.codec:設置map輸出壓縮機制,默認為org.apache.hadoop.io.compress.DefaultCodec,推薦使用SnappyCodec
    9. hbase參數設置
        由於hbase默認是一條一條數據拿取的,在mapper節點上執行的時候是每處理一條數據后就從hbase中獲取下一條數據,通過設置cache值可以一次獲取多條數據,減少網絡數據傳輸。
 
源碼:
1、設置map端的數量:mapreduce.input.fileinputformat.split.minsize
位置FileInputFormat.getSplits()方法
(1)輸入文件size巨大,但不是小文件
減小map的數量:增大mapred.min.split.size的值
(2)輸入文件數量巨大,且都是小文件
使用FileInputFormat衍生的CombineFileInputFormat將多個input path合並成一個InputSplit送給mapper處理,從而減少mapper的數量
 
2、增加Map-Reduce job 啟動時創建的Mapper數量
可以通過減小每個mapper的輸入做到,即減小blockSize或者減小mapred.min.split.size的值,設置blockSize一般不可行


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM