WordCount是一個入門的MapReduce程序(從src\examples\org\apache\hadoop\examples粘貼過來的):
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MapReduce即將一個計算任務分為兩個階段:Map、Reduce。為什么要這么分解?
為了理解其含義,我們先不管MapReduce這一套框架,從一個簡單的問題來看,如果對於100T的日志文件,需要統計其中出現的"ERROR"這個單詞的次數,怎么辦?
最簡單的方法:單機處理,逐行讀入每一行文本,統計並累加,則得到其值。問題是:因為數據量太大,速度太慢,怎么辦?自然,多機並行處理就是一個自然的選擇。
那么,這個文件怎么切分到多個機器呢?假定有100台機器,可以寫一個主程序,將這個100T大文件按照每個機器存儲1T的原則,在100台機器上分布存儲,再把原來單機上的程序拷貝100份(無需修改)至100台機器上運行得到結果,此時得到的結果只是一個中間結果,最后需要寫一個匯總程序,將統計結果進行累加,則完成計算。
將大文件分解后,對單機1T文件計算的過程就相當於Map,而Map的結果就相當於"ERROR"這個單詞在本機1T文件中出現的次數,而最后的匯總程序就相當於Reduce,Reduce的輸入來源於100台機器。在這個簡單的例子中,有100個Map任務,1個Reduce任務。
100台機器計算后的中間結果需要傳遞到Reduce任務所在機器上,這個過程就是Shuffle,Shuffle這個單詞的含義是”洗牌“,也就是將中間結果從Map所在機器傳輸到Reduce所在機器,在這個過程中,存在網絡傳輸。
此時,我們利用上面的例子已經理解了Map-Shuffle-Reduce的基本含義,在上面的例子中,如果還需要對”WARNING“這個單詞進行統計,那么怎么辦呢?此時,每個Map任務就不僅需要統計本機1T文件中ERROR的個數,還需要統計WARNING的次數,然后在Reduce程序中分別進行統計。如果需要對所有單詞進行統計呢?一個道理,每個Map任務對1T文件中所有單詞進行統計計數,然后 Reduce對所有結果進行匯總,得到所有單詞在100T大文件中出現的次數。此時,問題可能出現了,因為單詞數量可能很多,Reduce用單機處理也可能存在瓶頸了,於是我們需要考慮用多台機器並行計算Reduce,假如用2台機器,因為Reduce只是對單詞進行計數累加,所有可以按照這樣簡單的規則進行:大寫字母A-Z開頭的單詞由Reduce 1累加;小寫字母a-z開頭的單詞由Reduce 2累加。
在這種情況下,100個Map任務執行后的結果,都需要分為兩部分,一部分准備送到Reduce 1統計,一部分准備送到Reduce 2統計,這個功能稱為Partitioner,即將Map后的結果(比如一個文本文件,記錄了各個單詞在本機文件出現的次數)分解為兩部分(比如兩個文本 文件),准備送到兩個Reduce任務。
因此,Shuffle在這里就是從100個Map任務和2個Reduce任務之間傳輸中間結果的過程。
我們繼續考慮幾個問題:
1、 如果Map后的中間結果數據量較大,Shuffle過程對網絡帶寬要求較高,因此需要將Map后的結果盡可能減小,這個功能當然可以在Map內自己搞 定,不過MapReduce將這個功能單獨拎出來,稱為Combiner,即合並,這個Combiner,指的是Map任務后中間結果的合並,相比於 Reduce的最終合並,這里相當於先進行一下局部匯總,減小中間結果,進而減小網絡傳輸量。所以,在上面的例子中,假如Map並不計數,只是記錄單詞出現這個信息,輸出結果是<ERROR,1>,<WARNING,1>,<WARNING,1>.....這樣一個Key-Value序列,Combiner可以進行局部匯總,將Key相同的Value進行累加,形成一個新的Key-Value序列:<ERROR,14>,<WARNING,27>,.....,這樣就大大減小了Shuffle需要的網絡帶寬,要知道現在數據中心一般使用千兆以太網,好些的使用萬兆以太網,TCP/IP傳輸的效率不太高。這里Combiner匯總函數實際上可以與Reduce的匯總函數一致,只是輸入數據不同。
2、 來自100個Map任務后的結果分別送到兩個Reduce任務處理。對於任何一個Reduce任務,輸入是一堆<ERROR,14>這樣的 Key-Value序列,因為100個Map任務都有可能統計到ERROR的次數,因此這里會先進行一個歸並,即將相同單詞的歸並到一起,形 成<ERROR, <14,36,.....>>,<WARNING,<27,45,...>>這樣一個仍然是Key-Value的 序列,14、36、。。。分別表示第1、2、。。。台機器中ERROR的統計次數,這個歸並過程在MapReduce中稱為Merge。如果merge后 再進行Reduce,那么就只需要統計即可;如果事先沒有merge,那么Reduce自己完成這一功能也行,只是兩種情況下Reduce的輸入Key- Value形式不同。
3、如果要求最后的單詞統計結果還要形成字典序怎么辦呢?可以自己在 Reduce中進行全排序,也可以100個Map任務中分別進行局部排序,然后將結果發到Reduce任務時,再進行歸並排序。這個過程 MapReduce也內建支持,因此不需要用戶自己去寫排序程序,這個過程在MapReduce中稱為Sort。
到這里,我們理解了MapReduce中的幾個典型步驟:Map、Sort、Partitioner、 Combiner、Shuffle、Merge、Reduce。MapReduce程序之所以稱為MapReduce,就說明Map、Reduce這兩個 步驟對於一個並行計算來說幾乎是必須的,你總得先分開算吧,所以必須有Map;你總得匯總吧,所以有Reduce。當然,理論上也可以不需要 Reduce,如果Map后就得到你要的結果的話。
Sort對於不需要順序的程序里沒意義(但MapReduce默認做了排序);
Partitioner對於Reduce只有一個的時候沒意義,如果有多個Reduce,則需要,至於怎么分,用戶可以繼承Partitioner標准類,自己實現分解函數。控制中間結果如何傳輸。MapReduce提供的標准的Partitioner是 一個接口,用戶可以自己實現getPartition()函數,MapReduce也提供了幾個基本的實現,最典型的HashPartitioner是根 據用戶設定的Reduce任務數量(注意,MapReduce中,Map任務的個數最終取決於數據分布,Reduce則是用戶直接指定),按照哈希進行計算的:
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
這里,numReduceTasks就是用戶設定的Reduce任務數量;
K2 key, V2 value 就是Map計算后的中間結果。
Combiner可以選擇性放棄,但考慮到網絡帶寬,可以自己寫相應的函數實現局部合並功能。很多情況下,直接利用Reduce那個程序即可,WordCount這個標准程序里就是這么用的。
Shuffle自然是必須的,不用寫,根據Partitioner邏輯,框架自己去執行結果傳輸。
Merge也不是必須的,可以揉到Reduce里面實現等等也可以。因為這些操作的數據結構都是Key-Value,Reduce的輸入只要是一個Key-Value即可,相當靈活。
我們再來看WordCount,這個MapReduce程序中定義了一個類:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
而Mapper是Hadoop中的一個接口,其定義為:
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable { /** * Maps a single input key/value pair into an intermediate key/value pair. * * <p>Output pairs need not be of the same types as input pairs. A given * input pair may map to zero or many output pairs. Output pairs are * collected with calls to * {@link OutputCollector#collect(Object,Object)}.</p> * * <p>Applications can use the {@link Reporter} provided to report progress * or just indicate that they are alive. In scenarios where the application * takes an insignificant amount of time to process individual key/value * pairs, this is crucial since the framework might assume that the task has * timed-out and kill that task. The other way of avoiding this is to set * <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout"> * mapred.task.timeout</a> to a high-enough value (or even zero for no * time-outs).</p> * * @param key the input key. * @param value the input value. * @param output collects mapped keys and values. * @param reporter facility to report progress. */
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException; }
因此,Mapper里面並沒有規定輸入輸出的類型是什么,只要是KeyValue的即可,K1、V1、K2、V2是什么由用戶指定,反正只是實現K1、V1到K2、V2的映射即可。
在WordCount中實現了繼承於Mapper<Object, Text, Text, IntWritable>的一個TokenizerMapper類,實現了map函數:map(Object key, Text value, Context context ) ;
TokenizerMapper中,輸入的Key-Value是<Object, Text>,輸出是<Text, IntWritable>,在WordCount程序里,K1代表一行文本的起始位置,V1代表這一行文本;
K2代表單詞,V2代表"1",用於后面的累和。
同樣,在MapReduce中,Reducer也是一個接口,其聲明為:
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable { /** * <i>Reduces</i> values for a given key. * * <p>The framework calls this method for each * <code><key, (list of values)></code> pair in the grouped inputs. * Output values must be of the same type as input values. Input keys must * not be altered. The framework will <b>reuse</b> the key and value objects * that are passed into the reduce, therefore the application should clone * the objects they want to keep a copy of. In many cases, all values are * combined into zero or one value. * </p> * * <p>Output pairs are collected with calls to * {@link OutputCollector#collect(Object,Object)}.</p> * * <p>Applications can use the {@link Reporter} provided to report progress * or just indicate that they are alive. In scenarios where the application * takes an insignificant amount of time to process individual key/value * pairs, this is crucial since the framework might assume that the task has * timed-out and kill that task. The other way of avoiding this is to set * <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout"> * mapred.task.timeout</a> to a high-enough value (or even zero for no * time-outs).</p> * * @param key the key. * @param values the list of values to reduce. * @param output to collect keys and combined values. * @param reporter facility to report progress. */
void reduce(K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException; }
Reducer的輸入為K2, V2(這個對應於Mapper輸出的經過Shuffle到達Reducer端的K2,V2,), 輸出為K3, V3。
在WordCount中,K2為單詞,V2為1這個固定值(或者為局部出現次數,取決於是否有Combiner);K3還是單詞,V3就是累和值。
而WordCount里存在繼承於Reducer<Text, IntWritable, Text, IntWritable>的IntSumReducer類,完成單詞計數累加功能。
對於Combiner,實際上MapReduce沒有Combiner這個基類(WordCount自然也沒有實現),從任務的提交函數來看:
public void setCombinerClass(Class<? extends Reducer> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); }
可以看出,Combiner使用的類實際上符合Reducer。兩者是一樣的。
再來看作業提交代碼:
在之前先說一下Job和Task的區別,一個MapReduce運行流程稱為一個Job,中文稱“作業”。
在傳統的分布式計算領域,一個Job分為多個Task運行。Task中文一般稱為任務,在Hadoop中,這種任務有兩種:Map和Reduce
所以下面說到Map和Reduce時,指的是任務;說到整個流程時,指的是作業。不過由於疏忽,可能會將作業稱為任務的情況。
根據上下文容易區分出來。
1 Job job = new Job(conf, "word count"); 2 job.setJarByClass(WordCount.class); 3 job.setMapperClass(TokenizerMapper.class); 4 job.setCombinerClass(IntSumReducer.class); 5 job.setReducerClass(IntSumReducer.class); 6 job.setOutputKeyClass(Text.class); 7 job.setOutputValueClass(IntWritable.class); 8 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 9 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 10 System.exit(job.waitForCompletion(true) ? 0 : 1);
第1行創建一個Job對象,Job是MapReduce中提供的一個作業類,其聲明為:
public class Job extends JobContext { public static enum JobState {DEFINE, RUNNING}; private JobState state = JobState.DEFINE; private JobClient jobClient; private RunningJob info; .......
之后,設置該作業的運行類,也就是WordCount這個類;
然后設置Map、Combiner、Reduce三個實現類;
之后,設置輸出Key和Value的類,這兩個類表明了MapReduce作業完畢后的結果。
Key即單詞,為一個Text對象,Text是Hadoop提供的一個可以序列化的文本類;
Value為計數,為一個IntWritable對象,IntWritable是Hadoop提供的一個可以序列化的整數類。
之所以不用普通的String和int,是因為輸出Key、 Value需要寫入HDFS,因此Key和Value都要可寫,這種可寫能力在Hadoop中使用一個接口Writable表示,其實就相當於序列化,換句話說,Key、Value必須得有可序列化的能力。Writable的聲明為:
public interface Writable { /** * Serialize the fields of this object to <code>out</code>. * * @param out <code>DataOuput</code> to serialize this object into. * @throws IOException */
void write(DataOutput out) throws IOException; /** * Deserialize the fields of this object from <code>in</code>. * * <p>For efficiency, implementations should attempt to re-use storage in the * existing object where possible.</p> * * @param in <code>DataInput</code> to deseriablize this object from. * @throws IOException */
void readFields(DataInput in) throws IOException; }
在第8、9行,還設置了要計算的文件在HDFS中的路徑,設定好這些配置和參數后,執行作業提交:job.waitForCompletion(true)
waitForCompletion是Job類中實現的一個方法:
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { jobClient.monitorAndPrintJob(conf, info); } else { info.waitForCompletion(); } return isSuccessful(); }
即執行submit函數:
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); // Connect to the JobTracker and submit the job
connect(); info = jobClient.submitJobInternal(conf); super.setJobID(info.getID()); state = JobState.RUNNING; }
其中,調用jobClient對象的submitJobInternal方法進行作業提交。jobClient是 JobClient對象,在執行connect()的時候即創建出來:
private void connect() throws IOException, InterruptedException { ugi.doAs(new PrivilegedExceptionAction<Object>() { public Object run() throws IOException { jobClient = new JobClient((JobConf) getConfiguration()); return null; } }); }
創建JobClient的參數是這個作業的配置信息,JobClient是MapReduce作業的客戶端部分,主要用於提交作業等等。而具體的作業提交在submitJobInternal方法中實現,關於submitJobInternal的具體實現,包括MapReduce的作業執行流程,較為復雜,留作下一節描述。
關於MapReduce的這一流程,我們也可以看出一些特點:
1、 Map任務之間是不通信的,這與傳統的MPI(Message Passing Interface)存在本質區別,這就要求划分后的任務具有獨立性。這個要求一方面限制了MapReduce的應用場合,但另一方面對於任務執行出錯后的處理十分方便,比如執行某個Map任務的機器掛掉了,可以不管其他Map任務,重新在另一台機器上執行一遍即可。因為底層的數據在HDFS里面,有3 份備份,所以數據冗余搭配上Map的重執行這一能力,可以將集群計算的容錯性相比MPI而言大大增強。后續博文會對MPI進行剖析,也會對 MapReduce與傳統高性能計算中的並行計算框架進行比較。
2、Map任務的分配與數據的分布關系十分密切,對於上面的例子,這個100T的大文件分布在多台機器上,MapReduce框架會根據文件的實際存儲位置分配Map任務,這一過程需要對HDFS有好的理解,在后續博文中會對HDFS中進行剖析。到時候,能更好滴理解MapReduce框架。因為兩者是搭配起來使用的。
3、 MapReduce的輸入數據來自於HDFS,輸出結果也寫到HDFS。如果一個事情很復雜,需要分成很多個MapReduce作業反復運行,那么就需要來來回回地從磁盤中搬移數據的過程,速度很慢,后續博文會對Spark這一內存計算框架進行剖析,到時候,能更好滴理解MapReduce性能。
4、MapReduce的輸入數據和輸出結果也可以來自於HBase,HBase本身搭建於HDFS之上(理論上也可以搭建於其他文件系統),這種應用場合大多需要MapReduce處理一些海量結構化數據。后續博文會對HBase進行剖析。