Hadoop 中的MapReduce庫支持幾種不同格式的輸入數據。例如,文本模式的輸入數據的每一行被視為一個key/value pair,其中key為文件的偏移量,value為那一行的內容。每一種輸入類型的實現都必須能夠把輸入數據分割成數據片段,並能夠由單獨的Map任務來對數據片段進行后續處理。
一. 輸入格式InputFormat
當運行一個M-R 作業的時候,我們需要為作業制定它的輸入格式。InputFormat為Hadoop作業的所有輸入格式的抽象基類,它描述了作業輸入需要滿足的規范細節。
1.InputFormat 抽象類
該抽象類只有兩個抽象方法:
| Modifier and Type | Method and Description |
| RecordReader<K,V> | getRecordReader(InputSplit split, TaskAttemptContext context)
|
| List<InputSplit> | getSplits
|
函數getSplits(JobContext context ) 主要作用就是將所有的輸入文件分割成邏輯上的多個分片InputSplit。這只是邏輯上的分片,並不是真正將文件分割成多個數據塊。每個InputSplit分片通過輸入文件路徑、開始位置和偏移量三個信息來唯一標識。
函數getRecordReader(InputSplit split, TaskAttemptContext context) 主要作用就是為指定的InputSplit創建記錄讀取器,通過創建的記錄讀取器從輸入分片中讀取鍵值對,然后將鍵值對交給Map來處理。
當Hadoop在運行MapReduce程序時,InputFormat主要承擔一下三個功能:
a. 檢查輸入路徑是否正確;
b. 將輸入文件分割成邏輯上的分片InputSplit,並將每個InputSplit 分別傳給單獨的一個Map,也就是說,有多少InputSplit,就得生成多少個Map任務;
c. InputSplit 是由一條條記錄構成的,所以InputSplit 需要提供一個RecordReader 的實現,然后通過RecordReader 的實現來讀取InputSplit 中的每條記錄,並將之傳給Map任務。
2.InputSplit 的子類
InputFormat 類 有多個子類繼承,其類與類之間的關系如下:
由上圖可知,InputFormat 直接子類有三個:DBInputFormat、DelegatingInputFormat和FileInputFormat,分別表示輸入文件的來源為從數據庫、用於多個輸入以及基於文件的輸入。對於FileInputFormat,即從文件輸入的輸入方式,又有五個繼承子類:CombineFileInputFormat,KeyValueTextInput,NLineInoutFormat,
SequenceFileInputFormat,TextInputFormat。
3.getSplits 函數
FileInputFormat 類,實現了getSplits函數:
//獲取輸入文件的輸入分片的方法,生成的輸入分片是FileSplit格式的
public List<InputSplit> getSplits(JobContext job) throws IOException {
//首先得獲取輸入分片的上界和下界
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();//初始化用於保存生成的輸入分片的列表對象
List<FileStatus>files = listStatus(job);//獲取所有的輸入文件列表
for (FileStatus file: files) {//對文件列表中的每個文件進行相應的分割處理,然后生成文件的輸入分片列表
Path path = file.getPath(); //獲取文件的path對象
FileSystem fs = path.getFileSystem(job.getConfiguration());//獲取文件所在的文件系統
long length = file.getLen();//獲取文件的長度
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);//獲取文件的所有塊信息
if ((length != 0) && isSplitable(job, path)) { //如果文件的大小不是0 && 文件是可分割的,則執行分割操作
long blockSize = file.getBlockSize();//獲取文件的快大小
/*將文件系統數據塊的大小,輸入分片的上下界作為參數傳給computeSplitSize方法來計算出真正的輸入分片的大小。輸入分片
* 大小的計算策略為:首先取出塊大小和設置的分片大小的上界中的較小值,然后取出上一步計算出的較小值和設置的分片大小的下界
* 的較大值,最終將第二步取出的較大值作為實際分片大小
* */
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;//剩余文件大小的初始值作為整個文件的大小
//如果文件未分割的部分的大小比分片的1.1倍大,那么就創建一個FileSplit分片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
//當剩余文件的大小比分片的1.1倍小,則將剩余部分作為整個FileSplit分片處理
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) { //如果文件是不可分割的,則將整個文件作為一個FileSplit處理
splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
4.TestInputFormat 類
TextInputFormat 類是FileInputFormat 的默認實現,該輸入格式主要針對的是文本類型的文件,文件被分割成多行,而且每一行都使用換行符(LF=10) 或者【Enter】鍵作為每一行的結束標識。該類主要重寫了FileInputFormat中的createRecordReader ,其返回了LineRecordReader行記錄讀取器,該讀取器用於從文件中讀取一行,將這行文本在文件中的偏移量作為key,以這行文本的內容作為value,組成鍵值對。
二. 輸入分片 InputSplit
輸入分片InputSplit 是一個單獨的Map需要處理的數據單元。輸入分片的類型一般都是字節類型的,經過相應的RecordReader 處理后,轉化成記錄視圖的形式,然后交給Map處理。InputSplit存儲的並非數據本身,而是一個分片長度和一個記錄數據位置的數組,生成InputSplit的方法可通過InputFormat來設置。InputFormat的getSplits方法可以生成InputSplit相關信息,包括兩部分:InputSplit元數據信息和原始InputSplit信息。InputSplit元數據信息將被JobTracker使用,用以生成Task本地性相關數據結構;原始InputSplit信息將被Map Task初始化時使用,用以獲取自己要處理的數據。
輸入分片InputSplit 類有三個子類繼承:FileSplit (文件輸入分片),CombineFileSplit(多文件輸入分片)以及DBInputSplit(數據塊輸入分片)。
三. 記錄讀取器 RecordReader
抽象基類RecordReader 的實現來讀取InputSplit 中的每條記錄,並將之傳給Map任務。
1.RecordReader 抽象類中主要有一下幾個虛函數:
public abstract void initialize(InputSplit split,TaskAttemptContext context) ;//初始化方法
public abstract boolean nextKeyValue();//獲取輸入分片的下一個鍵值對
public abstract KEYIN getCurrentKey();//獲取當前讀取到的鍵值對中的鍵對象
public abstract VALUEIN getCurrentValue();//獲取當前讀取到的鍵值對中的值對象
public abstract float getProgress();//獲取當前數據的讀取進度
public abstract void close();//關閉RecordReader ,清理其占用的資源
2.RecordReader 的幾個繼承子類:
LineRecordReader 行記錄讀取器:該類是針對文本文件的默認讀取器,一次處理輸入分片中的一行,並將偏移量最為鍵,行的內容作為值,得到鍵值對。
KeyValueLineRecordReader 鍵值對讀取器:該類將輸入文件的整行作為一個由指定分隔符分開的鍵值對,默認的分隔符為:”\t”,我們可以通過mapreduce.input.keyvaluelinerecordreader.key.value.separator配置項來修改默認的分隔符。
SequenceFileRecordReader 序列文件讀取器:該類用於讀取SequenceFile 格式的輸入分片。
DBRecordReader 數據庫記錄讀取器:該類用於處理DBInputSplit 輸入分片,該類會從關系型數據庫中讀取若干條記錄, 將讀取到的記錄的條數作為鍵,將內容作為值。
四. 輸出格式 OutputFormat
1.OutputFormat 抽象類:
OutputFormat 抽象類描述了M-R作業的輸出規范,它決定了將MapReduce 的作業的輸出結果保持到哪里,以及如何對輸出結果進行持久化操作。其主要完成以下幾個工作:
a.檢查作業的輸出是否有效;
b. 提供一個具體的RecordWriter 實現類。
OutputFormat 抽象類主要有三個抽象函數:
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context )//返回RecordWriter,將K-V 對寫入存儲結構
public abstract void checkOutputSpecs(JobContext context )//檢查輸出目錄是否存在
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context )//Hadoop作業使用該類來完成輸出結果的提交即將作業的輸出結果保存到正確的輸出目錄中
2.OutputFormat 類有四個繼承子類:
FileOutputFormat 類:將鍵值對寫入文件系統
DBOutputFormat 類:將鍵值對寫入數據庫中
FitterOutputFormat 類:將OutputFormat 的結果再次封裝,類似Java的流的Fitter
NullOutputFormat 類:將鍵值對寫入/dev/null,相當於舍棄這些值
五. 記錄寫入器 RecordWriter
InputFormat 提供了RecordReader 來從輸入文件中讀取鍵值對,OutputFormat 也提供了RecordWriter 用於MapReduce 作業的鍵值對寫入指定的輸出中。
RecordWriter 有兩個抽象函數:
public abstract void write(K key,V value)//用於將產生的鍵值對以指定的格式寫入到輸出目錄中
public abstract void close(TaskAttempContext context)//關閉輸出,並釋放資源

