hadoop map(分片)數量確定


之前學習hadoop的時候,一直希望可以調試hadoop源碼,可是一直沒找到有效的方法,今天在調試矩陣乘法的時候發現了調試的方法,所以在這里記錄下來。

1)事情的起因是想在一個Job里設置map的數量(雖然最終的map數量是由分片決定的),在hadoop1.2.1之前,設置方法是:

job.setNumMapTasks()

不過,hadoop1.2.1沒有了這個方法,只保留了設置reduce數量的方法。繼續搜索資料,發現有同學提供了另外一種方法,就是使用configuration設置,設置方式如下:

conf.set("mapred.map.tasks",5);//設置5個map

按照上述方法設置之后,還是沒有什么效果,控制分片數量的代碼如下():

goalSize=totalSize/(numSplits==0?1:numSplits)
//totalSize是輸入數據文件的大小,numSplits是用戶設置的map數量,就是按照用戶自己
//的意願,每個分片的大小應該是goalSize
minSize=Math.max(job.getLong("mapred.min.split.size",1),minSplitSize)
//hadoop1.2.1中mapred-default.xml文件中mapred.min.split.size=0,所以job.getLong("mapred.min.split.size",1)=0,而minSplitSize是InputSplit中的一個數據成員,在File//Split中值為1.所以minSize=1,其目的就是得到配置中的最小值。
splitSize=Math.max(minSize,Math.min(goalSize,blockSize))
//真正的分片大小就是取按照用戶設置的map數量計算出的goalSize和塊大小blockSize中最小值(這是為了是分片不會大於一個塊大小,有利於本地化計算),並且又比minSize大的值。

其實,這是hadoop1.2.1之前的生成分片的方式,所以即使設置了map數量也不會有什么實際效果。

2)新版API(hadoop1.2.1)中計算分片的代碼如下所示:

 1  public List<InputSplit> getSplits(JobContext job) throws IOException {
 2         long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
 3         long maxSize = getMaxSplitSize(job);
 4         ArrayList splits = new ArrayList();
 5         List files = this.listStatus(job);
 6         Iterator i$ = files.iterator();
 7 
 8         while(true) {
 9             while(i$.hasNext()) {
10                 FileStatus file = (FileStatus)i$.next();
11                 Path path = file.getPath();
12                 FileSystem fs = path.getFileSystem(job.getConfiguration());
13                 long length = file.getLen();
14                 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0L, length);
15                 if(length != 0L && this.isSplitable(job, path)) {
16                     long blockSize = file.getBlockSize();
17                     long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); 18 
19                     long bytesRemaining;
20                     for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
21                         int blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
22                         splits.add(new FileSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
23                     }
24 
25                     if(bytesRemaining != 0L) {
26                         splits.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkLocations.length - 1].getHosts()));
27                     }
28                 } else if(length != 0L) {
29                     splits.add(new FileSplit(path, 0L, length, blkLocations[0].getHosts()));
30                 } else {
31                     splits.add(new FileSplit(path, 0L, length, new String[0]));
32                 }
33             }
34 
35             job.getConfiguration().setLong("mapreduce.input.num.files", (long)files.size());
36             LOG.debug("Total # of splits: " + splits.size());
37             return splits;
38         }
39     }

第17行使用computeSplitSize(blockSize,minSize,maxsize)計算分片大小。

a.minSize通過以下方式計算:

 long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job))

而getFormatMinSplitSize():

protected long getFormatMinSplitSize() {
        return 1L;
    }

而getMinSplitSize(job):

 public static long getMinSplitSize(JobContext job) {
        return job.getConfiguration().getLong("mapred.min.split.size", 1L);
    }

沒有設置“mapred.min.split.size”的默認值是0。

所以,不設置“mapred.min.split.size”的話,就使用方法的默認值1代替,而“mapred.min.split.size”的默認值是0,所以minSize的值就是1

b.再看maxSize的計算方式:

   long maxSize = getMaxSplitSize(job);

而getMaxSplitSize():

public static long getMaxSplitSize(JobContext context) {
        return context.getConfiguration().getLong("mapred.max.split.size", 9223372036854775807L);
    }

   沒有設置"mapred.max.split.size"的話,就使用方法的默認值 9223372036854775807,而"mapred.max.split.size"並沒有默認值,所以maxSize= 9223372036854775807;

c.我們已經能夠計算出minSize=1,maxSize= 9223372036854775807,接下來計算分片大小:

 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
    }

   顯然,分片大小是就是maxSize和blockSize的較小值(minSize=1),那么我們就可以通過設置"mapred.max.split.size"來控制map的數量,只要設置值比物理塊小就可以了。使用configuration對象的設置方法如下:

conf.set("mapred.max.split.size",2000000)//單位是字節,物理塊是16M

3)可以設置map數量的矩陣乘法代碼如下所示:

  1 /**
  2  * Created with IntelliJ IDEA.
  3  * User: hadoop
  4  * Date: 16-3-14
  5  * Time: 下午3:13
  6  * To change this template use File | Settings | File Templates.
  7  */
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.fs.FileSystem;
 10 import java.io.IOException;
 11 import java.net.URI;
 12 import org.apache.hadoop.fs.Path;
 13 import org.apache.hadoop.io.*;
 14 import org.apache.hadoop.io.DoubleWritable;
 15 import org.apache.hadoop.io.Writable;
 16 import org.apache.hadoop.mapreduce.InputSplit;
 17 import org.apache.hadoop.mapreduce.Job;
 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 19 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 20 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 21 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 23 import org.apache.hadoop.mapreduce.Reducer;
 24 import org.apache.hadoop.mapreduce.Mapper;
 25 import org.apache.hadoop.filecache.DistributedCache;
 26 import org.apache.hadoop.util.ReflectionUtils;
 27 
 28 public class MutiDoubleInputMatrixProduct {
 29 
 30     public static void initDoubleArrayWritable(int length,DoubleWritable[] doubleArrayWritable){
 31         for (int i=0;i<length;++i){
 32             doubleArrayWritable[i]=new DoubleWritable(0.0);
 33         }
 34     }
 35 
 36     public static  class MyMapper extends Mapper<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
 37         public DoubleArrayWritable map_value=new DoubleArrayWritable();
 38         public  double[][] leftMatrix=null;/******************************************/
 39         //public Object obValue=null;
 40         public DoubleWritable[] arraySum=null;
 41         public DoubleWritable[] tempColumnArrayDoubleWritable=null;
 42         public DoubleWritable[] tempRowArrayDoubleWritable=null;
 43         public double sum=0;
 44         public double uValue;
 45         public int leftMatrixRowNum;
 46         public int leftMatrixColumnNum;
 47         public void setup(Context context) throws IOException {
 48             Configuration conf=context.getConfiguration();
 49             leftMatrixRowNum=conf.getInt("leftMatrixRowNum",10);
 50             leftMatrixColumnNum=conf.getInt("leftMatrixColumnNum",10);
 51             leftMatrix=new double[leftMatrixRowNum][leftMatrixColumnNum];
 52             uValue=(double)(context.getConfiguration().getFloat("u",1.0f));
 53             tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum];
 54             initDoubleArrayWritable(leftMatrixColumnNum,tempRowArrayDoubleWritable);
 55             tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum];
 56             initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable);
 57             System.out.println("map setup() start!");
 58             //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
 59             Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf);
 60             String localCacheFile="file://"+cacheFiles[0].toString();
 61             //URI[] cacheFiles=DistributedCache.getCacheFiles(conf);
 62             //DistributedCache.
 63             System.out.println("local path is:"+cacheFiles[0].toString());
 64             // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration());
 65             FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf);
 66             SequenceFile.Reader reader=null;
 67             reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf);
 68             IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
 69             DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
 70             //int valueLength=0;
 71             int rowIndex=0;
 72             int index;
 73             while (reader.next(key,value)){
 74                 index=-1;
 75                 for (Writable val:value.get()){ //ArrayWritable類的get方法返回Writable[]數組
 76                     tempRowArrayDoubleWritable[++index].set(((DoubleWritable)val).get());
 77                 }
 78                 //obValue=value.toArray();
 79                 rowIndex=key.get();
 80                 leftMatrix[rowIndex]=new double[leftMatrixColumnNum];
 81                 //this.leftMatrix=new double[valueLength][Integer.parseInt(context.getConfiguration().get("leftMatrixColumnNum"))];
 82                 for (int i=0;i<leftMatrixColumnNum;++i){
 83                     //leftMatrix[rowIndex][i]=Double.parseDouble(Array.get(obValue, i).toString());
 84                     //leftMatrix[rowIndex][i]=Array.getDouble(obValue, i);
 85                     leftMatrix[rowIndex][i]= tempRowArrayDoubleWritable[i].get();
 86                 }
 87 
 88             }
 89             arraySum=new DoubleWritable[leftMatrix.length];
 90             initDoubleArrayWritable(leftMatrix.length,arraySum);
 91         }
 92         public void map(IntWritable key,DoubleArrayWritable value,Context context) throws IOException, InterruptedException {
 93             //obValue=value.toArray();
 94             InputSplit inputSplit=context.getInputSplit();
 95             String fileName=((FileSplit)inputSplit).getPath().getName();
 96             if (fileName.startsWith("FB")) {
 97                 context.write(key,value);
 98             }
 99             else{
100                 int ii=-1;
101                 for(Writable val:value.get()){
102                     tempColumnArrayDoubleWritable[++ii].set(((DoubleWritable)val).get());
103                 }
104                 //arraySum=new DoubleWritable[this.leftMatrix.length];
105                 for (int i=0;i<this.leftMatrix.length;++i){
106                     sum=0;
107                     for (int j=0;j<this.leftMatrix[0].length;++j){
108                         //sum+= this.leftMatrix[i][j]*Double.parseDouble(Array.get(obValue,j).toString())*(double)(context.getConfiguration().getFloat("u",1f));
109                         //sum+= this.leftMatrix[i][j]*Array.getDouble(obValue,j)*uValue;
110                         sum+= this.leftMatrix[i][j]*tempColumnArrayDoubleWritable[j].get()*uValue;
111                     }
112                     arraySum[i].set(sum);
113                     //arraySum[i].set(sum);
114                 }
115                 map_value.set(arraySum);
116                 context.write(key,map_value);
117             }
118         }
119     }
120     public static class MyReducer extends Reducer<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{
121         public DoubleWritable[] sum=null;
122         // public Object obValue=null;
123         public DoubleArrayWritable valueArrayWritable=new DoubleArrayWritable();
124         public DoubleWritable[] tempColumnArrayDoubleWritable=null;
125         private int leftMatrixRowNum;
126 
127         public void setup(Context context){
128             //leftMatrixColumnNum=context.getConfiguration().getInt("leftMatrixColumnNum",100);
129             leftMatrixRowNum=context.getConfiguration().getInt("leftMatrixRowNum",100);
130             sum=new DoubleWritable[leftMatrixRowNum];
131             initDoubleArrayWritable(leftMatrixRowNum,sum);
132             //tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum];
133             tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum];
134             initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable);
135         }
136         //如果矩陣的計算已經在map中完成了,貌似可以不使用reduce,如果不創建reduce類,MR框架仍然會調用一個默認的reduce,只是這個reduce什么也不做
137         //但是,不使用reduce的話,map直接寫文件,有多少個map就會產生多少個結果文件。這里使用reduce是為了將結果矩陣存儲在一個文件中。
138         public void reduce(IntWritable key,Iterable<DoubleArrayWritable>value,Context context) throws IOException, InterruptedException {
139             //int valueLength=0;
140             for(DoubleArrayWritable doubleValue:value){
141                 int index=-1;
142                 for (Writable val:doubleValue.get()){
143                     tempColumnArrayDoubleWritable[++index].set(((DoubleWritable)val).get());
144                 }
145                 //valueLength=Array.getLength(obValue);
146                 /*
147                 for (int i=0;i<leftMatrixRowNum;++i){
148                     //sum[i]=new DoubleWritable(Double.parseDouble(Array.get(obValue,i).toString())+sum[i].get());
149                     //sum[i]=new DoubleWritable(Array.getDouble(obValue,i)+sum[i].get());
150                     sum[i].set(tempColumnArrayDoubleWritable[i].get()+sum[i].get());
151                 }
152                 */
153             }
154             //valueArrayWritable.set(sum);
155             valueArrayWritable.set(tempColumnArrayDoubleWritable);
156             context.write(key,valueArrayWritable);
157             /*
158             for (int i=0;i<sum.length;++i){
159                 sum[i].set(0.0);
160             }
161             */
162 
163         }
164     }
165 
166     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
167         String uri=args[3];
168         String outUri=args[4];
169         String cachePath=args[2];
170         HDFSOperator.deleteDir(outUri);
171         Configuration conf=new Configuration();
172         DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式緩存
173         /**************************************************/
174         //FileSystem fs=FileSystem.get(URI.create(uri),conf);
175         //fs.delete(new Path(outUri),true);
176         /*********************************************************/
177         conf.setInt("leftMatrixColumnNum",Integer.parseInt(args[0]));
178         conf.setInt("leftMatrixRowNum",Integer.parseInt(args[1]));
179         conf.setFloat("u",1.0f);
180         //conf.set("mapred.map.tasks",args[5]);
181         //int mxSplitSize=Integer.valueOf(args[5])
182         conf.set("mapred.max.split.size",args[5]);//hadoop1.2.1中並沒有setNumMapTasks方法,只能通過這種方式控制計算分片的大小來控制map數量
183         conf.set("mapred.jar","MutiDoubleInputMatrixProduct.jar");
184         Job job=new Job(conf,"MatrixProdcut");
185         job.setJarByClass(MutiDoubleInputMatrixProduct.class);
186         job.setInputFormatClass(SequenceFileInputFormat.class);
187         job.setOutputFormatClass(SequenceFileOutputFormat.class);
188         job.setMapperClass(MyMapper.class);
189         job.setReducerClass(MyReducer.class);
190         job.setMapOutputKeyClass(IntWritable.class);
191         job.setMapOutputValueClass(DoubleArrayWritable.class);
192         job.setOutputKeyClass(IntWritable.class);
193         job.setOutputValueClass(DoubleArrayWritable.class);
194         FileInputFormat.setInputPaths(job, new Path(uri));
195         FileOutputFormat.setOutputPath(job,new Path(outUri));
196         System.exit(job.waitForCompletion(true)?0:1);
197     }
198 
199 
200 }
201 class DoubleArrayWritable extends ArrayWritable {
202     public DoubleArrayWritable(){
203         super(DoubleWritable.class);
204     }
205 /*
206     public String toString(){
207         StringBuilder sb=new StringBuilder();
208         for (Writable val:get()){
209             DoubleWritable doubleWritable=(DoubleWritable)val;
210             sb.append(doubleWritable.get());
211             sb.append(",");
212         }
213         sb.deleteCharAt(sb.length()-1);
214         return sb.toString();
215     }
216 */
217 }
218 
219 class HDFSOperator{
220     public static boolean deleteDir(String dir)throws IOException{
221         Configuration conf=new Configuration();
222         FileSystem fs =FileSystem.get(conf);
223         boolean result=fs.delete(new Path(dir),true);
224         System.out.println("sOutput delete");
225         fs.close();
226         return result;
227     }
228 }

4)接下來說說如何斷點調試hadoop源碼,這里以計算文件分片的源碼為例來說明。

a.首先找到FileInputFormat類,這個類就在hadoop-core-1.2.1.jar中,我們需要將這個jar包添加到工程中,如下所示:

   雖然這是編譯之后的類文件,也就是字節碼,但是仍然可以像java源碼一樣,斷點調試,這里我們分別在getSplits()方法和computeSplitSize()方法中添加兩個斷點,然后使用IDEA在本地直接以Debug方式運行我們的MapReduce程序,結果如下所示:

命中斷點,並且我們可以查看相關的變量值。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM