Hadoop I/O


在Hadoop集群中,數據在網絡上傳輸,保證數據完整性的通常做法使用checksum,比如常用的 CRC-32 (cyclic redundancy check)。

Hadoop上支持的文件壓縮格式有:gzip  ZIP  bzip2  LZO。例如在UNIX上可以使用命令:gzip -1 file,會生成file.gz,但是原來的file就沒有了。壓縮算法都要在執行速度和壓縮比上做一個權衡,-1表示只注意速度,-9表示只注重壓縮比。

CompressionOutputStream和CompressionInputStream很類似於java.util.zip.DeflaterOutputStream 和 java.util.zip.DeflaterInputStream。

Interface CompressionCodec{
  CompressionInputStream createInputStream(InputStream in);      //解壓
  CompressionOutputStream createOutputStream(OutputStream out);    //壓縮

  ……

} 

實現了CompressionCodec接口的類有:BZip2Codec, DefaultCodec, GzipCodec, SnappyCodec

package io;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.GzipCodec;

public class FileCmpress {

    public static void main(String[] args) throws IOException{
        //解壓示例codec.createInputStream
        String uri=args[0];
        Configuration conf=new Configuration();
        FileSystem fs=FileSystem.get(URI.create(uri),conf);
        Path inputPath=new Path(uri);
        CompressionCodecFactory factory=new CompressionCodecFactory(conf);
        CompressionCodec codec=factory.getCodec(inputPath);        //根據文件名的后綴來選擇生成哪種類型的CompressionCodec
        if(codec==null){
            System.err.println("No codec found for "+uri);
            System.exit(1);
        }
        String outputUri=CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
        InputStream in=null;
        OutputStream out=null;
        try{
            in=codec.createInputStream(fs.open(inputPath));        //對輸入流進行解壓
            out=fs.create(new Path(outputUri));
            IOUtils.copyBytes(in, out, conf);
        }finally{
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
        }
        //壓縮示例codec.createOutputStream
        CompressionOutputStream outStream=null;
        Path op2=new Path("2.gz");
        try{
            in=fs.open(new Path(outputUri));        //打開原始文件
            GzipCodec gzipCodec=new GzipCodec();    //創建gzip壓縮實例
            gzipCodec.setConf(conf);                //給CompressionCodec設置Configuration
            outStream=gzipCodec.createOutputStream(fs.create(op2));        //打開輸出文件(最終的壓縮文件)
            IOUtils.copyBytes(in, outStream, 4096,false);        //從輸入流向輸出流拷貝,GzipCodec負責對輸出流進行壓縮
        }finally{
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
        }
        
    }
}

注意文件經過壓縮之后再送給MapReduce時顯然就不能再split了,還好gzip和LZO格式的文件(hadoop通過查看文件名后綴)默認就是不支持split的,而bzip2可以被split,ZIP文件則不能作為MapReduce的輸入文件。

可以讓Reduce的輸出文件被壓縮。

conf.setBoolean("mapred.output.compress", true);    //支持輸出被壓縮
conf.setClass("mapred.output.compression.codec", GzipCodec.class,CompressionCodec.class);    //指定采用的壓縮算法

讓Map的輸出被壓縮:

conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class);

Java中的基本數據類型在Hadoop中都有對應的Writable類型,Hadoop中還有幾中Writable Collections。ArrayWriatble和TwoDArrayWriatble分別是適用於一維數組和二維數組的Wriatble類型,當然數組中元素必須是相同的類型。

ArrayWritable writable = new ArrayWritable(Text.class);

但是ArrayWritable還是不要用了,Hadoop中沒有實現ArrayWritable的空構造函數,而作為實現了Writable的類,是必須提供空參數構造函數的----在調用readFields(DataInput in)之前要先調用空構造函數來建立實例,所以每當我使用ArrayWritable作為Mapper的輸出類型時就會報錯。

MapWriatble實現了java.util.Map<Wriatble,Writable>接口;而SortedMapWritable則實現了java.util.Map<WriatbleComparable,Writable>接口。

MapWritable src = new MapWritable();
src.put(new IntWritable(1), new Text("cat"));
src.put(new VIntWritable(2), new LongWritable(163));

用戶自定義Writable類型

package basic;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;

    public TextPair() {
        set(new Text(), new Text());
    }

    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second) {
        set(first, second);
    }

    public void set(Text first, Text second) {
        this.first = first;
        this.second = second;
    }

    public Text getFirst() {
        return first;
    }

    public Text getSecond() {
        return second;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }

    @Override
    public int hashCode() {
        return first.hashCode() * 163 + second.hashCode();
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof TextPair) {
            TextPair tp = (TextPair) o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }

    @Override
    public String toString() {
        return first + "\t" + second;
    }

    @Override
    public int compareTo(TextPair tp) {
        int cmp = first.compareTo(tp.first);
        if (cmp != 0) {
            return cmp;
        }
        return second.compareTo(tp.second);
    }
}

SequenceFile

HDFS和MapReduce都是為處理大文件設計的,所以把諸多小文件打包在一個SequenceFile中着實是一個提高效率的好方法。

寫入SequenceFile的key或value數據類型沒有必要是Writable類型,只要是可以被Serialization序列化和反序列化的類型就可以了。

用SequenceFile.createWriter()創建SequenceFile.Writer,把一個文本文件寫入一個二進制文件。代碼如下

package basic;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

public class SequenceFileWriteDemo {
    private static final String[] DATA = { "One, two, buckle my shoe",
            "Three, four, shut the door", "Five, six, pick up sticks",
            "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };

    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
            writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
                    value.getClass());

            for (int i = 0; i < 100; i++) {
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
                        value);
                writer.append(key, value);    //采用append方式
            }
        } finally {
            IOUtils.closeStream(writer);
        }
    }
}

可以想像上面代碼中的DATA是從小文件中讀取的,我們從多個小文件中讀取文本,轉換字節流append到一個大的二進制文件中,既實現了多個小文件的合並,也實現了壓縮(二進制文件比文本文件小)。

下面演示讀取一個SequenceFile

package basic;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileReadDemo {
    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);

        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(fs, path, conf);
            Writable key = (Writable) ReflectionUtils.newInstance(
                    reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(
                    reader.getValueClass(), conf);
            long position = reader.getPosition();
            while (reader.next(key, value)) {    //循環讀取文件
                String syncSeen = reader.syncSeen() ? "*" : "";    //SequenceFile中都有sync標記
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,
                        value);
                position = reader.getPosition(); //下一條record開始的位置
            }
        } finally {
            IOUtils.closeStream(reader);
        }
    }
}

SequenceFile.Writer會在流中寫入sync pointer,由於某些原因,SequenceFile.Reader可能會找不到record的邊界,sync pointer就是用來標記record邊界的,但它並不是在每個record的后面都做一個標記,而隔幾個做一個。sync pointer不會超過文件長度的1%。

SequenceFile同樣支持隨機讀寫:reader.seek(360);

而reader.sync(360)則是定位到360之后的第1個sync pointer處。

在命令行顯示一個SequenceFile的內容不能用cat,而要用-text(表示要用文本的形式顯示二進制文件)

$ hadoop fs -text myfile.seq

SequenceFile文件的最開始是Header部分,其中包含了record的key-value的數據類型、sync標記所采用的字符、有關壓縮的細節、用戶自定義的metadata。SequenceFile內置的壓縮方式有兩種:RecordCompression和Block Compression。

SequenceFile的文件結構:

二進制文件具有清晰嚴謹的文件結構,讀寫速度自然要比文本文件快,SequenceFile更容易和Hadoop的基本數據類型(IntWritable,FloatWriatble等)進行交互。

下面的代碼展示如何把一個文本文件轉換為SequenceFile,job.setOutputFormatClass(SequenceFileOutputFormat.class);就可以了。

 1 package basic;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.conf.Configured;
 5 import org.apache.hadoop.fs.FileSystem;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.SequenceFile.CompressionType;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.io.compress.GzipCodec;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
15 import org.apache.hadoop.util.Tool;
16 import org.apache.hadoop.util.ToolRunner;
17 
18 public class ToSeqFile extends Configured implements Tool {
19 
20     @Override
21     public int run(String[] arg0) throws Exception {
22         Job job = new Job();
23         job.setJarByClass(getClass());
24         Configuration conf=getConf();
25         FileSystem fs = FileSystem.get(conf);
26     
27         FileInputFormat.setInputPaths(job, "/user/orisun/input/rdata");
28         Path outDir=new Path("/user/orisun/output");
29         fs.delete(outDir,true);
30         FileOutputFormat.setOutputPath(job, outDir);
31         
32         job.setNumReduceTasks(0);
33         job.setOutputKeyClass(LongWritable.class);
34         job.setOutputValueClass(Text.class);
35         //設置OutputFormat為SequenceFileOutputFormat
36         job.setOutputFormatClass(SequenceFileOutputFormat.class);
37         //允許壓縮
38         SequenceFileOutputFormat.setCompressOutput(job, true);
39         //壓縮算法為gzip
40         SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
41         //壓縮模式為BLOCK
42         SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
43 
44 
45         return job.waitForCompletion(true)?0:1;
46     }
47 
48     public static void main(String[] args) throws Exception {
49         int res = ToolRunner.run(new Configuration(), new ToSeqFile(), args);
50         System.exit(res);
51     }
52 }

代碼中沒有設置FileInputFormat,所以采用默認的TextInputFormat。代碼中沒有設置MapperClass,默認情況下map把從InputFormat中得到的key-value原樣輸出(新版本的Hadoop中已經沒有了IndentityMapper這個類,所以我沒有說“默認的Mapper就是IndentityMapper”)。

 MapFile

MapFile是排序后的SequenceFile,並且它會額外生成一個index文件提供按key的查找。

讀寫MapFile與讀寫SequenceFile非常類似,只需要換成MapFie.Reader和MapFile.Writer就可以了。在命令行顯示MapFile的文件內容同樣要用-text。

與SequenceFile不同的是:由於MapFile需要按key排序,所以它的key必須是WritableComparable類型的。

MapFile會生成2個文件,一個名為data,一個名為index。data中的內容就是按key排序后的SequenceFile中的內容

$ hadoop fs -text numbers.map/data | head

1  One, two, buckle my shoe
2  Three, four, shut the door
3  Five, six, pick up sticks
4  Seven, eight, lay them straight
5  Nine, ten, a big fat hen
6  One, two, buckle my shoe
7  Three, four, shut the door

$ hadoop fs -text numbers.map/index
1  128
129  6079
257  12054
385  18030
第1列就是data文件中的key值,第2列是key在data文件中的offset。我們看到並不是所有的key都記錄在了index文件中,而是隔128個才記錄一個(這個間隔可以在io.map.index.interval屬性中設置,或直接在代碼中通過MapFile.Writer實例的setIndexInterval()函數來設置)。

下面我們來看一下index文件是怎么發揮作用的。

data文件中存放的每條record都是一個key-value對,我們可以根據下面的函數由key值獲取value。

public Writable get(WritableComparable key, Writable val) throws IOException

如果指定的key不存在,則不會給value賦值。

假如我們要檢索的key值是300。首先MapFile.Reader把index文件讀入內存,在index文件中進行二分查找,找到等於或第一個小於300的key-value對:257--12054,然后在data文件中從12054的位置開始查找,直到找到300或第一個大於300的key。可見整個查找的過程需要在內存中做一次二分查找,然后作一次文件掃描,且掃描文件的行數不會超過128行(如果io.map.index.interval屬性值為128話)。

當index文件很大時,全部讀到內存中也是不現實的。當然你會說我們可以調大io.map.index.interval的值,但那樣的話需要重新生成MapFile。在index文件已經生成的情況下我們可以設置io.map.index.skip的值,設為1就表示index中的記錄每隔一行才被載入內存。

使用MapFile.fix把一個SequenceFile轉換成MapFile:

 1 package basic;
 2 
 3 import java.net.URI;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FileSystem;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.MapFile;
 9 import org.apache.hadoop.io.SequenceFile;
10 
11 public class MapFileFixer {
12     @SuppressWarnings({ "rawtypes", "unchecked" })
13     public static void main(String[] args) throws Exception {
14     String mapUri = args[0];
15     Configuration conf = new Configuration();
16     FileSystem fs = FileSystem.get(URI.create(mapUri), conf);        //MapFile(由data和index組成的目錄)也可以自成一個FileSystem
17     Path map = new Path(mapUri);
18     Path mapData = new Path(map, MapFile.DATA_FILE_NAME);
19     //通過SequenceFile.Reader來獲取SequenceFile的key和value類型
20     SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf);
21     Class keyClass = reader.getKeyClass();
22     Class valueClass = reader.getValueClass();
23     reader.close();
24 
25     //使用MapFile.fix把一個SequenceFile轉換成MapFile
26     long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf);
27     System.out.printf("Created MapFile %s with %d entries\n", map, entries);
28 
29     }
30 }

程序開始前先新建一個目錄假如取名為map

$ hadoop fs -mkdir map

把一個sequencefile放到map目錄下,並重命名為data(注意必須重命名為data)

$ hadoop fs -put myseqfile map/data

最后運行程序

$ hadoop jar dm.jar basic.MapFileFixer map

結果:data文件中的record按key值重新排序,並在map目錄下生成了index文件。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM