開頭對這邊博客的內容做個概述,首先是定義了一個DoubleArrayWritable的類,用於存放矩陣的列向量,然后將其作為value寫入SequenceFile中,key就是對應的矩陣的列號,最后(key,value)從SequenceFile中讀出,與另一矩陣做乘法。完全通過IDEA在本地調試程序,並未提交集群。一般來說是將hadoop-core-1.2.1.jar和lib目錄下的commons-cli-1.2.jar兩個包加入到工程的classpath中就可以了,不過僅僅添加這兩個包,調試的時候會提示找不到某些類的定義,所以索性將hadoop-core-1.2.1.jar和lib目錄下的所有jar包均添加到工程的classpath中,這樣完全不必提交到集群就可以在本地調試程序。
1)首先是定義DoubleArrayWritable類,這個類繼承與ArrayWritable。
1 import org.apache.hadoop.io.IntWritable; 2 import org.apache.hadoop.io.ArrayWritable; 3 public class IntArrayWritable extends ArrayWritable { 4 public IntArrayWritable(){ 5 super(IntWritable.class); 6 } 7 }
因為要讀取SequenceFile中的(key,value)傳給map,所以需要以4-6的形式顯示定義構造函數。
2)然后是將DoubleArrayWritable類型的對象作為value寫入SequenceFile,使用SequenceFile.writer
/** * Created with IntelliJ IDEA. * User: hadoop * Date: 16-3-4 * Time: 上午10:36 * To change this template use File | Settings | File Templates. */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.net.URI; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.ArrayWritable; public class SequenceFileWriterDemo { public static void main(String[] args) throws IOException { String uri="/home/hadoop/2016Test/SeqTest/10IntArray"; Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(URI.create(uri),conf); Path path=new Path(uri); IntWritable key=new IntWritable(); IntArrayWritable value=new IntArrayWritable();//定義IntArrayWritable類型的alue值。 value.set(new IntWritable[]{new IntWritable(1),new IntWritable(2),new IntWritable(3), new IntWritable(4)}); SequenceFile.Writer writer=null; writer=SequenceFile.createWriter(fs,conf,path,key.getClass(),value.getClass()); int i=0; while(i<10){ key.set(i++); //value.set(intArray); writer.append(key,value); } writer.close();//一定要加上這句,否則寫入SequenceFile會失敗,結果是一個空文件。 System.out.println("done!"); } } class IntArrayWritable extends ArrayWritable { public IntArrayWritable(){ super(IntWritable.class); } }
這就完成了一個10行4列的矩陣寫入SequenceFile文件在,其中key是矩陣行號,value是IntArrayWritable類型的變量。
3)將生成的SequenceFile上傳到集群,然后查看其內容,使用命令(需要將IntArrayWritable類打包並將其路徑加入到hadoop_env.sh中HADOOP_CLASSPATH中)如下:
hadoop fs -text /testData/10IntArray
結果如下:
好像哪里不對?應該是[1,2,3,4]數組呀。其實是對的,寫入SequenceFile中時就是將”活對象“持久化存儲的過程,也就是序列化,所以當我們以文本的方式(-text)打開文件時,就看到了IntArrayWritable...的形式。如果想要看數組也可以,反序列化就好了。
4)使用SequenceFile.reader讀取上述SequenceFile文件的內容,我要看到數組~~~,代碼如下:
/** * Created with IntelliJ IDEA. * User: hadoop * Date: 16-3-4 * Time: 下午5:41 * To change this template use File | Settings | File Templates. */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; //import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.io.*; import org.apache.hadoop.util.ReflectionUtils; import java.io.IOException; import java.net.URI; public class SequencefileReaderDemo { public static void main(String[] args) throws IOException { String uri="/home/hadoop/2016Test/SeqTest/10IntArray"; Configuration conf=new Configuration(); FileSystem fs =FileSystem.get(URI.create(uri),conf); Path path=new Path(uri); SequenceFile.Reader reader=null; try { reader=new SequenceFile.Reader(fs,path,conf); Writable key =(Writable)ReflectionUtils.newInstance(reader.getKeyClass(),conf); IntArrayWritable value=(IntArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf); long position=reader.getPosition(); String[] sValue=null; while(reader.next(key,value)){ String syncSeen=reader.syncSeen()?"*":""; sValue=value.toStrings(); System.out.printf("[%s%s]\t%s\t%s\t",position,syncSeen,key,value); for (String s:sValue){ System.out.printf("%s\t", s); } System.out.println(); position=reader.getPosition(); } } finally { IOUtils.closeStream(reader); } } }
運行結果如下:
5)最后,利用上述生成的SequenceFile文件作為左矩陣,寫一個MR程序計算矩陣的乘法,代碼如下:
1 /** 2 * Created with IntelliJ IDEA. 3 * User: hadoop 4 * Date: 16-3-4 5 * Time: 上午10:34 6 * To change this template use File | Settings | File Templates. 7 */ 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.io.IntWritable; 15 import org.apache.hadoop.mapreduce.lib.input.*; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 18 19 import java.io.IOException; 20 import java.lang.reflect.Array; 21 import java.net.URI; 22 23 24 public class MRTest { 25 public static class MyMapper extends Mapper<IntWritable,IntArrayWritable,IntWritable,IntArrayWritable>{ 26 public static int[][] rightMatrix=new int[][]{{10,10,10,10,10},{10,10,10,10,10},{10,10,10,10,10},{10,10,10,10,10}}; 27 public IntWritable key=new IntWritable(); 28 public IntArrayWritable value=new IntArrayWritable(); 29 //public IntWritable[] valueInput=null; 30 public Object valueObject=null; 31 public IntWritable[] arraySum=new IntWritable[rightMatrix[0].length]; 32 public int sum=0; 33 public void map(IntWritable key,IntArrayWritable value,Context context) throws IOException, InterruptedException { 34 valueObject=value.toArray();//value.toArray的返回值是一個Object類型的對象,但是Object內部值是數組呀 35 //使用Array.get(valueObject,3)可以得到數組中第4個元素,然后將其轉化為string,再使用 36 //Integer.parseInt(str)將其轉化為整型值. 37 for (int i=0;i<rightMatrix[0].length;++i){ 38 sum=0; 39 for (int j=0;j<rightMatrix.length;++j){ 40 sum+=(Integer.parseInt(((Array.get(valueObject,j)).toString())))*rightMatrix[j][i]; 41 } 42 arraySum[i]=new IntWritable(sum); 43 } 44 value.set(arraySum); 45 context.write(key,value); 46 } 47 } 48 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 49 String uri="/home/hadoop/2016Test/SeqTest/10IntArray"; 50 String outUri="/home/hadoop/2016Test/SeqTest/output"; 51 Configuration conf=new Configuration(); 52 FileSystem fs=FileSystem.get(URI.create(uri), conf); 53 54 fs.delete(new Path(outUri),true);//輸出目錄存在的話就將其刪除。 55 56 Job job=new Job(conf,"SeqMatrix"); 57 job.setJarByClass(MRTest.class); 58 job.setMapperClass(MyMapper.class); 59 job.setInputFormatClass(SequenceFileInputFormat.class); 60 job.setOutputFormatClass(SequenceFileOutputFormat.class); 61 job.setOutputKeyClass(IntWritable.class); 62 job.setOutputValueClass(IntArrayWritable.class); 63 FileInputFormat.setInputPaths(job,new Path(uri)); 64 FileOutputFormat.setOutputPath(job,new Path(outUri)); 65 System.exit(job.waitForCompletion(true)?0:1); 66 } 67 68 69 }
其中,使用Array.get(object,index)從包含數組的Object對象內部獲得數組值的方法參考了:http://www.blogjava.net/pengpenglin/archive/2008/09/04/226968.html
最后的計算結果如下: