在Hadoop集群中,數據在網絡上傳輸,保證數據完整性的通常做法使用checksum,比如常用的 CRC-32 (cyclic redundancy check)。
Hadoop上支持的文件壓縮格式有:gzip ZIP bzip2 LZO。例如在UNIX上可以使用命令:gzip -1 file,會生成file.gz,但是原來的file就沒有了。壓縮算法都要在執行速度和壓縮比上做一個權衡,-1表示只注意速度,-9表示只注重壓縮比。
CompressionOutputStream和CompressionInputStream很類似於java.util.zip.DeflaterOutputStream 和 java.util.zip.DeflaterInputStream。
Interface CompressionCodec{ CompressionInputStream createInputStream(InputStream in); //解壓 CompressionOutputStream createOutputStream(OutputStream out); //壓縮 …… }
實現了CompressionCodec接口的類有:BZip2Codec, DefaultCodec, GzipCodec, SnappyCodec
package io; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.GzipCodec; public class FileCmpress { public static void main(String[] args) throws IOException{ //解壓示例codec.createInputStream String uri=args[0]; Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(URI.create(uri),conf); Path inputPath=new Path(uri); CompressionCodecFactory factory=new CompressionCodecFactory(conf); CompressionCodec codec=factory.getCodec(inputPath); //根據文件名的后綴來選擇生成哪種類型的CompressionCodec if(codec==null){ System.err.println("No codec found for "+uri); System.exit(1); } String outputUri=CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in=null; OutputStream out=null; try{ in=codec.createInputStream(fs.open(inputPath)); //對輸入流進行解壓 out=fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); }finally{ IOUtils.closeStream(in); IOUtils.closeStream(out); } //壓縮示例codec.createOutputStream CompressionOutputStream outStream=null; Path op2=new Path("2.gz"); try{ in=fs.open(new Path(outputUri)); //打開原始文件 GzipCodec gzipCodec=new GzipCodec(); //創建gzip壓縮實例 gzipCodec.setConf(conf); //給CompressionCodec設置Configuration outStream=gzipCodec.createOutputStream(fs.create(op2)); //打開輸出文件(最終的壓縮文件) IOUtils.copyBytes(in, outStream, 4096,false); //從輸入流向輸出流拷貝,GzipCodec負責對輸出流進行壓縮 }finally{ IOUtils.closeStream(in); IOUtils.closeStream(out); } } }
注意文件經過壓縮之后再送給MapReduce時顯然就不能再split了,還好gzip和LZO格式的文件(hadoop通過查看文件名后綴)默認就是不支持split的,而bzip2可以被split,ZIP文件則不能作為MapReduce的輸入文件。
可以讓Reduce的輸出文件被壓縮。
conf.setBoolean("mapred.output.compress", true); //支持輸出被壓縮 conf.setClass("mapred.output.compression.codec", GzipCodec.class,CompressionCodec.class); //指定采用的壓縮算法
讓Map的輸出被壓縮:
conf.setCompressMapOutput(true); conf.setMapOutputCompressorClass(GzipCodec.class);
Java中的基本數據類型在Hadoop中都有對應的Writable類型,Hadoop中還有幾中Writable Collections。ArrayWriatble和TwoDArrayWriatble分別是適用於一維數組和二維數組的Wriatble類型,當然數組中元素必須是相同的類型。
ArrayWritable writable = new ArrayWritable(Text.class);
但是ArrayWritable還是不要用了,Hadoop中沒有實現ArrayWritable的空構造函數,而作為實現了Writable的類,是必須提供空參數構造函數的----在調用readFields(DataInput in)之前要先調用空構造函數來建立實例,所以每當我使用ArrayWritable作為Mapper的輸出類型時就會報錯。
MapWriatble實現了java.util.Map<Wriatble,Writable>接口;而SortedMapWritable則實現了java.util.Map<WriatbleComparable,Writable>接口。
MapWritable src = new MapWritable(); src.put(new IntWritable(1), new Text("cat")); src.put(new VIntWritable(2), new LongWritable(163));
用戶自定義Writable類型
package basic; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } @Override public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } }
SequenceFile
HDFS和MapReduce都是為處理大文件設計的,所以把諸多小文件打包在一個SequenceFile中着實是一個提高效率的好方法。
寫入SequenceFile的key或value數據類型沒有必要是Writable類型,只要是可以被Serialization序列化和反序列化的類型就可以了。
用SequenceFile.createWriter()創建SequenceFile.Writer,把一個文本文件寫入一個二進制文件。代碼如下
package basic; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; public class SequenceFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); for (int i = 0; i < 100; i++) { key.set(100 - i); value.set(DATA[i % DATA.length]); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); writer.append(key, value); //采用append方式 } } finally { IOUtils.closeStream(writer); } } }
可以想像上面代碼中的DATA是從小文件中讀取的,我們從多個小文件中讀取文本,轉換字節流append到一個大的二進制文件中,既實現了多個小文件的合並,也實現了壓縮(二進制文件比文本文件小)。
下面演示讀取一個SequenceFile
package basic; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; public class SequenceFileReadDemo { public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, path, conf); Writable key = (Writable) ReflectionUtils.newInstance( reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance( reader.getValueClass(), conf); long position = reader.getPosition(); while (reader.next(key, value)) { //循環讀取文件 String syncSeen = reader.syncSeen() ? "*" : ""; //SequenceFile中都有sync標記 System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value); position = reader.getPosition(); //下一條record開始的位置 } } finally { IOUtils.closeStream(reader); } } }
SequenceFile.Writer會在流中寫入sync pointer,由於某些原因,SequenceFile.Reader可能會找不到record的邊界,sync pointer就是用來標記record邊界的,但它並不是在每個record的后面都做一個標記,而隔幾個做一個。sync pointer不會超過文件長度的1%。
SequenceFile同樣支持隨機讀寫:reader.seek(360);
而reader.sync(360)則是定位到360之后的第1個sync pointer處。
在命令行顯示一個SequenceFile的內容不能用cat,而要用-text(表示要用文本的形式顯示二進制文件)
$ hadoop fs -text myfile.seq
SequenceFile文件的最開始是Header部分,其中包含了record的key-value的數據類型、sync標記所采用的字符、有關壓縮的細節、用戶自定義的metadata。SequenceFile內置的壓縮方式有兩種:RecordCompression和Block Compression。
SequenceFile的文件結構:
二進制文件具有清晰嚴謹的文件結構,讀寫速度自然要比文本文件快,SequenceFile更容易和Hadoop的基本數據類型(IntWritable,FloatWriatble等)進行交互。
下面的代碼展示如何把一個文本文件轉換為SequenceFile,job.setOutputFormatClass(SequenceFileOutputFormat.class);就可以了。
1 package basic; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.SequenceFile.CompressionType; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.io.compress.GzipCodec; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 15 import org.apache.hadoop.util.Tool; 16 import org.apache.hadoop.util.ToolRunner; 17 18 public class ToSeqFile extends Configured implements Tool { 19 20 @Override 21 public int run(String[] arg0) throws Exception { 22 Job job = new Job(); 23 job.setJarByClass(getClass()); 24 Configuration conf=getConf(); 25 FileSystem fs = FileSystem.get(conf); 26 27 FileInputFormat.setInputPaths(job, "/user/orisun/input/rdata"); 28 Path outDir=new Path("/user/orisun/output"); 29 fs.delete(outDir,true); 30 FileOutputFormat.setOutputPath(job, outDir); 31 32 job.setNumReduceTasks(0); 33 job.setOutputKeyClass(LongWritable.class); 34 job.setOutputValueClass(Text.class); 35 //設置OutputFormat為SequenceFileOutputFormat 36 job.setOutputFormatClass(SequenceFileOutputFormat.class); 37 //允許壓縮 38 SequenceFileOutputFormat.setCompressOutput(job, true); 39 //壓縮算法為gzip 40 SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 41 //壓縮模式為BLOCK 42 SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); 43 44 45 return job.waitForCompletion(true)?0:1; 46 } 47 48 public static void main(String[] args) throws Exception { 49 int res = ToolRunner.run(new Configuration(), new ToSeqFile(), args); 50 System.exit(res); 51 } 52 }
代碼中沒有設置FileInputFormat,所以采用默認的TextInputFormat。代碼中沒有設置MapperClass,默認情況下map把從InputFormat中得到的key-value原樣輸出(新版本的Hadoop中已經沒有了IndentityMapper這個類,所以我沒有說“默認的Mapper就是IndentityMapper”)。
MapFile
MapFile是排序后的SequenceFile,並且它會額外生成一個index文件提供按key的查找。
讀寫MapFile與讀寫SequenceFile非常類似,只需要換成MapFie.Reader和MapFile.Writer就可以了。在命令行顯示MapFile的文件內容同樣要用-text。
與SequenceFile不同的是:由於MapFile需要按key排序,所以它的key必須是WritableComparable類型的。
MapFile會生成2個文件,一個名為data,一個名為index。data中的內容就是按key排序后的SequenceFile中的內容
$ hadoop fs -text numbers.map/data | head
1 One, two, buckle my shoe
2 Three, four, shut the door
3 Five, six, pick up sticks
4 Seven, eight, lay them straight
5 Nine, ten, a big fat hen
6 One, two, buckle my shoe
7 Three, four, shut the door
$ hadoop fs -text numbers.map/index
1 128
129 6079
257 12054
385 18030
第1列就是data文件中的key值,第2列是key在data文件中的offset。我們看到並不是所有的key都記錄在了index文件中,而是隔128個才記錄一個(這個間隔可以在io.map.index.interval屬性中設置,或直接在代碼中通過MapFile.Writer實例的setIndexInterval()函數來設置)。
下面我們來看一下index文件是怎么發揮作用的。
data文件中存放的每條record都是一個key-value對,我們可以根據下面的函數由key值獲取value。
public Writable get(WritableComparable key, Writable val) throws IOException
如果指定的key不存在,則不會給value賦值。
假如我們要檢索的key值是300。首先MapFile.Reader把index文件讀入內存,在index文件中進行二分查找,找到等於或第一個小於300的key-value對:257--12054,然后在data文件中從12054的位置開始查找,直到找到300或第一個大於300的key。可見整個查找的過程需要在內存中做一次二分查找,然后作一次文件掃描,且掃描文件的行數不會超過128行(如果io.map.index.interval屬性值為128話)。
當index文件很大時,全部讀到內存中也是不現實的。當然你會說我們可以調大io.map.index.interval的值,但那樣的話需要重新生成MapFile。在index文件已經生成的情況下我們可以設置io.map.index.skip的值,設為1就表示index中的記錄每隔一行才被載入內存。
使用MapFile.fix把一個SequenceFile轉換成MapFile:
1 package basic; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.MapFile; 9 import org.apache.hadoop.io.SequenceFile; 10 11 public class MapFileFixer { 12 @SuppressWarnings({ "rawtypes", "unchecked" }) 13 public static void main(String[] args) throws Exception { 14 String mapUri = args[0]; 15 Configuration conf = new Configuration(); 16 FileSystem fs = FileSystem.get(URI.create(mapUri), conf); //MapFile(由data和index組成的目錄)也可以自成一個FileSystem 17 Path map = new Path(mapUri); 18 Path mapData = new Path(map, MapFile.DATA_FILE_NAME); 19 //通過SequenceFile.Reader來獲取SequenceFile的key和value類型 20 SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf); 21 Class keyClass = reader.getKeyClass(); 22 Class valueClass = reader.getValueClass(); 23 reader.close(); 24 25 //使用MapFile.fix把一個SequenceFile轉換成MapFile 26 long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf); 27 System.out.printf("Created MapFile %s with %d entries\n", map, entries); 28 29 } 30 }
程序開始前先新建一個目錄假如取名為map
$ hadoop fs -mkdir map
把一個sequencefile放到map目錄下,並重命名為data(注意必須重命名為data)
$ hadoop fs -put myseqfile map/data
最后運行程序
$ hadoop jar dm.jar basic.MapFileFixer map
結果:data文件中的record按key值重新排序,並在map目錄下生成了index文件。