一、初步探索Partitioner
1.1 再次回顧Map階段五大步驟
在第四篇博文《初識MapReduce》中,我們認識了MapReduce的八大步湊,其中在Map階段總共五個步驟,如下圖所示:
其中,step1.3就是一個分區操作。通過前面的學習我們知道Mapper最終處理的鍵值對<key, value>,是需要送到Reducer去合並的,合並的時候,有相同key的鍵/值對會送到同一個Reducer節點中進行歸並。哪個key到哪個Reducer的分配過程,是由Partitioner規定的。在一些集群應用中,例如分布式緩存集群中,緩存的數據大多都是靠哈希函數來進行數據的均勻分布的,在Hadoop中也不例外。
1.2 Hadoop內置Partitioner
MapReduce的使用者通常會指定Reduce任務和Reduce任務輸出文件的數量(R)。用戶在中間key上使用分區函數來對數據進行分區,之后在輸入到后續任務執行進程。一個默認的分區函數式使用hash方法(比如常見的:hash(key) mod R)進行分區。hash方法能夠產生非常平衡的分區,鑒於此,Hadoop中自帶了一個默認的分區類HashPartitioner,它繼承了Partitioner類,提供了一個getPartition的方法,它的定義如下所示:
/** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
現在我們來看看HashPartitoner所做的事情,其關鍵代碼就一句:(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
這段代碼實現的目的是將key均勻分布在Reduce Tasks上,例如:如果Key為Text的話,Text的hashcode方法跟String的基本一致,都是采用的Horner公式計算,得到一個int整數。但是,如果string太大的話這個int整數值可能會溢出變成負數,所以和整數的上限值Integer.MAX_VALUE(即0111111111111111)進行與運算,然后再對reduce任務個數取余,這樣就可以讓key均勻分布在reduce上。
二、自己定制Partitioner
大部分情況下,我們都會使用默認的分區函數HashPartitioner。但有時我們又有一些特殊的應用需求,所以我們需要定制Partitioner來完成我們的業務。這里以第五篇—自定義數據類型處理手機上網日志為例,來對其中的日志內容做一個特殊的分區:
從上圖中我們可以發現,在第二列上並不是所有的數據都是手機號(例如:84138413並不是一個手機號),我們任務就是在統計手機流量時,將手機號碼和非手機號輸出到不同的文件中。
2.1 自定義KpiPartitioner
/* * 自定義Partitioner類 */ public static class KpiPartitioner extends Partitioner<Text, KpiWritable> { @Override public int getPartition(Text key, KpiWritable value, int numPartitions) { // 實現不同的長度不同的號碼分配到不同的reduce task中 int numLength = key.toString().length(); if (numLength == 11) { return 0; } else { return 1; } } }
這里按手機和非手機號碼的區分是按該字段的長度來划分,如果是11位則為手機號。接下來,就是重新修改run方法中的代碼:設置為打包運行,設置Partitioner為KpiPartitioner,設置ReducerTask的個數為2;
public int run(String[] args) throws Exception { // 首先刪除輸出目錄已生成的文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf()); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } // 定義一個作業 Job job = new Job(getConf(), "MyKpiJob"); // 分區需要設置為打包運行 job.setJarByClass(MyKpiJob.class); // 設置輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 設置自定義Mapper類 job.setMapperClass(MyMapper.class); // 指定<k2,v2>的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(KpiWritable.class); // 設置Partitioner job.setPartitionerClass(KpiPartitioner.class); job.setNumReduceTasks(2); // 設置Combiner job.setCombinerClass(MyReducer.class); // 設置自定義Reducer類 job.setReducerClass(MyReducer.class); // 指定<k3,v3>的類型 job.setOutputKeyClass(Text.class); job.setOutputKeyClass(KpiWritable.class); // 設置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); // 提交作業 System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; }
注意:分區的例子必須要設置為打成jar包運行!
2.2 打成jar包並在Hadoop中運行
(1)通過Eclipse導出jar包
(2)通過FTP上傳到Linux中,可以使用各種FTP工具,我一般使用XFtp。
(3)通過Hadoop Shell執行jar包中的程序
(4)查看執行結果文件:
首先是part-r-00000,它展示了手機號碼的統計結果
然后是part-r-00001,它展示了非手機號碼的統計結果
(5)通過Web接口驗證Partitioner的運行:通過訪問http://hadoop-master:50030
①是否有2個Reduce任務?
從圖中可以看出,總共有2個Reduce任務;
②Reduce輸出結果是否一致?
手機號碼有20條記錄,一致!
非手機號碼只有1條記錄,一致!
總結:分區Partitioner主要作用在於以下兩點
(1)根據業務需要,產生多個輸出文件;
(2)多個reduce任務並發運行,提高整體job的運行效率
參考資料
(1)吳超,《深入淺出Hadoop》:http://115.28.208.222/
(2)萬川梅、謝正蘭,《Hadoop應用開發實戰詳解(修訂版)》:http://item.jd.com/11508248.html
(3)Suddenly,《Hadoop日記Day17-分區》:http://www.cnblogs.com/sunddenly/p/4009568.html
(4)三劫散仙,《如何使用Hadoop中的Partitioner》:http://qindongliang.iteye.com/blog/2043136