Hadoop文件分片與InputFormat


 

1. Mapper Reducer 數量

對於一個默認的MapReduce Job 來說,map任務的數量等於輸入文件被划分成的分塊數,這個取決於輸入文件的大小以及文件塊的大小(如果此文件在 HDFS中)。但是對於 reduce的任務,並不會自動決定reducer數目的大小,若未指定,則默認為1。例如:

 

 

但單個reducer任務執行效率不盡人意,在實際場景中會將它設置為一個較大的數值。此時,決定Key條目被送往哪個reducer由方法 setPartitionerClass() 指定:job.setPartitionerClass(HashPartitioner.class); 

 
默認為HashPartitioner,它會將每條KeyHash,然后與最大的整型值做一次按位與操作,以得到一個非負整數。然后對分區數做取模(mod)操作,將key分配到其中一個分區。這里的分區數即為reducer數目。HashPartitioner 源碼如下:
 
public class HashPartitioner<K, V> extends Partitioner<K, V> {
    
public HashPartitioner() {
    }

   
public int getPartition(K key, V value, int numReduceTasks) {
       
return (key.hashCode() & 2147483647) % numReduceTasks;
    }
}

 

若是為reducer數目設置為默認值1,則所有的中間數據都會被放入到一個reducer中,作業處理效率會非常低效。若是設置了過大的值,則每個reducer都會輸出一個文件,會導致過多的小文件。

在為一個任務選擇多少個reducer個數時,應遵循的原則為:目標reducer保持在每個運行5分鍾左右,且產生至少一個HDFS塊的輸出比較合適。

記錄在發送給 reducer 之前,會被MapReduce系統進行排序。因此輸入文件中的行會被交叉放入一個合並后的輸出文件。

 

2. 輸入格式

我們已經了解到map的輸入是分片(split),一個分片對應一個mapper,且僅被一個mapper處理。分片里面是多條記錄(item)。“輸入分片”在Hadoop中以InputSplit 接口的方式提供:

public interface InputSplit extends Writable {
   
long getLength() throws IOException;

    String[] getLocations()
throws IOException;
}

 

它包含兩個方法,分別為getLength() getLocations()。其中getLength() 用於獲取數據的長度(以字節為單位);getLocations() 用於獲取一組存儲位置(也就是一組主機名)。其中getLocations()的返回值由mapreduce系統獲取后,實現data locality,也就是盡量將map任務放在離數據節點近的地方。而getLength() 的返回值用於排序分片,將最大的分片優先處理,以最小化整個作業運行的時間。

InputSplitmapreduce中的分片)由InputFormat創建,它負責創建輸入分片,並將它們分成一條條記錄(item)。首先簡單看一下InputFormat 抽象類:

public abstract class InputFormat<K, V> {
   
public InputFormat() {
    }

   
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

   
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

 

這里 getSplits() 方法計算分片,然后將計算得到的List 結果發給 application masterApplication master 根據其分片所在節點信息,調度map任務到離分片數據最近的節點。在map任務端,會把輸入分片傳給 InputFormat createRecordReader() 方法,此方法會返回一個 RecordReader 對象,用於迭代讀取這個分片上的記錄(item),並生成記錄的鍵值對,之后傳遞給 map函數。通過查看 Mapper 類中的 run() 方法,更好的了解此過程:

public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
   
this.setup(context);

   
try {
       
while(context.nextKeyValue()) {
           
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    }
finally {
        
this.cleanup(context);
    }

}

 

這里,先運行setup() 操作,然后從 context 不斷迭代獲取分片的內容,並傳給map() 方法,並由map() 方法進一步對 key-value 對進行處理。

 

3. FileInputFormat

Hadoop 中,數據源一般為文件,而 FileInputFormat 類就是用於處理數據源為文件的一個(繼承於)InputFormat :

public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {

 

可以看到它是一個抽象類,它的實現類有CombineFileInputFormatTextInputFormatKeyValueTextInputFormatNLineInputFormat以及SequenceFileInputFormat

FileInputFormat類提供兩個功能:1. 指出作業的輸入文件位置;2. 為輸入文件生成分片的代碼實現。

FileInputFormat中,作業的輸入可以是一個文件、一個目錄,也可以是目錄與文件的集合。它提供了四種方法用於設置Job的輸入路徑:

public static void addInputPath(JobConf conf, Path path)
public static void addInputPaths(JobConf conf, String commaSeparatedPaths)
public static void setInputPaths(JobConf conf, Path... inputPaths)
public static void setInputPaths(JobConf conf, String commaSeparatedPaths) 

 

其中addInputPath() addInputPaths() 用於添加路徑,以構成路徑列表。而setInputPath() 用於一次性設置完整的路徑列表(會替換前面所有路徑設置)。

在設置路徑后,也可以指定需要排除的特定文件,此功能由 setInputPathFilter() 實現:

public static void setInputPathFilter(JobConf conf, Class<? extends PathFilter> filter) {
    conf.setClass(
"mapreduce.input.pathFilter.class", filter, PathFilter.class);
}

 

它可以設置一個過濾器PathFilter,默認的實現是過濾掉隱藏文件(以 . _ 開頭的文件)。如果通過setInputPathFilter() 設置過濾器,它會在默認過濾器的基礎上進行過濾,也就是說,僅會在非隱藏文件中再次進行過濾。

輸入路徑的設置可以通過屬性與選項進行配置,在屬性配置中相關配置為:

mapreduce.input.fileinputformat.inputdir (逗號分隔屬性,無默認值)

mapreduce.input.pathFilter.class  PathFilter 類名,無默認值)

 

4. FileInputFormat 類處理輸入分片

在設置了一組文件后,FileInputFormat會將文件轉換為輸入分片。這里需要注意的是:在HDFS中,一個文件可以占用(分布到)多個block,但是不會存在一個block中存多個文件。對於小文件(小於一個HDFS 塊大小的文件)來說,一個文件就是占用一個block,但是不會占據整個block的空間。例如,當一個1MB的文件存儲在一個128MB 的塊中時,文件只使用 1MB 的磁盤空間,而不是128MB)。

FileInputFormat 只分割大文件,也就是文件超過HDFS塊的大小。在FileInputFormat中,控制分片大小的屬性有:

mapreduce.input.fileinputformat.split.minsize  一個文件分片最小的有效字節數(int類型),默認值為1(字節)

mapreduce.input.fileinputformat.split.maxsize  一個文件分片中最大的有效字節數(long 類型),默認值為Long.MAX_VALUE,即9223372036854775807

dfs.blocksize        HDFS中的塊大小(按字節),默認為 128MB(即 134217728

 

最小分片通常為1個字節,用戶可以設置最小分片的大小超過HDFS 塊大小,這樣會強制分片比HDFS塊大。但是如果數據存儲在 HDFS 上,則這樣對data locality 來說,並不友好,以至於延長任務執行時間。

最大分片默認是 Java Long 類型的最大值,只有把它的值設置為小於 HDFS Block 大小才有效,此時會強制分片比塊小。

FileInputFormat中,分片的大小由以下公式計算:

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
   
return Math.max(minSize, Math.min(maxSize, blockSize));
}

 

其中參數部分為:

long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

 

protected long getFormatMinSplitSize() {
   
return 1L;
}

 

public static long getMinSplitSize(JobContext job) {
   
return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
}

 

public static long getMaxSplitSize(JobContext context) {
   
return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
}

 

minSize 若未指定,則默認為 1MaxSize默認為Java Long類型最大值。再計算時,先取 maxSize blockSize 的最小值,然后再取結果與 minSize的最大值。

在默認情況下:minSize < blockSize < maxSize

所以分片的大小就是 blockSize大小。

 

5. 小文件與 CombineFileInputFormat

相對於大量的小文件,Hadoop更適合處理少量的大文件。其中一個原因是:對於每個小文件(遠小於 HDFS塊大小),FileInputFormat 都會生成一個分片(生成的分片要么是文件的整個內容,要么是文件的部分內容),這樣會產生大量的 map 任務,並且每個map任務僅處理一小部分數據,這樣會導致任務執行效率低下,時間過長。

CombineFileInputFormat 可以緩解此問題,它針對小文件而設計。FileInputFormat 為每個小文件產生一個分片,而CombineFileInpurtFormat 把多個文件打包到一個分片中,以便於每個 mapper 可以處理更多的數據。更重要的是:CombineFileInputFormat在分配多個block到同一個 split時,會考慮到node locality 以及 rack locality。所以它的速度在一個典型的 mr 任務中,處理輸入的速度並不會下降。

不過盡可能要避免小文件過多的情況,原因有:

  1. 處理小文件會增加運行作業而必須的尋址次數

  2. 浪費namenode的內存

可以嘗試使用順序文件(sequence file)將這些小文件合並成一個或多個大文件:例如將文件名作為key,文件內容作為 value。但是如果集群里已經有了大量小文件,可以嘗試一下CombineFileInputFormat 方法。

CombinedFileInputFormat不僅處理小文件有好處,處理大文件時也有益處。例如,如果mapper在處理一個block時僅花費很少的時間,則可以考慮使用CombineFileInputFormat,並將maximum split size 設置為 HDFS block 大小的幾倍(參數為mapred.max.split.size)。這樣每個mapper會處理多個block,使得整個處理時間下降。

 

6. 避免分片

有時候可能需要計算整個文件里的順序關系,這種任務無法分布式處理,所以只能讓文件由一個mapper處理,此時需要避免文件被分片。

有兩種方式可以避免文件被分片,而是當作一個單獨分片處理:

  1. 設置最小分片大小split.minsize Java Long類型最大值(long.MAX_VALUE

  2.  使用FileInputFormat 具體子類時,重寫isSplitable() 方法,把返回值設置為 false

使用第二種方法時,以 TextInputFormat類為例:

public class TextInputFormat extends FileInputFormat<LongWritable, Text> implements JobConfigurable {
   
private CompressionCodecFactory compressionCodecs = null;

   
public TextInputFormat() {
    }

   
public void configure(JobConf conf) {
       
this.compressionCodecs = new CompressionCodecFactory(conf);
    }

   
protected boolean isSplitable(FileSystem fs, Path file) {
        CompressionCodec codec =
this.compressionCodecs.getCodec(file);
       
return null == codec ? true : codec instanceof SplittableCompressionCodec;
    }

….

}

默認會根據 CompressionCodec 類型判斷是否切分,也可以直接指定return false,使得輸入文件不可切分。

 

References:Hadoop權威指南第4版 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM