第一部分:重要的組件
Combiner
•什么是Combiner
•combine函數把一個map函數產生的<key,value>對(多個key, value)合並成一個新的<key2,value2>. 將新的<key2,value2>作為輸入到reduce函數中,其格式與reduce函數相同。
•這樣可以有效的較少中間結果,減少網絡傳輸負荷。
•什么情況下可以使用Combiner
•可以對記錄進行匯總統計的場景,如求和。
•求平均數的場景就不可以使用了
Combiner執行時機
•運行combiner函數的時機有可能會是merge完成之前,或者之后,這個時機可以由一個參數控制,即
min.num.spill.for.combine(default 3)
•當job中設定了combiner,並且spill數最少有3個的時候,那么combiner函數就會在merge產生結果文件之前運行
•通過這樣的方式,就可以在spill非常多需要merge,並且很多數據需要做conbine的時候,減少寫入到磁盤文件的數據數量,同樣是為了減少對磁盤的讀寫頻率,有可能達到優化作業的目的。
•Combiner也有可能不執行, Combiner會考慮當時集群的負載情況。
Combiner如何使用
•代碼示例
•繼承Reducer類
public static class Combiner extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
}
}
•配置作業時加入conf.setCombinerClass(Combiner.class)
Partitioner
•什么是Partitioner
•Mapreduce 通過Partitioner 對Key 進行分區,進而把數據按我們自己的需求來分發。
•什么情況下使用Partitioner
•如果你需要key按照自己意願分發,那么你需要這樣的組件。
•例如:數據文件內包含省份,而輸出要求每個省份輸出一個文件。
•框架默認的HashPartitioner
•public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
Partitioner如何使用
•實現Partitioner接口覆蓋getPartition()方法
•配置作業時加入conf.setPartitionerClass(MyPartitioner.class);
•Partitioner示例
public static class MyPartitioner implements Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numPartitions) {
public int getPartition(Text key, Text value, int numPartitions) {
}
}
Partitioner需求示例
•需求描述
•數據文件中含有省份
•需要相同的省份送到相同的Reduce里
•從而產生不同的文件
•數據樣例
•1 liaoning
•1 代表該省份有多少個直轄市
•步驟
•實現Partitioner,覆蓋getPartition
•根據省份字段進行切分
RecordReader
•什么是RecordReader
•用於在分塊中讀取<Key,Value>對,也就是說每一次我們讀取一條記錄都會調用該類。
•主要是處理經過InputFormat分片完的數據
•什么時候使用RecordReader
•需要對輸入的數據按自己的需求處理
•如:要求輸入的key不是文件的偏移量而是文件的路徑或者名字
•系統默認為LineRecordReader
•按照每行的偏移量做為map輸出時的key值,每行的內容作為map的value值,默認的分隔符是回車和換行。
RecordReader需求示例
•需求
•更改map對應的輸入的<key,value>值,key對應的文件的路徑(或者是文件名),value對應的是文件的內容(content)。
•步驟
•重寫InputFormat不對文件切分
•重寫RecordReader
•在配置作業時使用自定義的組件進行數據處理
第二部分:Join
案例分析
•輸入為2個文件,文件一內容如下
•空格分割:用戶名 手機號 年齡
•內容樣例
•Tom 1314567890 14
•文件二內容
•空格分割:手機號 地市
•內容樣例
•13124567890 hubei
•需要統計出的匯總信息為 用戶名 手機號 年齡 地市
Map端Join
•設計思路
•使用DistributedCache.addCacheFile()將地市的文件加入到所有Map的緩存里
•在Map函數里讀取該文件,進行Join
• 將結果輸出到reduce
•需要注意的是
•DistributedCache需要在生成Job作業前使用
Reduce端Join
•設計思路
•Map端讀取所有文件,並在輸出的內容里加上標識代表數據時從哪個文件里來的
•在reduce對按照標識對數據進行保存
•然后根據Key的Join來求出結果直接輸出
第三部分:排序
普通排序
•Mapreduce本身自帶排序功能
•Text對象是不適合排序的,如果內容為整型不會安照編碼順序去排序
•一般情況下我們可以考慮以IntWritable做為Key,同時將Reduce設置成0 ,進行排序
部分排序
•即輸出的每個文件都是排過序的
•如果我們不需要全局排序,那么這是個不錯的選擇。
全局排序
•產生背景
•Hadoop平台沒有提供全局數據排序,而在大規模數據處理中進行數據的全局排序是非常普遍的需求。
•使用hadoop進行大量的數據排序排序最直觀的方法是把文件所有內容給map之后,map不做任何處理,直接輸出給一個reduce,利用hadoop的自己的shuffle機制,對所有數據進行排序,而后由reduce直接輸出。
•快速排序基本步驟就是需要現在所有數據中選取一個作為支點。然后將大於這個支點的放在一邊,小於這個支點的放在另一邊。
設想如果我們有
N
個支點(這里可以稱為標尺),就可以把所有的數據分成
N+1
個
part
,將這
N+1
個
part
丟給
reduce
,由
hadoop
自動排序,最后輸出
N+1
個內部有序的文件,再把這
N+1
個文件首尾相連合並成一個文件,收工
。
由此我們可以歸納出這樣一個用
hadoop
對大量數據排序的步驟:
1
)
對待排序數據進行抽樣;
2
)
對抽樣數據進行排序,產生標尺;
3
)
Map
對輸入的每條數據計算其處於哪兩個標尺之間;將數據發給對應區間
ID
的
reduce
4
)
Reduce
將獲得數據直接輸出。
•Hadoop 提供了Sampler接口可以返回一組樣本,該接口為Hadoop的采樣器。
public interface Sampler<K, V> {
K[] getSample(InputFormat<K, V> inf, Job job)
throws IOException, InterruptedException;
}
•Hadoop提供了一個TotalOrderPartitioner,可以使我們來實現全局排序。
二次排序
•產生背景
•MapReduce默認會對key進行排序
•將輸出到Reduce的values也進行預先的排序
•實現方式
•重寫Partitioner,完成key分區,進行第一次排序
•實現WritableComparator,完成自己的排序邏輯,完成key的第2次排序
•原理
•Map之前的數據
key1 1
key2 2
key2 3
key3 4
key1 2
•Mapduce只能排序key,所以為了二次排序我們要重新定義自己的key 簡單說來就是<key value> value ,組合完后
<key1 1 > 1
<key2 2 > 2
<key2 3 > 3
<key3 4> 4
<key1 2 > 2
•原理
•接下來實現自定義的排序類,分組類,數據變成
<key1 1 > 1
<key1 2 > 2
<key2 2 > 2
<key2 3 > 3
<key3 4> 4
•最后 reduce處理后輸出結果
key1 1
key1 2
key2 2
key2 3
key3 4
第四部分:計數器
•什么是計數器
計數器主要用來收集系統信息和作業運行信息,用於知道作業成功、失敗等情況,比日志更便利進行分析。
•內置計數器
•Hadoop內置的計數器,記錄作業執行情況和記錄情況。包括MapReduce框架、文件系統、作業計數三大類。
•計數器由關聯任務維護,定期傳遞給tasktracker,再由tasktracker傳給jobtracker。
•計數器可以被全局聚集。內置的作業計數器實際上由jobtracker維護,不必在整個網絡中傳遞。
•當一個作業執行成功后,計數器的值才是完整可靠的。
用戶自定義Java計數器
•MapReduce框架允許用戶自定義計數器
•計數器是全局使用的
•計數器有組的概念,可以由一個Java枚舉類型來定義
•如何配置
•0.20.2以下的版本使用Reporter,
•0.20.2以上的版本使用context.getCounter(groupName, counterName) 來獲取計數器配置並設置。
•動態計數器
•所謂動態計數器即不采用Java枚舉的方式來定義
•Reporter中的獲取動態計數器的方法
•public void incrCounter(String group,String counter,long amount)
組名稱,計數器名稱,計數值
•一些原則
•創建計數器時,盡量讓名稱易讀
•獲取計數器
•Web UI
•命令行 hadoop job-counter
•Java API
•Java API
•在作業運行完成后,計數器穩定后獲取。 使用job.getCounters()得到Counters
第五部分:合並小文件示例
•產生背景
•Hadoop不適合處理小文件
•會占用大量的內存空間
•解決方案
•文件內容讀取到SequenceFile內