Spark 實現自定義對象sequenceFile方式存儲,讀寫示例(scala編寫)


 1 package com.fuge.bigdata.datahub.analysis
 2 
 3 import java.io.{DataInput, DataOutput}
 4 
 5 import com.fuge.bigdata.tools.common.utils.SparkUtils
 6 import org.apache.hadoop.io.{NullWritable, WritableComparable}
 7 import org.apache.spark.SparkContext
 8 
 9 /**
10   * Created by chen xiang on 18-6-13.
11   * 一個使用SequenceFile進行存儲讀取的使用示例
12   */
13 object SequenceFileUsage {
14   def main(args: Array[String]): Unit = {
15 
16     require(args.length == 1)
17     // 構建SparkContext對象,封裝過,單獨運行,自行修改后定義
18     val sc = new SparkContext(SparkUtils.getSparkConf("SequenceFileUsage"))
19 
20     // 獲取路徑參數
21     val path = args(0).trim
22 
23     // 定義測試數據
24     val studentList = List(Student("01", "abc"), Student("02", "baby"), Student("03", "xiang"))
25 
26     // 序列化測試數據到RDD,並寫入到bos
27     sc.parallelize(studentList)
28       .repartition(1)
29       // 以NullWritable 為key,構建kv結構.SequenceFile需要kv結構才能存儲,NullWritable不占存儲
30       .map(NullWritable.get() -> _)
31 // 壓縮參數可選用
32       .saveAsSequenceFile(s"$path", Option(classOf[GzipCodec]))
33 
34     // 讀取剛才寫入的數據
35     val studentRdd = sc.sequenceFile(s"$path/part-*", classOf[NullWritable], classOf[Student])
36       .map {
37         // 讀取數據,並且重新賦值對象
38         case (_, y) => Student(y.id, y.name)
39       }
40       .persist()
41 
42     studentRdd
43       .foreach(x => println("count: " + x.id + "\t" + x.name))
44   }
45 }
46 
47 case class Student(var id: String, var name: String) extends WritableComparable[Student] {
48   /**
49     * 重寫無參構造函數,用於反序列化時的反射操作
50     */
51   def this() {
52     this("", "")
53   }
54 
55   /**
56     * 繼承Comparable接口需要實現的方法,用於比較兩個對象的大小
57     */
58   override def compareTo(o: Student): Int = {
59     var cmp = id compareTo o.id
60     if (cmp == 0) {
61       cmp = name compareTo o.name
62     }
63     cmp
64   }
65 
66   /**
67     * 繼承Writable接口需要實現的方法-反序列化讀取結果,並且賦值到對象字段
68     * 注意要和write的順序一致
69     */
70   override def readFields(in: DataInput): Unit = {
71     name = in.readUTF()
72     id = in.readUTF()
73     println("count: " + "\t id = " + id + "\t name = " + name)
74   }
75 
76   /**
77     * 繼承Writable接口需要實現的方法-序列化寫操作,將對象字段值寫入序列化
78     * 注意要和readFields的順序一致
79     */
80   override def write(out: DataOutput): Unit = {
81     out.writeUTF(id)
82     out.writeUTF(name)
83   }
84 }

 

補充:
1. 自定義的類需要進行序列化,必須都要實現Writable,一般情況下采用實現WritableComparable的方式,並且實現comparaTo,readFields, write方法,並且提供一個無參構造函數
2. readFields和write方法,里面字段的順序要保持一致
3. 遇到集合類型,序列化時需要先將集合長度寫進去,然后再挨個寫集合數據
4. 遇到集合類型,反序列化時需要先讀取集合的長度,然后接收數據,如果集合數據類型是自定義類型,還需要先實例化一個無參構造,然后賦值。
5. SequenceFile需要使用KV結構才能調用存儲,可以使用一個NullWritable來占位,上訴例子中的K值就是使用的NullWritable進行的
6. sequenceFile序列化后占用的存儲空間比較大,有需要的話,可以在存儲的時候加上壓縮算法,具體使用方式可以見上訴的例子


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM