Spark中文件格式、壓縮和序列化
1.1 文件格式
1.1.1 行存儲:
特點:適合OLTP,寫密集的場景(或是要求所有列的查詢);
- text:spark直接讀入並按行切分;需要保持一行的size在合理的范圍;支持有限的schema;
- csv:常用於日志收集,寫性能比讀性能好,缺點是文件規范不夠標准(例如分隔符、轉義符、引號),對嵌套類型支持不足等;
- json:通常被當做一個結構體,需要注意key的數目(容易OOM),對schema支持不夠好;優點是輕量、便於部署和debug;屬於半結構化的文本結構;
1.1.2 列存儲:
特點:適合OLAP,讀密集的場景,可以列裁剪,壓縮率高;
- parquet:
-
文件組織格式:首尾都有magic number校驗這個是parquet文件;Footer放在文件末尾,存放了元數據信息,包括schema信息,以及以及每個row group的meta data和統計信息;row group(默認128M)是一批行數據的組成(例如0~1萬行數據),row group中的每個column是一個列;一個列又分為多個page(默認1M)頁,頁是最小的編碼單位;二進制存儲;
-
謂詞下推:parquet原生支持謂詞下推;parquet每次掃描一個row group的數據,每個row group都是按列存儲的,便可以只讀取需要的column chunk列塊,每個column chunk會生成統計信息(最大值、最小值、空值個數),通過這些統計信息和該列的過濾條件,可以判斷該row group是否需要掃描;
-
應用:存儲parquet文件時,通常會按照HDFS的Block大小設置row group的大小,因為MR和Spark在讀取文件時一個task讀取的最小單元就是一個Block,這樣可以增大任務的並行度;相對於ORC格式,parquet對嵌套類型的支持更好,內部默認使用snappy壓縮;但是壓縮率、查詢性能比ORC差一點點(有的測試比ORC要好),並且不支持update和acid,但是olap場景也不需要update和acid;常用於impala和spark;spark中row group大小為 parquet.block.size(默認128M,壓縮后的),row group是一個切片的最小單位;page大小為parquet.page.size(默認1M,壓縮后的),page是壓縮和一次讀取的最小單位;
-
spark和hive讀取parquet:spark會使用自定義的serde來讀取parquet文件(性能更高),如果讀取異常,可以改用hive的serde來讀取,將參數spark.sql.hive.convertMetastoreParquet(默認true)設為false即可;spark在處理時會緩存parquet的元數據信息,如果其他地方修改了,需要手動刷新;
-
結合壓縮: parquet + snappy(lzo)的方式用的較多;這里要特別注意snappy壓縮,如果用snappy對一個text文件壓縮,那么這個文件是不可分割的,而使用snappy對parquet內部的page壓縮,則內部壓縮后的文件是可分割的,並且讀取時是以row group為切片的;orc同理;parquet+snappy綜合性能最高;spark中parquet的壓縮格式是spark.sql.parquet.compression.codec(默認是snappy);
- ORC:
- 文件組織格式:與parquet類似,Postscript保存該表的行數、壓縮參數等信息;File Footer保存各個stripe的位置信息,列的類型,以及表的統計信息(最大值、最小值、行計數等);stripe條帶(對應parquet的row group)大小為HDFS的Block大小,其中分為三部分:stripe footer保存stripe位置,stripe的統計信息;row data就是具體的數據,由row group組成,一個row group默認1萬行數據;index data保存每個row group的metadata stream具體位置;metadata stream保存了每一個row group在data stream中的位置和統計信息;
- 謂詞下推:同樣的ORC也支持列裁剪,並且ORC保存了三個層級的統計信息:file footer保存了文件級別的統計信息;stripe footer保存了stripe級別的統計信息;metadata stream中保存了row group級別的統計信息;例如某個stripe的max(a)=10, min(a)=3,當where a > 10或者where a<3時,那么這個stripe就可以跳過,不用讀取;
- 應用:spark中ORC內部默認使用snappy壓縮;支持數據update和acid,但是對嵌套類型支持不好;
1.2 文件壓縮
1.2.1 spark默認支持的壓縮格式
- snappy:壓縮率22%,parquet格式可達到14%;不可分割;壓縮解壓速度特別快(250M/s~500M/s);parquet用;
- gzip:壓縮率13.4%,parquet格式可達到6%;不可分割;壓縮(17.5M/s),解壓(58M/s)速度慢;大文件用;
- lzo:壓縮率20.5%,可分割;壓縮(49M/s),解壓(74M/s)速度較快;支持分割,但是要建索引文件;
- lz4:spark2.2默認的壓縮格式;不可分割;壓縮率不好(30%?),但是壓縮和解壓速度非常快(400M/s~4000M/s),比gzip快一個量級,比snappy還快一點;spark2.2之前默認使用snappy壓縮;
1.2.2 spark可壓縮的地方
壓縮格式:spark.io.compression.codec(默認lz4,spark2.2之前是snappy)
- rdd緩存(spark.rdd.compress默認false):由於rdd緩存到磁盤就是想用磁盤IO換取cpu計算;緩存到內存要考慮緩存的大小,要考慮內存gc問題;而使用壓縮會增加cpu計算,所以默認是關閉的;如果磁盤IO或者gc成為問題且沒有更好的解決方法,就可以考慮開啟rdd緩存壓縮,壓縮后的數據量少了,對磁盤IO和gc都有提升;要啟用rdd緩存壓縮,存儲級別就必須帶序列化;
- 廣播變量(spark.broadcast.compress默認true):廣播是每個executor的第一個task啟動時,獲取一份廣播數據,之后的task都從本地的BlockManager中獲取;廣播時有網絡IO,並且要存儲在本地的BlockManager中,所以默認是開啟的;
- shuffle輸出(spark.shuffle.compress默認true):shuffle最后輸出的data文件壓縮;要考慮CPU負載與IO負載(磁盤IO和網絡IO),如果CPU負載影響遠大於IO負載,則可以關閉該參數;
- shuffle溢寫(spark.shuffle.spill.compress默認為true):shuffle溢寫的中間文件壓縮;由於中間溢寫的文件不需要經過網絡IO,並且需要在一個task中同時執行壓縮和解壓縮,對cpu負載較大;所以對於cpu負載較大,磁盤IO性能好的任務,可以考慮關閉該參數;
1.3 序列化格式
-
序列化的地方:廣播變量、shuffle數據、緩存rdd(緩存級別帶序列化SER)等;
-
Kryo序列化:Kryo序列化性能(大小和時間)是spark默認的java序列化的10倍(實際中可能3~6倍);但是Kryo需要注冊自定義的類才能達到高性能,這也是spark默認沒有選擇Kryo的唯一原因;因為如果不注冊自定義的類,Kryo需要為每一個對象保存它的全類名,這是非常浪費的;所以序列化大量沒有注冊的自定義對象時,序列化后的大小甚至會大於java序列化后的大小還大,但是序列化的時間還是優於java序列化的;
-
使用步驟:設置spark的序列化器,注冊自定義類型;
val conf = new SparkConf()
// 設置序列化器為KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))