MapReduce 中如何處理HBase中的數據?如何讀取HBase數據給Map?如何將結果存儲到HBase中?
Mapper類:包括一個內部類(Context)和四個方法(setup,map,cleanup,run);
setup,cleanup用於管理Mapper生命周期中的資源。setup -> map -> cleanup , run方法執行了這個過程;
map方法用於對一次輸入的key/value對進行map動作,對應HBase操作也就是一行的處理;
job的配置:
1. TableInputFormat完成了什么功能?
(1)通過設置conf.set(TableInputFormat.INPUT_TABLE,"udc_sell");設定HBase的輸入表;
設置conf.set(TableInputFormat.SCAN, TableMRUtil.convertScanToString(scan));設定對HBase輸入表的scan方式;
(2)通過TableInputFormat.setConf(Configration conf)方法初始化scan對象;
scan對象是從job中設置的對象,以字符串的形式傳給TableInputFormat,在TableInputFormat內部將scan字符創轉換為scan對象
* TableMapReduceUtily有兩個方法:convertScanToString和convertStringToScan作用?
將scan實例轉換為Base64字符串 和將Base64字符串還原為scan實例;
(3)TableInputFormat繼承了TableInputFormatBase實現了InputFormat抽象類的兩個抽象方法:
getSplits()和createRecordReader()方法:
A:getSplits()斷定輸入對象的切分原則:對於TableInputFormatBase,會遍歷HBase相應表的所有HRegion,每一 個HRegion都會被分成一個split,
所以切分的塊數是與表中HRegion的數目是相同的;
InputSplit split = new TableSplit(table.getTableName(),splitStart, splitStop, regionLocation);
在split中只會記載HRegion的其實rowkey和終止rowkey,具體的去讀取這篇區域的數據是createRecordReader()實現的。
計算出來的每一個分塊都將被作為一個map Task的輸入;
Q:但是分出的塊分給那台機器的那個task去執行Map,即jobTracker如何調度任務給taskTracker?
A: 需要進一步了解Map的本地化運行機制和jobTracker的調度算法;(可能是就近原則)
對於一個map任務,jobtracker會考慮tasktracker的網絡位置,並選取一個距離其輸入分片文件最近的tasktracker。在最理 想 的情況下,任務是數據本地化的(data- local),也就是任務運行在輸入分片所在的節點上。同樣,任務也可能是機器本地化的:任務和輸入分片在同一個機架,但不在同 一個節點上。
reduce任務,jobtracker簡單滴從待運行的reduce任務列表中選取下一個來運行,用不着考慮數據段餓本地化。
B:createRecordReader()按照必然格式讀取響應數據:接收split塊,返回讀取記錄的結果;
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, TaskAttemptContext context){
}
trr.init()返回的是這個分塊的起始rowkey的記錄;
RecordReader將一個split解析成<key,value>對的形式提供給map函數,key就是rowkey,value就是對應的一行數據;
RecordReader用於在划分中讀取<Key,Value>對。RecordReader有五個虛方法,分別是:
initialize:初始化,輸入參數包括該Reader工作的數據划分InputSplit和Job的上下文context;
nextKey:得到輸入的下一個Key,如果數據划分已經沒有新的記錄,返回空;
nextValue:得到Key對應的Value,必須在調用nextKey后調用;
getProgress:得到現在的進度;
close:來自java.io的Closeable接口,用於清理RecordReader。
2. job.setInputFormatClass(TableInputFormat.class);
3. TableMapReduceUtil.initTableReducerJob("daily_result", DailyReduce.class, job);
使用了該方法就不需要再單獨定義
initTableReducerJob()方法完成了一系列操作:
(1). job.setOutputFormatClass(TableOutputFormat.class); 設置輸出格式;
(2). conf.set(TableOutputFormat.OUTPUT_TABLE, table); 設置輸出表;
(3). 初始化partition