前言
上一篇給大家介紹了Hadoop是怎么樣保證數據的完整性的,並且使用Java程序來驗證了會產生.crc的校驗文件。這一篇給大家分享的是Hadoop的序列化!
一、序列化和反序列化概述
1.1、序列化和反序列化的定義
1)序列化:將結構化對象轉換為字節流的過程,以便在網絡上傳輸或寫入到磁盤進行永久存儲的過程。
2)反序列化:將字節流轉回一系列的相反過程結構化對象。
注意:其實流就是字節數組,我們把數據轉變成一系列的字節數組(0101這樣的數據)
1.2、序列化和反序列化的應用
1)進程間的通信
2)持久化存儲
1.3、RPC序列化格式要求
在Hadoop中,系統中多個節點上進程間的通信是通過“遠程過程調用(RPC)”實現的。RPC協議將消息序列化成 二進制流后發送到遠程節點,遠程節點
將二進制流反序列化為原始信息。通常情況下,RPC序列化格式如下:
1)緊湊(compact)
緊湊格式能充分利用網絡帶寬。
2)快速(Fast)
進程間通信形成了分布式系統的骨架,所以需要盡量減少序列化和反序列化的性能開銷,這是基本..最基本的。
3)可擴展(Extensible)
為了滿足新的需求,協議不斷變化。所以控制客戶端和服務器的過程中,需要直接引進相應的協議。
4)支持互操作(Interoperable)
對於某些系統來說,希望能支持以不同語言寫的客戶端與服務器交互,所以需要設計需要一種特定的格式來滿足這一需求。
二、Hadoop中和虛序列化相關的接口和類
在Java中將一個類寫為可以序列化的類是實現Serializable接口
在Hadoop中將一個類寫為可以序列化的類是實現Writable接口,它是一個最頂級的接口。
1.1、Hadoop對基本數據類型的包裝
Hadoop參照JDK里面的數據類型實現了自己的數據類型,Hadoop自己實現的原理會使數據更緊湊一些,效率會高一些。序列化之后的字節數組大小會比
JDK序列化出來的更小一些。
所有Java基本類型的可寫包裝器,除了char(可以是存儲在IntWritable中)。所有的都有一個get()和set()方法來檢索和存儲包裝值。

Java中的String對應着Hadoop中的Text,Text可以存儲2G的字符串大小。
1.2、Writable接口
1)Writable接口概述

2)接口中的方法
Writable接口定義了兩個方法:
一個將其狀態寫到DataOutput二進制流,另一個從DataInput二進制流讀取狀態。

3)API中Writable接口的例子:
public class MyWritable implements Writable { // Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public static MyWritable read(DataInput in) throws IOException { MyWritable w = new MyWritable(); w.readFields(in); return w; } }
思考:在Java中已經有序列化和反序列化相關的類和方法,為什么Hadoop還要去自己設計一套呢?
因為Hadoop認為Java設計的序列化和反序列化相關的類和方法性能不夠好,效率太低了。所以就自己設計一套。
4)Writable的繼承關系

1.3、實例解釋Java和Hadoop數據類型序列化的差別
1)核心代碼
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; import java.io.ObjectOutputStream; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; //測試使用Hadoop序列化和JDK序列化之間的區別 public class SerializationCompare_0010{ //Writable是Hadoop中所有數據類型的父類(父接口)。 public static byte[] serialize(Writable writable) throws IOException{ //這是一種編程思想,因為我們返回的是一個字節數組,所以進行了一下流的轉換。 ByteArrayOutputStream baos= new ByteArrayOutputStream(); ObjectOutputStream oos= new ObjectOutputStream(baos); writable.write(oos); oos.close(); return baos.toByteArray(); } //能序列化的一定是類類型,所以這里使用int類型的包裝類 public static byte[] serialize(Integer integer) throws IOException{ ByteArrayOutputStream baos= new ByteArrayOutputStream(); ObjectOutputStream oos= new ObjectOutputStream(baos); oos.writeInt(integer); oos.close(); return baos.toByteArray(); } public static Writable deserialize(byte[] bytes) throws IOException{ ByteArrayInputStream bais= new ByteArrayInputStream(bytes); DataInputStream dis= new DataInputStream(bais); IntWritable iw=new IntWritable(); iw.readFields(dis); return iw; } public static void main(String[] args) throws IOException{ IntWritable iw=new IntWritable(200); //hadoop也可以使用set方法傳值 // iw.set(300); byte[] bytes=serialize(iw); System.out.println("Hadoop:"+bytes.length); //Writable deIw=deserialize(bytes); //System.out.println("Hadoop Deserialize:"+deIw); Integer integer=new Integer(200); bytes=serialize(integer); System.out.println("Java:"+bytes.length); } }
2)測試結果
其實這里雖然是字節數組長度相同,但是在大數據中,其實是Hadoop占優勢的。
1.4、在Hadoop中寫一個序列化的類
1)核心代碼
import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; public class StudentDemo_0010{ public static void main(String[] args) throws IOException{ Student student=new Student(); student.setId(new IntWritable(10)); student.setName(new Text("Lance")); student.setGender(true); ByteArrayOutputStream baos= new ByteArrayOutputStream(); DataOutputStream dos= new DataOutputStream(baos); student.write(dos); byte[] data=baos.toByteArray(); System.out.println(Arrays.toString(data)); System.out.println(data.length); // 將data進行反序列化? } } class Student implements Writable{ private IntWritable id; private Text name; private boolean gender; private List<Text> list=new ArrayList<>(); Student(){ id=new IntWritable(); name=new Text(); } /** * * @param student */ Student(Student student){ // 在Hadoop中這屬於引用復制,完全杜絕這種現象 //this.id=student.id; //this.name=student.name; // 在Hadoop中要使用屬性值的復制 id=new IntWritable(student.id.get()); name=new Text(student.name.toString()); } @Override public void write(DataOutput out) throws IOException{ id.write(out); name.write(out); BooleanWritable gender= new BooleanWritable(this.gender); gender.write(out); // 在Hadoop中序列化Java中所對應的集合的時候, // 應該現將集合的長度進行序列化,然后將集合中的 // 每一個元素進行序列化 int size=list.size(); new IntWritable(size).write(out); for(int i=0;i<size;i++){ Text text=list.get(i); text.write(out); } } @Override public void readFields(DataInput in) throws IOException{ id.readFields(in); name.readFields(in); BooleanWritable bw=new BooleanWritable(); bw.readFields(in); gender=bw.get(); // 在反序列化集合的時候應該先反序列化集合的長度 IntWritable size=new IntWritable(); size.readFields(in); // 再反序列化流中所對應的結合中的每一個元素 list.clear(); for(int i=0;i<size.get();i++){ Text text=new Text(); text.readFields(in); list.add(text);// 此步驟有沒有問題??? } } public IntWritable getId(){ return id; } public void setId(IntWritable id){ this.id=id; } public Text getName(){ return name; } public void setName(Text name){ this.name=name; } public boolean isGender(){ return gender; } public void setGender(boolean gender){ this.gender=gender; } public List<Text> getList(){ return list; } public void setList(List<Text> list){ this.list=list; } }
2)測試執行:

注意:" 第一部分":代表的是id,占四個字節。
“第二部分”:代表的是name,首先5是代表字符的長度,后面是字符的ASCII碼。
注意如果將name的值改為中文,比如“二蛋子”如果是GBK編碼就會占6個字節,如果是UTF-8編碼就會占9個字節。
“第三部分”:代表的是gender,1表示ture,0表示false。
“第四部分”:在我們list中的size,雖然這里沒有數據,但是int類型的仍然會占4個字節數。

四、Hadoop中和比較相關的接口和類
4.1、WritableComparable<T>接口
1)概述
繼承了兩個接口

2)相關方法
繼承過來的三個方法

4.2、RawComparator<T>接口
1)概述

2)相關方法
除了Comparator中繼承的兩個方法,它自己也定義了一個方法有6個參數,這是在字節流的層面上去做比較。(第一個參數:指定字節數組,第二個參數:從哪里開始比較,第三個參數:比較多長)

在考慮到使用RawComparator比較不方便,有出現了一個實現類。
4.3、WritableComparator類
1)概述

2)構造方法

3)相關方法
截取了部分

介紹了上面的類和這些方法,我們Hadoop中有實現了一些既可以序列化也可以比較的類:

那我們如果自定義一個類型去實現比較的功能呢?在我們前面寫了一個Student的類,它具有序列化的功能,那怎么樣才能有比較的功能呢?
在Java中如果讓一個類的對象具有可比較性
1)實現Comparable接口
2)編寫獨立的比較器,Comparator
而在Hadoop如果你要實現比較的功能有:

從上面的圖中可以看出:
要是一個類具有比較功能又有序列化的功能可以去實現WritableComparable接口,如果你要一個類只要有比較功能
可以去寫一個比較器用RawComparator或WritableComparator。
總的來說最好還是去實現WritableComparable接口,因為又有序列化的功能又有比較的功能。
五、Hadoop實現序列化和比較功能
功能分析:
5.1、核心代碼
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable.Comparator; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; public class P00120_AccountWritable_0010{ public static void main(String[] args){ AccountWritable aw1=new AccountWritable(); aw1.set(new IntWritable(30),new Text("zyh"),new BooleanWritable(true)); AccountWritable aw2=new AccountWritable(); aw2.set(new IntWritable(30),new Text("zyh"),new BooleanWritable(true)); AccountWritable.DiyComparator comparator=new AccountWritable.DiyComparator(); System.out.println(comparator.compare(aw1,aw2)); } } class AccountWritable implements WritableComparable<AccountWritable>{ private IntWritable code; private Text name; private BooleanWritable gender; AccountWritable(){ code=new IntWritable(); name=new Text(); gender=new BooleanWritable(); } // 把參數類型和類類型相同的構造器,叫復制構造器 AccountWritable(AccountWritable aw){ code=new IntWritable(aw.getCode().get()); name=new Text(aw.getName().toString()); gender=new BooleanWritable(aw.getGender().get()); } public void set(IntWritable code,Text name,BooleanWritable gender){ this.code=new IntWritable(code.get()); this.name=new Text(name.toString()); this.gender=new BooleanWritable(gender.get()); } @Override public int compareTo(AccountWritable o){ /*return this.code.compareTo(o.code)!=0?code.compareTo(o.code): (name.compareTo(o.name)!=0?name.compareTo(o.name):(this.gender.compareTo(o.gender)!=0?gender.compareTo(o.gender):0));*/ int comp=this.code.compareTo(o.code); if(comp!=0){ return comp; }else{ comp=this.name.compareTo(o.name); if(comp!=0){ return comp; }else{ comp=this.gender.compareTo(o.gender); if(comp!=0){ return comp; }else{ return 0; } } } } @Override public void write(DataOutput out) throws IOException{ code.write(out); name.write(out); gender.write(out); } @Override public void readFields(DataInput in) throws IOException{ code.readFields(in); name.readFields(in); gender.readFields(in); } //實現一個比較器 static class DiyComparator implements RawComparator<AccountWritable>{ private IntWritable.Comparator ic= new Comparator(); private Text.Comparator tc= new Text.Comparator(); private BooleanWritable.Comparator bc= new BooleanWritable.Comparator(); @Override public int compare(byte[] b1,int s1,int l1,byte[] b2,int s2,int l2){ // code被序列化后在b1和b2數組中的起始位置以及字節長度 int firstLength=4; int secondLength=4; int firstStart=s1; int secondStart=s2; int firstOffset=0; int secondOffset=0; // 比較字節流中的code部分 int comp=ic.compare( b1,firstStart,firstLength, b2,secondStart,secondLength); if(comp!=0){ return comp; }else{ try{ // 獲取記錄字符串的起始位置 firstStart=firstStart+firstLength; secondStart=secondStart+secondLength; // 獲取記錄字符串長度的VIntWritable的值的長度,被稱為offset firstOffset=WritableUtils.decodeVIntSize(b1[firstStart]); secondOffset=WritableUtils.decodeVIntSize(b2[secondStart]); // 獲取字符串的長度 firstLength=readLengthValue(b1,firstStart); secondLength=readLengthValue(b2,secondStart); }catch(IOException e){ e.printStackTrace(); } // 比較字節流中的name部分 comp=tc.compare(b1,firstStart+firstOffset,firstLength,b2,secondStart+secondOffset,secondLength); if(comp!=0){ return comp; }else{ firstStart+=(firstOffset+firstLength); secondStart+=(secondOffset+secondLength); firstLength=1; secondLength=1; // 比較字節流中的gender部分 return bc.compare(b1,firstStart,firstLength,b2,secondStart,secondLength); } } } private int readLengthValue( byte[] bytes,int start) throws IOException{ DataInputStream dis= new DataInputStream( new ByteArrayInputStream( bytes,start,WritableUtils.decodeVIntSize(bytes[start]))); VIntWritable viw=new VIntWritable(); viw.readFields(dis); return viw.get(); } @Override public int compare(AccountWritable o1,AccountWritable o2){ ByteArrayOutputStream baos1=new ByteArrayOutputStream(); DataOutputStream dos1=new DataOutputStream(baos1); ByteArrayOutputStream baos2=new ByteArrayOutputStream(); DataOutputStream dos2=new DataOutputStream(baos2); try{ o1.write(dos1); o2.write(dos2); dos1.close(); dos2.close(); byte[] b1=baos1.toByteArray(); byte[] b2=baos2.toByteArray(); return compare(b1,0,b1.length,b2,0,b2.length); }catch(IOException e){ e.printStackTrace(); } return 0; } } public IntWritable getCode(){ return code; } public void setCode(IntWritable code){ this.code=code; } public Text getName(){ return name; } public void setName(Text name){ this.name=name; } public BooleanWritable getGender(){ return gender; } public void setGender(BooleanWritable gender){ this.gender=gender; } }
注意如果一個類即實現了WritableComparatable接口又寫了比較器,優先使用比較器。
喜歡就點個“推薦”!
