MapReduce數據處理模型非常簡單:map和reduce函數的輸入和輸出是鍵/值對(key/value pair)
1.MapReduce的類型
Hadoop的MapReduce一般遵循如下常規格式:
map(K1, V1) –> list (K2, V2)
combine(K2, list(V2)) –> list(K2, V2)
partition(K2, V2) –> integer
reduce(K2, list(V2)) –> list(K3, V3)
map:對數據進行抽去過濾數據,組織key/value對等操作.
combine:為了減少reduce的輸入和Hadoop內部網絡數據傳輸負載,需要在map端對輸出進行預處理,類似reduce。combine不一定適用任何情況,選用
partition:將中間鍵值對划分到一個reduce分區,返回分區索引號。實際上,分區單獨由鍵決定(值是被忽略的),分區內的鍵會排序,相同的鍵的所有值會合成一個組(list(V2))
reduce:每個reduce會處理具有某些特性的鍵,每個鍵上都有值的序列,是通過對所有map輸出的值進行統計得來的,reduce根據所有map傳來的結果,最后進行統計合並操作,並輸出結果。
注:combine與reduce一樣時,K3與K2相同,V3與V2相同。
MapReduce的Java API代碼:一般Combine函數與reduce的一樣的
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { // ... } protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { // ... } } public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public class Context extends ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { // ... } protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException { // ... } }
用於處理中間數據的partition函數 API代碼:
public abstract class Partitioner<KEY, VALUE> { public abstract int getPartition(KEY key, VALUE value, int numPartitions); }
關於默認的MapReduce作業
默認的map是Mapper,是一個泛型類型,簡單的將所有輸入的值和鍵原封不動的寫到輸出中,即輸入輸出類型相同。
Mapper的實現
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } }
默認的 partitioner是HashPartitioner,對每天記錄的鍵進行哈希操作以決定該記錄屬於那個分區讓reduce處理,每個分區對應一個reducer任務,所以分區數等於Job的reduce的個數
HashPartitioner的實現
public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
默認的reduce函數Reducer,也是泛型類型,簡單的將所有輸入寫到輸出中。記錄在發給reduce之前,會被排序,一般是按照鍵值的大小排序。reduce的默認輸出格式是TextOutputFormat----它將鍵和值轉換成字符串並用Tab進行分割,然后一條記錄一行地進行輸出。
Reducer 的實現
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException { for (VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } } }
選擇reduce的個數:一般集群的總共的slot個數等於node的數目乘以每個node上的slot數目,而reduce的數目一般設置為比總slot數目少一些
默認MapReduce函數實例程序
public class MinimalMapReduceWithDefaults extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(Mapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); job.setReducerClass(Reducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args); System.exit(exitCode); } }
關於默認的stream作業(Stream概念見第二章)
stream最簡單的形式:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper /bin/cat
注意,必須提供一個mappe:默認的identity mapp不能在stream工作
這里再給出更多設置的stream形式,其他詳見權威指南:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-mapper /bin/cat \
-partitioner org.apache.hadoop.mapred.lib.HashPartitioner \
-numReduceTasks 1 \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-outputformat org.apache.hadoop.mapred.TextOutputFormat
關於Streaming中的鍵和值
Streaming用分隔符用於通過標准輸入把key/value對轉換為一串比特發送到map或reduce函數
默認時,用Tab分隔符,也可以根據需要,用配置的分隔符來進行分割,例如:來自輸出的鍵可以由一條記錄的前n個字段組成(stream.num.map.output.key.fields或stream.num.reduce.output.key.fields定義),剩下的就是值,eg,輸出的是"a,b,c",n=2,則鍵為"a、b",而值是"c"。Map和Reduce的分隔符是相互獨立進行配置的,參見下圖
2.輸入格式
1).輸入分片與記錄
一個輸入分片(input split)是由單個map處理的輸入塊,即每一個map只處理一個輸入分片,每個分片被划分為若干個記錄(records),每條記錄就是一個key/value對,map一個接一個的處理每條記錄,輸入分片和記錄都是邏輯的,不必將他們對應到文件上。注意,一個分片不包含數據本身,而是指向數據的引用和。
輸入分片在Java中被表示為InputSplit借口
public abstract class InputSplit { public abstract long getLength() throws IOException, InterruptedException; public abstract String[] getLocations() throws IOException,InterruptedException; }
InputFormat負責創建輸入分片並將它們分割成記錄,下面就是原型用法:
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; }
客戶端通過調用getSpilts()方法獲得分片數目,在TaskTracker或NM上,MapTask會將分片信息傳給InputFormat的createRecordReader()方法,進而這個方法來獲得這個分片的RecordReader,RecordReader基本就是記錄上的迭代器,MapTask用一個RecordReader來生成記錄的key/value對,然后再傳遞給map函數,如下代碼
public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); }
此處的Context實現接口MapContextImpl,並且封裝調用RecordReader下面的經過實現的方法,包括nextKeyValue,getCurrentKey,getCurrentValue。nextKeyValue()方法反復被調用用來為mapper生成key/value對,然后把這些key/value傳遞給map()方法,直到獨到stream的末尾,此時nextKeyValue返回false
A.FileInputFormat類
FileInputFormat是所有使用文件為數據源的InputFormat實現的基類,它提供了兩個功能:一個定義哪些文件包含在一個作業的輸入中;一個為輸入文件生成分片的實現,把分片割成記錄的作業由其子類來完成。
下圖為InputFormat類的層次結構:
B.FileInputFormat類輸入路徑
FileInputFormat提供四種靜態方法來設定Job的輸入路徑,其中下面的addInputPath()方法addInputPaths()方法可以將一個或多個路徑加入路徑列表,setInputPaths()方法一次設定完整的路徑列表(可以替換前面所設路徑)
public static void addInputPath(Job job, Path path); public static void addInputPaths(Job job, String commaSeparatedPaths); public static void setInputPaths(Job job, Path... inputPaths); public static void setInputPaths(Job job, String commaSeparatedPaths);
如果需要排除特定文件,可以使用FileInputFormat的setInputPathFilter()設置一個過濾器:
public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter);
C.FileInputFormat類的輸入分片
FileInputFormat類一般之分割超過HDFS塊大小的文件,通常分片與HDFS塊大小一樣,然后分片大小也可以改變的,下面展示了控制分片大小的屬性:
這里數據存儲在HDFS上的話,輸入分片大小不宜設置比HDFS塊更大,原因這樣會增加對MapTask來說不是本地文件的塊數。
最大的分片的大小默認是Java long類型的表示的最大值,這樣設置的效果:當它的值被設置成小於塊大小時,將強制分片比快小(?)
分片大小公式:
默認情況
max(minimumSize, min(maximumSize, blockSize))
下圖距離說明如何控制分片的大小
minimumSize < blockSize < maximumSize
D.小文件與CombineFileInputFormat
CombineFileInputFormat是針對小文件設計的,CombineFileInputFormat會把多個文件打包到一個分片中一邊每個mapper可以處理更多的數據;減少大量小文件的另一種方法可以使用SequenceFile將這些小文件合並成一個或者多個大文件。
CombineFileInputFormat不僅對於處理小文件實際上對於處理大文件也有好處,本質上,CombineFileInputFormat使map操作中處理的數據量與HDFS中文件的塊大小之間的耦合度降低了
CombineFileInputFormat是一個抽象類,沒有提供實體類,所以需要實現一個CombineFileInputFormat具體類和getRecordReader()方法(舊的接口是這個方法,新的接口InputFormat中則是createRecordReader(),似乎3rd權威指南在這個地方有些錯誤)
E.避免切分
有些應用程序可能不希望文件被切分,而是用一個mapper完整處理每一個輸入文件,有兩種方法可以保證輸入文件不被切分。第一種方法就是增加最小分片大小,將它設置成大於要處理的最大文件大小,eg:把這個值設置為long.MAXVALUE即可。第二種方法就是使用FileInputFormat具體子類,並且重載isSplitable()方法,把其返回值設置為false,如下所示
import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class NonSplittableTextInputFormat extends TextInputFormat { @Override protected boolean isSplitable(JobContext context, Path file) { return false; } }
F.mapper中的文件信息
處理文件輸入分片的mapper可以從文件配置對象的某些特定屬性中讀入輸入分片的有關信息,這可以通過在mapper實現中實現configure()方法來獲取作業配置對象JobConf,下圖顯示了文件輸入分片的屬性
G.把整個文件作為一條記錄處理
有時,mapper需要訪問問一個文件中的全部內容。即使不分割文件,仍然需要一個RecordReader來讀取文件內容為record的值,下面給出實現這個功能的完整程序,詳細解釋見權威指南
InputFormat的實現類WholeFileInputFormat
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { @Override protected boolean isSplitable(JobContext context, Path file) { return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; } }
WholeFileRecordReader的實現:WholeFileInputFormat使用RecordReader將整個文件讀為一條記錄
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit fileSplit; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException { return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { // do nothing } }
將若干個小文件打包成順序文件的MapReduce程序
public class SmallFilesToSequenceFileConverter extends Configured implements Tool { static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { private Text filenameKey; @Override protected void setup(Context context) throws IOException, InterruptedException { InputSplit split = context.getInputSplit(); Path path = ((FileSplit) split).getPath(); filenameKey = new Text(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(filenameKey, value); } } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setMapperClass(SequenceFileMapper.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args); System.exit(exitCode); } }
由於輸入格式為WholeFileInputFormat,所以mapper只需要找到文件輸入分片的文件名。
2).文本輸入
A.TextInputFormat
TextInputFormat是默認的InputFormat。每條記錄是一行輸入。key是LongWritable類型,存儲該行在整個文件中的字節偏移量,value是這行的內容,不包括任何終止符(換行符和回車符),它是Text類型
如下例
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
每條記錄表示以下key/value對
(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)
上面的key明顯不是行號,因為每個分片需要單獨處理的原因,行號只是一個片內的順序標記,所以在分片內在的行號是可以的,而在文件中是很難辦到的。然而為了使key是唯一的,我們可以利用已知的上一個分片的大小,計算出當前位置在整個文件中的偏移量(不是行號),這樣加上文件名,就能確定唯一key,如果行固定長,就可以算出行號
PS:因為FileInputFormat定義的是邏輯結構,不能匹配HDFS塊大小,所以TextFileInputFormat的以行為單位的邏輯記錄中,很有可能某一行是跨文件塊存儲的,如下所示
B.KeyValueTextInputFormat
對下面的文本,KeyValueTextInputFormat比較適合處理,其中可以通過mapreduce.input.keyvaluelinerecordreader.key.value.separator屬性設置指定分隔符,默認值為制表符,以下指定"→"為分隔符
line1→On the top of the Crumpetty Tree
line2→The Quangle Wangle sat,
line3→But his face you could not see,
line4→On account of his Beaver Hat.
C.NLineInputFormat
如果希望mapper收到固定行數的輸入,需要使用NLineInputFormat作為InputFormat。與TextInputFormat一樣,key是文件中行的字節偏移量,值是行本身。
N是每個mapper收到的輸入行數,默認時N=1,每個mapper會正好收到一行輸入,mapreduce.input.lineinputformat.linespermap屬性控制N的值。以剛才的文本為例:
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
例如,如果N=2,則每個輸入分片包括兩行。第一個mapper會收到前兩行key/value對:
(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
另一個mapper則收到:
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)
3).二進制輸入
A.SequenceFileInputFormat
如果要用順序文件數據作為MapReduce的輸入,應用SequenceFileInputFormat。key和value順序文件,所以要保證map輸入的類型匹配
雖然從名稱看不出來,但是SequenceFileInputFormat可以讀MapFile和SequenceFile,如果在處理順序文件時遇到目錄,SequenceFileInputFormat類會認為值正在讀MapFile,使用的是其數據文件,因此沒有MapFileInputFormat類是自然的
B.SequenceFileAsTextInputFormat和SequenceFileAsBinaryInputFormat
兩者均是SequenceFileInputFormat的變體,前者將順序文件(其實就是SequenceFile)的key和value轉成Text對象,后者獲取順序文件的key和value作為二進制對象
4).多種輸入
對於不同格式,不同表示的文本文件輸出的處理,可以用MultipleInputs類里處理,它允許為每條輸入路徑指定InputFormat和Mapper,例如,下滿對Met Office和NCDC兩種不同格式的氣象數據放在一起進行處理:
MultipleInputs.addInputPath(job, ncdcInputPath,TextInputFormat.class, MaxTemperatureMapper.class); MultipleInputs.addInputPath(job, metOfficeInputPath,TextInputFormat.class, MetOfficeMaxTemperatureMapper.class);
兩個數據源的行格式不同,所以使用了兩個不同的mapper
MultipleInputs類有一個重載版本的addInputPath()方法:
public static void addInputPath(Job job, Path path,Class<? extends InputFormat> inputFormatClass)
在有多種輸入格式只有一個mapper時候(調用Job的setMapperClass()方法),這個方法會很有用
另外還有一個用於食用JDBC從關系數據庫中讀取數據的輸入格式--DBInputFormat(參見權威指南)
3.輸出格式
OutputFormat類的層次結構:
1).文本輸出
默認輸出格式是TextOutputFormat,它本每條記錄寫成文本行,key/value任意,這里key和value可以用制表符分割,用mapreduce.output.textoutputformat.separator書信可以改變制表符,與TextOutputFormat對應的輸入格式是KeyValueTextInputFormat
可以使用NullWritable來省略輸出的key和value。
2).二進制輸出
A.SequenceFileOutputFormat
SequenceFileOutputFormat將它的輸出寫為一個順序文件,因為它的格式緊湊,很容易被壓縮,所以易於作為MapReduce的輸入
B.SequenceFileAsBinaryOutputFormat和MapFileOutputFormat
前者把key/value對作為二進制格式寫到一個SequenceFile容器中,后者把MapFile作為輸出,MapFile中的key必需順序添加,所以必須確保reducer輸出的key已經排好序。
3).多個輸出--MultipleOutputs類
有時可能需要對輸出的把文件名進行控制,或讓每個reducer輸出多個文件。MapReduce為此提供了庫:MultipleOutputs類
MultipleOutputs允許我們依據輸出的key和value或者二進制string命名輸出的文件名,如果為map輸出的文件,則文件名的格式為"name-m-nnnnn", 如果reduce輸出的文件,則文件名的格式為"name-r-nnnnn",其中"name"由MapReduce程序決定,”nnnnn“為part從0開始的整數編號,part編號確保從不同分區的輸出(mapper或reducer)生成的文件名字不會沖突
下面的程序使用MultipleOutputs類將整個數據集切分為以氣象站ID命名的文件
public class PartitionByStationUsingMultipleOutputs extends Configured implements Tool { static class StationMapper extends Mapper<LongWritable, Text, Text, Text> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); context.write(new Text(parser.getStationId()), value); } } static class MultipleOutputsReducer extends Reducer<Text, Text, NullWritable, Text> { private MultipleOutputs<NullWritable, Text> multipleOutputs; @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { multipleOutputs .write(NullWritable.get(), value, key.toString()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setMapperClass(StationMapper.class); job.setMapOutputKeyClass(Text.class); job.setReducerClass(MultipleOutputsReducer.class); job.setOutputKeyClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new PartitionByStationUsingMultipleOutputs(), args); System.exit(exitCode); } }
程序解釋略,運行后有以下結果:
output/010010-99999-r-00027
output/010050-99999-r-00013
output/010100-99999-r-00015
output/010280-99999-r-00014
output/010550-99999-r-00000
output/010980-99999-r-00011
output/011060-99999-r-00025
output/012030-99999-r-00029
output/012350-99999-r-00018
output/012620-99999-r-00004
我們還可以適當 改變MultipleOutputs類中的write方法中的路徑名參數,來得到我們想要輸出文件名(例如:029070-99999/1901/part-r-00000),有下面程序:
@Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { parser.parse(value); String basePath = String.format("%s/%s/part", parser.getStationId(), parser.getYear()); multipleOutputs.write(NullWritable.get(), value, basePath); } }
4).延時輸出
有些文件應用傾向於不創建空文件,此時就可以利用LazyOutputFormat,它是一個封裝輸出格式,可以保證指定分區第一條記錄輸出時才真正的創建文件,要使用它,用JobConf和相關輸出格式作為參數來調用setOutputFormatClass()方法.