背景:
搜索引擎在build全量時,會產生數G的xml的中間文件,我需要去查詢這些中間文件中,是否有某個特殊的字符。xml文件有很多,每個都有幾百M,存儲在hdfs上,而且是以gz結尾的文本格式的文件。
查找時,我是寫了一個實現Tool接口,繼承自Configured類的MapReduce,這樣就可以傳入自定義的參數給我的MapReduce程序了。需要在文件里Grep的內容,就是以參數的形式傳入的。
寫完代碼調試時,問題來了,會報這個異常:
14/10/17 12:06:33 INFO mapred.JobClient: Task Id : attempt_201405252001_273614_m_000013_0, Status : FAILED
java.io.IOException: incorrect header check
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:221)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:81)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:75)
at java.io.InputStream.read(InputStream.java:85)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:390)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:324)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
分析過程:
通過上面的異常,立馬猜想到是由於我的文件是gz結尾,所以hadoop把它當作了壓縮文件,然后嘗試解壓縮后讀取,所以解壓失敗了。於是去問google,沒有搜到能夠直接解決我問題的答案,但是搜到了此處相關的源代碼:LineRecordReader.java;於是嘗試着去閱讀代碼來解決問題,這個類很簡單,繼承自RecordReader,沒有看到next函數和readLine函數,那就應該是基類實現的。很快發現了看名字是跟壓縮解碼相關的代碼:
private CompressionCodecFactory compressionCodecs = null;
...
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
...
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
}
else{
...
in = new LineReader(fileIn, job);
}
此處file就是拿到的文件路徑,可以看到,應該就是通過CompressionCode.getCode(file)函數,拿到的codec類,然后讀取的時候出異常了。那怎么讓MapReduce程序把這個.gz文件當作普通的文本文件呢?再點進去看CompressionCodeFactory.java的代碼。getCodec函數的代碼如下:
/**
* Find the relevant compression codec for the given file based on its
* filename suffix.
* @param file the filename to check
* @return the codec object
*/
public CompressionCodec getCodec(Path file) {
CompressionCodec result = null;
if (codecs != null) {
String filename = file.getName();
String reversedFilename = new StringBuffer(filename).reverse().toString();
SortedMap<String, CompressionCodec> subMap = codecs.headMap(reversedFilename);
if (!subMap.isEmpty()) {
String potentialSuffix = subMap.lastKey();
if (reversedFilename.startsWith(potentialSuffix)) {
result = codecs.get(potentialSuffix);
}
}
}
return result;
}
就是根據文件名稱匹配來得到對應的解壓縮類。咋們按圖索驥,去看看codecs是在哪里賦值的:
/**
* Find the codecs specified in the config value io.compression.codecs
* and register them. Defaults to gzip and zip.
*/
public CompressionCodecFactory(Configuration conf) {
codecs = new TreeMap<String, CompressionCodec>();
List<Class<? extends CompressionCodec>> codecClasses = getCodecClasses(conf);
if (codecClasses == null) {
addCodec(new GzipCodec());
addCodec(new DefaultCodec());
} else {
Iterator<Class<? extends CompressionCodec>> itr = codecClasses.iterator();
while (itr.hasNext()) {
CompressionCodec codec = ReflectionUtils.newInstance(itr.next(), conf);
addCodec(codec);
}
}
}
看樣子從配置文件里,拿不到編碼相關的配置,就會默認把GzipCodec,DefaultCodec加進去。再跟到getCodecClasses(conf)函數里去:
/**
* Get the list of codecs listed in the configuration
* @param conf the configuration to look in
* @return a list of the Configuration classes or null if the attribute
* was not set
*/
public static List<Class<? extends CompressionCodec>> getCodecClasses(Configuration conf) {
String codecsString = conf.get("io.compression.codecs");
if (codecsString != null) {
List<Class<? extends CompressionCodec>> result = new ArrayList<Class<? extends CompressionCodec>>();
StringTokenizer codecSplit = new StringTokenizer(codecsString, ",");
while (codecSplit.hasMoreElements()) {
String codecSubstring = codecSplit.nextToken();
if (codecSubstring.length() != 0) {
try {
Class<?> cls = conf.getClassByName(codecSubstring);
if (!CompressionCodec.class.isAssignableFrom(cls)) {
throw new IllegalArgumentException("Class " + codecSubstring +
" is not a CompressionCodec");
}
result.add(cls.asSubclass(CompressionCodec.class));
} catch (ClassNotFoundException ex) {
throw new IllegalArgumentException("Compression codec " +
codecSubstring + " not found.",
ex);
}
}
}
return result;
} else {
return null;
}
}
從這個函數里能夠看到編碼的配置是 io.compression.codecs 。可以看到,我們必須返回非null的result,那么直接讓io.compression.codecs配置成空,應該就可以了,此時返回的result里面沒有任何元素。
問題解決方案:
試了一把,執行這個MapReduce程序時,加上 -Dio.compression.codecs=, 的參數,就可以了:
hadoop jar ./dumptools-0.1.jar ddump.tools.mr.Grep -Dio.compression.codecs=, "adgroupId=319356697" doc val
如果您看了本篇博客,覺得對您有所收獲,請點擊右下角的“推薦”,讓更多人看到!

