1、我們知道map的數量和文件數、文件大小、塊大小、以及split大小有關,而reduce的數量跟哪些因素有關呢?
設置mapred.tasktracker.reduce.tasks.maximum的大小能夠決定單個tasktracker一次性啟動reduce的數目,可是不能決定總的reduce數目。
conf.setNumReduceTasks(4);JobConf對象的這種方法能夠用來設定總的reduce的數目,看下Job Counters的統計:
Job Counters Data-local map tasks=2 Total time spent by all maps waiting after reserving slots (ms)=0 Total time spent by all reduces waiting after reserving slots (ms)=0 SLOTS_MILLIS_MAPS=10695 SLOTS_MILLIS_REDUCES=29502 Launched map tasks=2 Launched reduce tasks=4
確實啟動了4個reduce:看下輸出:
diegoball@diegoball:~/IdeaProjects/test/build/classes$ hadoop fs -ls /user/diegoball/join_ou1123 11/03/25 15:28:45 INFO security.Groups: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000 11/03/25 15:28:45 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id Found 5 items -rw-r--r-- 1 diegoball supergroup 0 2011-03-25 15:28 /user/diegoball/join_ou1123/_SUCCESS -rw-r--r-- 1 diegoball supergroup 124 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00000 -rw-r--r-- 1 diegoball supergroup 0 2011-03-25 15:27 /user/diegoball/join_ou1123/part-00001 -rw-r--r-- 1 diegoball supergroup 214 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00002 -rw-r--r-- 1 diegoball supergroup 0 2011-03-25 15:28 /user/diegoball/join_ou1123/part-00003
僅僅有2個reduce在干活。為什么呢?
shuffle的過程。須要依據key的值決定將這條<K,V> (map的輸出),送到哪一個reduce中去。送到哪一個reduce中去靠調用默認的org.apache.hadoop.mapred.lib.HashPartitioner的getPartition()方法來實現。
HashPartitioner類:
package org.apache.hadoop.mapred.lib; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.JobConf; /** Partition keys by their {@link Object#hashCode()}. */ @InterfaceAudience.Public @InterfaceStability.Stable public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
numReduceTasks的值在JobConf中能夠設置。
默認的是1:顯然太小。
這也是為什么默認的設置中總啟動一個reduce的原因。
返回與運算的結果和numReduceTasks求余。
Mapreduce依據這個返回結果決定將這條<K,V>,送到哪一個reduce中去。
key傳入的是LongWritable類型,看下這個LongWritable類的hashcode()方法:
public int hashCode() { return (int)value; }
簡簡單單的返回了原值的整型值。
由於getPartition(K2 key, V2 value,int numReduceTask)返回的結果僅僅有2個不同的值,所以終於僅僅有2個reduce在干活。
HashPartitioner是默認的partition類。我們也能夠自己定義partition類 :
package com.alipay.dw.test; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; /** * Created by IntelliJ IDEA. * User: diegoball * Date: 11-3-10 * Time: 下午5:26 * To change this template use File | Settings | File Templates. */ public class MyPartitioner implements Partitioner<IntWritable, IntWritable> { public int getPartition(IntWritable key, IntWritable value, int numPartitions) { /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */ int nbOccurences = key.get(); if (nbOccurences > 20051210) return 0; else return 1; } public void configure(JobConf arg0) { } }
只須要覆蓋getPartition()方法就OK。
通過:
conf.setPartitionerClass(MyPartitioner.class);
能夠設置自己定義的partition類。
相同因為之返回2個不同的值0,1,無論conf.setNumReduceTasks(4);設置多少個reduce,也相同僅僅會有2個reduce在干活。
因為每一個reduce的輸出key都是經過排序的,上述自己定義的Partitioner還能夠達到排序結果集的目的:
11/03/25 15:24:49 WARN conf.Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id Found 5 items -rw-r--r-- 1 diegoball supergroup 0 2011-03-25 15:23 /user/diegoball/opt.del/_SUCCESS -rw-r--r-- 1 diegoball supergroup 24546 2011-03-25 15:23 /user/diegoball/opt.del/part-00000 -rw-r--r-- 1 diegoball supergroup 10241 2011-03-25 15:23 /user/diegoball/opt.del/part-00001 -rw-r--r-- 1 diegoball supergroup 0 2011-03-25 15:23 /user/diegoball/opt.del/part-00002 -rw-r--r-- 1 diegoball supergroup 0 2011-03-25 15:23 /user/diegoball/opt.del/part-00003
part-00000和part-00001是這2個reduce的輸出,因為使用了自己定義的MyPartitioner,全部key小於20051210的的<K,V>都會放到第一個reduce中處理。key大於20051210就會被放到第二個reduce中處理。
每一個reduce的輸出key又是經過key排序的,所以終於的結果集降序排列。
可是假設使用上面自己定義的partition類,又conf.setNumReduceTasks(1)的話。會如何? 看下Job Counters:
Job Counters Data-local map tasks=2 Total time spent by all maps waiting after reserving slots (ms)=0 Total time spent by all reduces waiting after reserving slots (ms)=0 SLOTS_MILLIS_MAPS=16395 SLOTS_MILLIS_REDUCES=3512 Launched map tasks=2 Launched reduce tasks=1
僅僅啟動了一個reduce。
(1)、 當setNumReduceTasks( int a) a=1(即默認值),無論Partitioner返回不同值的個數b為多少,僅僅啟動1個reduce,這樣的情況下自己定義的Partitioner類沒有起到不論什么作用。
(2)、 若a!=1:
a、當setNumReduceTasks( int a)里 a設置小於Partitioner返回不同值的個數b的話:
public int getPartition(IntWritable key, IntWritable value, int numPartitions) { /* Pretty ugly hard coded partitioning function. Don't do that in practice, it is just for the sake of understanding. */ int nbOccurences = key.get(); if (nbOccurences < 20051210) return 0; if (nbOccurences >= 20051210 && nbOccurences < 20061210) return 1; if (nbOccurences >= 20061210 && nbOccurences < 20081210) return 2; else return 3; }
同一時候設置setNumReduceTasks( 2)。
於是拋出異常:
11/03/25 17:03:41 INFO mapreduce.Job: Task Id : attempt_201103241018_0023_m_000000_1, Status : FAILED java.io.IOException: Illegal partition for 20110116 (3) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:900) at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:508) at com.alipay.dw.test.KpiMapper.map(Unknown Source) at com.alipay.dw.test.KpiMapper.map(Unknown Source) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:397) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330) at org.apache.hadoop.mapred.Child$4.run(Child.java:217) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742) at org.apache.hadoop.mapred.Child.main(Child.java:211)
某些key沒有找到所相應的reduce去處。
原因是僅僅啟動了a個reduce。
b、當setNumReduceTasks( int a)里 a設置大於Partitioner返回不同值的個數b的話,相同會啟動a個reduce。可是僅僅有b個redurce上會得到數據。啟動的其它的a-b個reduce浪費了。
c、理想狀況是a=b,這樣能夠合理利用資源,負載更均衡。