Hadoop:The Definitive Guid 總結 Chapter 7 MapReduce的類型與格式


MapReduce數據處理模型非常簡單:map和reduce函數的輸入和輸出是鍵/值對(key/value pair)

 

1.MapReduce的類型

Hadoop的MapReduce一般遵循如下常規格式:

map(K1, V1) –> list (K2, V2)              

combine(K2, list(V2)) –> list(K2, V2)

partition(K2, V2) –> integer              

reduce(K2, list(V2)) –> list(K3, V3) 

map:對數據進行抽去過濾數據,組織key/value對等操作.

combine:為了減少reduce的輸入和Hadoop內部網絡數據傳輸負載,需要在map端對輸出進行預處理,類似reduce。combine不一定適用任何情況,選用

partition:將中間鍵值對划分到一個reduce分區,返回分區索引號。實際上,分區單獨由鍵決定(值是被忽略的),分區內的鍵會排序,相同的鍵的所有值會合成一個組(list(V2))

reduce:每個reduce會處理具有某些特性的鍵,每個鍵上都有值的序列,是通過對所有map輸出的值進行統計得來的,reduce根據所有map傳來的結果,最后進行統計合並操作,並輸出結果。

注:combine與reduce一樣時,K3與K2相同,V3與V2相同。

 

MapReduce的Java API代碼:一般Combine函數與reduce的一樣的

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        // ...
    }

    protected void map(KEYIN key, VALUEIN value, Context context)
            throws IOException, InterruptedException {
        // ...
    }
}

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public class Context extends
            ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        // ...
    }

    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context)
            throws IOException, InterruptedException {
        // ...
    }
}

用於處理中間數據的partition函數 API代碼:

public abstract class Partitioner<KEY, VALUE> {
    public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}

 

關於默認的MapReduce作業

默認的map是Mapper,是一個泛型類型,簡單的將所有輸入的值和鍵原封不動的寫到輸出中,即輸入輸出類型相同。

Mapper的實現

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    protected void map(KEYIN key, VALUEIN value, Context context)
            throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
    }
}

 

默認的 partitioner是HashPartitioner,對每天記錄的鍵進行哈希操作以決定該記錄屬於那個分區讓reduce處理,每個分區對應一個reducer任務,所以分區數等於Job的reduce的個數

HashPartitioner的實現

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

 

默認的reduce函數Reducer,也是泛型類型,簡單的將所有輸入寫到輸出中。記錄在發給reduce之前,會被排序,一般是按照鍵值的大小排序。reduce的默認輸出格式是TextOutputFormat----它將鍵和值轉換成字符串並用Tab進行分割,然后一條記錄一行地進行輸出。

Reducer 的實現

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) 
            throws IOException, InterruptedException {
        for (VALUEIN value: values) {
            context.write((KEYOUT) key, (VALUEOUT) value);
        }
    }
}

選擇reduce的個數:一般集群的總共的slot個數等於node的數目乘以每個node上的slot數目,而reduce的數目一般設置為比總slot數目少一些

默認MapReduce函數實例程序

public class MinimalMapReduceWithDefaults extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
            }
        
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(Mapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);
        job.setReducerClass(Reducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        return job.waitForCompletion(true) ? 0 : 1;
        }
    
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);
        System.exit(exitCode);
        }
}


 

 

關於默認的stream作業(Stream概念見第二章)

stream最簡單的形式:

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
     -input input/ncdc/sample.txt \
     -output output \
     -mapper /bin/cat

注意,必須提供一個mappe:默認的identity mapp不能在stream工作

這里再給出更多設置的stream形式,其他詳見權威指南:

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
    -input input/ncdc/sample.txt \
    -output output \
    -inputformat org.apache.hadoop.mapred.TextInputFormat \
    -mapper /bin/cat \
    -partitioner org.apache.hadoop.mapred.lib.HashPartitioner \
    -numReduceTasks 1 \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
    -outputformat org.apache.hadoop.mapred.TextOutputFormat

 

關於Streaming中的鍵和值

Streaming用分隔符用於通過標准輸入把key/value對轉換為一串比特發送到map或reduce函數

默認時,用Tab分隔符,也可以根據需要,用配置的分隔符來進行分割,例如:來自輸出的鍵可以由一條記錄的前n個字段組成(stream.num.map.output.key.fields或stream.num.reduce.output.key.fields定義),剩下的就是值,eg,輸出的是"a,b,c",n=2,則鍵為"a、b",而值是"c"。Map和Reduce的分隔符是相互獨立進行配置的,參見下圖

 

 

 

2.輸入格式

1).輸入分片與記錄

一個輸入分片(input split)是由單個map處理的輸入塊,即每一個map只處理一個輸入分片每個分片被划分為若干個記錄(records),每條記錄就是一個key/value對,map一個接一個的處理每條記錄,輸入分片和記錄都是邏輯的,不必將他們對應到文件上。注意,一個分片不包含數據本身,而是指向數據的引用和。

輸入分片在Java中被表示為InputSplit借口

public abstract class InputSplit {
    public abstract long getLength() throws IOException, InterruptedException;
    public abstract String[] getLocations() throws IOException,InterruptedException;
}

InputFormat負責創建輸入分片並將它們分割成記錄,下面就是原型用法:

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext context)
            throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException,
            InterruptedException;
}

 

客戶端通過調用getSpilts()方法獲得分片數目,在TaskTracker或NM上,MapTask會將分片信息傳給InputFormat的createRecordReader()方法,進而這個方法來獲得這個分片的RecordReader,RecordReader基本就是記錄上的迭代器,MapTask用一個RecordReader來生成記錄的key/value對,然后再傳遞給map函數,如下代碼

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    cleanup(context);
}

此處的Context實現接口MapContextImpl,並且封裝調用RecordReader下面的經過實現的方法,包括nextKeyValue,getCurrentKey,getCurrentValue。nextKeyValue()方法反復被調用用來為mapper生成key/value對,然后把這些key/value傳遞給map()方法,直到獨到stream的末尾,此時nextKeyValue返回false

 

A.FileInputFormat類

FileInputFormat是所有使用文件為數據源的InputFormat實現的基類,它提供了兩個功能:一個定義哪些文件包含在一個作業的輸入中;一個為輸入文件生成分片的實現,把分片割成記錄的作業由其子類來完成。

下圖為InputFormat類的層次結構:

 

B.FileInputFormat類輸入路徑

FileInputFormat提供四種靜態方法來設定Job的輸入路徑,其中下面的addInputPath()方法addInputPaths()方法可以將一個或多個路徑加入路徑列表,setInputPaths()方法一次設定完整的路徑列表(可以替換前面所設路徑)

public static void addInputPath(Job job, Path path);
public static void addInputPaths(Job job, String commaSeparatedPaths);
public static void setInputPaths(Job job, Path... inputPaths);
public static void setInputPaths(Job job, String commaSeparatedPaths);

如果需要排除特定文件,可以使用FileInputFormat的setInputPathFilter()設置一個過濾器:

public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter);


 

C.FileInputFormat類的輸入分片

FileInputFormat類一般之分割超過HDFS塊大小的文件,通常分片與HDFS塊大小一樣,然后分片大小也可以改變的,下面展示了控制分片大小的屬性:

這里數據存儲在HDFS上的話,輸入分片大小不宜設置比HDFS塊更大,原因這樣會增加對MapTask來說不是本地文件的塊數。

最大的分片的大小默認是Java long類型的表示的最大值,這樣設置的效果:當它的值被設置成小於塊大小時,將強制分片比快小(?)

分片大小公式:

默認情況

 max(minimumSize, min(maximumSize, blockSize))

下圖距離說明如何控制分片的大小

 minimumSize < blockSize < maximumSize

 

D.小文件與CombineFileInputFormat

CombineFileInputFormat是針對小文件設計的,CombineFileInputFormat會把多個文件打包到一個分片中一邊每個mapper可以處理更多的數據;減少大量小文件的另一種方法可以使用SequenceFile將這些小文件合並成一個或者多個大文件。

CombineFileInputFormat不僅對於處理小文件實際上對於處理大文件也有好處,本質上,CombineFileInputFormat使map操作中處理的數據量與HDFS中文件的塊大小之間的耦合度降低了

CombineFileInputFormat是一個抽象類,沒有提供實體類,所以需要實現一個CombineFileInputFormat具體類和getRecordReader()方法(舊的接口是這個方法,新的接口InputFormat中則是createRecordReader(),似乎3rd權威指南在這個地方有些錯誤)

 

E.避免切分

有些應用程序可能不希望文件被切分,而是用一個mapper完整處理每一個輸入文件,有兩種方法可以保證輸入文件不被切分。第一種方法就是增加最小分片大小,將它設置成大於要處理的最大文件大小,eg:把這個值設置為long.MAXVALUE即可。第二種方法就是使用FileInputFormat具體子類,並且重載isSplitable()方法,把其返回值設置為false,如下所示

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class NonSplittableTextInputFormat extends TextInputFormat {
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
}

 

 

F.mapper中的文件信息

處理文件輸入分片的mapper可以從文件配置對象的某些特定屬性中讀入輸入分片的有關信息,這可以通過在mapper實現中實現configure()方法來獲取作業配置對象JobConf,下圖顯示了文件輸入分片的屬性

 

G.把整個文件作為一條記錄處理

有時,mapper需要訪問問一個文件中的全部內容。即使不分割文件,仍然需要一個RecordReader來讀取文件內容為record的值,下面給出實現這個功能的完整程序,詳細解釋見權威指南

InputFormat的實現類WholeFileInputFormat

public class WholeFileInputFormat extends
        FileInputFormat<NullWritable, BytesWritable> {
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }
}

WholeFileRecordReader的實現:WholeFileInputFormat使用RecordReader將整個文件讀為一條記錄

class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException,
            InterruptedException {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException,
            InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException {
        return processed ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
        // do nothing
    }
}

將若干個小文件打包成順序文件的MapReduce程序

public class SmallFilesToSequenceFileConverter extends Configured implements
        Tool {
    static class SequenceFileMapper extends
            Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
        private Text filenameKey;

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit) split).getPath();
            filenameKey = new Text(path.toString());
        }

        @Override
        protected void map(NullWritable key, BytesWritable value,
                Context context) throws IOException, InterruptedException {
            context.write(filenameKey, value);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        job.setMapperClass(SequenceFileMapper.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),
                args);
        System.exit(exitCode);
    }
}

由於輸入格式為WholeFileInputFormat,所以mapper只需要找到文件輸入分片的文件名。

 

2).文本輸入

A.TextInputFormat

TextInputFormat是默認的InputFormat。每條記錄是一行輸入。key是LongWritable類型,存儲該行在整個文件中的字節偏移量,value是這行的內容,不包括任何終止符(換行符和回車符),它是Text類型

如下例

On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

每條記錄表示以下key/value對

(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)

上面的key明顯不是行號,因為每個分片需要單獨處理的原因,行號只是一個片內的順序標記,所以在分片內在的行號是可以的,而在文件中是很難辦到的。然而為了使key是唯一的,我們可以利用已知的上一個分片的大小,計算出當前位置在整個文件中的偏移量(不是行號),這樣加上文件名,就能確定唯一key,如果行固定長,就可以算出行號

PS:因為FileInputFormat定義的是邏輯結構,不能匹配HDFS塊大小,所以TextFileInputFormat的以行為單位的邏輯記錄中,很有可能某一行是跨文件塊存儲的,如下所示

 

B.KeyValueTextInputFormat

對下面的文本,KeyValueTextInputFormat比較適合處理,其中可以通過mapreduce.input.keyvaluelinerecordreader.key.value.separator屬性設置指定分隔符,默認值為制表符,以下指定"→"為分隔符

line1→On the top of the Crumpetty Tree
line2→The Quangle Wangle sat,
line3→But his face you could not see,
line4→On account of his Beaver Hat.

 

C.NLineInputFormat

如果希望mapper收到固定行數的輸入,需要使用NLineInputFormat作為InputFormat。與TextInputFormat一樣,key是文件中行的字節偏移量,值是行本身。

N是每個mapper收到的輸入行數,默認時N=1,每個mapper會正好收到一行輸入,mapreduce.input.lineinputformat.linespermap屬性控制N的值。以剛才的文本為例:

On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

例如,如果N=2,則每個輸入分片包括兩行。第一個mapper會收到前兩行key/value對:

(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)

另一個mapper則收到:

(57, But his face you could not see,)
(89, On account of his Beaver Hat.)

 

3).二進制輸入

A.SequenceFileInputFormat

如果要用順序文件數據作為MapReduce的輸入,應用SequenceFileInputFormat。key和value順序文件,所以要保證map輸入的類型匹配

雖然從名稱看不出來,但是SequenceFileInputFormat可以讀MapFile和SequenceFile,如果在處理順序文件時遇到目錄,SequenceFileInputFormat類會認為值正在讀MapFile,使用的是其數據文件,因此沒有MapFileInputFormat類是自然的

 

B.SequenceFileAsTextInputFormat和SequenceFileAsBinaryInputFormat

兩者均是SequenceFileInputFormat的變體,前者將順序文件(其實就是SequenceFile)的key和value轉成Text對象,后者獲取順序文件的key和value作為二進制對象

 

4).多種輸入

對於不同格式,不同表示的文本文件輸出的處理,可以用MultipleInputs類里處理,它允許為每條輸入路徑指定InputFormat和Mapper,例如,下滿對Met Office和NCDC兩種不同格式的氣象數據放在一起進行處理:

MultipleInputs.addInputPath(job, ncdcInputPath,TextInputFormat.class, MaxTemperatureMapper.class);
MultipleInputs.addInputPath(job, metOfficeInputPath,TextInputFormat.class, MetOfficeMaxTemperatureMapper.class);

兩個數據源的行格式不同,所以使用了兩個不同的mapper

MultipleInputs類有一個重載版本的addInputPath()方法:

public static void addInputPath(Job job, Path path,Class<? extends InputFormat> inputFormatClass)

在有多種輸入格式只有一個mapper時候(調用Job的setMapperClass()方法),這個方法會很有用

 

另外還有一個用於食用JDBC從關系數據庫中讀取數據的輸入格式--DBInputFormat(參見權威指南)

 

 

3.輸出格式

OutputFormat類的層次結構:

 

1).文本輸出

默認輸出格式是TextOutputFormat,它本每條記錄寫成文本行,key/value任意,這里key和value可以用制表符分割,用mapreduce.output.textoutputformat.separator書信可以改變制表符,與TextOutputFormat對應的輸入格式是KeyValueTextInputFormat

可以使用NullWritable來省略輸出的key和value。

 

2).二進制輸出

A.SequenceFileOutputFormat

SequenceFileOutputFormat將它的輸出寫為一個順序文件,因為它的格式緊湊,很容易被壓縮,所以易於作為MapReduce的輸入

 

B.SequenceFileAsBinaryOutputFormat和MapFileOutputFormat

前者把key/value對作為二進制格式寫到一個SequenceFile容器中,后者把MapFile作為輸出,MapFile中的key必需順序添加,所以必須確保reducer輸出的key已經排好序。

 

3).多個輸出--MultipleOutputs類

有時可能需要對輸出的把文件名進行控制,或讓每個reducer輸出多個文件。MapReduce為此提供了庫:MultipleOutputs類

MultipleOutputs允許我們依據輸出的key和value或者二進制string命名輸出的文件名,如果為map輸出的文件,則文件名的格式為"name-m-nnnnn", 如果reduce輸出的文件,則文件名的格式為"name-r-nnnnn",其中"name"由MapReduce程序決定,”nnnnn“為part從0開始的整數編號,part編號確保從不同分區的輸出(mapper或reducer)生成的文件名字不會沖突

下面的程序使用MultipleOutputs類將整個數據集切分為以氣象站ID命名的文件

public class PartitionByStationUsingMultipleOutputs extends Configured
        implements Tool {
    static class StationMapper extends Mapper<LongWritable, Text, Text, Text> {
        private NcdcRecordParser parser = new NcdcRecordParser();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            parser.parse(value);
            context.write(new Text(parser.getStationId()), value);
        }
    }

    static class MultipleOutputsReducer extends
            Reducer<Text, Text, NullWritable, Text> {
        private MultipleOutputs<NullWritable, Text> multipleOutputs;

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text value : values) {
                multipleOutputs
                        .write(NullWritable.get(), value, key.toString());
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            multipleOutputs.close();
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setMapperClass(StationMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setReducerClass(MultipleOutputsReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(
                new PartitionByStationUsingMultipleOutputs(), args);
        System.exit(exitCode);
    }
}

程序解釋略,運行后有以下結果:

output/010010-99999-r-00027
output/010050-99999-r-00013
output/010100-99999-r-00015
output/010280-99999-r-00014
output/010550-99999-r-00000
output/010980-99999-r-00011
output/011060-99999-r-00025
output/012030-99999-r-00029
output/012350-99999-r-00018
output/012620-99999-r-00004

我們還可以適當 改變MultipleOutputs類中的write方法中的路徑名參數,來得到我們想要輸出文件名(例如:029070-99999/1901/part-r-00000),有下面程序:

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
    for (Text value : values) {
        parser.parse(value);
        String basePath = String.format("%s/%s/part",
                parser.getStationId(), parser.getYear());
        multipleOutputs.write(NullWritable.get(), value, basePath);
    }
}

 

4).延時輸出

有些文件應用傾向於不創建空文件,此時就可以利用LazyOutputFormat,它是一個封裝輸出格式,可以保證指定分區第一條記錄輸出時才真正的創建文件,要使用它,用JobConf和相關輸出格式作為參數來調用setOutputFormatClass()方法.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM