MapReduce輸入格式


  文件是 MapReduce 任務數據的初始存儲地。正常情況下,輸入文件一般是存儲在 HDFS 里面。這些文件的格式可以是任意的:我們可以使用基於行的日志文件, 也可以使用二進制格式,多行輸入記錄或者其它一些格式。這些文件一般會很大,達到數十GB,甚至更大。那么 MapReduce 是如何讀取這些數據的呢?下面我們來學習 InputFormat 接口

1、InputFormat接口

  InputFormat接口決定了輸入文件如何被 Hadoop分塊(split up)與接受。InputFormat 能夠從一個 job 中得到一個 split 集合(InputSplit[]),然后再為這個 split 集合配上一個合適的 RecordReader(getRecordReader)來讀取每個split中的數據。 下面我們來看一下 InputFormat 接口由哪些抽象方法組成

2、InputFormat的抽象類方法

  InputFormat 包含兩個抽象方法,如下所示

1 public abstract class InputFormat< K, V> { 2 3 public abstract List<InputSplit> getSplits(JobContext context) throws IOException,InterruptedException; 4 5 public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,InterruptedException; 7 }

  1)getSplits(JobContext context) 方 法負責將一個大數據邏輯分成許多片。比如數據庫表有 100 條數據,按照主鍵ID升序存儲。 假設每20條分成一片,這個List的大小就是5,然后每個InputSplit記錄兩個參數,第一個為這個分片的起始 ID,第二個為這個分片數據的大小,這里是20.很明顯 InputSplit 並沒有真正存儲數據。只是提供了一個如何將數據分片的方法。

  2)createRecordReader(InputSplit split,TaskAttemptContext context)方 法根據 InputSplit 定義的方法,返回一個能夠讀取分片記錄的 RecordReader。getSplit 用來獲取由輸入文件計算出來的 InputSplit, 后面會看到計算 InputSplit 時,會考慮輸入文件是否可分割、文件存儲時分塊的大小和文件大小等因素;而createRecordReader() 提供了前面說的 RecordReader 的實現, 將Key-Value 對從 InputSplit 中正確讀出來,比如LineRecordReader,它是以偏移值為Key,每行的數據為 Value,這使所有 createRecordReader() 返回 LineRecordReader 的 InputFormat 都是以偏移值為Key,每行數據為 Value 的形式讀取輸入分片的。

  其實很多時候並不需要我們實現 InputFormat 來讀取數據,Hadoop 自帶有很多數據輸入格式,已經實現了 InputFormat接口

3、InputFormat接口實現類

  InputFormat 接口實現類有很多,其層次結構如下圖所示

  

  1、FileInputFormat

    FileInputFormat是所有使用文件作為其數據源的 InputFormat 實現的基類,它的主要作用是指出作業的輸入文件位置。因為作業的輸入被設定為一組路徑, 這對指定作業輸入提供了很強的靈活性。FileInputFormat 提供了四種靜態方法來設定 Job 的輸入路徑:

復制代碼
1 public static void addInputPath(Job job,Path path); 2 3 public static void addInputPaths(Job job,String commaSeparatedPaths); 4 5 public static void setInputPaths(Job job,Path... inputPaths); 6 7 public static void setInputPaths(Job job,String commaSeparatedPaths);
復制代碼

  addInputPath()、addInputPaths()方法可以將一 個或多個路徑加入路徑列表,可以分別調用這兩種方法來建立路徑列表;setInputPaths()方法一次設定完整的路徑列表,替換前面調用中在 Job 上所設置的所有路徑。它們具體的使用方法,看如下示例

復制代碼
 1     // 設置一個源路徑  2 FileInputFormat.addInputPath(job, new Path("hdfs://ljc:9000/buaa/inputPath1"));  3  4 // 設置多個源路徑,多個源路徑之間用逗號分開  5 FileInputFormat.addInputPaths(job, " hdfs://ljc:9000/buaa/inputPath1, hdfs://ljc:9000/buaa/inputPath2,...");  6  7 // inputPaths是一個Path類型的數組,可以包含多個源路徑,比如hdfs://ljc:9000/buaa/inputPath1,hdfs://ljc:9000/buaa/inputPath2,等  8  FileInputFormat.setInputPaths(job, inputPaths);  9 10 // 設置多個源路徑,多個源路徑之間用逗號分開 11 FileInputFormat.setInputPaths(job, " hdfs://ljc:9000/buaa/inputPath1, hdfs://ljc:9000/buaa/inputPath2,...");
復制代碼

  add 方法、set 方法允許指定包含的文件。如果需要排除特定文件,可以使用 FileInputFormat 的 setInputPathFilter()方法設置一個過濾器

 1 public static void setInputPathFilter(Job job,Class<? extends PathFilter filter); 

  關於過濾器,這里不再深入討論。即使不設置過濾 器,FileInputFormat 也會使用一個默認的過濾器來排除隱藏文件。 如果通過調用 setInputPathFilter()設置了過濾器,它會在默認過濾器的基礎上進行過濾。換句話說,自定義的過濾器只能看到非隱藏文件

  對於輸入的數據源是文件類型的情況下,Hadoop 不僅擅長處理非結構化文本數據,而且可以處理二進制格式的數據, 但它們的基類都是FileInputFormat。下面我們介紹的幾種常用輸入格式,都實現了FileInputFormat基類

    1、TextInputFormat

    TextInputFormat 是默認的 InputFormat。每條記錄是一行輸入。鍵是LongWritable 類型,存儲該行在整個文件中的字節偏移量。 值是這行的內容,不包括任何行終止符(換行符、回車符),它被打包成一個 Text 對象。

    比如,一個分片包含了如下5條文本記錄,記錄之間使用tab(水平制表符)分割

    1    22

    2    17

    3    17

    4    11

    5    11

    每條記錄表示為以下鍵/值對:

    (0, 1    22)

    (5, 2    17)

    (10,3    17)

    (15,4    11)

    (20,5    11)

    很明顯,鍵並不是行號。一般情況下,很難取得行號,因為文件按字節而不是按行切分為分片。

    2、KeyValueTextInputFormat

    每一行均為一條記錄, 被分隔符(缺省是tab(\t))分割為key(Text),value(Text)。可以通過 mapreduce.input.keyvaluelinerecordreader.key.value,separator屬性(或者舊版本 API 中的 key.value.separator.in.input.line)來設定分隔符。 它的默認值是一個制表符。

    比如,一個分片包含了如下5條文本記錄,記錄之間使用tab(水平制表符)分割。

    1    22

    2    17

    3    17

    4    11

    5    11

    每條記錄表示為以下鍵/值對:

    (1,22)

    (2,17)

    (3,17)

    (4,11)

    (5,11)

    此時的鍵是每行排在制表符之前的 Text 序列。

    3、NLineInputFormat

    通過 TextInputFormat 和 KeyValueTextInputFormat,每個 Mapper 收到的輸入行數不同。行數取決於輸入分片的大小和行的長度。 如果希望 Mapper 收到固定行數的輸入,需要將 NLineInputFormat 作為 InputFormat。與 TextInputFormat 一樣, 鍵是文件中行的字節偏移量,值是行本身。N 是每個 Mapper 收到的輸入行數。N 設置為1(默認值)時,每個 Mapper 正好收到一行輸入。 mapreduce.input.lineinputformat.linespermap 屬性(在舊版本 API 中的 mapred.line.input.format.linespermap 屬性)實現 N 值的設定。

    以下是一個示例,仍然以上面的4行輸入為例。

    1    22

    2    17

    3    17

    4    11

    5    11

    例如,如果 N 是3,則每個輸入分片包含三行。一個 mapper 收到三行鍵值對:

    1    22

    2    17

    3    17

    另一個 mapper 則收到后兩行(因為總共才5行,所有另一個mapper只能收到兩行)

    4    11

    5    11

    這里的鍵和值與 TextInputFormat 生成的一樣。

    4、SequenceFileInputFormat

    用於讀取 sequence file。鍵和值由用戶定義。序列文件為 Hadoop專用的壓縮二進制文件格式。它專用於一個 MapReduce作業和其它 MapReduce作業之間的傳送數據(適用與多個 MapReduce 鏈接操作)。

  2、多個輸入

    雖然一個 MapReduce 作業的輸入可以包含多個輸入文件,但所有文件都由同一個 InputFormat 和 同一個 Mapper 來解釋。 然而,數據格式往往會隨時間演變,所以必須寫自己的 Mapper 來處理應用中的遺留數據格式問題。或者,有些數據源會提供相同的數據, 但是格式不同。

    這些問題可以使用 MultipleInputs 類來妥善處理,它允許為每條輸入路徑指定 InputFormat 和 Mapper。例如,我們想把英國 Met Office 的氣象站數據和 NCDC 的氣象站數據放在一起來統計平均氣溫,則可以按照下面的方式來設置輸入路徑。

1 MultipleInputs.addInputPath(job,ncdcInputPath,TextInputFormat.class,NCDCTemperatureMapper.class); 2 3 MultipleInputs.addInputPath(job,metofficeInputPath,TextInputFormat.class,MetofficeTemperatureMapper.class);

    這段代碼取代了對 FileInputFormat.addInputPath()和job.setMapperClass() 的常規調用。Met Office 和 NCDC 的數據都是文本文件,所以對兩者都使用 TextInputFormat 數據類型。 但這兩個數據源的行格式不同,所以我們使用了兩個不一樣的 Mapper,分別為NCDCTemperatureMapper和MetofficeTemperatureMapper。重要的是兩個 Mapper 的輸出類型一樣,因此,reducer 看到的是聚集后的 map 輸出,並不知道這些輸入是由不同的 Mapper 產生的。

    MultipleInputs 類還有一個重載版本的 addInputPath() 方法,它沒有 Mapper參數。如果有多種輸入格式而只有一個 Mapper(通過 Job 的 setMapperClass()方法設定),這種方法很有用。其具體方法如下所示。

 1 public static void addInputPath(Job job,Path path,class< ? extends InputFormat> inputFormatClass); 

  3、DBInputFormat

    這 種輸入格式用於使用 JDBC 從關系數據庫中讀取數據。因為它沒有任何共享能力,所以在訪問數據庫的時候必須非常小心,在數據庫中運行太多的 mapper 讀數據可能會使數據庫受不了。 正是由於這個原因,DBInputFormat 最好用於加載少量的數據集。與之相對應的輸出格式是DBOutputFormat,它適用於將作業輸出數據(中等規模的數據)轉存到數據庫

  4、自定義 InputFormat

    有時候 Hadoop 自帶的輸入格式,並不能完全滿足業務的需求,所以需要我們根據實際情況自定義 InputFormat 類。而數據源一般都是文件數據,那么自定義 InputFormat時繼承 FileInputFormat 類會更為方便,從而不必考慮如何分片等復雜操作。 自定義輸入格式我們分為以下幾步:

    1、繼承 FileInputFormat 基類。

    2、重寫 FileInputFormat 里面的 isSplitable() 方法。

    3、重寫 FileInputFormat 里面的 createRecordReader()方法。

    按照上述步驟如何自定義輸入格式呢?下面我們通過一個示例加強理解。

    我們取有一份學生五門課程的期末考試成績數據,現在我們希望統計每個學生的總成績和平均成績。 樣本數據如下所示,每行數據的數據格式為:學號、姓名、語文成績、數學成績、英語成績、物理成績、化學成績

    19020090040 秦心芯 123 131 100 95 100

    19020090006 李磊 99 92 100 90 100

    。。。。。

    下面我們就編寫程序,實現自定義輸入並求出每個學生的總成績和平均成績。分為以下幾個步驟,這里只給出步驟,代碼見下

    第一步:為了便於每個學生學習成績的計算,這里我們需要自定義一個 ScoreWritable類實現 WritableComparable 接口,將學生各門成績封裝起來

    第二步:自定義輸入格式 ScoreInputFormat類,首先繼承 FileInputFormat,然后分別重寫 isSplitable() 方法和 createRecordReader() 方法。 需要注意的是,重寫createRecordReader()方法,其實也就是重寫其返回的對象ScoreRecordReader。 ScoreRecordReader 類繼承 RecordReader,實現數據的讀取

    第三步:編寫 MapReduce 程序,統計學生總成績和平均成績。需要注意的是,上面我們自定義的輸入格式ScoreInputFormat,需要在 MapReduce 程序中做如下設置,job.setInputFormatClass(ScoreInputFormat.class);//設置自定義輸入格式

    一般情況下,並不需要我們自定義輸入格式,Hadoop 自帶有很多種輸入格式,基本滿足我們工作的需要


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM