MRJobConfig
public static fina COMBINE_CLASS_ATTR
屬性
COMBINE_CLASS_ATTR
= "mapreduce.job.combine.class"
————子接口(F4) JobContent
方法getCombinerClass
————子實現類 JobContextImpl
實現getCombinerClass
方法:
public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
conf.getClass(COMBINE_CLASS_ATTR, null);
}
因為JobContextImpl是MRJobConfig子類
所以得到了父類MRJobConfig的COMBINE_CLASS_ATTR屬性
————子類Job
public void setCombinerClass(Class<? extends Reducer> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}
因為JobContextImpl是MRJobConfig子類,
而Job是JobContextImpl的子類
所以也有COMBINE_CLASS_ATTR屬性
通過setCombinerClass設置了父類MRJobConfig的屬性
MRJobConfig
————子接口JobContent
方法getCombinerClass
————子實現類 JobContextImpl
————子類 Job
————子實現類 TaskAttemptContext
繼承了方法getCombinerClass
Task
$CombinerRunner
(Task的內部類)
該內部類有方法create:
public static <K,V> CombinerRunner<K,V> create(JobConf job,
TaskAttemptID taskId,
Counters.Counter inputCounter,
TaskReporter reporter,
org.apache.hadoop.mapreduce.OutputCommitter committer
) throws ClassNotFoundException
{
Class<? extends Reducer<K,V,K,V>> cls =
(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
if (cls != null) {
return new OldCombinerRunner(cls, job, inputCounter, reporter);
}
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
reporter);
Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
taskContext.getCombinerClass();
if (newcls != null) {
return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
inputCounter, reporter, committer);
}
return null;
}
其中這一段應該是舊的API
Class<? extends Reducer<K,V,K,V>> cls =
(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
if (cls != null) {
return new OldCombinerRunner(cls, job, inputCounter, reporter);
}
而這個是新的API
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
reporter);
Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
taskContext.getCombinerClass();
if (newcls != null) {
return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,
inputCounter, reporter, committer);
}
return null;
(不知道為什么要寫全名,去掉那些包名、向上/下轉型和各種泛型的話,看起來就會清晰很多?)
而TaskAttemptContext是JobContent的子實現類,所以繼承了getCombinerClass方法
而且,這里用的是多態,其調用的是子實現類TaskAttemptContextImpl的getCombinerClass方法
(TaskAttemptContextImpl繼承了JobContextImpl,而JobContextImpl實現了該方法)
所以最終get到了屬性COMBINE_CLASS_ATTR,即得到了我們通過job.setCombinerClass的xxxC
而這個xxxC是給了newcls,而newcls是給了NewCombinerRunner的構造函數的reducerClassc參數
NewCombinerRunner(Class reducerClass,
JobConf job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
org.apache.hadoop.mapreduce.TaskAttemptContext context,
Counters.Counter inputCounter,
TaskReporter reporter,
org.apache.hadoop.mapreduce.OutputCommitter committer)
{
super(inputCounter, job, reporter);
this.reducerClass = reducerClass;
this.taskId = taskId;
keyClass = (Class<K>) context.getMapOutputKeyClass();
valueClass = (Class<V>) context.getMapOutputValueClass();
comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
this.committer = committer;
}
Task
MapTask
$MapOutputBuffer
private CombinerRunner<K,V> combinerRunner;
$SpillThread類($表示內部類)
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
//此時,我們得到了設置好的合並類
if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
final int kvoff = offsetFor(spindex % maxRec);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
}
再查看combine函數
在Task的內部類NewCombinerRunner下
public void combine(RawKeyValueIterator iterator,
OutputCollector<K,V> collector)
throws IOException, InterruptedException,ClassNotFoundException
{
// make a reducer
org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
(org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
ReflectionUtils.newInstance(reducerClass, job);
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, taskId,
iterator, null, inputCounter,
new OutputConverter(collector),
committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
}
上面的reducerClass就是我們傳入的xxxC
最終是通過反射創建了一個xxxC對象,並將其強制向上轉型為Reducer實例對象,
然后調用了向上轉型后對象的run方法(當前的xxxC沒有run方法,調用的是父類Reduce的run)
在類Reducer中,run方法如下
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
有由於多態,此時調用的reduce是子類xxxC中的reduce方法
(多態態性質:子類復寫了該方法,則實際上執行的是子類中的該方法)
所以說,我們自定義combine用的類的時候,應該繼承Reducer類,並且復寫reduce方法
且其輸入形式:(以wordcount為例)
reduce(Text key, Iterable<IntWritable> values,
Context context)
其中key是單詞個數,而values是個數列表,也就是value1、value2........
注意,此時已經是列表,即<鍵,list<值1、值2、值3.....>>
(之所以得到這個結論,是因為我當時使用的combine類是WCReduce,
即Reduce和combine所用的類是一樣的,通過對代碼的分析,傳入值的結構如果是<lkey,value>的話,是不可能做到combine的啊——即所謂的對相同值合並,求計數的累積和,這根本就是兩個步驟,對key相同的鍵值對在map端就進行了一次合並了,合並成了<key,value list>,然后才輪到combine接受直接換個形式的輸入,並處理——我們的處理是求和,然后再輸出到context,進入reduce端的shuffle過程。
然后我在reduce中遍歷了用syso輸出
結果發現是0,而這實際上是因為經過一次遍歷,
我的指針指向的位置就不對了啊,
)
嗯,自己反復使用以下的代碼,不斷的組合、注釋,去測試吧~就會得出這樣的結論了
/reduce
publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
private final IntWritableValueOut=newIntWritable();
@Override
protectedvoid reduce(Text key,Iterable<IntWritable> values,
Context context) throws IOException,InterruptedException{
for(IntWritable value : values){
System.out.println(value.get()+"--");
}
// int total = 0 ;
// for (IntWritable value : values) {
// total += value.get();
// }
// ValueOut.set(total);
// context.write(key, ValueOut);
}
}
job.setCombinerClass(WCReduce.class);
附件列表