1. TextInputFortmat
TextInputFormat是默認的InputFormat。每條記錄是一行輸入。Key是LongWritable類型,存儲該行在整個文件中的字節偏移量(不是行數),值是這行的內容,為一個Text對象。
例如輸入文件為:
grunt> cat test2
12,e21,ddwq,dqw,dwqw
sfd,cda,cdsz,cdwq,qwe
12,cds,fwa,feacd,cadfa
21ede,cdsf,ca,fa,dcac
caewf,ea,cdadc,acds,acsd
12e,afs,afesd,caefd,cawc
cax,cafe,caefe,fea,ceaef
在使用默認的 Map處理后輸出:
grunt> cat out
0 12,e21,ddwq,dqw,dwqw
21 sfd,cda,cdsz,cdwq,qwe
43 12,cds,fwa,feacd,cadfa
66 21ede,cdsf,ca,fa,dcac
88 caewf,ea,cdadc,acds,acsd
113 12e,afs,afesd,caefd,cawc
138 cax,cafe,caefe,fea,ceaef
可以看到Key的值並不是行數,而是字節在文件中的偏移量。一般情況下,很難獲取到文件的行號,因為文件是按字節切分為分片,而不是按行切分。
在按行讀文本的情況下,可能會存在超長行的情況。超長行會導致內存溢出,可以通過設置 mapreduce.input.linerecordreader.line.maxlength,指定一個最長行的字節數(在內存范圍內),可以確保 recordreader 跳過超長行。
2. KeyValueTextInputFormat
TextInputFormat 將文件中的行作為Key,每行對應的文本作為Value。但是對於某些文件內容已經是 Key-Value 形式的話,使用 TextInputFormat 會顯得多次一舉。在這種情況下,我們可以使用KeyValueTextInputFormat,它以某個分隔符進行分割(默認為制表符):
public KeyValueLineRecordReader(Configuration conf) throws IOException {
String sepStr = conf.get("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
this.separator = (byte)sepStr.charAt(0);
}
一個范例如下,使用逗號為分隔符:
grunt> cat test2
12,e21,ddwq,dqw,dwqw
sfd,cda,cdsz,cdwq,qwe
12,cds,fwa,feacd,cadfa
21ede,cdsf,ca,fa,dcac
輸出為:
grunt> cat out
12 cds,fwa,feacd,cadfa
12 e21,ddwq,dqw,dwqw
12e afs,afesd,caefd,cawc
21ede cdsf,ca,fa,dcac
在任務設置中需要做的配置如下:
Configuration conf = new Configuration();
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat.class);
3. NLineInputFormat
在使用TextInputFormat和KeyValueInputFormat 時,每個mapper 收到的行數取決於輸入的分片大小以及行的長度。如果希望 mapper 收到固定行的輸入,則需要使用 NLineInputFormat。與 TextInputFormat一樣,key是文件中的字節偏移量,值是行本身。
N是每個mapper收到的輸入行數。N設置為1時(默認),每個mapper正好收到一行輸入。同樣使用之前的一共7行輸入,使用NLineInputFormat:
job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.NLineInputFormat.class);
grunt> cat out
0 12,e21,ddwq,dqw,dwqw
21 sfd,cda,cdsz,cdwq,qwe
43 12,cds,fwa,feacd,cadfa
…
查看此任務的相關指標,可以看到:
Job Counters
Launched map tasks=7
Launched reduce tasks=1
Other local map tasks=7
Mapper數一共有7個,也就是每行均生成了一個Map。可以通過設置以下參數指定NLine為多少行:
mapreduce.input.lineinputformat.linespermap
References: Hadoop權威指南第四版