1 package com.fuge.bigdata.datahub.analysis 2 3 import java.io.{DataInput, DataOutput} 4 5 import com.fuge.bigdata.tools.common.utils.SparkUtils 6 import org.apache.hadoop.io.{NullWritable, WritableComparable} 7 import org.apache.spark.SparkContext 8 9 /** 10 * Created by chen xiang on 18-6-13. 11 * 一個使用SequenceFile進行存儲讀取的使用示例 12 */ 13 object SequenceFileUsage { 14 def main(args: Array[String]): Unit = { 15 16 require(args.length == 1) 17 // 構建SparkContext對象,封裝過,單獨運行,自行修改后定義 18 val sc = new SparkContext(SparkUtils.getSparkConf("SequenceFileUsage")) 19 20 // 獲取路徑參數 21 val path = args(0).trim 22 23 // 定義測試數據 24 val studentList = List(Student("01", "abc"), Student("02", "baby"), Student("03", "xiang")) 25 26 // 序列化測試數據到RDD,並寫入到bos 27 sc.parallelize(studentList) 28 .repartition(1) 29 // 以NullWritable 為key,構建kv結構.SequenceFile需要kv結構才能存儲,NullWritable不占存儲 30 .map(NullWritable.get() -> _) 31 // 壓縮參數可選用 32 .saveAsSequenceFile(s"$path", Option(classOf[GzipCodec])) 33 34 // 讀取剛才寫入的數據 35 val studentRdd = sc.sequenceFile(s"$path/part-*", classOf[NullWritable], classOf[Student]) 36 .map { 37 // 讀取數據,並且重新賦值對象 38 case (_, y) => Student(y.id, y.name) 39 } 40 .persist() 41 42 studentRdd 43 .foreach(x => println("count: " + x.id + "\t" + x.name)) 44 } 45 } 46 47 case class Student(var id: String, var name: String) extends WritableComparable[Student] { 48 /** 49 * 重寫無參構造函數,用於反序列化時的反射操作 50 */ 51 def this() { 52 this("", "") 53 } 54 55 /** 56 * 繼承Comparable接口需要實現的方法,用於比較兩個對象的大小 57 */ 58 override def compareTo(o: Student): Int = { 59 var cmp = id compareTo o.id 60 if (cmp == 0) { 61 cmp = name compareTo o.name 62 } 63 cmp 64 } 65 66 /** 67 * 繼承Writable接口需要實現的方法-反序列化讀取結果,並且賦值到對象字段 68 * 注意要和write的順序一致 69 */ 70 override def readFields(in: DataInput): Unit = { 71 name = in.readUTF() 72 id = in.readUTF() 73 println("count: " + "\t id = " + id + "\t name = " + name) 74 } 75 76 /** 77 * 繼承Writable接口需要實現的方法-序列化寫操作,將對象字段值寫入序列化 78 * 注意要和readFields的順序一致 79 */ 80 override def write(out: DataOutput): Unit = { 81 out.writeUTF(id) 82 out.writeUTF(name) 83 } 84 }
補充:
1. 自定義的類需要進行序列化,必須都要實現Writable,一般情況下采用實現WritableComparable的方式,並且實現comparaTo,readFields, write方法,並且提供一個無參構造函數
2. readFields和write方法,里面字段的順序要保持一致
3. 遇到集合類型,序列化時需要先將集合長度寫進去,然后再挨個寫集合數據
4. 遇到集合類型,反序列化時需要先讀取集合的長度,然后接收數據,如果集合數據類型是自定義類型,還需要先實例化一個無參構造,然后賦值。
5. SequenceFile需要使用KV結構才能調用存儲,可以使用一個NullWritable來占位,上訴例子中的K值就是使用的NullWritable進行的
6. sequenceFile序列化后占用的存儲空間比較大,有需要的話,可以在存儲的時候加上壓縮算法,具體使用方式可以見上訴的例子