5.1 小文件
大數據這個概念似乎意味着處理GB級乃至更大的文件。實際上大數據可以是大量的小文件。比如說,日志文件通常增長到MB級時就會存檔。這一節中將介紹在HDFS中有效地處理小文件的技術。
技術24 使用Avro存儲多個小文件
假定有一個項目akin在google上搜索圖片,並將數以百萬計的圖片存儲分別在HDFS中。很不幸的是,這樣做恰好碰上了HDFS和MapReduce的弱項,如下:
- Hadoop的NameNode將所有的HDFS元數據保存在內存中以加快速度。Yahoo估計平均每個文件需要600字節內存。那么10億個文件就需要60GB內存。對於當下的中端服務器來說,60GB內存就顯得太多了。
- 如果MapReduce的數據源是大量的文本文件或可分割文件,那么map任務個數就是這些文件占據的快的數量。如果MapReduce的數據源是成千上百萬的文件,那么作業將會消耗大量的時間在內核中創建和銷毀map任務進程上。這些時間將會比實際處理數據的時間還要長。
- 如果在一個有調度器的受控環境中運行MapReduce作業,那么map任務的個數可能是受到限制的。由於默認每個文件都需要至少一個map任務,這樣就有可能因為任務過多而被調度器拒絕運行。
思考如下問題:文件的大小和HDFS塊大小相比,大概是什么比例?50%,70%,還是90%。如果大數據項目啟動后,又突然需要成倍地擴展需要處理的文件。如果擴展僅僅需要增加節點,而不需要重新設計Hadoop過程,遷移文件等,是不是很美妙的事情。思考這些問題並在設計階段及早准備是很有必要的。
問題
需要處理HDFS中的大量文件,同時又不能超出NameNode的內存限制。
方案
最簡單的方案就是將HDFS中的小文件打包到一個大的文件容器中。這個技術中將本地磁盤中所有的目標文件存儲到HDFS中的一個單獨的Avro文件。然后在MapReduce中處理Avro文件和其中的小文件。
討論
圖5.1中介紹了這個技術的第一部分,如何在HDFS中創建Avro文件。這樣做可以減少HDFS中需要創建的文件數量,隨之減少了NameNode的內存消耗。
Avro是由Hadoop之父Doug Cutting發明的數據序列化和PRC庫。主要用於提高Hadoop數據交換,通用性和版本控制的能力。Avro有着很強的架構模式演化能力,相比它的競爭對手如SequenceFiles等有更明顯的競爭優勢。第3章中詳細介紹了Avro和它的競爭對手們。
讓我們來看看以下的JAVA代碼如何創建Avro文件:
從目錄中讀取多個小文件並在HDFS中生成一個單一的Avro文件
1 public class SmallFilesWrite { 2 3 public static final String FIELD_FILENAME = "filename"; 4 public static final String FIELD_CONTENTS = "contents"; 5 6 private static final String SCHEMA_JSON = 7 "{\"type\": \"record\", \"name\": \"SmallFilesTest\", " 8 + "\"fields\": [" 9 + "{\"name\":\" + FIELD_FILENAME 10 + "\", \"type\":\"string\"}," 11 + "{\"name\":\" + FIELD_CONTENTS 12 + "\", \"type\":\"bytes\"}]}"; 13 14 public static final Schema SCHEMA = Schema.parse(SCHEMA_JSON); 15 16 public static void writeToAvro(File srcPath, OutputStream outputStream)throws IOException { 17 18 DataFileWriter<Object> writer = 19 new DataFileWriter<Object>(new GenericDatumWriter<Object>()).setSyncInterval(100); 20 21 writer.setCodec(CodecFactory.snappyCodec()); 22 writer.create(SCHEMA, outputStream); 23 24 for (Object obj : FileUtils.listFiles(srcPath, null, false)) { 25 File file = (File) obj; 26 String filename = file.getAbsolutePath(); 27 byte content[] = FileUtils.readFileToByteArray(file); 28 GenericRecord record = new GenericData.Record(SCHEMA); 29 record.put(FIELD_FILENAME, filename); 30 record.put(FIELD_CONTENTS, ByteBuffer.wrap(content)); 31 writer.append(record); 32 System.out.println(file.getAbsolutePath() + ": " + DigestUtils.md5Hex(content)); 33 } 34 35 IOUtils.cleanup(null, writer); 36 IOUtils.cleanup(null, outputStream); 37 } 38 39 public static void main(String... args) throws Exception { 40 Configuration config = new Configuration(); 41 FileSystem hdfs = FileSystem.get(config); 42 File sourceDir = new File(args[0]); 43 Path destFile = new Path(args[1]); 44 OutputStream os = hdfs.create(destFile); 45 writeToAvro(sourceDir, os); 46 } 47 }
壓縮依賴 為了運行這一章中的代碼,需要在相應主機上安裝Snappy和LZOP壓縮編碼器。請參考附錄A來安裝和配置。 |
然后觀察這段代碼以Hadoop的配置目錄作為數據源的運行結果。
$ bin/run.sh \ com.manning.hip.ch5.SmallFilesWrite /etc/hadoop/conf test.avro /etc/hadoop/conf/ssl-server.xml.example: cb6f1b218... /etc/hadoop/conf/log4j.properties: 6920ca49b9790cb... /etc/hadoop/conf/fair-scheduler.xml: b3e5f2bbb1d6c... ...
看起來很可靠。然后來確認HDFS中的輸出文件:
$ hadoop fs -ls test.avro 2011-08-20 12:38 /user/aholmes/test.avro
為了確保所有都和預期一樣,編寫代碼讀取HDFS中的Avro文件,並輸出每個文件內容的MD5哈希值。代碼如下:
1 public class SmallFilesRead { 2 3 private static final String FIELD_FILENAME = "filename"; 4 private static final String FIELD_CONTENTS = "contents"; 5 6 public static void readFromAvro(InputStream is) throws IOException { 7 8 DataFileStream<Object> reader = new DataFileStream<Object>(is, new GenericDatumReader<Object>()); 9 10 for (Object o : reader) { 11 GenericRecord r = (GenericRecord) o; 12 System.out.println( 13 r.get(FIELD_FILENAME) + ": " + 14 DigestUtils.md5Hex(((ByteBuffer) r.get(FIELD_CONTENTS)).array())); 15 } 16 17 IOUtils.cleanup(null, is); 18 IOUtils.cleanup(null, reader); 19 } 20 21 public static void main(String... args) throws Exception { 22 Configuration config = new Configuration(); 23 FileSystem hdfs = FileSystem.get(config); 24 Path destFile = new Path(args[0]); 25 InputStream is = hdfs.open(destFile); 26 readFromAvro(is); 27 } 28 }
這段代碼比前一段代碼要簡單。因為Avro將結構模式(schema)寫入了每一個Avro文件。在逆序列化的時候,不需要告訴Avro結構模式的信息。現在來測試代碼:
$ bin/run.sh com.manning.hip.ch5.SmallFilesRead test.avro /etc/hadoop/conf/ssl-server.xml.example: cb6f1b21... /etc/hadoop/conf/log4j.properties: 6920ca49b9790c... /etc/hadoop/conf/fair-scheduler.xml: b3e5f2bbb1d6...
現在Avro文件就被存儲在了HDFS中。下一步是用MapReduce處理文件。如圖5.2所示,用一個只有Map的MapReduce作業讀取Avro記錄作為輸入,然后輸出一個包含有文件名和文件內容的MD5哈希值的文本文件。
以下是MapReduce作業的實現代碼:
一個以包含了多個小文件的Avro文件作為輸入源的MapReduce作業
1 public class SmallFilesMapReduce { 2 3 public static void main(String... args) throws Exception { 4 JobConf job = new JobConf(); 5 job.setJarByClass(SmallFilesMapReduce.class); 6 Path input = new Path(args[0]); 7 Path output = new Path(args[1]); 8 output.getFileSystem(job).delete(output, true); 9 AvroJob.setInputSchema(job, SmallFilesWrite.SCHEMA); 10 job.setInputFormat(AvroInputFormat.class); 11 job.setOutputFormat(TextOutputFormat.class); 12 job.setMapperClass(Map.class); 13 FileInputFormat.setInputPaths(job, input); 14 FileOutputFormat.setOutputPath(job, output); 15 job.setNumReduceTasks(0); 16 JobClient.runJob(job); 17 } 18 19 public static class Mapper 20 implements Mapper<AvroWrapper<GenericRecord>, NullWritable, Text, Text> { 21 22 @Override 23 public void map(AvroWrapper<GenericRecord> key, 24 NullWritable value, 25 OutputCollector<Text, Text> output, 26 Reporter reporter) throws IOException { 27 28 outKey.set(key.datum().get(SmallFilesWrite.FIELD_FILENAME).toString()); 29 30 outValue.set(DigestUtils.md5Hex( 31 ((ByteBuffer) key.datum().get(SmallFilesWrite.FIELD_CONTENTS)) 32 .array())); 33 34 output.collect(outKey, outValue); 35 } 36 } 37 }
如果將前面代碼創建的Avro文件作為輸入源,那么這個作業的日志文件將包含最初的文件名和它們的哈希值。執行過程如下:
$ bin/run.sh com.manning.hip.ch5.SmallFilesMapReduce test.avro output $ hadoop fs -cat output/part* /etc/hadoop/conf/capacity-scheduler.xml: 0601a2.. /etc/hadoop/conf/taskcontroller.cfg: 5c2c191420... /etc/hadoop/conf/configuration.xsl: e4e5e17b4a8... ...
這個技術假設需要處理的文件時無法連接合並的,如圖像文件。如果文件可以連接,那么就可以考慮其它的方案。使用Avro應盡可能保證文件的大小和HDFS快的大小相當,以減少NameNode中需要存儲的數據。
小結
也可以用Hadoop的SequenceFile來處理小文件。SequenceFile是一個更成熟的技術,比Avro出現時間更長。但是SequenceFiles是JAVA專用的,相比Avro相比豐富的交互性和版本控制語義。
Google的Protocol Buffers和源自Facebook的Apache Thrift都可以用來處理小文件。但是缺乏相應的InputFormat來配合它們。
另外一個方法是將文件打包成zip文件。但其中的問題是,必須自定義InputFormat來處理zip文件。同時zip文件無法分塊。不過分塊問題可以通過打包成多個大小和HDFS塊相近的zip文件。
Hadoop還提供了CombineFileInputFormat。它能夠讓一個單獨的map任務處理來自多個文件的多個輸入塊,以極大地減少需要運行的map任務個數。
在類似的方法中,也可以在Hadoop中配置,使map任務的JVM可以處理多個任務,來減少JVM循環的開支。配置項mapred.job.reuse.jvm.num.tasks默認為1.這說明一個JVM只能處理一個任務。當它被配置為更大的數字的時候,一個JVM可以處理多個任務。-1則代表着處理的任務數量無上限。
此外,也可以創建一個tarball文件來裝載所有的文件,然后生成一個文本文件描述HDFS中的tarball文件的位置信息。文本文件將會被作為MapReduce作業的輸入源。Map任務將會直接打開tarball。但是這種方法將會損害MapReduce的本地性。也就是說,map任務需要在包含那個文本文件的節點上運行,然而包含tarball文件的HDFS很可能在另外一個節點上,這就增加了網絡IO的成本。
Hadoop打包文件(HAR)是Hadoop專用於解決小文件問題的文件。它是基於HDFS的虛擬文件系統。HAR的缺陷在於無法優化MapReduce的本地磁盤訪問性能,而且無法被壓縮。
Hadoop 2.x版本支持HDFS聯合機制。在HDFS聯合機制中,HDFS被分區成多個不同的名字空間,由不同的NameNode分別管理。然后,NameNode的快信息緩存的內存壓力可以由多個NameNode共同承擔。最終支持了更大數量的小文件。Hortonworks有一片關於HDFS聯合機制的博客:http://hortonworks.com/an-introduction-to-hdfs-federation/。
最后一個方法是MapR。MapR擁有自己的分布式文件系統,支持大量的小文件。但是,應用MapR作為分布式存儲系統將會帶來很大的系統變更。也就是說,幾乎不可能通過應用MapR來解決HDFS中的小文件問題。
在Hadoop中有可能多次碰到小文件的問題。直接使用小文件將會使NameNode的內存消耗迅速增大,並拖累MapReduce的運行時間。這個技術可以幫助緩解這個問題,通過將小文件打包到更大的容器文件中。選擇Avro的原因是,它支持可分塊文件,壓縮。Avro的結構模式語言有利於版本控制。
假定需要處理的不是小文件,而是超大文件。那么應當如何有效地存儲數據?如何在Hadoop中壓縮數據?MapReduce中應當如何處理?這將是下一節的內容。