前言
列式文件,顧名思義就是按列存儲到文件,和行式存儲文件對應。保證了一列在一個文件中是連續的。下面從parquet常見術語,核心schema和文件結構來深入理解。最后通過java api完成write和read。
術語
block
parquet層面和row group是一個意思
row group
邏輯概念,用於對row進行分區。由數據集中每個column的column chunk組成。是讀寫過程中的緩存單元
column chunk
某個column的所有數據被稱為column chunk,存在於row group,並保證在文件中是連續的
page
多個column chunk之間用page分開,也就是說一個page只會包含一個column的數據,一個page是一個獨立的單元(可以被編碼或者壓縮)
dictionary page
每個page之前都可以選擇是否需要dictionary page。dictionary page記錄了該page所有不同的值。這可以增強處理速度提高壓縮率。
總結
一個文件由多個row group組成,一個row group包括了多個column chunk,一個column chunck就是某個column的所有數據集, 被分割成多個page,一個page是最小的處理單元,可以被編碼或者壓縮。
schema
每種文件都有自己特有的規則,像csv文件,是用分隔符分隔開的一個個列。parquet文件也有自己獨特的schema格式。
這是一個parquet文件的schema例子,對應的api是MessageType
message person{ required binary name (UTF8); required int age; repeated group family{ required binary father (UTF8); required binary mother (UTF8); optional binary sister (UTF8); } }
message
固定聲明,就像結構體中的struct一樣。
person
message name,可以粗暴的理解為表名,因為里面都是field。
optional,required,repeated
這是三種field的關鍵字,分別表示可選,必選,可重復選
可選和必選類似數據庫中的nullable,可重復選是為了支持復雜的嵌套結構。
field類型
目前parquet支持int32,int64,int96(有些系統會把時間戳存成int96如老版本hive),float,double,boolean,binary,fixed_len_byte_array。
參考類org.apache.parquet.schema. PrimitiveType.PrimitiveTypeName
UTF8
field的原始類型(Original Type),可以輔助field的type進行細粒度的類型判斷。
參考類 org.apache.parquet.schema. OriginalType
group
嵌套結構聲明,類似json對象
schema&數據
schema有了,那如何把schema和數據關聯起來,也就是說可以通過schema構建或者解析出相應的數據。那就引出了嵌套關系,definition level和repetitional level。用於定位數據到底出現在嵌套中(如果有嵌套的話)的哪一層。值得注意的是,嵌套關系是針對列而言的,不同列有各自的嵌套關系。
definition level
optional字段定位,如果實際沒有數據就為0,有數據就為1。涉及到嵌套optional,那么可以這么理解,如果從某一層開始沒有該數據,那么該層之前肯定是有數據的,該層之后肯定沒有數據。舉個簡單的例子
message ExampleDefinitionLevel {
optional group a {
optional group b {
optional string c;
}
}
}
這個schema對應的definition level所有的可能性如表所示
repetition level
repeated字段定位,如果在嵌套中某一層出現了值,那么就記錄該層。那一個例子來說:
message AddressBook {
required string owner;
repeated string ownerPhoneNumbers;
repeated group contacts {
required string name;
optional string phoneNumber;
}
}
針對不同的列,defnition level和repetition level的最大值如表
文件結構
結構圖
詳細
一個parquet文件由3部分組成,header,blocks,footer。類似一般文檔中的頁眉,正文,頁腳。
header
只包含4個字節的魔數,PAR1
blocks
block定義參考“術語”
footer
記錄了該parquet文件正文所有metadata,
文件物理格式
通過 cat -v 查看一個parquet,會看到很多的non-printable字符,比如:^U^@^U^P^U^P,^U^B^U^@^
這些字符其實是可以和ascii互相映射,比如^@就是ascii中的0,詳細可以看這篇文檔
https://docstore.mik.ua/orelly/unix/upt/ch25_07.htm
其實就是八進制的ascii,小於100的+100,大於100的減100。
所有的列,包括嵌套結構,例如test.c1和test.c2屬於兩個列,都是連續存儲在parquet文件中。
參考資料
// twitter對parquet的概述
// parquet的github
https://github.com/apache/parquet-format
// 很詳細的parquet文件解析
http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format
coding
public static MessageType getMessageTypeFromCode(){ MessageType messageType = Types.buildMessage() .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("id") .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name") .required(PrimitiveType.PrimitiveTypeName.INT32).named("age") .requiredGroup() .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("test1") .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("test2") .named("group1") .named("trigger"); return messageType; } public static void writeParquet(String name){ // 1. 聲明parquet的messageType MessageType messageType = getMessageTypeFromCode(); System.out.println(messageType.toString()); // 2. 聲明parquetWriter Path path = new Path("/tmp/etl/"+ name); Configuration configuration = new Configuration(); GroupWriteSupport.setSchema(messageType, configuration); GroupWriteSupport writeSupport = new GroupWriteSupport(); // 3. 寫數據 ParquetWriter<Group> writer = null; try { writer = new ParquetWriter<Group>(path, ParquetFileWriter.Mode.CREATE, writeSupport, CompressionCodecName.UNCOMPRESSED, 128*1024*1024, 5*1024*1024, 5*1024*1024, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION, configuration); Random random = new Random(); for(int i=0; i<10; i++){ // 4. 構建parquet數據,封裝成group Group group = new SimpleGroupFactory(messageType).newGroup(); group.append("name", i+"@qq.com") .append("id",i+"@id") .append("age",i) .addGroup("group1") .append("test1", "test1"+i) .append("test2","test2"+i); writer.write(group); } } catch (IOException e) { e.printStackTrace(); } finally { if(writer != null){ try { writer.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void readParquet(String name){ // 1. 聲明readSupport GroupReadSupport groupReadSupport = new GroupReadSupport(); Path path = new Path("/tmp/etl/"+name); // 2.通過parquetReader讀文件 ParquetReader<Group>reader = null; try { reader = ParquetReader.builder(groupReadSupport, path).build(); Group group = null; while ((group = reader.read()) != null){ System.out.println(group); } } catch (IOException e) { e.printStackTrace(); } finally { if(reader != null){ try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } }