MapReduce :基於 FileInputFormat 的 mapper 數量控制


本篇分兩部分,第一部分分析使用 java 提交 mapreduce 任務時對 mapper 數量的控制,第二部分分析使用 streaming 形式提交 mapreduce 任務時對 mapper 數量的控制。

 

環境:hadoop-3.0.2

前言:

熟悉 hadoop mapreduce 的人可能已經知道,即使在程序里對 conf 顯式地設置了 mapred.map.tasks 或 mapreduce.job.maps,程序也並沒有運行期望數量的 mapper。

這是因為,mapper 的數量由輸入的大小、HDFS 當前設置的 BlockSize、以及當前配置中的 split min size 和 split max size 等參數共同確定,並不會受到簡單的人工設置 mapper num 的影響。

因此,對於 mapper num 的控制,需要我們理解 hadoop 中對於 FileInputFormat 類中 getSplit() 方法的實現,針對性地配置 BlockSize、split min size、split max size 等參數,才能達到目的。

重點:

值得一提並且容易忽略的是,要區分 org.apache.hadoop.mapred.FileInputFormat類和 org.apache.hadoop.mapreduce.lib.input.FileInputFormat類,兩者雖然相似,但在getSplit()上的實現是有區別的。

重要區別是,hadoop streaming 中使用的 InputFormat 類,使用的是 org.apache.hadoop.mapred.FileInputFormat,僅僅需要指定 mapreduce.job.maps ,就能夠設置 mapper num了(具體源碼分析在第二部分)。而使用JAVA設計的 mapreduce 任務中使用的 InputFormat 類,使用的是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat,則需要通過配置BlockSize、split min size、split max size 等參數來間接性地控制 mapper num。

 

一、Java 本地提交 mapreduce 任務, org.apache.hadoop.mapreduce.lib.input.FileInputFormat 的 mapper num 控制

1. 在java本地編輯 mapreduce 任務,(默認)使用 FileInputFormat 類的子類 TextInputFormat

job.setInputFormatClass(TextInputFormat.class);
 
        

 

2. mapper 的切分邏輯在 FileInputFormat 類中的 getSplits()實現:

public List<InputSplit> getSplits(JobContext job) throws IOException {
        StopWatch sw = (new StopWatch()).start();
        long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);
        List<InputSplit> splits = new ArrayList();
        List<FileStatus> files = this.listStatus(job);
        Iterator var9 = files.iterator();

        while(true) {
            while(true) {
                while(var9.hasNext()) {
                    FileStatus file = (FileStatus)var9.next();
                    Path path = file.getPath();
                    long length = file.getLen();
                    if (length != 0L) {
                        BlockLocation[] blkLocations;
                        if (file instanceof LocatedFileStatus) {
                            blkLocations = ((LocatedFileStatus)file).getBlockLocations();
                        } else {
                            FileSystem fs = path.getFileSystem(job.getConfiguration());
                            blkLocations = fs.getFileBlockLocations(file, 0L, length);
                        }

                        if (this.isSplitable(job, path)) {
                            long blockSize = file.getBlockSize();
                            long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

                            long bytesRemaining;
                            int blkIndex;
                            for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }

                            if (bytesRemaining != 0L) {
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }
                        } else {
                            if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) {
                                LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
                            }

                            splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
                        }
                    } else {
                        splits.add(this.makeSplit(path, 0L, length, new String[0]));
                    }
                }

                job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
                sw.stop();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
                }

                return splits;
            }
        }
    }

 

3. 最后確定 mapper 數量在這里:

 1                         if (this.isSplitable(job, path)) {
 2                             long blockSize = file.getBlockSize();
 3                             long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
 4 
 5                             long bytesRemaining;
 6                             int blkIndex;
 7                             for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
 8                                 blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
 9                                 splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
10                             }
11 
12                             if (bytesRemaining != 0L) {
13                                 blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
14                                 splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
15                             }

含義:

a. 當 this.isSplitable 開啟時,只要當前未分配的大小 bytesRemaining 大於 splitSize 的 1.1 倍,就添加一個 inputSplit, 即一個mapper 被生成。 

b. 最后,不足 1.1 倍splitSize 的殘余,補充為一個 mapper。因此,經常發現實際分配的 mapper 數比自己定義的會多 1 個。

c. 為什么設置1.1倍?避免將不足 0.1 倍 splitSize 的量分配為一個 mapper, 避免浪費。

 

4.  重要的兩個量:BlockSize 和 splitSize

long blockSize = file.getBlockSize();
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

其中,blockSize 是 hdfs 設置的,一般是 64MB 或 128MB,我的 hdfs 中為 128 MB = 132217728L。這個量可認為靜態,我們不宜修改。

觀察 splitSize 的獲得:

1     protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
2         return Math.max(minSize, Math.min(maxSize, blockSize));
3     }

在 getSplits()中找到 minSize, maxSize, blockSize 的賦值:

long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

找到這些量的賦值、默認值:

maxSize 的 setter/getter:    除非用戶重新設置,否則 maxSize 的默認值為 Long 的最大值 

1 public static void setMaxInputSplitSize(Job job, long size) {
2         job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", size);
3     }
4 
5 public static long getMaxSplitSize(JobContext context) {
6         return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
7     }

minSize 的 setter/getter:  除非用戶重新設置,否則 minSize 的默認值為 1L

protected long getFormatMinSplitSize() {
        return 1L;
    }

public static void setMinInputSplitSize(Job job, long size) {
        job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", size);
    }

public static long getMinSplitSize(JobContext job) {
        return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
    }

 

因此容易算出,默認情況下,

long splitSize = this.computeSplitSize(blockSize, minSize, maxSize) = Math.max(Math.max(1L,1L), Math.min(9223372036854775807L, 128M=132217728L)) = 132217728L = 128M

5. 控制 mapper 數量

知道了上面的計算過程,我們要控制 mapper,在 BlockSize 不能動的情況下,就必須控制 minSize 和 maxSize 了。這里主要控制 maxSize。 

 

TextInputFormat.setMinInputSplitSize(job, 1L);//設置minSize
TextInputFormat.setMaxInputSplitSize(job, 10 * 1024 * 1024);//設置maxSize

 

測試輸入文件大小為 40MB, 很小, 在默認情況下, 被分配為 1 個或 2 個 mapper 執行成功。

現在希望分配 4 個mapper:那么設置 maxSize 為10M ,那么 splitSize 計算為 10M。對於 40MB 的輸入文件,理應分配 4 個mapper。

實際運行,運行了 5 個mapper,認為成功擺脫了默認啟動 2 個mapper 的限制,額外多出的 1 個 mapper 則猜測是上文提到的,對殘余量的補充 mapper。

 

6. 至此,對Java 本地提交 mapreduce 任務, org.apache.hadoop.mapreduce.lib.input.FileInputFormat 的 mapper num 控制方法如上。接下來討論 streaming 使用的 org.apache.hadoop.mapred.FileInputFormat 的 mapper 控制。

 

二、streaming 提交 mapreduce 任務, org.apache.hadoop.mapred.FileInputFormat 的 mapper num 控制

1. 可通過 mapreduce.job.maps 直接控制,即使不是絕對精確。原因在下面的源碼分析中可以看到。

1 hadoop dfs -rm -r -f /output && \
2 
3 hadoop jar /opt/hadoop-3.0.2/share/hadoop/tools/lib/hadoop-streaming-3.0.2.jar \
4 -D mapreduce.reduce.tasks=0 \
5 -D mapreduce.job.maps=7 \
6 -input /input \
7 -output /output \
8 -mapper "cat" \
9 -inputformat TextInputFormat

 

2. 將 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 中的 maxSize,嘗試通過 streaming 的 -D 設置,是無效的。因為 streaming 使用的是 org.apache.hadoop.mapred.FileInputFormat,在下面的源碼分析中可以看到。

 

3. 查看 FileInputFormat 的 getSplits 源碼

 1     public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 2         StopWatch sw = (new StopWatch()).start();
 3         FileStatus[] files = this.listStatus(job);
 4         job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length);
 5         long totalSize = 0L;
 6         FileStatus[] var7 = files;
 7         int var8 = files.length;
 8 
 9         for(int var9 = 0; var9 < var8; ++var9) {
10             FileStatus file = var7[var9];
11             if (file.isDirectory()) {
12                 throw new IOException("Not a file: " + file.getPath());
13             }
14 
15             totalSize += file.getLen();
16         }
17 
18         long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
19         long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
20         ArrayList<FileSplit> splits = new ArrayList(numSplits);
21         NetworkTopology clusterMap = new NetworkTopology();
22         FileStatus[] var13 = files;
23         int var14 = files.length;
24 
25         for(int var15 = 0; var15 < var14; ++var15) {
26             FileStatus file = var13[var15];
27             Path path = file.getPath();
28             long length = file.getLen();
29             if (length == 0L) {
30                 splits.add(this.makeSplit(path, 0L, length, new String[0]));
31             } else {
32                 FileSystem fs = path.getFileSystem(job);
33                 BlockLocation[] blkLocations;
34                 if (file instanceof LocatedFileStatus) {
35                     blkLocations = ((LocatedFileStatus)file).getBlockLocations();
36                 } else {
37                     blkLocations = fs.getFileBlockLocations(file, 0L, length);
38                 }
39 
40                 if (!this.isSplitable(fs, path)) {
41                     if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) {
42                         LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
43                     }
44 
45                     String[][] splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap);
46                     splits.add(this.makeSplit(path, 0L, length, splitHosts[0], splitHosts[1]));
47                 } else {
48                     long blockSize = file.getBlockSize();
49                     long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
50 
51                     long bytesRemaining;
52                     String[][] splitHosts;
53                     for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
54                         splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
55                         splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
56                     }
57 
58                     if (bytesRemaining != 0L) {
59                         splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
60                         splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
61                     }
62                 }
63             }
64         }
65 
66         sw.stop();
67         if (LOG.isDebugEnabled()) {
68             LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
69         }
70 
71         return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
72     }

與 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 相似,但不同之處還是很重要的。主要在

long blockSize = file.getBlockSize();
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);

賦值:

long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);

 

4. 追溯這些量

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
}
private long minSplitSize = 1L;

 

5. 分析

minSize 默認為1L,blocksize是當前目標文件的塊大小,而 splitSize 就是 BlockSize 和 goalSize 的小值。

goalSize 的計算,就是輸入文件總大小與 numSplits 的比值。而 numSplits 就是我們在streaming 中設置的 -D mapreduce.job.maps

因此,在streaming中才可以簡單地直接設置 mapper 的數量了。

但是,只有當 goalsize 小於 blocksize 時,mapreduce.job.maps 才會生效!

當 goalsize < blocksize,splitsize = goalsize,此時你設置的 mapreduce.job.maps 數量一般大於輸入塊的數量,因此配置生效。

當 goalsize > blocksize,splitsize = blocksize,此時你設置的 mapreduce.job.maps 不足,一般少於輸入塊的數量,因此配置不生效。

換句話說,如果輸入只有一個文件,那么只要 -D mapreduce.job.maps > 1,配置大多數會生效。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM