本文發表於本人博客。
今天接着上次【Hadoop mapreduce自定義排序WritableComparable】文章寫,按照順序那么這次應該是講解自定義分組如何實現,關於操作順序在這里不多說了,需要了解的可以看看我在博客園的評論,現在開始。
首先我們查看下Job這個類,發現有setGroupingComparatorClass()這個方法,具體源碼如下:
/** * Define the comparator that controls which keys are grouped together * for a single call to * {@link Reducer#reduce(Object, Iterable, * org.apache.hadoop.mapreduce.Reducer.Context)} * @param cls the raw comparator to use * @throws IllegalStateException if the job is submitted */ public void setGroupingComparatorClass(Class<? extends RawComparator> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setOutputValueGroupingComparator(cls); }
從方法的源碼可以看出這個方法是定義自定義鍵分組功能。設置這個自定義分組類必須滿足extends RawComparator,那我們可以看下這個類的源碼:
/** * <p> * A {@link Comparator} that operates directly on byte representations of * objects. * </p> * @param <T> * @see DeserializerComparator */ public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
然而這個RawComparator是泛型繼承Comparator接口的,簡單看了下那我們來自定義一個類繼承RawComparator,代碼如下:
public class MyGrouper implements RawComparator<SortAPI> { @Override public int compare(SortAPI o1, SortAPI o2) { return (int)(o1.first - o2.first); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); return compareBytes; } }
源碼中SortAPI是上節自定義排序中的定義對象,第一個方法從注釋可以看出是比較2個參數的大小,返回的是自然整數;第二個方法是在反序列化時比較,所以需要是用字節比較。接下來我們繼續看看自定義MyMapper類:
public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splied = value.toString().split("\t"); try { long first = Long.parseLong(splied[0]); long second = Long.parseLong(splied[1]); context.write(new SortAPI(first,second), new LongWritable(1)); } catch (Exception e) { System.out.println(e.getMessage()); } } }
自定義MyReduce類:
public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> { @Override protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { context.write(new LongWritable(key.first), new LongWritable(key.second)); } }
自定義SortAPI類:
public class SortAPI implements WritableComparable<SortAPI> { public Long first; public Long second; public SortAPI(){ } public SortAPI(long first,long second){ this.first = first; this.second = second; } @Override public int compareTo(SortAPI o) { return (int) (this.first - o.first); } @Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); } @Override public int hashCode() { return this.first.hashCode() + this.second.hashCode(); } @Override public boolean equals(Object obj) { if(obj instanceof SortAPI){ SortAPI o = (SortAPI)obj; return this.first == o.first && this.second == o.second; } return false; } @Override public String toString() { return "輸出:" + this.first + ";" + this.second; } }
接下來准備數據,數據如下:
1 2 1 1 3 0 3 2 2 2 1 2
上傳至hdfs://hadoop-master:9000/grouper/input/test.txt,main代碼如下:
public class Test { static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/grouper/output/"; static final String INPUT_DIR = "hdfs://hadoop-master:9000/grouper/input/test.txt"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, Test.class.getSimpleName()); job.setJarByClass(Test.class); deleteOutputFile(OUTPUT_DIR); //1設置輸入目錄 FileInputFormat.setInputPaths(job, INPUT_DIR); //2設置輸入格式化類 job.setInputFormatClass(TextInputFormat.class); //3設置自定義Mapper以及鍵值類型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(SortAPI.class); job.setMapOutputValueClass(LongWritable.class); //4分區 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //5排序分組 job.setGroupingComparatorClass(MyGrouper.class); //6設置在一定Reduce以及鍵值類型 job.setReducerClass(MyReduce.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); //7設置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR)); //8提交job job.waitForCompletion(true); } static void deleteOutputFile(String path) throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf); if(fs.exists(new Path(path))){ fs.delete(new Path(path)); } } }
執行代碼,然后在節點上用終端輸入:hadoop fs -text /grouper/output/part-r-00000查看結果:
1 2 2 2 3 0
接下來我們修改下SortAPI類的compareTo()方法:
@Override public int compareTo(SortAPI o) { long mis = (this.first - o.first) * -1; if(mis != 0 ){ return (int)mis; } else{ return (int)(this.second - o.second); } }
再次執行並查看/grouper/output/part-r-00000文件:
3 0 2 2 1 1
這樣我們就得出了同樣的數據分組結果會受到排序算法的影響,比如排序是倒序那么分組也是先按照倒序數據源進行分組輸出。我們還可以在map函數以及reduce函數中打印記錄(過程省略)這樣經過對比也得出分組階段:鍵值對中key相同(即compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法返回0)的則為一組,當前組再按照順序選擇第一個往緩沖區輸出(也許會存儲到硬盤)。其它的相同key的鍵值對就不會再往緩沖區輸出了。在百度上檢索到這邊文章,其中它的分組是把map函數輸出的value全部迭代到同一個key中,就相當於上面{key,value}:{1,{2,1,2}},這個結果跟最開始沒有自定義分組時是一樣的,我們可以在reduce函數輸出Iterable<LongWritable> values進行查看,其實我覺得這樣的才算是分組吧就像數據查詢一樣。
在這里我們應該要弄懂分組與分區的區別。分區是對輸出結果文件進行分類拆分文件以便更好查看,比如一個輸出文件包含所有狀態的http請求,那么為了方便查看通過分區把請求狀態分成幾個結果文件。分組就是把一些相同鍵的鍵值對進行計算減少輸出;分區之后數據全部還是照樣輸出到reduce端,而分組的話就有所減少了;當然這2個步驟也是不同的階段執行。
這次先到這里。堅持記錄點點滴滴!