一、 為什么javaBean要繼承Writable和WritableComparable接口?
1. 如果一個javaBean想要作為MapReduce的key或者value,就一定要實現序列化,因為在Map到Reduce階段的時候,只能是傳輸二進制數據,不可能將字符流直接進行RPC傳輸,
只要一個javabean實現了序列化和反序列化,就可以做為key或者value
最簡單的序列化和反序列化就是實現Writable接口
ps:javaBean在作為key的時候有點不同,除了要繼承Writable接口還需要實現Comparable接口
因為在shuffle到Reduce階段的合並階段,需要根據key對數據進行排序,合並,如果不實現這個接口,運行時會出錯
WritableComparable就是Writable接口和java.lang.Comparable<T>的一個子接口,所以將要作為key的javaBean直接繼承WritableComparable就可以了
2. java序列化與Writable序列化的比較
2.1 java序列化不夠靈活,為了更好的控制序列化的整個流程所以使用Writable
2.2 java序列化不符合序列化的標准,沒有做一定的壓縮,java序列化首先寫類名,然后再是整個類的數據,而且成員對象在序列化中只存引用,成員對象的可以出現的位置很隨機,既可以在序列化的對象前,也可以在其后面,這樣就對隨機訪問造成影響,一旦出錯,整個后面的序列化就會全部錯誤
2.3 Java序列化每次序列化都要重新創建對象,內存消耗大,而Writable是可以重用的
二、 實現Writable和WritableComparable的UserBean
代碼如下:
package com.qjx.serialize_8_2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class UserBean implements WritableComparable<UserBean> { private int id; private String name ; private String age; public UserBean() { } public UserBean(int id,String name , String age) { this.id = id; this.name = name; this.age = age; } @Override public String toString() { return this.id + this.name + this.age; } //反序列化,將輸入二進制反序列化為字符流 @Override public void readFields(DataInput in) throws IOException { id = in.readInt(); name = in.readUTF(); age = in.readUTF(); } //序列化,將字節轉化為二進制輸出 @Override public void write(DataOutput out) throws IOException { out.writeInt(id); out.writeUTF(name); out.writeUTF(age); } @Override public int compareTo(UserBean o) { int thisValue = this.id; int thatValue = o.id; return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAge() { return age; } public void setAge(String age) { this.age = age; } }
三、 MapReduce傳遞UserBean的一個簡單例子
我們已經實現了可序列化的UserBean類,現在就做一個簡單的例子,在MapReduce中傳遞UserBean
1. 准備一個文件user.txt,內容如下:
1 'tom' '22',2 'tom2' '22',3 'tom3' '22',4 'tom4' '22',5 'tom5' '22',6 'tom6' '22',7 'tom7' '22',8 'tom8' '22',9 'tom9' '22',10 'tom10' '22',11 'tom11' '22',12 'tom12' '22',13 'tom13' '22',1 'tom' '22',1 'tom' '22',2 'tom2' '22',2 'tom2' '22',
這個文件中有多個UserBean,我們的MapReduce就是要實現統計這些UserBean出現的次數
2. WCMapper.java的實現代碼:
package com.qjx.serialize_8_2; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* * Writable接口是一個實現了序列化協議的序列化對象。 * 在Hadoop中定義一個結構化對象都要實現Writable接口,使得該結構化對象可以序列化為字節流,字節流也可以反序列化為結構化對象。 * LongWritable類型:Hadoop.io對Long類型的封裝類型 */ public class WCMapper extends Mapper<LongWritable, Text, UserBean, LongWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, UserBean, LongWritable>.Context context) throws IOException, InterruptedException { // 獲得每行文檔內容,並且進行折分 String[] users = value.toString().split(","); // 遍歷折份的內容 System.out.println(users.length); for (String u1 : users) { //根據空格划分為三個屬性 String[] u = u1.toString().split(" "); System.out.println(u.length); if(u!=null && u.length== 3) { UserBean u2 = new UserBean(Integer.parseInt(u[0]),u[1],u[2]); context.write(u2, new LongWritable(1)); } else { System.out.println("user split false !"); } } } }
3. WCReducer.java實現代碼:
package com.qjx.serialize_8_2; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.qjx.serialize_8_2.UserBean; public class WCReducer extends Reducer<UserBean, LongWritable, UserBean, LongWritable>{ @Override protected void reduce(UserBean key, Iterable<LongWritable> values, Reducer<UserBean, LongWritable, UserBean, LongWritable>.Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable i : values) { // i.get轉換成long類型 sum += i.get(); } // 輸出總計結果 context.write(key, new LongWritable(sum)); } }
4. UserCount.java 的實現代碼:
package com.qjx.serialize_8_2; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class UserCount { public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { // 創建job對象 Job job = Job.getInstance(new Configuration()); // 指定程序的入口 job.setJarByClass(UserCount.class); // 指定自定義的Mapper階段的任務處理類 job.setMapperClass(WCMapper.class); job.setMapOutputKeyClass(UserBean.class); job.setMapOutputValueClass(LongWritable.class); // 本地數據的輸入路徑 FileInputFormat.setInputPaths(job, new Path("E:/trainingPack/serialize/input")); // 指定自定義的Reducer階段的任務處理類 job.setReducerClass(WCReducer.class); // 設置最后輸出結果的Key和Value的類型 x job.setOutputKeyClass(UserBean.class); job.setOutputValueClass(LongWritable.class); // 將計算的結果存到本地 FileOutputFormat.setOutputPath(job, new Path("E:/trainingPack/serialize/output")); // 執行提交job方法,直到完成,參數true打印進度和詳情 job.waitForCompletion(true); System.out.println("Finished"); } }
5. 執行結果,生成的output內容如下:
1'tom''22' 3 2'tom2''22' 3 3'tom3''22' 1 4'tom4''22' 1 5'tom5''22' 1 6'tom6''22' 1 7'tom7''22' 1 8'tom8''22' 1 9'tom9''22' 1 10'tom10''22' 1 11'tom11''22' 1 12'tom12''22' 1 13'tom13''22' 1