如何讓Hadoop讀取以gz結尾的文本格式的文件


背景:

搜索引擎在build全量時,會產生數G的xml的中間文件,我需要去查詢這些中間文件中,是否有某個特殊的字符。xml文件有很多,每個都有幾百M,存儲在hdfs上,而且是以gz結尾的文本格式的文件。

查找時,我是寫了一個實現Tool接口,繼承自Configured類的MapReduce,這樣就可以傳入自定義的參數給我的MapReduce程序了。需要在文件里Grep的內容,就是以參數的形式傳入的。

寫完代碼調試時,問題來了,會報這個異常:

14/10/17 12:06:33 INFO mapred.JobClient: Task Id : attempt_201405252001_273614_m_000013_0, Status : FAILED	
java.io.IOException: incorrect header check
    at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
    at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:221)
    at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:81)
    at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:75)
    at java.io.InputStream.read(InputStream.java:85)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:390)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:324)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
    at org.apache.hadoop.mapred.Child.main(Child.java:262)

分析過程:

通過上面的異常,立馬猜想到是由於我的文件是gz結尾,所以hadoop把它當作了壓縮文件,然后嘗試解壓縮后讀取,所以解壓失敗了。於是去問google,沒有搜到能夠直接解決我問題的答案,但是搜到了此處相關的源代碼:LineRecordReader.java;於是嘗試着去閱讀代碼來解決問題,這個類很簡單,繼承自RecordReader,沒有看到next函數和readLine函數,那就應該是基類實現的。很快發現了看名字是跟壓縮解碼相關的代碼:

private CompressionCodecFactory compressionCodecs = null;
...
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
...
if (codec != null) {
	in = new LineReader(codec.createInputStream(fileIn), job);
}
else{
	...
	in = new LineReader(fileIn, job);
}

此處file就是拿到的文件路徑,可以看到,應該就是通過CompressionCode.getCode(file)函數,拿到的codec類,然后讀取的時候出異常了。那怎么讓MapReduce程序把這個.gz文件當作普通的文本文件呢?再點進去看CompressionCodeFactory.java的代碼。getCodec函數的代碼如下:

/**
* Find the relevant compression codec for the given file based on its
* filename suffix.
* @param file the filename to check
* @return the codec object
*/
public CompressionCodec getCodec(Path file) {
	CompressionCodec result = null;
    if (codecs != null) {
		String filename = file.getName();
        String reversedFilename = new StringBuffer(filename).reverse().toString();
	    SortedMap<String, CompressionCodec> subMap = codecs.headMap(reversedFilename);
        if (!subMap.isEmpty()) {
	    	String potentialSuffix = subMap.lastKey();
 	      	if (reversedFilename.startsWith(potentialSuffix)) {
   		   		result = codecs.get(potentialSuffix);
    		}
  	   }
	}
    return result;
}

就是根據文件名稱匹配來得到對應的解壓縮類。咋們按圖索驥,去看看codecs是在哪里賦值的:

/**
* Find the codecs specified in the config value io.compression.codecs 
* and register them. Defaults to gzip and zip.
*/
public CompressionCodecFactory(Configuration conf) {
	codecs = new TreeMap<String, CompressionCodec>();
	List<Class<? extends CompressionCodec>> codecClasses = getCodecClasses(conf);
	if (codecClasses == null) {
  		addCodec(new GzipCodec());
        addCodec(new DefaultCodec());      
	} else {
  		Iterator<Class<? extends CompressionCodec>> itr = codecClasses.iterator();
        while (itr.hasNext()) {
  			CompressionCodec codec = ReflectionUtils.newInstance(itr.next(), conf);
	        addCodec(codec);     
  		}
	}
}

看樣子從配置文件里,拿不到編碼相關的配置,就會默認把GzipCodec,DefaultCodec加進去。再跟到getCodecClasses(conf)函數里去:

/**
* Get the list of codecs listed in the configuration
* @param conf the configuration to look in
* @return a list of the Configuration classes or null if the attribute
*         was not set
*/
public static List<Class<? extends CompressionCodec>> getCodecClasses(Configuration conf) {
	String codecsString = conf.get("io.compression.codecs");
    if (codecsString != null) {
	  List<Class<? extends CompressionCodec>> result = new ArrayList<Class<? extends CompressionCodec>>();
      StringTokenizer codecSplit = new StringTokenizer(codecsString, ",");
	  while (codecSplit.hasMoreElements()) {
    	String codecSubstring = codecSplit.nextToken();
        if (codecSubstring.length() != 0) {
	      try {
    	    Class<?> cls = conf.getClassByName(codecSubstring);
        	if (!CompressionCodec.class.isAssignableFrom(cls)) {
          	throw new IllegalArgumentException("Class " + codecSubstring +
                                             " is not a CompressionCodec");
        	}
            result.add(cls.asSubclass(CompressionCodec.class));
      	 	} catch (ClassNotFoundException ex) {
 	           throw new IllegalArgumentException("Compression codec " + 
                                           codecSubstring + " not found.",
                                           ex);
      		}
    	}
  	  }
      return result;
	} else {
  	  return null;
	}
}

從這個函數里能夠看到編碼的配置是 io.compression.codecs 。可以看到,我們必須返回非null的result,那么直接讓io.compression.codecs配置成空,應該就可以了,此時返回的result里面沒有任何元素。

問題解決方案:

試了一把,執行這個MapReduce程序時,加上 -Dio.compression.codecs=, 的參數,就可以了:

hadoop jar ./dumptools-0.1.jar	ddump.tools.mr.Grep -Dio.compression.codecs=, "adgroupId=319356697" doc val

如果您看了本篇博客,覺得對您有所收獲,請點擊右下角的“推薦”,讓更多人看到!
資助Jack47寫作,打賞一個雞蛋灌餅錢吧
pay_weixin
微信打賞
pay_alipay
支付寶打賞


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM