自定義數據類型寫入SequenceFile並讀出


    開頭對這邊博客的內容做個概述,首先是定義了一個DoubleArrayWritable的類,用於存放矩陣的列向量,然后將其作為value寫入SequenceFile中,key就是對應的矩陣的列號,最后(key,value)從SequenceFile中讀出,與另一矩陣做乘法。完全通過IDEA在本地調試程序,並未提交集群。一般來說是將hadoop-core-1.2.1.jar和lib目錄下的commons-cli-1.2.jar兩個包加入到工程的classpath中就可以了,不過僅僅添加這兩個包,調試的時候會提示找不到某些類的定義,所以索性將hadoop-core-1.2.1.jar和lib目錄下的所有jar包均添加到工程的classpath中,這樣完全不必提交到集群就可以在本地調試程序。

1)首先是定義DoubleArrayWritable類,這個類繼承與ArrayWritable。

1 import org.apache.hadoop.io.IntWritable;
2 import org.apache.hadoop.io.ArrayWritable;
3 public class IntArrayWritable extends ArrayWritable {
4     public IntArrayWritable(){
5         super(IntWritable.class);
6     }
7 }

因為要讀取SequenceFile中的(key,value)傳給map,所以需要以4-6的形式顯示定義構造函數。

2)然后是將DoubleArrayWritable類型的對象作為value寫入SequenceFile,使用SequenceFile.writer

/**
 * Created with IntelliJ IDEA.
 * User: hadoop
 * Date: 16-3-4
 * Time: 上午10:36
 * To change this template use File | Settings | File Templates.
 */
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.ArrayWritable;
public class SequenceFileWriterDemo {
    public static void main(String[] args) throws IOException {
        String uri="/home/hadoop/2016Test/SeqTest/10IntArray";
        Configuration conf=new Configuration();
        FileSystem fs=FileSystem.get(URI.create(uri),conf);
        Path path=new Path(uri);
        IntWritable key=new IntWritable();
        IntArrayWritable value=new IntArrayWritable();//定義IntArrayWritable類型的alue值。
        value.set(new IntWritable[]{new IntWritable(1),new IntWritable(2),new IntWritable(3),
                new IntWritable(4)});
        SequenceFile.Writer writer=null;
        writer=SequenceFile.createWriter(fs,conf,path,key.getClass(),value.getClass());
        int i=0;
        while(i<10){
               key.set(i++);
               //value.set(intArray);
               writer.append(key,value);
            }
        writer.close();//一定要加上這句,否則寫入SequenceFile會失敗,結果是一個空文件。
        System.out.println("done!");
    }
}
 class IntArrayWritable extends ArrayWritable {
    public IntArrayWritable(){
        super(IntWritable.class);
    }
}

這就完成了一個10行4列的矩陣寫入SequenceFile文件在,其中key是矩陣行號,value是IntArrayWritable類型的變量。

3)將生成的SequenceFile上傳到集群,然后查看其內容,使用命令(需要將IntArrayWritable類打包並將其路徑加入到hadoop_env.sh中HADOOP_CLASSPATH中)如下:

hadoop fs -text /testData/10IntArray

結果如下:

好像哪里不對?應該是[1,2,3,4]數組呀。其實是對的,寫入SequenceFile中時就是將”活對象“持久化存儲的過程,也就是序列化,所以當我們以文本的方式(-text)打開文件時,就看到了IntArrayWritable...的形式。如果想要看數組也可以,反序列化就好了。

4)使用SequenceFile.reader讀取上述SequenceFile文件的內容,我要看到數組~~~,代碼如下:

/**
 * Created with IntelliJ IDEA.
 * User: hadoop
 * Date: 16-3-4
 * Time: 下午5:41
 * To change this template use File | Settings | File Templates.
 */
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
//import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;
import java.net.URI;

public class SequencefileReaderDemo {
    public static void main(String[] args) throws IOException {
        String uri="/home/hadoop/2016Test/SeqTest/10IntArray";
        Configuration conf=new Configuration();
        FileSystem fs =FileSystem.get(URI.create(uri),conf);
        Path path=new Path(uri);
        SequenceFile.Reader reader=null;
        try {
            reader=new SequenceFile.Reader(fs,path,conf);
            Writable key =(Writable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
            IntArrayWritable value=(IntArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
            long position=reader.getPosition();
            String[] sValue=null;
            while(reader.next(key,value)){
                String syncSeen=reader.syncSeen()?"*":"";
                sValue=value.toStrings();
                System.out.printf("[%s%s]\t%s\t%s\t",position,syncSeen,key,value);
                for (String s:sValue){
                    System.out.printf("%s\t", s);
                }
                System.out.println();
                position=reader.getPosition();
            }
        }
        finally {
                IOUtils.closeStream(reader);
        }
    }

}

運行結果如下:

 

5)最后,利用上述生成的SequenceFile文件作為左矩陣,寫一個MR程序計算矩陣的乘法,代碼如下:

 1 /**
 2  * Created with IntelliJ IDEA.
 3  * User: hadoop
 4  * Date: 16-3-4
 5  * Time: 上午10:34
 6  * To change this template use File | Settings | File Templates.
 7  */
 8 import org.apache.hadoop.conf.Configuration;
 9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.io.IntWritable;
15 import org.apache.hadoop.mapreduce.lib.input.*;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
18 
19 import java.io.IOException;
20 import java.lang.reflect.Array;
21 import java.net.URI;
22 
23 
24 public class MRTest {
25     public static class MyMapper extends Mapper<IntWritable,IntArrayWritable,IntWritable,IntArrayWritable>{
26         public static  int[][] rightMatrix=new int[][]{{10,10,10,10,10},{10,10,10,10,10},{10,10,10,10,10},{10,10,10,10,10}};
27         public IntWritable key=new IntWritable();
28         public IntArrayWritable value=new IntArrayWritable();
29         //public IntWritable[] valueInput=null;
30         public Object valueObject=null;
31         public IntWritable[] arraySum=new IntWritable[rightMatrix[0].length];
32         public int sum=0;
33         public void map(IntWritable key,IntArrayWritable value,Context context) throws IOException, InterruptedException {
34             valueObject=value.toArray();//value.toArray的返回值是一個Object類型的對象,但是Object內部值是數組呀
35                                         //使用Array.get(valueObject,3)可以得到數組中第4個元素,然后將其轉化為string,再使用
36                                         //Integer.parseInt(str)將其轉化為整型值.
37             for (int i=0;i<rightMatrix[0].length;++i){
38                 sum=0;
39                 for (int j=0;j<rightMatrix.length;++j){
40                     sum+=(Integer.parseInt(((Array.get(valueObject,j)).toString())))*rightMatrix[j][i];
41                 }
42                 arraySum[i]=new IntWritable(sum);
43             }
44             value.set(arraySum);
45             context.write(key,value);
46         }
47     }
48     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
49         String uri="/home/hadoop/2016Test/SeqTest/10IntArray";
50         String outUri="/home/hadoop/2016Test/SeqTest/output";
51         Configuration conf=new Configuration();
52         FileSystem fs=FileSystem.get(URI.create(uri), conf);
53 
54         fs.delete(new Path(outUri),true);//輸出目錄存在的話就將其刪除。
55 
56         Job job=new Job(conf,"SeqMatrix");
57         job.setJarByClass(MRTest.class);
58         job.setMapperClass(MyMapper.class);
59         job.setInputFormatClass(SequenceFileInputFormat.class);
60         job.setOutputFormatClass(SequenceFileOutputFormat.class);
61         job.setOutputKeyClass(IntWritable.class);
62         job.setOutputValueClass(IntArrayWritable.class);
63         FileInputFormat.setInputPaths(job,new Path(uri));
64         FileOutputFormat.setOutputPath(job,new Path(outUri));
65         System.exit(job.waitForCompletion(true)?0:1);
66     }
67 
68 
69 }

其中,使用Array.get(object,index)從包含數組的Object對象內部獲得數組值的方法參考了:http://www.blogjava.net/pengpenglin/archive/2008/09/04/226968.html

最后的計算結果如下:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM