之前學習hadoop的時候,一直希望可以調試hadoop源碼,可是一直沒找到有效的方法,今天在調試矩陣乘法的時候發現了調試的方法,所以在這里記錄下來。
1)事情的起因是想在一個Job里設置map的數量(雖然最終的map數量是由分片決定的),在hadoop1.2.1之前,設置方法是:
job.setNumMapTasks()
不過,hadoop1.2.1沒有了這個方法,只保留了設置reduce數量的方法。繼續搜索資料,發現有同學提供了另外一種方法,就是使用configuration設置,設置方式如下:
conf.set("mapred.map.tasks",5);//設置5個map
按照上述方法設置之后,還是沒有什么效果,控制分片數量的代碼如下():
goalSize=totalSize/(numSplits==0?1:numSplits) //totalSize是輸入數據文件的大小,numSplits是用戶設置的map數量,就是按照用戶自己 //的意願,每個分片的大小應該是goalSize minSize=Math.max(job.getLong("mapred.min.split.size",1),minSplitSize) //hadoop1.2.1中mapred-default.xml文件中mapred.min.split.size=0,所以job.getLong("mapred.min.split.size",1)=0,而minSplitSize是InputSplit中的一個數據成員,在File//Split中值為1.所以minSize=1,其目的就是得到配置中的最小值。 splitSize=Math.max(minSize,Math.min(goalSize,blockSize)) //真正的分片大小就是取按照用戶設置的map數量計算出的goalSize和塊大小blockSize中最小值(這是為了是分片不會大於一個塊大小,有利於本地化計算),並且又比minSize大的值。
其實,這是hadoop1.2.1之前的生成分片的方式,所以即使設置了map數量也不會有什么實際效果。
2)新版API(hadoop1.2.1)中計算分片的代碼如下所示:
1 public List<InputSplit> getSplits(JobContext job) throws IOException { 2 long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); 3 long maxSize = getMaxSplitSize(job); 4 ArrayList splits = new ArrayList(); 5 List files = this.listStatus(job); 6 Iterator i$ = files.iterator(); 7 8 while(true) { 9 while(i$.hasNext()) { 10 FileStatus file = (FileStatus)i$.next(); 11 Path path = file.getPath(); 12 FileSystem fs = path.getFileSystem(job.getConfiguration()); 13 long length = file.getLen(); 14 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0L, length); 15 if(length != 0L && this.isSplitable(job, path)) { 16 long blockSize = file.getBlockSize(); 17 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); 18 19 long bytesRemaining; 20 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { 21 int blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); 22 splits.add(new FileSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); 23 } 24 25 if(bytesRemaining != 0L) { 26 splits.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkLocations.length - 1].getHosts())); 27 } 28 } else if(length != 0L) { 29 splits.add(new FileSplit(path, 0L, length, blkLocations[0].getHosts())); 30 } else { 31 splits.add(new FileSplit(path, 0L, length, new String[0])); 32 } 33 } 34 35 job.getConfiguration().setLong("mapreduce.input.num.files", (long)files.size()); 36 LOG.debug("Total # of splits: " + splits.size()); 37 return splits; 38 } 39 }
第17行使用computeSplitSize(blockSize,minSize,maxsize)計算分片大小。
a.minSize通過以下方式計算:
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job))
而getFormatMinSplitSize():
protected long getFormatMinSplitSize() { return 1L; }
而getMinSplitSize(job):
public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapred.min.split.size", 1L); }
沒有設置“mapred.min.split.size”的默認值是0。
所以,不設置“mapred.min.split.size”的話,就使用方法的默認值1代替,而“mapred.min.split.size”的默認值是0,所以minSize的值就是1
b.再看maxSize的計算方式:
long maxSize = getMaxSplitSize(job);
而getMaxSplitSize():
public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong("mapred.max.split.size", 9223372036854775807L); }
沒有設置"mapred.max.split.size"的話,就使用方法的默認值 9223372036854775807,而"mapred.max.split.size"並沒有默認值,所以maxSize= 9223372036854775807;
c.我們已經能夠計算出minSize=1,maxSize= 9223372036854775807,接下來計算分片大小:
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
顯然,分片大小是就是maxSize和blockSize的較小值(minSize=1),那么我們就可以通過設置"mapred.max.split.size"來控制map的數量,只要設置值比物理塊小就可以了。使用configuration對象的設置方法如下:
conf.set("mapred.max.split.size",2000000)//單位是字節,物理塊是16M
3)可以設置map數量的矩陣乘法代碼如下所示:
1 /** 2 * Created with IntelliJ IDEA. 3 * User: hadoop 4 * Date: 16-3-14 5 * Time: 下午3:13 6 * To change this template use File | Settings | File Templates. 7 */ 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import java.io.IOException; 11 import java.net.URI; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.*; 14 import org.apache.hadoop.io.DoubleWritable; 15 import org.apache.hadoop.io.Writable; 16 import org.apache.hadoop.mapreduce.InputSplit; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 19 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 20 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 21 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 23 import org.apache.hadoop.mapreduce.Reducer; 24 import org.apache.hadoop.mapreduce.Mapper; 25 import org.apache.hadoop.filecache.DistributedCache; 26 import org.apache.hadoop.util.ReflectionUtils; 27 28 public class MutiDoubleInputMatrixProduct { 29 30 public static void initDoubleArrayWritable(int length,DoubleWritable[] doubleArrayWritable){ 31 for (int i=0;i<length;++i){ 32 doubleArrayWritable[i]=new DoubleWritable(0.0); 33 } 34 } 35 36 public static class MyMapper extends Mapper<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{ 37 public DoubleArrayWritable map_value=new DoubleArrayWritable(); 38 public double[][] leftMatrix=null;/******************************************/ 39 //public Object obValue=null; 40 public DoubleWritable[] arraySum=null; 41 public DoubleWritable[] tempColumnArrayDoubleWritable=null; 42 public DoubleWritable[] tempRowArrayDoubleWritable=null; 43 public double sum=0; 44 public double uValue; 45 public int leftMatrixRowNum; 46 public int leftMatrixColumnNum; 47 public void setup(Context context) throws IOException { 48 Configuration conf=context.getConfiguration(); 49 leftMatrixRowNum=conf.getInt("leftMatrixRowNum",10); 50 leftMatrixColumnNum=conf.getInt("leftMatrixColumnNum",10); 51 leftMatrix=new double[leftMatrixRowNum][leftMatrixColumnNum]; 52 uValue=(double)(context.getConfiguration().getFloat("u",1.0f)); 53 tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum]; 54 initDoubleArrayWritable(leftMatrixColumnNum,tempRowArrayDoubleWritable); 55 tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum]; 56 initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable); 57 System.out.println("map setup() start!"); 58 //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration()); 59 Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf); 60 String localCacheFile="file://"+cacheFiles[0].toString(); 61 //URI[] cacheFiles=DistributedCache.getCacheFiles(conf); 62 //DistributedCache. 63 System.out.println("local path is:"+cacheFiles[0].toString()); 64 // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration()); 65 FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf); 66 SequenceFile.Reader reader=null; 67 reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf); 68 IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf); 69 DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf); 70 //int valueLength=0; 71 int rowIndex=0; 72 int index; 73 while (reader.next(key,value)){ 74 index=-1; 75 for (Writable val:value.get()){ //ArrayWritable類的get方法返回Writable[]數組 76 tempRowArrayDoubleWritable[++index].set(((DoubleWritable)val).get()); 77 } 78 //obValue=value.toArray(); 79 rowIndex=key.get(); 80 leftMatrix[rowIndex]=new double[leftMatrixColumnNum]; 81 //this.leftMatrix=new double[valueLength][Integer.parseInt(context.getConfiguration().get("leftMatrixColumnNum"))]; 82 for (int i=0;i<leftMatrixColumnNum;++i){ 83 //leftMatrix[rowIndex][i]=Double.parseDouble(Array.get(obValue, i).toString()); 84 //leftMatrix[rowIndex][i]=Array.getDouble(obValue, i); 85 leftMatrix[rowIndex][i]= tempRowArrayDoubleWritable[i].get(); 86 } 87 88 } 89 arraySum=new DoubleWritable[leftMatrix.length]; 90 initDoubleArrayWritable(leftMatrix.length,arraySum); 91 } 92 public void map(IntWritable key,DoubleArrayWritable value,Context context) throws IOException, InterruptedException { 93 //obValue=value.toArray(); 94 InputSplit inputSplit=context.getInputSplit(); 95 String fileName=((FileSplit)inputSplit).getPath().getName(); 96 if (fileName.startsWith("FB")) { 97 context.write(key,value); 98 } 99 else{ 100 int ii=-1; 101 for(Writable val:value.get()){ 102 tempColumnArrayDoubleWritable[++ii].set(((DoubleWritable)val).get()); 103 } 104 //arraySum=new DoubleWritable[this.leftMatrix.length]; 105 for (int i=0;i<this.leftMatrix.length;++i){ 106 sum=0; 107 for (int j=0;j<this.leftMatrix[0].length;++j){ 108 //sum+= this.leftMatrix[i][j]*Double.parseDouble(Array.get(obValue,j).toString())*(double)(context.getConfiguration().getFloat("u",1f)); 109 //sum+= this.leftMatrix[i][j]*Array.getDouble(obValue,j)*uValue; 110 sum+= this.leftMatrix[i][j]*tempColumnArrayDoubleWritable[j].get()*uValue; 111 } 112 arraySum[i].set(sum); 113 //arraySum[i].set(sum); 114 } 115 map_value.set(arraySum); 116 context.write(key,map_value); 117 } 118 } 119 } 120 public static class MyReducer extends Reducer<IntWritable,DoubleArrayWritable,IntWritable,DoubleArrayWritable>{ 121 public DoubleWritable[] sum=null; 122 // public Object obValue=null; 123 public DoubleArrayWritable valueArrayWritable=new DoubleArrayWritable(); 124 public DoubleWritable[] tempColumnArrayDoubleWritable=null; 125 private int leftMatrixRowNum; 126 127 public void setup(Context context){ 128 //leftMatrixColumnNum=context.getConfiguration().getInt("leftMatrixColumnNum",100); 129 leftMatrixRowNum=context.getConfiguration().getInt("leftMatrixRowNum",100); 130 sum=new DoubleWritable[leftMatrixRowNum]; 131 initDoubleArrayWritable(leftMatrixRowNum,sum); 132 //tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum]; 133 tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum]; 134 initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable); 135 } 136 //如果矩陣的計算已經在map中完成了,貌似可以不使用reduce,如果不創建reduce類,MR框架仍然會調用一個默認的reduce,只是這個reduce什么也不做 137 //但是,不使用reduce的話,map直接寫文件,有多少個map就會產生多少個結果文件。這里使用reduce是為了將結果矩陣存儲在一個文件中。 138 public void reduce(IntWritable key,Iterable<DoubleArrayWritable>value,Context context) throws IOException, InterruptedException { 139 //int valueLength=0; 140 for(DoubleArrayWritable doubleValue:value){ 141 int index=-1; 142 for (Writable val:doubleValue.get()){ 143 tempColumnArrayDoubleWritable[++index].set(((DoubleWritable)val).get()); 144 } 145 //valueLength=Array.getLength(obValue); 146 /* 147 for (int i=0;i<leftMatrixRowNum;++i){ 148 //sum[i]=new DoubleWritable(Double.parseDouble(Array.get(obValue,i).toString())+sum[i].get()); 149 //sum[i]=new DoubleWritable(Array.getDouble(obValue,i)+sum[i].get()); 150 sum[i].set(tempColumnArrayDoubleWritable[i].get()+sum[i].get()); 151 } 152 */ 153 } 154 //valueArrayWritable.set(sum); 155 valueArrayWritable.set(tempColumnArrayDoubleWritable); 156 context.write(key,valueArrayWritable); 157 /* 158 for (int i=0;i<sum.length;++i){ 159 sum[i].set(0.0); 160 } 161 */ 162 163 } 164 } 165 166 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 167 String uri=args[3]; 168 String outUri=args[4]; 169 String cachePath=args[2]; 170 HDFSOperator.deleteDir(outUri); 171 Configuration conf=new Configuration(); 172 DistributedCache.addCacheFile(URI.create(cachePath),conf);//添加分布式緩存 173 /**************************************************/ 174 //FileSystem fs=FileSystem.get(URI.create(uri),conf); 175 //fs.delete(new Path(outUri),true); 176 /*********************************************************/ 177 conf.setInt("leftMatrixColumnNum",Integer.parseInt(args[0])); 178 conf.setInt("leftMatrixRowNum",Integer.parseInt(args[1])); 179 conf.setFloat("u",1.0f); 180 //conf.set("mapred.map.tasks",args[5]); 181 //int mxSplitSize=Integer.valueOf(args[5]) 182 conf.set("mapred.max.split.size",args[5]);//hadoop1.2.1中並沒有setNumMapTasks方法,只能通過這種方式控制計算分片的大小來控制map數量 183 conf.set("mapred.jar","MutiDoubleInputMatrixProduct.jar"); 184 Job job=new Job(conf,"MatrixProdcut"); 185 job.setJarByClass(MutiDoubleInputMatrixProduct.class); 186 job.setInputFormatClass(SequenceFileInputFormat.class); 187 job.setOutputFormatClass(SequenceFileOutputFormat.class); 188 job.setMapperClass(MyMapper.class); 189 job.setReducerClass(MyReducer.class); 190 job.setMapOutputKeyClass(IntWritable.class); 191 job.setMapOutputValueClass(DoubleArrayWritable.class); 192 job.setOutputKeyClass(IntWritable.class); 193 job.setOutputValueClass(DoubleArrayWritable.class); 194 FileInputFormat.setInputPaths(job, new Path(uri)); 195 FileOutputFormat.setOutputPath(job,new Path(outUri)); 196 System.exit(job.waitForCompletion(true)?0:1); 197 } 198 199 200 } 201 class DoubleArrayWritable extends ArrayWritable { 202 public DoubleArrayWritable(){ 203 super(DoubleWritable.class); 204 } 205 /* 206 public String toString(){ 207 StringBuilder sb=new StringBuilder(); 208 for (Writable val:get()){ 209 DoubleWritable doubleWritable=(DoubleWritable)val; 210 sb.append(doubleWritable.get()); 211 sb.append(","); 212 } 213 sb.deleteCharAt(sb.length()-1); 214 return sb.toString(); 215 } 216 */ 217 } 218 219 class HDFSOperator{ 220 public static boolean deleteDir(String dir)throws IOException{ 221 Configuration conf=new Configuration(); 222 FileSystem fs =FileSystem.get(conf); 223 boolean result=fs.delete(new Path(dir),true); 224 System.out.println("sOutput delete"); 225 fs.close(); 226 return result; 227 } 228 }
4)接下來說說如何斷點調試hadoop源碼,這里以計算文件分片的源碼為例來說明。
a.首先找到FileInputFormat類,這個類就在hadoop-core-1.2.1.jar中,我們需要將這個jar包添加到工程中,如下所示:
雖然這是編譯之后的類文件,也就是字節碼,但是仍然可以像java源碼一樣,斷點調試,這里我們分別在getSplits()方法和computeSplitSize()方法中添加兩個斷點,然后使用IDEA在本地直接以Debug方式運行我們的MapReduce程序,結果如下所示:
命中斷點,並且我們可以查看相關的變量值。