1、 在hadoop中所有的key/value都必須實現Writable接口,有兩個方法,分別用於讀(反序列化)和寫(序列化)操作。
參考代碼:

1 package org.dragon.hadoop.mapreduce.app; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.Writable; 8 9 /** 10 * 11 * @author ZhuXY 12 * @time 2016-3-10 下午3:49:55 13 * 14 */ 15 public class DataWritable implements Writable { 16 17 // telsphone 18 19 // upload 20 private int upPackNum; 21 private int upPayLoad; 22 23 // download 24 private int downPackNum; 25 private int downPayLoad; 26 27 public DataWritable() { 28 29 } 30 31 public void set(int upPackNum, int upPayLoad, int downPackNum, 32 int downPayload) { 33 this.upPackNum = upPackNum; 34 this.upPayLoad = upPayLoad; 35 this.downPackNum = downPackNum; 36 this.downPayLoad = downPayload; 37 38 } 39 40 public int getUpPackNum() { 41 return upPackNum; 42 } 43 44 public int getUpPayLoas() { 45 return upPayLoad; 46 } 47 48 public int getDownPackNum() { 49 return downPackNum; 50 } 51 52 public int getDownPayload() { 53 return downPayLoad; 54 } 55 56 @Override 57 public void write(DataOutput out) throws IOException { 58 out.writeInt(upPackNum); 59 out.writeInt(upPayLoad); 60 out.writeInt(downPackNum); 61 out.writeInt(downPayLoad); 62 } 63 64 /** 65 * 讀出的順序要和寫入的順序相同 66 */ 67 @Override 68 public void readFields(DataInput in) throws IOException { 69 // TODO Auto-generated method stub 70 this.upPackNum = in.readInt(); 71 this.upPayLoad = in.readInt(); 72 this.downPackNum = in.readInt(); 73 this.downPayLoad = in.readInt(); 74 } 75 76 @Override 77 public String toString() { 78 return upPackNum + "\t" + upPayLoad + "\t" + downPackNum + "\t" 79 + downPayLoad; 80 } 81 82 @Override 83 public int hashCode() { 84 final int prime = 31; 85 int result = 1; 86 result = prime * result + downPackNum; 87 result = prime * result + downPayLoad; 88 result = prime * result + upPackNum; 89 result = prime * result + upPayLoad; 90 return result; 91 } 92 93 @Override 94 public boolean equals(Object obj) { 95 if (this == obj) 96 return true; 97 if (obj == null) 98 return false; 99 if (getClass() != obj.getClass()) 100 return false; 101 DataWritable other = (DataWritable) obj; 102 if (downPackNum != other.downPackNum) 103 return false; 104 if (downPayLoad != other.downPayLoad) 105 return false; 106 if (upPackNum != other.upPackNum) 107 return false; 108 if (upPayLoad != other.upPayLoad) 109 return false; 110 return true; 111 } 112 113 }
2、所有的key必須實現Comparable接口,在MapReduce過程中需要對Key/Value對進行反復的排序。默認情況下依據Key進行排序的,要實現comparaTo()方法。所以通過Key既要實現Writable接口又要實現Comparable接口,Hadoop中提供了一個公共的接口,叫做WritableComparable接口:
3、由於需要序列化反序列化和進行比較,對java對象需要重寫一下幾個方法:
① equals();
② hashCode();
③ toString()方法
如IntWritable類型的實現:

1 package org.apache.hadoop.io; 2 3 import java.io.*; 4 5 /** A WritableComparable for ints. */ 6 public class IntWritable implements WritableComparable { 7 private int value; 8 9 public IntWritable() {} 10 11 public IntWritable(int value) { set(value); } 12 13 /** Set the value of this IntWritable. */ 14 public void set(int value) { this.value = value; } 15 16 /** Return the value of this IntWritable. */ 17 public int get() { return value; } 18 19 public void readFields(DataInput in) throws IOException { 20 value = in.readInt(); 21 } 22 23 public void write(DataOutput out) throws IOException { 24 out.writeInt(value); 25 } 26 27 /** Returns true iff <code>o</code> is a IntWritable with the same value. */ 28 public boolean equals(Object o) { 29 if (!(o instanceof IntWritable)) 30 return false; 31 IntWritable other = (IntWritable)o; 32 return this.value == other.value; 33 } 34 35 public int hashCode() { 36 return value; 37 } 38 39 /** Compares two IntWritables. */ 40 public int compareTo(Object o) { 41 int thisValue = this.value; 42 int thatValue = ((IntWritable)o).value; 43 return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); 44 } 45 46 public String toString() { 47 return Integer.toString(value); 48 }
4、數據類型,必須有一個無參的構造方法,為了方便反射創建對象。
在自定義數據類型中,建議使用java原生數據類型,最好不要使用hadoop對原生類型封裝好的數據類型,即如下樣例代碼:
推薦使用:
不建議使用:
5、問題:
當數據寫入磁盤時,如果要進行排序的話,需要首先從磁盤中讀取數據進行反序列化成對象,然后在內存中對反序列化的對象進行比較。
對字節(未經過反序列化字節)進行直接比較,不需要進行反序列化以后再比較呢?如果要實現上述功能,Hadoop數據類型需要實現一個接口RawComparator。
在Hadoop中有一個針對Writable數據類型,進行實現的一個通用實現類WritableComparator類。所有的數據類型,只需要繼承通用類,再去需要具體功能復寫相應的compara()方法。一下以IntWritable為例,查看一下:
對於自定義的Comparator類需要以下幾步:
1) 推薦Comparator類定義在數據類型內部,靜態內部類,實現WritableComparator類。
2) 重寫默認無參構造方法,方法內必須調用父類有參構造方法,如下截圖:
3) 重載父類的compare()方法,依據具體功能覆寫。
4) 向WritableComparator類中注冊自定義的Comparator類,代碼如下:
5、自定義數據類型
樣例代碼:

1 package org.dragon.hadoop.mr.io; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import org.apache.hadoop.io.WritableComparable; 7 import org.apache.hadoop.io.WritableComparator; 8 import org.apache.hadoop.io.WritableUtils; 9 10 /** 11 * 自定義數據類型對Writable的實現。 12 * 快捷鍵get、set選擇器alt+shift+s 13 * @author ZhuXY 14 * @time 2016-3-9 下午10:40:02 15 * 16 */ 17 /** 18 * 1、Hadoop之——數據類型 19 1) 在hadoop中所有的key/value都必須實現Writable接口,有兩個方法,分別用於讀(反序列化)和寫(序列化)操作。 20 2) 所有的key必須實現Comparable接口,在MapReduce過程中需要對Key/Value對進行反復的排序。默認情況下依據Key進行排序的,要實現comparaTo()方法。所以通過Key既要實現Writable接口又要實現Comparable接口,Hadoop中提供了一個公共的接口,叫做WritableComparable接口 21 3) 由於需要序列化反序列化和進行比較,對java對象需要重寫一下幾個方法: 22 ① equals(); 23 ② hashCode(); 24 ③ toString()方法 25 4) 數據類型,必須有一個無參的構造方法,為了方便反射創建對象。 26 5) 在自定義數據類型中,建議使用java原生數據類型,最好不要使用hadoop對原生類型封裝好的數據類型,即 27 28 */ 29 30 /** 31 * 問題: 32 當數據寫入磁盤時,如果要進行排序的話,需要首先從磁盤中讀取數據進行反序列化成對象,然后在內存中對反序列化的對象進行比較。 33 * 對字節(未經過反序列化字節)進行直接比較,不需要進行反序列化以后再比較呢?如果要實現上述功能,Hadoop數據類型需要實現一個接口RawComparator。 34 在Hadoop中有一個針對Writable數據類型,進行實現的一個通用實現類WritableComparator類。所有的數據類型,只需要繼承通用類,再去需要具體功能復寫相應的compara()方法。 35 對於自定義的Comparator類需要以下幾步: 36 1) 推薦Comparator類定義在數據類型內部,靜態內部類,實現WritableComparator類。 37 2) 重寫默認無參構造方法,方法內必須調用父類有參構造方法,如下截圖: 38 39 3) 重載父類的compare()方法,依據具體功能覆寫。 40 4) 向WritableComparator類中注冊自定義的Comparator類,代碼如下: 41 42 */ 43 44 /** 45 * WritableCOmparator是RawComparator對WritableComparable類的一個通用實現。它提供兩個主要的功能。 46 * 首先他提供了一個默認的對原始compare()函數的調用,對從數據流對要比較的對象進行反序列化,然后調用對象 47 * 的compare方法。 48 * 其次,他充當的是RawComparator實例的工廠方法(Writable方法已經注冊)。 49 * @author ZhuXY 50 * 51 */ 52 public class PairWritable implements WritableComparable<PairWritable> { 53 54 private String name;// Text 55 private Integer age;// IntWritale 56 57 public PairWritable() { 58 } 59 60 public PairWritable(String name, Integer age) { 61 this.set(name, age); 62 } 63 64 public void set(String name, Integer age) { 65 this.name = name; 66 this.age = age; 67 } 68 69 public String getName() { 70 return name; 71 } 72 73 public Integer getAge() { 74 return age; 75 } 76 77 /** 78 * write方法是在寫入數據時調用,進行序列化 79 */ 80 @Override 81 public void write(DataOutput out) throws IOException { 82 out.writeUTF(name); 83 out.writeInt(age); 84 } 85 86 /** 87 * readField()方法是在取出數據時調用的方法,反序列化方法 88 * 以便生成對象 89 */ 90 @Override 91 public void readFields(DataInput in) throws IOException { 92 this.name = in.readUTF(); 93 this.age = in.readInt(); 94 } 95 96 /** 97 * 98 hashCode 的常規協定是: 99 1)在 Java 應用程序執行期間,在同一對象上多次調用 hashCode 方法時,必須一致地返回相同的整數,前提是對象上 equals 比較中所用的信息沒有被修改。從某一應用程序的一次執行到同一應用程序的另一次執行,該整數無需保持一致。 100 2)如果根據 equals(Object) 方法,兩個對象是相等的,那么在兩個對象中的每個對象上調用 hashCode 方法都必須生成相同的整數結果。 101 3)以下情況不 是必需的:如果根據 equals(java.lang.Object) 方法,兩個對象不相等,那么在兩個對象中的任一對象上調用 hashCode 方法必定會生成不同的整數結果。但是,程序員應該知道,為不相等的對象生成不同整數結果可以提高哈希表的性能。 102 4)實際上,由 Object 類定義的 hashCode 方法確實會針對不同的對象返回不同的整數。(這一般是通過將該對象的內部地址轉換成一個整數來實現的,但是 JavaTM 編程語言不需要這種實現技巧。) 103 104 5)當equals方法被重寫時,通常有必要重寫 hashCode 方法,以維護 hashCode 方法的常規協定,該協定聲明相等對象必須具有相等的哈希碼。 105 */ 106 @Override 107 public int hashCode() { 108 return name.hashCode() * 31 + age.hashCode(); 109 } 110 111 @Override 112 public boolean equals(Object obj) { 113 if (obj instanceof PairWritable) { 114 PairWritable pairWritable = (PairWritable) obj; 115 116 return this.name.equals(pairWritable.getName()) 117 && this.age.equals(pairWritable.getAge()); 118 } 119 return false; 120 } 121 122 @Override 123 public String toString() { 124 // TODO Auto-generated method stub 125 return this.name+"\t"+this.age; 126 } 127 128 @Override 129 public int compareTo(PairWritable o) { 130 int cmp=this.name.compareTo(o.getName()); 131 if (cmp!=0) { 132 return cmp; 133 } 134 return this.age.compareTo(o.getAge()); 135 } 136 137 public static class Comparator extends WritableComparator{ 138 139 public Comparator(){ 140 super(PairWritable.class); 141 } 142 143 /** 144 * 第一個字節數組 145 * byte[] b1, int s1, int l1, 146 * 字節數組起始位置長度 147 * 148 * 第二個字節數組 149 * byte[] b2, int s2, int l2 150 * 字節數組的起始位置長度 151 */ 152 /** 153 * 154 * 核心: 155 * 這個接口允許執行者比較從流中讀取的未被反序列化為對象的記錄,從而省去了創建對象的所有開銷。 156 * 例如,IntWritables的comparator使用原始的compare()方法從每個字節數組的指定 157 * 開始位置(S1和S2)和長度(L1和L2)讀取整數(b1和b2),然后直接進行比較。 158 */ 159 @Override 160 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 161 int n1=WritableUtils.decodeVIntSize(b1[s1]); 162 int n2=WritableUtils.decodeVIntSize(b2[s2]); 163 164 int cmp=WritableComparator.compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2+n2); 165 166 if (0!=cmp) { 167 return cmp; 168 } 169 170 int thisValue=readInt(b1, l1-s1-n1); 171 int thatValue=readInt(b2, l2-s2-n2); 172 173 return (thisValue<thatValue ?-1:(thisValue==thatValue?0:1)); 174 } 175 static { 176 WritableComparator.define(PairWritable.class, new Comparator()); 177 } 178 } 179 }
通常情況下,實現一個靜態方法read(DataInput),用於構造數據類型的實例對象,方法內部調用readFields(DataInput)方法。
Hadoop MapReduce Data Type中所有的Key,必須實現WritableComparable接口,官方文檔說明如下:
比較器RawComparator,官方文檔說明如下:
6、注意NullWritable類型

1 package org.apache.hadoop.io; 2 3 import java.io.*; 4 5 /** Singleton Writable with no data. */ 6 public class NullWritable implements WritableComparable { 7 8 private static final NullWritable THIS = new NullWritable(); 9 10 private NullWritable() {} // no public ctor 11 12 /** Returns the single instance of this class. */ 13 public static NullWritable get() { return THIS; } 14 15 public String toString() { 16 return "(null)"; 17 } 18 19 public int hashCode() { return 0; } 20 public int compareTo(Object other) { 21 if (!(other instanceof NullWritable)) { 22 throw new ClassCastException("can't compare " + other.getClass().getName() 23 + " to NullWritable"); 24 } 25 return 0; 26 } 27 public boolean equals(Object other) { return other instanceof NullWritable; } 28 public void readFields(DataInput in) throws IOException {} 29 public void write(DataOutput out) throws IOException {} 30 31 /** A Comparator "optimized" for NullWritable. */ 32 public static class Comparator extends WritableComparator { 33 public Comparator() { 34 super(NullWritable.class); 35 } 36 37 /** 38 * Compare the buffers in serialized form. 39 */ 40 public int compare(byte[] b1, int s1, int l1, 41 byte[] b2, int s2, int l2) { 42 assert 0 == l1; 43 assert 0 == l2; 44 return 0; 45 } 46 } 47 48 static { // register this comparator 49 WritableComparator.define(NullWritable.class, new Comparator()); 50 } 51 }