hadoop 分片與分塊,map task和reduce task的理解


分塊:Block

  HDFS存儲系統中,引入了文件系統的分塊概念(block),塊是存儲的最小單位,HDFS定義其大小為64MB。與單磁盤文件系統相似,存儲在 HDFS上的文件均存儲為多個塊,不同的是,如果某文件大小沒有到達64MB,該文件也不會占據整個塊空間。在分布式的HDFS集群上,Hadoop系統保證一個塊存儲在一個datanode上。

  把File划分成Block,這個是物理上真真實實的進行了划分,數據文件上傳到HDFS里的時候,需要划分成一塊一塊,每塊的大小由hadoop-default.xml里配置選項進行划分。一個大文件可以把划分后的所有塊存儲到同一個磁盤上,也可以在每個磁盤上都存在這個文件的分塊。

這個就是默認的每個塊64M:

<property>  
  <name>dfs.block.size</name>  
  <value>67108864</value>  
  <description>The default block size for new files.</description>  
</property>  

數據划分的時候有冗余,即進行備份,個數是由以下配置指定的。具體的物理划分步驟由Namenode決定。

1 <property>  
2   <name>dfs.replication</name>  
3   <value>3</value>  
4   <description>Default block replication.   
5   The actual number of replications can be specified when the file is created.  
6   The default is used if replication is not specified in create time.  
7   </description>  
8 </property>  

分片:splits

  由InputFormat這個接口來定義的,其中有個getSplits方法。這里有一個新的概念:fileSplit。每個map處理一個fileSplit,所以有多少個fileSplit就有多少個map(map數並不是單純的由用戶設置決定的)。

我們來看一下hadoop分配splits的源碼:

 

 1 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
 2 long minSize = Math.max(job.getLong("mapred.min.split.size", 1), minSplitSize);
 3 
 4 for (FileStatus file: files) {
 5   Path path = file.getPath();
 6   FileSystem fs = path.getFileSystem(job);
 7   if ((length != 0) && isSplitable(fs, path)) { 
 8     long blockSize = file.getBlockSize();
 9     long splitSize = computeSplitSize(goalSize, minSize, blockSize);
10     
11     long bytesRemaining = length;
12     while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
13       String[] splitHosts = getSplitHosts(blkLocations,length-bytesRemaining, splitSize, clusterMap);
14       splits.add(new FileSplit(path, length-bytesRemaining, splitSize, splitHosts));
15       bytesRemaining -= splitSize;
16     }
17 
18     if (bytesRemaining != 0) {
19       splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));
20     }
21   } else if (length != 0) {
22     String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
23     splits.add(new FileSplit(path, 0, length, splitHosts));
24   } else { 
25     //Create empty hosts array for zero length files
26     splits.add(new FileSplit(path, 0, length, new String[0]));
27   }
28 }
29 
30 return splits.toArray(new FileSplit[splits.size()]);
31 
32 protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
33     return Math.max(minSize, Math.min(goalSize, blockSize));
34 }

totalSize:是整個Map-Reduce job所有輸入的總大小。

numSplits:來自job.getNumMapTasks(),即在job啟動時用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設置的值,給M-R框架的Map數量的提示。

goalSize:是輸入總大小與提示Map task數量的比值,即期望每個Mapper處理多少的數據,僅僅是期望,具體處理的數據數由下面的computeSplitSize決定。

minSplitSize:默認為1,可由子類復寫函數protected void setMinSplitSize(long minSplitSize) 重新設置。一般情況下,都為1,特殊情況除外

minSize:取的1和mapred.min.split.size中較大的一個。

blockSize:HDFS的塊大小,默認為64M,一般大的HDFS都設置成128M。

splitSize:就是最終每個Split的大小,那么Map的數量基本上就是totalSize/splitSize。

接下來看看computeSplitSize的邏輯:首先在goalSize(期望每個Mapper處理的數據量)和HDFS的block size中取較小的,然后與mapred.min.split.size相比取較大的

 

  一個片為一個splits,即一個map,只要搞清楚片的大小,就能計算出運行時的map數。而一個split的大小是由goalSize, minSize, blockSize這三個值決定的。computeSplitSize的邏輯是,先從goalSize和blockSize兩個值中選出最小的那個(比如一般不設置map數,這時blockSize為當前文件的塊size,而goalSize是文件大小除以用戶設置的map數得到的,如果沒設置的話,默認是1),在默認的大多數情況下,blockSize比較小。然后再取blockSize和minSize中最大的那個。而minSize如果不通過”mapred.min.split.size”設置的話(”mapred.min.split.size”默認為0),minSize為1,這樣得出的一個splits的size就是blockSize,即一個塊一個map,有多少塊就有多少map。

 

 

input_file_num : 輸入文件的個數
(1)默認map個數
如果不進行任何設置,默認的map個數是和blcok_size相關的。
default_num = total_size / block_size;
(2)期望大小
可以通過參數
mapred.map.tasks來設置程序員期望的map個數,但是這個個數只有在大於default_num的時候,才會生效。
goal_num =mapred.map.tasks;
(3)設置處理的文件大小
可以通過mapred.min.split.size 設置每個task處理的文件大小,但是這個大小只有在大於
block_size的時候才會生效。
split_size = max(
mapred.min.split.size,
block_size);split_num = total_size / split_size;
(4)計算的map個數
compute_map_num = min(split_num, max(default_num, goal_num))
除了這些配置以外,mapreduce還要遵循一些原則。 mapreduce的每一個map處理的數據是不能跨越文件的,也就是說max_map_num <= input_file_num。 所以,最終的map個數應該為:
final_map_num = min(compute_map_num, input_file_num)
經過以上的分析,在設置map個數的時候,可以簡單的總結為以下幾點:
(1)如果想增加map個數,則設置mapred.map.tasks 為一個較大的值。
(2)如果想減小map個數,則設置mapred.min.split.size 為一個較大的值。

map task

如何調整map數量:

有了2的分析,下面調整Map的數量就很容易了。

減小Map-Reduce job 啟動時創建的Mapper數量

當處理大批量的大數據時,一種常見的情況是job啟動的mapper數量太多而超出了系統限制,導致Hadoop拋出異常終止執行。解決這種異常的思路是減少mapper的數量。具體如下:

  輸入文件size巨大,但不是小文件

  這種情況可以通過增大每個mapper的input size,即增大minSize或者增大blockSize來減少所需的mapper的數量。增大blockSize通常不可行,因為當HDFS被hadoop namenode -format之后,blockSize就已經確定了(由格式化時dfs.block.size決定),如果要更改blockSize,需要重新格式化HDFS,這樣當然會丟失已有的數據。所以通常情況下只能通過增大minSize,即增大mapred.min.split.size的值。

  輸入文件數量巨大,且都是小文件

  所謂小文件,就是單個文件的size小於blockSize。這種情況通過增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat將多個input path合並成一個InputSplit送給mapper處理,從而減少mapper的數量。具體細節稍后會更新並展開。

增加Map-Reduce job 啟動時創建的Mapper數量

增加mapper的數量,可以通過減小每個mapper的輸入做到,即減小blockSize或者減小mapred.min.split.size的值。

 

參考資料:

http://blog.csdn.net/strongerbit/article/details/7440111

http://blog.csdn.net/clerk0324/article/details/50887866

http://blog.csdn.net/yeruby/article/details/20068731

 http://zhidao.baidu.com/link?url=fLPxBdClbJ0R0-VGGiSbXR4bx9tlhadShKNYQ76CNrShD7Q7zsxr5b_df9gl7l5LA3VsXTkbeTvtOlj1fQY_yNx7bzopbfrW_tSbzN2J6Se


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM