Hadoop(十一)Hadoop IO之序列化與比較功能實現詳解


前言

  上一篇給大家介紹了Hadoop是怎么樣保證數據的完整性的,並且使用Java程序來驗證了會產生.crc的校驗文件。這一篇給大家分享的是Hadoop的序列化!

一、序列化和反序列化概述

1.1、序列化和反序列化的定義

  1)序列化:將結構化對象轉換為字節流的過程,以便在網絡上傳輸或寫入到磁盤進行永久存儲的過程。
  2)反序列化:將字節流轉回一系列的相反過程結構化對象。

  注意:其實流就是字節數組,我們把數據轉變成一系列的字節數組(0101這樣的數據)

1.2、序列化和反序列化的應用

  1)進程間的通信

  2)持久化存儲

1.3、RPC序列化格式要求

  在Hadoop中,系統中多個節點上進程間的通信是通過“遠程過程調用(RPC)”實現的。RPC協議將消息序列化成 二進制流后發送到遠程節點,遠程節點

  將二進制流反序列化為原始信息。通常情況下,RPC序列化格式如下:

    1)緊湊(compact)

      緊湊格式能充分利用網絡帶寬。

    2)快速(Fast)

      進程間通信形成了分布式系統的骨架,所以需要盡量減少序列化和反序列化的性能開銷,這是基本..最基本的。

    3)可擴展(Extensible)

      為了滿足新的需求,協議不斷變化。所以控制客戶端和服務器的過程中,需要直接引進相應的協議。

    4)支持互操作(Interoperable)

      對於某些系統來說,希望能支持以不同語言寫的客戶端與服務器交互,所以需要設計需要一種特定的格式來滿足這一需求。

二、Hadoop中和虛序列化相關的接口和類

  在Java中將一個類寫為可以序列化的類是實現Serializable接口

  在Hadoop中將一個類寫為可以序列化的類是實現Writable接口,它是一個最頂級的接口。

1.1、Hadoop對基本數據類型的包裝

  Hadoop參照JDK里面的數據類型實現了自己的數據類型,Hadoop自己實現的原理會使數據更緊湊一些,效率會高一些。序列化之后的字節數組大小會比

  JDK序列化出來的更小一些。

  所有Java基本類型的可寫包裝器,除了char(可以是存儲在IntWritable中)。所有的都有一個get()和set()方法來檢索和存儲包裝值。  

  

  Java中的String對應着Hadoop中的Text,Text可以存儲2G的字符串大小。

1.2、Writable接口

  1)Writable接口概述

    

  2)接口中的方法

    Writable接口定義了兩個方法:

      一個將其狀態寫到DataOutput二進制流,另一個從DataInput二進制流讀取狀態。

    

  3)API中Writable接口的例子:   

 public class MyWritable implements Writable {
       // Some data     
       private int counter;
       private long timestamp;
       
       public void write(DataOutput out) throws IOException {
         out.writeInt(counter);
         out.writeLong(timestamp);
       }
       
       public void readFields(DataInput in) throws IOException {
         counter = in.readInt();
         timestamp = in.readLong();
       }
       
       public static MyWritable read(DataInput in) throws IOException {
         MyWritable w = new MyWritable();
         w.readFields(in);
         return w;
       }
     }

  思考:在Java中已經有序列化和反序列化相關的類和方法,為什么Hadoop還要去自己設計一套呢?

    因為Hadoop認為Java設計的序列化和反序列化相關的類和方法性能不夠好,效率太低了。所以就自己設計一套。

  4)Writable的繼承關系

  

1.3、實例解釋Java和Hadoop數據類型序列化的差別

  1)核心代碼

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;

    //測試使用Hadoop序列化和JDK序列化之間的區別
public class SerializationCompare_0010{
    //Writable是Hadoop中所有數據類型的父類(父接口)。
    public static byte[] serialize(Writable writable) throws IOException{
        //這是一種編程思想,因為我們返回的是一個字節數組,所以進行了一下流的轉換。
        ByteArrayOutputStream baos=
            new ByteArrayOutputStream();
        ObjectOutputStream oos=
            new ObjectOutputStream(baos);
        writable.write(oos);
        oos.close();
        return baos.toByteArray();
    }

    //能序列化的一定是類類型,所以這里使用int類型的包裝類
    public static byte[] serialize(Integer integer) throws IOException{
        ByteArrayOutputStream baos=
            new ByteArrayOutputStream();
        ObjectOutputStream oos=
            new ObjectOutputStream(baos);
        oos.writeInt(integer);
        oos.close();
        return baos.toByteArray();
    }

    public static Writable deserialize(byte[] bytes) throws IOException{
        ByteArrayInputStream bais=
            new ByteArrayInputStream(bytes);
        DataInputStream dis=
            new DataInputStream(bais);
        IntWritable iw=new IntWritable();
        iw.readFields(dis);
        return iw;
    }

    public static void main(String[] args) throws IOException{
        IntWritable iw=new IntWritable(200);
     //hadoop也可以使用set方法傳值
        // iw.set(300);
        byte[] bytes=serialize(iw);
        System.out.println("Hadoop:"+bytes.length);
        //Writable deIw=deserialize(bytes);
        //System.out.println("Hadoop Deserialize:"+deIw);

        Integer integer=new Integer(200);
        bytes=serialize(integer);
        System.out.println("Java:"+bytes.length);
    }
}
SerializationCompare_0010

  2)測試結果

    其實這里雖然是字節數組長度相同,但是在大數據中,其實是Hadoop占優勢的。

1.4、在Hadoop中寫一個序列化的類

  1)核心代碼

import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

public class StudentDemo_0010{
    public static void main(String[] args) throws IOException{
        Student student=new Student();
        student.setId(new IntWritable(10));
        student.setName(new Text("Lance"));
        student.setGender(true);

        ByteArrayOutputStream baos=
            new ByteArrayOutputStream();
        DataOutputStream dos=
            new DataOutputStream(baos);
        student.write(dos);
        byte[] data=baos.toByteArray();
        System.out.println(Arrays.toString(data));
        System.out.println(data.length);

        // 將data進行反序列化?
    }
}

class Student implements Writable{
    private IntWritable id;
    private Text name;
    private boolean gender;
    private List<Text> list=new ArrayList<>();

    Student(){
        id=new IntWritable();
        name=new Text();
    }

    /**
     *
     * @param student
     */
    Student(Student student){
        // 在Hadoop中這屬於引用復制,完全杜絕這種現象
        //this.id=student.id;
        //this.name=student.name;
        // 在Hadoop中要使用屬性值的復制
        id=new IntWritable(student.id.get());
        name=new Text(student.name.toString());
    }

    @Override
    public void write(DataOutput out) throws IOException{
        id.write(out);
        name.write(out);
        BooleanWritable gender=
            new BooleanWritable(this.gender);
        gender.write(out);
        // 在Hadoop中序列化Java中所對應的集合的時候,
        // 應該現將集合的長度進行序列化,然后將集合中的
        // 每一個元素進行序列化
        int size=list.size();
        new IntWritable(size).write(out);
        for(int i=0;i<size;i++){
            Text text=list.get(i);
            text.write(out);
        }
    }

    @Override
    public void readFields(DataInput in) throws IOException{
        id.readFields(in);
        name.readFields(in);
        BooleanWritable bw=new BooleanWritable();
        bw.readFields(in);
        gender=bw.get();
        // 在反序列化集合的時候應該先反序列化集合的長度
        IntWritable size=new IntWritable();
        size.readFields(in);
        // 再反序列化流中所對應的結合中的每一個元素
        list.clear();
        for(int i=0;i<size.get();i++){
            Text text=new Text();
            text.readFields(in);
            list.add(text);// 此步驟有沒有問題???
        }
    }

    public IntWritable getId(){
        return id;
    }

    public void setId(IntWritable id){
        this.id=id;
    }

    public Text getName(){
        return name;
    }

    public void setName(Text name){
        this.name=name;
    }

    public boolean isGender(){
        return gender;
    }

    public void setGender(boolean gender){
        this.gender=gender;
    }

    public List<Text> getList(){
        return list;
    }

    public void setList(List<Text> list){
        this.list=list;
    }
}
StudentDemo_0010

  2)測試執行:

      

      注意:" 第一部分":代表的是id,占四個字節。

         “第二部分”:代表的是name,首先5是代表字符的長度,后面是字符的ASCII碼。

            注意如果將name的值改為中文,比如“二蛋子”如果是GBK編碼就會占6個字節,如果是UTF-8編碼就會占9個字節。  

         “第三部分”:代表的是gender,1表示ture,0表示false。

         “第四部分”:在我們list中的size,雖然這里沒有數據,但是int類型的仍然會占4個字節數。

            

四、Hadoop中和比較相關的接口和類

4.1、WritableComparable<T>接口

  1)概述

    繼承了兩個接口

    

  2)相關方法

    繼承過來的三個方法

    

4.2、RawComparator<T>接口

  1)概述

    

  2)相關方法

    除了Comparator中繼承的兩個方法,它自己也定義了一個方法有6個參數,這是在字節流的層面上去做比較。(第一個參數:指定字節數組,第二個參數:從哪里開始比較,第三個參數:比較多長)

    

  在考慮到使用RawComparator比較不方便,有出現了一個實現類。

4.3、WritableComparator類

  1)概述

    

  2)構造方法

    

  3)相關方法

    截取了部分

    

 

  介紹了上面的類和這些方法,我們Hadoop中有實現了一些既可以序列化也可以比較的類:

  

  那我們如果自定義一個類型去實現比較的功能呢?在我們前面寫了一個Student的類,它具有序列化的功能,那怎么樣才能有比較的功能呢?

  在Java中如果讓一個類的對象具有可比較性
    1)實現Comparable接口
    2)編寫獨立的比較器,Comparator

  而在Hadoop如果你要實現比較的功能有:

    

  從上面的圖中可以看出:

    要是一個類具有比較功能又有序列化的功能可以去實現WritableComparable接口,如果你要一個類只要有比較功能

    可以去寫一個比較器用RawComparator或WritableComparator。

    總的來說最好還是去實現WritableComparable接口,因為又有序列化的功能又有比較的功能

五、Hadoop實現序列化和比較功能

功能分析:

    

5.1、核心代碼

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

public class P00120_AccountWritable_0010{
    public static void main(String[] args){
        AccountWritable aw1=new AccountWritable();
        aw1.set(new IntWritable(30),new Text("zyh"),new BooleanWritable(true));

        AccountWritable aw2=new AccountWritable();
        aw2.set(new IntWritable(30),new Text("zyh"),new BooleanWritable(true));

        AccountWritable.DiyComparator comparator=new AccountWritable.DiyComparator();
        System.out.println(comparator.compare(aw1,aw2));
    }
}

class AccountWritable
    implements WritableComparable<AccountWritable>{

    private IntWritable code;
    private Text name;
    private BooleanWritable gender;

    AccountWritable(){
        code=new IntWritable();
        name=new Text();
        gender=new BooleanWritable();
    }

    // 把參數類型和類類型相同的構造器,叫復制構造器
    AccountWritable(AccountWritable aw){
        code=new IntWritable(aw.getCode().get());
        name=new Text(aw.getName().toString());
        gender=new BooleanWritable(aw.getGender().get());
    }

    public void set(IntWritable code,Text name,BooleanWritable gender){
        this.code=new IntWritable(code.get());
        this.name=new Text(name.toString());
        this.gender=new BooleanWritable(gender.get());
    }

    @Override
    public int compareTo(AccountWritable o){
        /*return this.code.compareTo(o.code)!=0?code.compareTo(o.code):
            (name.compareTo(o.name)!=0?name.compareTo(o.name):(this.gender.compareTo(o.gender)!=0?gender.compareTo(o.gender):0));*/
        int comp=this.code.compareTo(o.code);
        if(comp!=0){
            return comp;
        }else{
            comp=this.name.compareTo(o.name);
            if(comp!=0){
                return comp;
            }else{
                comp=this.gender.compareTo(o.gender);
                if(comp!=0){
                    return comp;
                }else{
                    return 0;
                }
            }
        }
    }

    @Override
    public void write(DataOutput out) throws IOException{
        code.write(out);
        name.write(out);
        gender.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException{
        code.readFields(in);
        name.readFields(in);
        gender.readFields(in);
    }
  
  
   //實現一個比較器
    static class DiyComparator
        implements RawComparator<AccountWritable>{

        private IntWritable.Comparator ic=
            new Comparator();
        private Text.Comparator tc=
            new Text.Comparator();
        private BooleanWritable.Comparator bc=
            new BooleanWritable.Comparator();

        @Override
        public int compare(byte[] b1,int s1,int l1,byte[] b2,int s2,int l2){
            // code被序列化后在b1和b2數組中的起始位置以及字節長度
            int firstLength=4;
            int secondLength=4;

            int firstStart=s1;
            int secondStart=s2;

            int firstOffset=0;
            int secondOffset=0;

            // 比較字節流中的code部分
            int comp=ic.compare(
                b1,firstStart,firstLength,
                b2,secondStart,secondLength);
            if(comp!=0){
                return comp;
            }else{
                try{
                    // 獲取記錄字符串的起始位置
                    firstStart=firstStart+firstLength;
                    secondStart=secondStart+secondLength;
                    // 獲取記錄字符串長度的VIntWritable的值的長度,被稱為offset
                    firstOffset=WritableUtils.decodeVIntSize(b1[firstStart]);
                    secondOffset=WritableUtils.decodeVIntSize(b2[secondStart]);
                    // 獲取字符串的長度
                    firstLength=readLengthValue(b1,firstStart);
                    secondLength=readLengthValue(b2,secondStart);
                }catch(IOException e){
                    e.printStackTrace();
                }
                // 比較字節流中的name部分
                comp=tc.compare(b1,firstStart+firstOffset,firstLength,b2,secondStart+secondOffset,secondLength);
                if(comp!=0){
                    return comp;
                }else{
                    firstStart+=(firstOffset+firstLength);
                    secondStart+=(secondOffset+secondLength);
                    firstLength=1;
                    secondLength=1;
                    // 比較字節流中的gender部分
                    return bc.compare(b1,firstStart,firstLength,b2,secondStart,secondLength);
                }
            }
        }

        private int readLengthValue(
            byte[] bytes,int start) throws IOException{
            DataInputStream dis=
                new DataInputStream(
                    new ByteArrayInputStream(
                        bytes,start,WritableUtils.decodeVIntSize(bytes[start])));
            VIntWritable viw=new VIntWritable();
            viw.readFields(dis);
            return viw.get();
        }

        @Override
        public int compare(AccountWritable o1,AccountWritable o2){
            ByteArrayOutputStream baos1=new ByteArrayOutputStream();
            DataOutputStream dos1=new DataOutputStream(baos1);

            ByteArrayOutputStream baos2=new ByteArrayOutputStream();
            DataOutputStream dos2=new DataOutputStream(baos2);

            try{
                o1.write(dos1);
                o2.write(dos2);

                dos1.close();
                dos2.close();

                byte[] b1=baos1.toByteArray();
                byte[] b2=baos2.toByteArray();

                return compare(b1,0,b1.length,b2,0,b2.length);
            }catch(IOException e){
                e.printStackTrace();
            }
            return 0;
        }
    }

    public IntWritable getCode(){
        return code;
    }

    public void setCode(IntWritable code){
        this.code=code;
    }

    public Text getName(){
        return name;
    }

    public void setName(Text name){
        this.name=name;
    }

    public BooleanWritable getGender(){
        return gender;
    }

    public void setGender(BooleanWritable gender){
        this.gender=gender;
    }
}
AccountWritable  

   注意如果一個類即實現了WritableComparatable接口又寫了比較器,優先使用比較器。

 

喜歡就點個“推薦”!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM