本文發表於本人博客。
在上一篇文章我寫了個簡單的WordCount程序,也大致了解了下關於mapreduce運行原來,其中說到還可以自定義分區、排序、分組這些,那今天我就接上一次的代碼繼續完善實現自定義分區。
首先我們明確一下關於中這個分區到底是怎么樣,有什么用處?回答這個問題先看看上次代碼執行的結果,我們知道結果中有個文件(part-r-00000),這個文件就是所有的詞的數量記錄,這個時候有沒什么想法比如如果我想把一些包含特殊的詞放置單獨的一個文件,其他我不關心的放置在另一個文件這樣我就好查看方便多了,又比如如果是統計關於人的某些愛好那我是不是可以把童年的放置在一個文件,成年的放置在一個文件等等這樣輸出結果。是,這個倒是非常有用哦輸出的結果就是最直接的了,那現在我們就來分析一下應該怎么搞怎么實現:
我們清楚,這個輸出文件是由reduce端輸出的,reduce端的數據是由map函數處理完通過shufflecopy至reduce端的,然而map端的輸出數量會對於reduce輸入的數量,那么map端會負責划分數據,在shuffle過程中有個步驟就是分區,我們先來看看上次代碼中使用的分區類HashPartitioner,看代碼:
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
這里出現了個numReduceTasks變量,這個是由哪里過來的呢,那就得看誰調用了這個方法了,看:MapTask.java就可以看到其write方法調用了,然而這個方法的partitions參數是由:
jobContext.getNumReduceTasks();
覺得,那我們繼續找下去這個變量是由mapred.reduce.tasks配置節點決定的默認是1。那現在我們雖然不知道(key.hashCode() & Integer.MAX_VALUE)值是多少但是%1我們可以知道結果就是0;現在我來繼承這個類(也可繼承其父類Partitioner<K, V>)重寫其getPartition方法來實現分區,看下面自定義分區MyPartition代碼:
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /** * 自定義分區類 * @author Liang * */ public class MyPartition extends HashPartitioner<Text, LongWritable> { @Override public int getPartition(Text key, LongWritable value, int numReduceTasks) { return key.toString().contains("luoliang") ? 0 : 1; } }
上面重寫getPartition函數,其中如果鍵中有字符串"luoliang"的鍵值就返回0否則其它返回1。執行后在(hdfs://hadoop-master:9000/mapreduce/output/
)會有2個文件,一個是part-r-00000,一個是part-r-00001。part-r-00000對應的是條件key.toString().contains("luoliang")為真的!
注意先要在mian函數中加入:
job.setJarByClass(Test.class);
還需要更改:
job.setPartitionerClass(MyPartition.class); job.setNumReduceTasks(2);
再把程序打包成jar.jar文件上傳至服務器使用命令運行:
hadoop jar jar.jar
如果本地調試或者運行會報錯必須打包至服務器運行,結果會生成有那下面2個文件如下:
part-r-00000 part-r-00001
這次先到這里。堅持記錄點點滴滴!