在Spark中使用Kryo序列化


spark序列化
 對於優化<網絡性能>極為重要,將RDD以序列化格式來保存減少內存占用. spark.serializer=org.apache.spark.serializer.JavaSerialization
Spark默認 使用Java自帶的ObjectOutputStream 框架來序列化對象,這樣任何實現了 java.io.Serializable 接口的對象,都能被序列化。同時,還可以通過擴展 java.io.Externalizable 來控制序列化性能。Java序列化很靈活但性能差速度很慢,同時序列化后占用的字節數也較多。
spark.serializer=org.apache.spark.serializer.KryoSerialization
KryoSerialization速度快,可以配置為任何org.apache.spark.serializer的子類。但Kryo也不支持所有實現了 java.io.Serializable 接口的類型,它需要你在程序中 register 需要序列化的類型,以得到最佳性能。
LZO的支持要求先安裝 Hadoop-lzo包(每個節點), 並放到 Spark本地庫中。如果是Debian包安裝,在調用spark-submit時加上 --driver-library-path /usr/lib/hadoop/lib/native/ --driver-class-path /usr/lib/hadoop/lib/ 就可以。 下載lzo http://cn.jarfire.org/hadoop.lzo.html
在 SparkConf 初始化的時候調用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) 使用 Kryo。這個設置不僅控制各個worker節點之間的混洗數據序列化格式,同時還控制RDD存到磁盤上的序列化格式。需要在使用時注冊需要序列化的類型,建議在對網絡敏感的應用場景下使用Kryo。 如果你的自定義類型需要使用Kryo序列化,可以用 registerKryoClasses 方法先注冊:
val conf = new SparkConf.setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
最后,如果你不注冊需要序列化的自定義類型,Kryo也能工作,不過每一個對象實例的序列化結果都會包含一份完整的類名,這有點浪費空間。
在Scala中使用New API (Twitter Elephant Bird 包) lzo JsonInputFormat讀取 LZO 算法壓縮的 JSON 文件:
val input = sc.newAPIHadoopFile(inputFile, classOf[lzoJsonInputFormat], classOf[LongWritable], classOf[MapWritable], conf)
inputFile: 輸入路徑
接收第一個類:“格式”類,輸入格式
接收第二個類:“鍵”
接收第二個類:“值”
conf:設置一些額外的壓縮選項
在Scala中使用老API直接讀取 KeyValueTextInputFormat()最簡單的Hadoop輸入格式 :
val input = sc.HadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{ case (x, y) => (x.toString, y.toString) }

注:如果讀取單個壓縮過的輸入,做好不要考慮使用Spark的封裝(textFile/SequenceFile..),而是使用 newAPIHadoopFile 或者 HadoopFile,並指定正確的壓縮解碼器。 有些輸入格式(如SequenceFile)允許我們只壓縮鍵值對數據中的值,這在查詢時很有用。其它一些輸入格式也有自己的壓縮控制,如:Twitter Elephant Bird 包中的許多格式都可以使用LZO算法壓縮數據。





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM