摘要:mapreduce中執行reduce(KEYIN key, Iterable<VALUEIN> values, Context context),調用一次reduce方法,迭代value集合時,發現key的值也是在不斷變化的,這是因為key的地址在內部會隨着value的迭代而不斷變化。
序:我們知道reduce方法每執行一次,里面我們會通過for循環迭代value的迭代器。如果key是bean的時候,for循環里面value值變化的同時我們的bean值也是會跟隨着變化,調用reduce方法時傳參數就傳了一次key的值,但是在方法內部迭代的時候,key值在變化,那他怎么變動的?
誤區:在map處理完成之后,將所有kv對緩存起來,進行分組,然后傳遞一個組<key,valus{}>,調用一次reduce方法傳入的key和value的迭代器如<hello,{1,1,1,1,1,1.....}>。
給一個需求來觀察現象
對日志數據中的上下行流量信息匯總,並輸出按照總流量倒序排序的結果,且該需求日志中手機號是不會重復的——即不會存在多條數據,手機號相同,且流量不同,還需要進行多條數據的匯總。
數據如下:
13888888801,1,9,10
13888888802,5,5,10
13888888803,2,7,9
13888888804,4,6,10
13888888805,6,4,10
13888888806,1,0,1
分析
基本思路:實現自定義的bean來封裝流量信息,並將bean作為map輸出的key來傳輸。
MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),排序的依據是map輸出的key,所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:WritableComparable,然后重寫key的compareTo方法。
package cn.intsmaze.flowsum.SortBean; public class FlowBeanOne implements WritableComparable<FlowBeanOne> { private long upFlow; private long dFlow; private long sumFlow; private long phone; // 序列化框架在反序列化操作創建對象實例時會調用無參構造 public FlowBeanOne() { } // 序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dFlow); out.writeLong(sumFlow); out.writeLong(phone); } // 反序列化方法,注意: 字段的反序列化順序與序列化時的順序保持一致 @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.dFlow = in.readLong(); this.sumFlow = in.readLong(); this.phone = in.readLong(); } public void set(long phone,long upFlow, long dFlow) { this.phone=phone; this.upFlow = upFlow; this.dFlow = dFlow; this.sumFlow = upFlow + dFlow; } @Override public String toString() { return upFlow + "\t" + dFlow + "\t" + sumFlow+ "\t" + phone; }//自定義倒序比較規則,總流量相同視為同一個key. @Override public int compareTo(FlowBeanOne o) { return (int)(o.getSumFlow() - this.sumFlow); } get,set...... }
代碼實現如下:
package cn.intsmaze.flowsum.SortBean;
/** * 實現流量匯總並且按照流量大小倒序排序 * 前提:處理的數據是已經匯總過的結果文件,然后再次對該文件進行排序 * @author */ public class FlowSumSort { public static class FlowSumSortMapperOne extends Mapper<LongWritable, Text, FlowBeanOne, Text> { FlowBeanOne k = new FlowBeanOne(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(","); long phoneNbr = Long.parseLong(fields[0]); long upFlowSum = Long.parseLong(fields[1]); long dFlowSum = Long.parseLong(fields[2]); k.set(phoneNbr,upFlowSum, dFlowSum);//這里對bean作為key。 context.write(k, v); } } public static class FlowSumSortReducerOne extends Reducer<FlowBeanOne, Text, Text, FlowBeanOne> {
@Override protected void reduce(FlowBeanOne bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException { System.out.println("-------------------"); for (Text text : phoneNbrs) { System.out.println(bean); context.write(text, bean); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumSort.class); // 告訴框架,我們的程序所用的mapper類和reducer類 job.setMapperClass(FlowSumSortMapperOne.class); job.setReducerClass(FlowSumSortReducerOne.class); job.setMapOutputKeyClass(FlowBeanOne.class); job.setMapOutputValueClass(Text.class); // 告訴框架,我們的mapperreducer輸出的數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBeanOne.class); // 告訴框架,我們要處理的文件在哪個路徑下 FileInputFormat.setInputPaths(job, new Path("d:/intsmaze/input/")); // 告訴框架,我們的處理結果要輸出到哪里去 FileOutputFormat.setOutputPath(job, new Path("d:/intsmaze/output/")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
這里要注意,因為是匯總排序,所以reduce的並行度必須為1,。除了使用框架的組件外,我們還可以通過使用reduce的cleanup方法,自己在reduce端對收集到的數據進行匯總排序。
6 4 10 13888888805
4 6 10 13888888804
5 5 10 13888888802
1 9 10 13888888801
2 7 9 13888888803
1 0 1 13888888806
-------------------
6 4 10 13888888805
4 6 10 13888888804
5 5 10 13888888802
1 9 10 13888888801
-------------------
2 7 9 13888888803
-------------------
1 0 1 13888888806
靈異現象
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } }
來看看hadoop2.6.4源碼解析吧:
因為這個問題是一年前遇到的,看完源碼搞明白后,並沒有時間去整理,所以再次解析有所不足。
Reducer源碼解析
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public abstract class Context implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { } /** * 這個方法我們不需要管,因為我們實現的類重寫了該方法。 */ protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } } //通過debug我們可以看到,數據在結束map任務執行reduce任務的時候,reduce端會先調用這個方法,而調用這個 //方法的類是我們實現的reduce類,通過繼承調用該方法,然后在該方法里面調用我們實現類重寫的reduce方法。 public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) {//這個地方調用ReduceContextImpl的方法進行判斷 reduce(context.getCurrentKey(), context.getValues(), context);//這個地方調用我們的實現類的reduce方法走我們的邏輯代碼了 // If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); } } } finally { cleanup(context); } } }
ReduceContextImpl源碼解析
(由於代碼太多,我只截取了部分主要的代碼)
public class ReduceContextImpl { private RawKeyValueIterator input;//這個迭代器里面存儲的key-value對元素。 private KEYIN key; // current key private VALUEIN value; // current value private boolean firstValue = false; // first value in key private boolean nextKeyIsSame = false; // more w/ this key private boolean hasMore; // more in file private ValueIterable iterable = new ValueIterable();//訪問自己的內部類 public ReduceContextImpl() throws InterruptedException, IOException{ hasMore = input.next();//對象創建的時候,就先判斷reduce接收的key-value迭代器是否有元素,並獲取下一個元素 } /** 創建完成就調用該方法 ,開始處理下一個唯一的key*/ public boolean nextKey() throws IOException,InterruptedException { while (hasMore && nextKeyIsSame) { //判斷迭代器是否還有下一個元素已經下一個元素是否和上一個已經遍歷出來的key-value元素的key是不是一樣 nextKeyValue(); } if (hasMore) { if (inputKeyCounter != null) { inputKeyCounter.increment(1); } return nextKeyValue(); } else { return false; } } /** * Advance to the next key/value pair. */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!hasMore) { key = null; value = null; return false; } firstValue = !nextKeyIsSame; //獲取迭代器下一個元素的key DataInputBuffer nextKey = input.getKey(); //設置當前key的坐標 currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()); buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); //反序列化得到當前key對象 key = keyDeserializer.deserialize(key); //獲取迭代器下一個元素的value DataInputBuffer nextVal = input.getValue(); buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength() - nextVal.getPosition()); //反序列化value value = valueDeserializer.deserialize(value); currentKeyLength = nextKey.getLength() - nextKey.getPosition(); currentValueLength = nextVal.getLength() - nextVal.getPosition(); if (isMarked) { //存儲下一個key和value backupStore.write(nextKey, nextVal); } //迭代器向下迭代一次 hasMore = input.next(); //如果還有元素,則進行比較,判斷key是否相同 if (hasMore) { nextKey = input.getKey(); //這個地方也是比較關鍵的: nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition() ) == 0; } else { nextKeyIsSame = false; } inputValueCounter.increment(1); return true; } //一個迭代器模式的內部類 protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> { private boolean inReset = false; private boolean clearMarkFlag = false; @Override//它並不僅僅是判斷迭代器是否還有下一個元素,而且還要判斷下一個元素和上一個元素是不是相同的key public boolean hasNext() { if (inReset && backupStore.hasNext()) { return true; } return firstValue || nextKeyIsSame; } @Override //這個地方要注意了,其實在獲取下一個元素的時候主要調用的是nextKeyValue(); public VALUEIN next() { if (inReset) { if (backupStore.hasNext()) { backupStore.next(); DataInputBuffer next = backupStore.nextValue(); buffer.reset(next.getData(), next.getPosition(), next.getLength() - next.getPosition()); value = valueDeserializer.deserialize(value); return value; } else { inReset = false; backupStore.exitResetMode(); if (clearMarkFlag) { clearMarkFlag = false; isMarked = false; } } } // if this is the first record, we don't need to advance if (firstValue) { firstValue = false; return value; } // otherwise, go to the next key/value pair nextKeyValue();//該方法就是獲取下一個key,value對,key值的變化也就在這里表現出來了。 return value; } } //內部類,實現迭代器,具備迭代器功能 protected class ValueIterable implements Iterable<VALUEIN> { private ValueIterator iterator = new ValueIterator(); @Override public Iterator<VALUEIN> iterator() { return iterator; } } public Iterable<VALUEIN> getValues() throws IOException, InterruptedException { return iterable; } }
簡單一句話總結就是:ReduceContextImpl類的RawKeyValueIterator input迭代器對象里面存儲中着key-value對的元素, 以及一個只存儲value的迭代器,然后每調一次我們實現的reduce方法,就是傳入ValueIterable迭代器對象和當前的key。但是我們在方法里面調用迭代器的next方法時,其實調用了nextKeyValue,來獲取下一個key和value,並判斷下一個key是否和 上一個key是否相同,然后決定hashNext方法是否結束,同時對key進行了一次重新賦值。
這個方法獲取KV的迭代器的下一個KV值,然后把K值和V值放到之前傳入我們自己寫的Reduce類的方法中哪個輸入參數的地址上,白話說:框架調用我們寫的reduce方法時,傳入了三個參數,然后我們方法內部調用phoneNbrs.hashNext方法就是調用的ReduceContextImpl的內部類ValueIterator的hashNext方法,這個方法里面調用了ReduceContextImpl內的nextKeyValue方法,該方法內部又清除了之前調用用戶自定義reduce方法時傳入的k,v參數的內存地址的數據,然后獲取了RawKeyValueIterator input迭代器的下一個KV值,然后把k值和V值放入該數據。這就是原因了。
再看我們的reduce實現類
public static class FlowSumSortReducerOne extends Reducer<FlowBeanOne, Text, Text, FlowBeanOne> { @Override protected void reduce(FlowBeanOne bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException { System.out.println("-------------------"); for (Text text : phoneNbrs) {//這里就是迭代器,相當於調用ValueIterable.hashNext System.out.println(bean); context.write(text, bean); } } }
最近實在是不知道學點什么了呦,就把hadoop回顧一下,當初學時,為了快速上手,都是記各種理論以及結論,沒有時間去看源碼驗證,也不知道人家說的結論是否正確,這次回滾就是看源碼驗證當初結論的正確性。這也快一年沒有用了,最近一直從事分布式實時計算的研究。