本篇分兩部分,第一部分分析使用 java 提交 mapreduce 任務時對 mapper 數量的控制,第二部分分析使用 streaming 形式提交 mapreduce 任務時對 mapper 數量的控制。
環境:hadoop-3.0.2
前言:
熟悉 hadoop mapreduce 的人可能已經知道,即使在程序里對 conf 顯式地設置了 mapred.map.tasks 或 mapreduce.job.maps,程序也並沒有運行期望數量的 mapper。
這是因為,mapper 的數量由輸入的大小、HDFS 當前設置的 BlockSize、以及當前配置中的 split min size 和 split max size 等參數共同確定,並不會受到簡單的人工設置 mapper num 的影響。
因此,對於 mapper num 的控制,需要我們理解 hadoop 中對於 FileInputFormat 類中 getSplit() 方法的實現,針對性地配置 BlockSize、split min size、split max size 等參數,才能達到目的。
重點:
值得一提並且容易忽略的是,要區分 org.apache.hadoop.mapred.FileInputFormat類和 org.apache.hadoop.mapreduce.lib.input.FileInputFormat類,兩者雖然相似,但在getSplit()上的實現是有區別的。
重要區別是,hadoop streaming 中使用的 InputFormat 類,使用的是 org.apache.hadoop.mapred.FileInputFormat,僅僅需要指定 mapreduce.job.maps ,就能夠設置 mapper num了(具體源碼分析在第二部分)。而使用JAVA設計的 mapreduce 任務中使用的 InputFormat 類,使用的是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat,則需要通過配置BlockSize、split min size、split max size 等參數來間接性地控制 mapper num。
一、Java 本地提交 mapreduce 任務, org.apache.hadoop.mapreduce.lib.input.FileInputFormat 的 mapper num 控制
1. 在java本地編輯 mapreduce 任務,(默認)使用 FileInputFormat 類的子類 TextInputFormat
job.setInputFormatClass(TextInputFormat.class);
2. mapper 的切分邏輯在 FileInputFormat 類中的 getSplits()實現:
public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = (new StopWatch()).start(); long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); List<InputSplit> splits = new ArrayList(); List<FileStatus> files = this.listStatus(job); Iterator var9 = files.iterator(); while(true) { while(true) { while(var9.hasNext()) { FileStatus file = (FileStatus)var9.next(); Path path = file.getPath(); long length = file.getLen(); if (length != 0L) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus)file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0L, length); } if (this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } if (bytesRemaining != 0L) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) { LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath()); } splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, new String[0])); } } job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; } } }
3. 最后確定 mapper 數量在這里:
1 if (this.isSplitable(job, path)) { 2 long blockSize = file.getBlockSize(); 3 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); 4 5 long bytesRemaining; 6 int blkIndex; 7 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { 8 blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); 9 splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); 10 } 11 12 if (bytesRemaining != 0L) { 13 blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); 14 splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); 15 }
含義:
a. 當 this.isSplitable 開啟時,只要當前未分配的大小 bytesRemaining 大於 splitSize 的 1.1 倍,就添加一個 inputSplit, 即一個mapper 被生成。
b. 最后,不足 1.1 倍splitSize 的殘余,補充為一個 mapper。因此,經常發現實際分配的 mapper 數比自己定義的會多 1 個。
c. 為什么設置1.1倍?避免將不足 0.1 倍 splitSize 的量分配為一個 mapper, 避免浪費。
4. 重要的兩個量:BlockSize 和 splitSize
long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
其中,blockSize 是 hdfs 設置的,一般是 64MB 或 128MB,我的 hdfs 中為 128 MB = 132217728L。這個量可認為靜態,我們不宜修改。
觀察 splitSize 的獲得:
1 protected long computeSplitSize(long blockSize, long minSize, long maxSize) { 2 return Math.max(minSize, Math.min(maxSize, blockSize)); 3 }
在 getSplits()中找到 minSize, maxSize, blockSize 的賦值:
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job);
找到這些量的賦值、默認值:
maxSize 的 setter/getter: 除非用戶重新設置,否則 maxSize 的默認值為 Long 的最大值
1 public static void setMaxInputSplitSize(Job job, long size) { 2 job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", size); 3 } 4 5 public static long getMaxSplitSize(JobContext context) { 6 return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L); 7 }
minSize 的 setter/getter: 除非用戶重新設置,否則 minSize 的默認值為 1L
protected long getFormatMinSplitSize() { return 1L; } public static void setMinInputSplitSize(Job job, long size) { job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", size); } public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L); }
因此容易算出,默認情況下,
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize) = Math.max(Math.max(1L,1L), Math.min(9223372036854775807L, 128M=132217728L)) = 132217728L = 128M
5. 控制 mapper 數量
知道了上面的計算過程,我們要控制 mapper,在 BlockSize 不能動的情況下,就必須控制 minSize 和 maxSize 了。這里主要控制 maxSize。
TextInputFormat.setMinInputSplitSize(job, 1L);//設置minSize TextInputFormat.setMaxInputSplitSize(job, 10 * 1024 * 1024);//設置maxSize
測試輸入文件大小為 40MB, 很小, 在默認情況下, 被分配為 1 個或 2 個 mapper 執行成功。
現在希望分配 4 個mapper:那么設置 maxSize 為10M ,那么 splitSize 計算為 10M。對於 40MB 的輸入文件,理應分配 4 個mapper。
實際運行,運行了 5 個mapper,認為成功擺脫了默認啟動 2 個mapper 的限制,額外多出的 1 個 mapper 則猜測是上文提到的,對殘余量的補充 mapper。
6. 至此,對Java 本地提交 mapreduce 任務, org.apache.hadoop.mapreduce.lib.input.FileInputFormat 的 mapper num 控制方法如上。接下來討論 streaming 使用的 org.apache.hadoop.mapred.FileInputFormat 的 mapper 控制。
二、streaming 提交 mapreduce 任務, org.apache.hadoop.mapred.FileInputFormat 的 mapper num 控制
1. 可通過 mapreduce.job.maps 直接控制,即使不是絕對精確。原因在下面的源碼分析中可以看到。
1 hadoop dfs -rm -r -f /output && \ 2 3 hadoop jar /opt/hadoop-3.0.2/share/hadoop/tools/lib/hadoop-streaming-3.0.2.jar \ 4 -D mapreduce.reduce.tasks=0 \ 5 -D mapreduce.job.maps=7 \ 6 -input /input \ 7 -output /output \ 8 -mapper "cat" \ 9 -inputformat TextInputFormat
2. 將 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 中的 maxSize,嘗試通過 streaming 的 -D 設置,是無效的。因為 streaming 使用的是 org.apache.hadoop.mapred.FileInputFormat,在下面的源碼分析中可以看到。
3. 查看 FileInputFormat 的 getSplits 源碼
1 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 2 StopWatch sw = (new StopWatch()).start(); 3 FileStatus[] files = this.listStatus(job); 4 job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length); 5 long totalSize = 0L; 6 FileStatus[] var7 = files; 7 int var8 = files.length; 8 9 for(int var9 = 0; var9 < var8; ++var9) { 10 FileStatus file = var7[var9]; 11 if (file.isDirectory()) { 12 throw new IOException("Not a file: " + file.getPath()); 13 } 14 15 totalSize += file.getLen(); 16 } 17 18 long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); 19 long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize); 20 ArrayList<FileSplit> splits = new ArrayList(numSplits); 21 NetworkTopology clusterMap = new NetworkTopology(); 22 FileStatus[] var13 = files; 23 int var14 = files.length; 24 25 for(int var15 = 0; var15 < var14; ++var15) { 26 FileStatus file = var13[var15]; 27 Path path = file.getPath(); 28 long length = file.getLen(); 29 if (length == 0L) { 30 splits.add(this.makeSplit(path, 0L, length, new String[0])); 31 } else { 32 FileSystem fs = path.getFileSystem(job); 33 BlockLocation[] blkLocations; 34 if (file instanceof LocatedFileStatus) { 35 blkLocations = ((LocatedFileStatus)file).getBlockLocations(); 36 } else { 37 blkLocations = fs.getFileBlockLocations(file, 0L, length); 38 } 39 40 if (!this.isSplitable(fs, path)) { 41 if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) { 42 LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath()); 43 } 44 45 String[][] splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap); 46 splits.add(this.makeSplit(path, 0L, length, splitHosts[0], splitHosts[1])); 47 } else { 48 long blockSize = file.getBlockSize(); 49 long splitSize = this.computeSplitSize(goalSize, minSize, blockSize); 50 51 long bytesRemaining; 52 String[][] splitHosts; 53 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { 54 splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap); 55 splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); 56 } 57 58 if (bytesRemaining != 0L) { 59 splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); 60 splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); 61 } 62 } 63 } 64 } 65 66 sw.stop(); 67 if (LOG.isDebugEnabled()) { 68 LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); 69 } 70 71 return (InputSplit[])splits.toArray(new FileSplit[splits.size()]); 72 }
與 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 相似,但不同之處還是很重要的。主要在
long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
賦值:
long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
4. 追溯這些量
protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }
private long minSplitSize = 1L;
5. 分析
minSize 默認為1L,blocksize是當前目標文件的塊大小,而 splitSize 就是 BlockSize 和 goalSize 的小值。
goalSize 的計算,就是輸入文件總大小與 numSplits 的比值。而 numSplits 就是我們在streaming 中設置的 -D mapreduce.job.maps
因此,在streaming中才可以簡單地直接設置 mapper 的數量了。
但是,只有當 goalsize 小於 blocksize 時,mapreduce.job.maps 才會生效!
當 goalsize < blocksize,splitsize = goalsize,此時你設置的 mapreduce.job.maps 數量一般大於輸入塊的數量,因此配置生效。
當 goalsize > blocksize,splitsize = blocksize,此時你設置的 mapreduce.job.maps 不足,一般少於輸入塊的數量,因此配置不生效。
換句話說,如果輸入只有一個文件,那么只要 -D mapreduce.job.maps > 1,配置大多數會生效。