parquet列式文件實戰


前言

列式文件,顧名思義就是按列存儲到文件,和行式存儲文件對應。保證了一列在一個文件中是連續的。下面從parquet常見術語,核心schema和文件結構來深入理解。最后通過java api完成write和read。

 

術語

block

parquet層面和row group是一個意思

 

row group

邏輯概念,用於對row進行分區。由數據集中每個column的column chunk組成。是讀寫過程中的緩存單元

 

column chunk

某個column的所有數據被稱為column chunk,存在於row group,並保證在文件中是連續的

 

page

多個column chunk之間用page分開,也就是說一個page只會包含一個column的數據,一個page是一個獨立的單元(可以被編碼或者壓縮)

 

dictionary page

每個page之前都可以選擇是否需要dictionary page。dictionary page記錄了該page所有不同的值。這可以增強處理速度提高壓縮率。

 

總結

一個文件由多個row group組成,一個row group包括了多個column chunk,一個column chunck就是某個column的所有數據集, 被分割成多個page,一個page是最小的處理單元,可以被編碼或者壓縮。

 

schema

每種文件都有自己特有的規則,像csv文件,是用分隔符分隔開的一個個列。parquet文件也有自己獨特的schema格式。

這是一個parquet文件的schema例子,對應的api是MessageType

message person{
  required binary name (UTF8);
  required int age;
  repeated group family{
    required binary father (UTF8);
    required binary mother (UTF8);
    optional binary sister (UTF8);
  }
}

 

message

固定聲明,就像結構體中的struct一樣。

 

person

message name,可以粗暴的理解為表名,因為里面都是field。

 

optional,required,repeated

這是三種field的關鍵字,分別表示可選,必選,可重復選

可選和必選類似數據庫中的nullable,可重復選是為了支持復雜的嵌套結構。

 

field類型

目前parquet支持int32,int64,int96(有些系統會把時間戳存成int96如老版本hive),float,double,boolean,binary,fixed_len_byte_array。

參考類org.apache.parquet.schema. PrimitiveType.PrimitiveTypeName

 

UTF8

field的原始類型(Original Type),可以輔助field的type進行細粒度的類型判斷。

參考類 org.apache.parquet.schema. OriginalType

 

group

嵌套結構聲明,類似json對象

 

schema&數據

schema有了,那如何把schema和數據關聯起來,也就是說可以通過schema構建或者解析出相應的數據。那就引出了嵌套關系,definition level和repetitional level。用於定位數據到底出現在嵌套中(如果有嵌套的話)的哪一層。值得注意的是,嵌套關系是針對列而言的,不同列有各自的嵌套關系。

 

definition level

optional字段定位,如果實際沒有數據就為0,有數據就為1。涉及到嵌套optional,那么可以這么理解,如果從某一層開始沒有該數據,那么該層之前肯定是有數據的,該層之后肯定沒有數據。舉個簡單的例子

message ExampleDefinitionLevel {
  optional group a {
    optional group b {
      optional string c;
    }
  }
}

這個schema對應的definition level所有的可能性如表所示

 

 

repetition level

repeated字段定位,如果在嵌套中某一層出現了值,那么就記錄該層。那一個例子來說:

message AddressBook {
  required string owner;
  repeated string ownerPhoneNumbers;
  repeated group contacts {
    required string name;
    optional string phoneNumber;
  }
}

針對不同的列,defnition level和repetition level的最大值如表

 

 

文件結構

結構圖

 

 

詳細

一個parquet文件由3部分組成,header,blocks,footer。類似一般文檔中的頁眉,正文,頁腳。

 

header

只包含4個字節的魔數,PAR1

 

blocks

block定義參考“術語”

 

footer

記錄了該parquet文件正文所有metadata,

 

文件物理格式

通過 cat -v 查看一個parquet,會看到很多的non-printable字符,比如:^U^@^U^P^U^P,^U^B^U^@^

這些字符其實是可以和ascii互相映射,比如^@就是ascii中的0,詳細可以看這篇文檔

https://docstore.mik.ua/orelly/unix/upt/ch25_07.htm

其實就是八進制的ascii,小於100的+100,大於100的減100。

 

所有的列,包括嵌套結構,例如test.c1和test.c2屬於兩個列,都是連續存儲在parquet文件中。

 

參考資料

// twitter對parquet的概述

https://blog.twitter.com/engineering/en_us/a/2013/announcing-parquet-10-columnar-storage-for-hadoop.html

// parquet的github

https://github.com/apache/parquet-format

// 很詳細的parquet文件解析

http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format

 

coding

 

public static MessageType getMessageTypeFromCode(){
    MessageType messageType =
            Types.buildMessage()
            .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("id")
            .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name")
            .required(PrimitiveType.PrimitiveTypeName.INT32).named("age")
            .requiredGroup()
              .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("test1")
              .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("test2")
              .named("group1")
            .named("trigger");
    return messageType;
}

public static void writeParquet(String name){

    // 1. 聲明parquet的messageType
    MessageType messageType = getMessageTypeFromCode();
    System.out.println(messageType.toString());

    // 2. 聲明parquetWriter
    Path path = new Path("/tmp/etl/"+ name);
    Configuration configuration = new Configuration();
    GroupWriteSupport.setSchema(messageType, configuration);
    GroupWriteSupport writeSupport = new GroupWriteSupport();

    // 3. 寫數據
    ParquetWriter<Group> writer = null;
    try {
        writer = new ParquetWriter<Group>(path,
                ParquetFileWriter.Mode.CREATE,
                writeSupport,
                CompressionCodecName.UNCOMPRESSED,
                128*1024*1024,
                5*1024*1024,
                5*1024*1024,
                ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
                ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
                ParquetWriter.DEFAULT_WRITER_VERSION,
                configuration);
        Random random = new Random();

        for(int i=0; i<10; i++){
            // 4. 構建parquet數據,封裝成group
            Group group = new SimpleGroupFactory(messageType).newGroup();
            group.append("name", i+"@qq.com")
                    .append("id",i+"@id")
                    .append("age",i)
            .addGroup("group1")
                    .append("test1", "test1"+i)
                    .append("test2","test2"+i);
            writer.write(group);
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if(writer != null){
            try {
                writer.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}


public static void readParquet(String name){
    // 1. 聲明readSupport
    GroupReadSupport groupReadSupport = new GroupReadSupport();
    Path path = new Path("/tmp/etl/"+name);

    // 2.通過parquetReader讀文件
    ParquetReader<Group>reader = null;
    try {
        reader = ParquetReader.builder(groupReadSupport, path).build();
        Group group = null;
        while ((group = reader.read()) != null){
            System.out.println(group);
        }

    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if(reader != null){
            try {
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM