Parquet列式存儲
Apache Parquet是Hadoop生態系統中的列式存儲格式,面向分析型業務,與數據處理框架、數據模型、編程語言無關。
● 優勢
降低存儲空間:按列存,能夠更好地壓縮數據,因為一列的數據一般都是同質的(homogenous)
提高IO效率:掃描(遍歷/scan)的時候,可以只讀其中部分列. 而且由於數據壓縮的更好的緣故,IO所需帶寬也會減小
降低上層應用延遲
查詢引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
計算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
數據模型: Avro, Thrift, Protocol Buffers, POJOs
當時Twitter的日增數據量達到壓縮之后的100TB+,存儲在HDFS上,工程師會使用多種計算框架(例如MapReduce, Hive, Pig等)對這些數據做分析和挖掘;
日志結構是復雜的嵌套數據類型,例如一個典型的日志的schema有87列,嵌套了7層。所以需要一種列式存儲格式,既能支持關系型數據(簡單數據類型),又能支持復雜的嵌套類型的數據,同時能夠適配多種數據處理框架。
還原嵌套結構
● 難點
處理嵌套的數據結構才是真正的挑戰。
多個 field 可以形成一個 group,一個 field 可以重復出現(叫做 repeated field)。用 group 和 repeated field 的各種組合來描述。
● Definition Level
知道到底是從哪一級開始沒定義的,這是還原整條記錄所必須知道的。
不要讓 definition level 太大,這很重要,目標是所用的比特越少越好。
● Repetition level
標示着新 List 出現的層級。
repetition level 告訴我們,在從列式表達,還原嵌套結構的時候,是在哪一級插入新值。
示意圖:http://lastorder.me/tag/parquet.html
每一個基本類型的列,都要創建三個子列(R, D, Value)。
三個子列的總開銷其實並不大. 因為兩種 Levels的最大值,是由 schema 的深度決定的,並且通常只用幾個 bit 就夠用了(1個bit 就可表達1層嵌套,2個bit就可以表達3層嵌套了,3個bit就能夠表達7層嵌套。
為了通過列是存儲,還原重建這條嵌套結構的記錄,寫一個循環讀列中的值。
R=0, D=2, Value = “555 987 6543”:
R = 0 這是一個新的 record. 從根開始按照schema 重建結構,直到 repetition level 達到 2
D = 2 是最大值,值是有定義的,所以此時將值插入.
R=1, D=1:
R = 1 level1 的 contact list 中一條新記錄
D = 1 contacts 有定義,但 phoneNumber 沒定義,所建一個空的 contacts 即可.
R=0, D=0:
R = 0 一條新 record. 可以重建嵌套結構,直到達到 definition level 的值.
D = 0 => contacts 是 null,所以最后拼裝出來的是一個空的 Address Book
Parquet列式存儲 替代 HDFS文件
提高查詢性能、存儲壓縮
● 《Spark SQL下的Parquet使用最佳實踐和代碼實戰》http://blog.csdn.net/sundujing/article/details/51438306
● 《操作技巧:將 Spark 中的文本轉換為 Parquet 以提升性能》http://www.ibm.com/developerworks/cn/analytics/blog/ba-parquet-for-spark-sql/index.html
● 《[Big Data]從Hadoop到Spark的架構實踐》http://www.cnblogs.com/losbyday/p/5854618.html
● 《Spark SQL 下DateFrame的初步認識(3)》http://blog.csdn.net/erfucun/article/details/52086858
業界的主流公司在做大數據分析的時候,基本上都以Parquet方式存儲數據。
Parquet文件是基於列式存儲的,列之間是分開的,可以進行各種高效的優化。Spark底層一般都會接 Parquet文件。
Parquet是列式存儲格式的一種文件類型,列式存儲有以下的核心優勢:
a.可以跳過不符合條件的數據,只讀取需要的數據,降低IO數據量。
b.壓縮編碼可以降低磁盤存儲空間。由於同一列的數據類型是一樣的,可以使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約存儲空間。
c.只讀取需要的列,支持向量運算,能夠獲取更好的掃描性能。
數據處理流程
● 離線數據挖掘:HDFS(JSON) -> Spark/Hive(ETL) -> HDFS Parquet -> Spark SQL / ML / GraphX
● 實時即時分析:Kafka -> Spark Streaming -> HDFS(JSON) -> HDFS Parquet -> Spark SQL / ML / GraphX
● 實時流式計算:Kafka -> Spark Streaming -> Redis
Spark Streaming完成以下工作
1. 原始日志的保存。將Kafka中的原始日志以JSON格式無損的保存在HDFS中
2. 數據清洗和轉換,清洗和標准化之后,轉變為Parquet格式,存儲在HDFS中,方便后續的各種數據計算任務
3. 定義好的流式計算任務,比如基於頻次規則的標簽加工等等,計算結果直接存儲在MongoDB中
將文件轉換為Parquet格式
cd /usr/local/hadoop
./sbin/start-dfs.sh
./sbin/start-yarn.sh
# 上傳行為日志文件至HDFS
hdfs dfs -mkdir /user/hadoop/test
hdfs dfs -put /home/hadoop/test/user_behavior.txt /user/hadoop/test/
# 啟動Spark
cd /usr/local/spark
./bin/spark-shell
// 讀取HDFS上的JSON文件
val df = sqlContext.read.json("hdfs://localhost:9000/user/hadoop/test/user_behavior.txt")
// 保存為parquet文件
df.saveAsParquetFile("hdfs://localhost:9000/user/hadoop/test/userBehavior.parquet")
// 讀取parquet文件
val parquetData = sqlContext.parquetFile("hdfs://localhost:9000/user/hadoop/test/userBehavior.parquet")
parquetData.show()
parquetData.printSchema()
// 執行計算
df.groupBy("Behavior").count().show()
parquetData.groupBy("Behavior").count().show()