Parquet 列式存儲格式
參考文章:
https://blog.csdn.net/kangkangwanwan/article/details/78656940
http://parquet.apache.org/documentation/latest/
列式存儲的優勢
- 把IO只給查詢需要用到的數據,只加載需要被計算的列
- 列式的壓縮效果更好,節省空間
parquet只是一種存儲格式,與上層語言無關
-
適配通用性
-
存儲空間優化
-
計算時間優化
-
hive中metastore和數據是分開的,alter table只修改metastore,不會立即把數據進行轉換,因此查詢時可能出現轉換失敗的錯誤,parquet的schema也不會改變
詞匯表
block:hdfs block
file:hdfs 文件,必須包含file metadata,不是一定包含實際數據
row group:多個column chunk組成,每個chunk標識row中的一列
column chunk:大量的數據集,針對某一列
page:chunk被分成多頁,每一個頁是一個壓縮編碼的單位,一個chunk可能多有個頁
File Format
4-byte magic number "PAR1"
<Column 1 Chunk 1 + Column Metadata>
<Column 2 Chunk 1 + Column Metadata>
...
<Column N Chunk 1 + Column Metadata>
<Column 1 Chunk 2 + Column Metadata>
<Column 2 Chunk 2 + Column Metadata>
...
<Column N Chunk 2 + Column Metadata>
...
<Column 1 Chunk M + Column Metadata>
<Column 2 Chunk M + Column Metadata>
...
<Column N Chunk M + Column Metadata>
File Metadata
4-byte length in bytes of file metadata
4-byte magic number "PAR1"
Metadata is written after the data to allow for single pass writing.
Readers are expected to first read the file metadata to find all the column chunks they are interested in. The columns chunks should then be read sequentially.
數據模型
message AddressBook {
required string owner; //required(出現1次)
repeated string ownerPhoneNumbers; //repeated(出現0次或者多次)
repeated group contacts { //Parquet格式沒有復雜的Map, List,Set等而是使用repeated fields和groups來表示
required string name;
optional string phoneNumber; //optional(出現0次或者1次)
}
}
type可以是一個group或者一個primitive類型
一個schema有幾個葉子節點就有多少列
[ AddressBook ]
/ | \
/ | \
required repeated repeated
owner ownerPhoneNumbers contacts
/ \
required optional
name phoneNumber
此處有4列
owner:string
ownerPhoneNumbers:string
contacts.name:string
contacts.phoneNumber:string
分段/組裝算法
Definition Level
從根節點開始遍歷,當某一個field的路徑上的節點為空的時候,我們記錄下當前深度作為這個field的Definition Level
最大Definition level:根據schema推算出來的,field路徑都不為空時的Definition level
如果current definition level == max definition level ,則表示這個field是有數據的
Repetition level
記錄該field的值是在哪一個深度上重復的。只有repeated類型的field需要Repetition Level,optional 和 required類型的不需要。Repetition Level = 0 表示開始一個新的record
如何存儲
Parquet文件
所有數據水平切分成Row group:包含對應區間內的所有列的column chunk
column chunk:負責存儲某一列的數據,由1個或多個Page組成
Page:壓縮和編碼的單元,對數據模型是透明的
Parquet文件尾部:Footer:文件的元數據信息和統計信息
推薦
row group size:一般情況下推薦配置一個Row group大小1G,一個HDFS塊大小1G,一個HDFS文件只含有一個塊,最大化順序IO的性能優勢
data page size:recommend 8KB for page sizes.小的page size有更好的讀取性能,如單行查找時;大的page size帶來少的空間成本和潛在的解析成本降低,因為page header變少了。Note:順序掃描,不要去希望一次讀完整個page,這不是IO意義上的數據塊
疑問
多個列如何對應到同一行的
hbase存在列數據分布不均問題,即1列數據非常多,另一列數據非常少,導致有很多無用的block,推測parquet也存在這個問題,如果存在,那么會有很多空數據列來標識不存在的數據
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
MapredParquetOutputFormat將對象寫入parquet文件,通過它,我們來追蹤這些疑問,方法getHiveRecordWriter返回一個寫入記錄的對象用於最終將對象的多個fields寫到文件中
/**
*
* Create the parquet schema from the hive schema, and return the RecordWriterWrapper which
* contains the real output format
*/
@Override
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
final JobConf jobConf,
final Path finalOutPath,
final Class<? extends Writable> valueClass,
final boolean isCompressed,
final Properties tableProperties,
final Progressable progress) throws IOException {
LOG.info("creating new record writer..." + this);
final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS);
final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
List<String> columnNames;
List<TypeInfo> columnTypes;
if (columnNameProperty.length() == 0) {
columnNames = new ArrayList<String>();
} else {
columnNames = Arrays.asList(columnNameProperty.split(","));
}
if (columnTypeProperty.length() == 0) {
columnTypes = new ArrayList<TypeInfo>();
} else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}
//根據列名和類型,生成schema
DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf);
//類的構造方法生成 realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new DataWritableWriteSupport());
return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
progress,tableProperties);
}
protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
JobConf jobConf,
String finalOutPath,
Progressable progress,
Properties tableProperties
) throws IOException {
return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
progress,tableProperties);
}
ParquetRecordWriterWrapper是RecordWriter包裝類
public ParquetRecordWriterWrapper(
final OutputFormat<Void, ParquetHiveRecord> realOutputFormat,
final JobConf jobConf,
final String name, //finalOutPath
final Progressable progress, Properties tableProperties) throws
IOException {
try {
// create a TaskInputOutputContext
TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id"));
if (taskAttemptID == null) {
taskAttemptID = new TaskAttemptID();
}
taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
//從tableProperties初始化blockSize,ENABLE_DICTIONARY,COMPRESSION,如果tableProperties沒有對應屬性,則什么都不做
LOG.info("initialize serde with table properties.");
initializeSerProperties(taskContext, tableProperties);
LOG.info("creating real writer to write at " + name);
//真正的writer
realWriter =
((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));
LOG.info("real writer: " + realWriter);
} catch (final InterruptedException e) {
throw new IOException(e);
}
}
疑問
blockSize是什么
進入ParquetRecordWriterWrapper.getRecordWriter來看realWriter生成方式
public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec) throws IOException, InterruptedException {
WriteSupport<T> writeSupport = this.getWriteSupport(conf); //writeSupport是一個關鍵的東西 ,讀取配置中的parquet.write.support.class,反射生成實例
CodecFactory codecFactory = new CodecFactory(conf);
long blockSize = getLongBlockSize(conf);
int pageSize = getPageSize(conf);
int dictionaryPageSize = getDictionaryPageSize(conf);
boolean enableDictionary = getEnableDictionary(conf);
boolean validating = getValidation(conf);
WriterVersion writerVersion = getWriterVersion(conf);
//通過conf.schema生成寫入上下文
WriteContext init = writeSupport.init(conf);
//使用finalOutPath
ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);
w.start();
float maxLoad = conf.getFloat("parquet.memory.pool.ratio", 0.95F);
long minAllocation = conf.getLong("parquet.memory.min.chunk.size", 1048576L);
if (memoryManager == null) {
memoryManager = new MemoryManager(maxLoad, minAllocation);
} else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
LOG.warn("The configuration parquet.memory.pool.ratio has been set. It should not be reset by the new value: " + maxLoad);
}
//realWriter
return new ParquetRecordWriter(w, writeSupport, init.getSchema(), init.getExtraMetaData(), blockSize, pageSize, codecFactory.getCompressor(codec, pageSize), dictionaryPageSize, enableDictionary, validating, writerVersion, memoryManager);
}
ParquetFileWriter,執行FS中的file
public ParquetFileWriter(Configuration configuration, MessageType schema, Path file, ParquetFileWriter.Mode mode) throws IOException {
this.blocks = new ArrayList();
this.state = ParquetFileWriter.STATE.NOT_STARTED; //writer初始狀態
this.schema = schema;
FileSystem fs = file.getFileSystem(configuration);
boolean overwriteFlag = mode == ParquetFileWriter.Mode.OVERWRITE;
this.out = fs.create(file, overwriteFlag); //生成FSDataOutputStream,指定文件為file
}
真,最終 ParquetRecordWriter
public ParquetRecordWriter(ParquetFileWriter w, WriteSupport<T> writeSupport, MessageType schema, Map<String, String> extraMetaData, long blockSize, int pageSize, BytesCompressor compressor, int dictionaryPageSize, boolean enableDictionary, boolean validating, WriterVersion writerVersion, MemoryManager memoryManager) {
this.internalWriter = new InternalParquetRecordWriter(w, writeSupport, schema, extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary, validating, writerVersion);
this.memoryManager = (MemoryManager)Preconditions.checkNotNull(memoryManager, "memoryManager");
memoryManager.addWriter(this.internalWriter, blockSize);
}
ParquetRecordWriterWrapper寫入一條記錄,對應row級別
@Override
public void write(final Void key, final ParquetHiveRecord value) throws IOException {
try {
realWriter.write(key, value);
} catch (final InterruptedException e) {
throw new IOException(e);
}
}
調用到ParquetRecordWriter.write方法,最終指向InternalParquetRecordWriter.write
public void write(Void key, T value) throws IOException, InterruptedException {
this.internalWriter.write(value);
}
public void write(T value) throws IOException, InterruptedException {
this.writeSupport.write(value);
++this.recordCount;
this.checkBlockSizeReached();
}
既然writeSuppot是反射生成的,我們使用 GroupWriteSupport 來繼續走完代碼(spark自己寫了一個WriteSupport:ParquetWriteSupport)
public void prepareForWrite(RecordConsumer recordConsumer) {
this.groupWriter = new GroupWriter(recordConsumer, this.schema);
}
public void write(Group record) {
this.groupWriter.write(record);
}
public class GroupWriter {
private final RecordConsumer recordConsumer;
private final GroupType schema;
public GroupWriter(RecordConsumer recordConsumer, GroupType schema) {
this.recordConsumer = recordConsumer; //MessageColumnIORecordConsumer類
this.schema = schema;
}
public void write(Group group) {
//標記record開始,預准備一些標識
this.recordConsumer.startMessage();
//寫入row
this.writeGroup(group, this.schema);
//標記record結束,處理寫完數據后的標識操作
this.recordConsumer.endMessage();
}
private void writeGroup(Group group, GroupType type) {
int fieldCount = type.getFieldCount();
//遍歷所有field,對應column
for(int field = 0; field < fieldCount; ++field) {
int valueCount = group.getFieldRepetitionCount(field);
//有效數量,如果為0,則表示null,不需要此時寫入
if (valueCount > 0) {
Type fieldType = type.getType(field);
String fieldName = fieldType.getName();
//標記field開始
this.recordConsumer.startField(fieldName, field);
for(int index = 0; index < valueCount; ++index) {
if (fieldType.isPrimitive()) {
//寫入原始類型的值
group.writeValue(field, index, this.recordConsumer);
} else {
//復雜結構,遞歸調用
this.recordConsumer.startGroup();
this.writeGroup(group.getGroup(field, index), fieldType.asGroupType());
this.recordConsumer.endGroup();
}
}
//標記field結束
this.recordConsumer.endField(fieldName, field);
}
}
}
}
當所有列中有數據的列插入完畢后,執行this.recordConsumer.endMessage() ,
public void endMessage() {
//空值處理,writer中僅寫入了非空值,這里把空值也寫進去占位
this.writeNullForMissingFieldsAtCurrentLevel();
this.columns.endRecord();
if (MessageColumnIO.DEBUG) {
this.log("< MESSAGE END >");
}
if (MessageColumnIO.DEBUG) {
this.printState();
}
}
private void writeNullForMissingFieldsAtCurrentLevel() {
int currentFieldsCount = ((GroupColumnIO)this.currentColumnIO).getChildrenCount();
for(int i = 0; i < currentFieldsCount; ++i) {
if (!this.fieldsWritten[this.currentLevel].isWritten(i)) {
try {
ColumnIO undefinedField = ((GroupColumnIO)this.currentColumnIO).getChild(i);
int d = this.currentColumnIO.getDefinitionLevel();
if (MessageColumnIO.DEBUG) {
this.log(Arrays.toString(undefinedField.getFieldPath()) + ".writeNull(" + this.r[this.currentLevel] + "," + d + ")");
}
this.writeNull(undefinedField, this.r[this.currentLevel], d);
} catch (RuntimeException var5) {
throw new ParquetEncodingException("error while writing nulls for fields of indexes " + i + " . current index: " + this.fieldsWritten[this.currentLevel], var5);
}
}
}
}
ColumnWriterV1類定義了寫空值,寫非空值的方法。兩個方法不同點是this.dataColumn.writeInteger(value);空值不為寫入到dataColumn中,也沒有寫入null,因此不能認為存入的是規則的矩陣,因此解析時,需要根據repetition,definition來對應具體的data,
public void writeNull(int repetitionLevel, int definitionLevel) {
//寫入列的repetitionLevel
this.repetitionLevelColumn.writeInteger(repetitionLevel);
this.definitionLevelColumn.writeInteger(definitionLevel);
//記錄列的空數據個數
this.updateStatisticsNumNulls();
//
this.accountForValueWritten();
}
public void write(int value, int repetitionLevel, int definitionLevel) {
this.repetitionLevelColumn.writeInteger(repetitionLevel);
this.definitionLevelColumn.writeInteger(definitionLevel);
//寫非空值時,多了一個dataColumn寫操作
this.dataColumn.writeInteger(value);
this.updateStatistics(value);
this.accountForValueWritten();
}
//repetitionLevelColumn,definitionLevelColumn的生成方法如下,它依賴了共同的this.allocator:ByteBufferAllocator,作為寫文件緩存
private ValuesWriter newColumnDescriptorValuesWriter(int maxLevel) {
//如果maxLevel為0,則返回一個DevNullValuesWriter,當執行write操作時,DevNullValuesWriter內部什么都不做
return (ValuesWriter)(maxLevel == 0 ? new DevNullValuesWriter() : new RunLengthBitPackingHybridValuesWriter(BytesUtils.getWidthFromMaxInt(maxLevel), 64, this.pageSizeThreshold, this.allocator));
}
當definitionLevel的maxLevel=0時,列為require節點
當repetitionLevel的maxLevel=0時,列沒有涉及repeated字段
接下來再看看accountForValueWritten方法
private void accountForValueWritten() {
++this.valueCount; //值個數+1,如果個數超過了單個page的閾值,則觸發大小判斷
if (this.valueCount > this.valueCountForNextSizeCheck) {
long memSize = this.repetitionLevelColumn.getBufferedSize() + this.definitionLevelColumn.getBufferedSize() + this.dataColumn.getBufferedSize();
//計算列中repetitionLevel、definitionLevel、datas在內存緩沖區中的大小,如果大於pageSize閾值
if (memSize > (long)this.props.getPageSizeThreshold()) {
if (this.props.estimateNextSizeCheck()) {
this.valueCountForNextSizeCheck = this.valueCount / 2;
} else {
this.valueCountForNextSizeCheck = this.props.getMinRowCountForPageSizeCheck();
}
//將buffer寫入文件
this.writePage();
} else if (this.props.estimateNextSizeCheck()) {
this.valueCountForNextSizeCheck = (int)((float)this.valueCount + (float)this.valueCount * (float)this.props.getPageSizeThreshold() / (float)memSize) / 2 + 1;
} else {
this.valueCountForNextSizeCheck += this.props.getMinRowCountForPageSizeCheck();
}
}
}
private void writePage() {
try {
//concat將repetitionLevel,definitionLevel,dataColumn順序組裝
this.pageWriter.writePage(BytesInput.concat(new BytesInput[]{this.repetitionLevelColumn.getBytes(), this.definitionLevelColumn.getBytes(), this.dataColumn.getBytes()}), this.valueCount, this.statistics, this.repetitionLevelColumn.getEncoding(), this.definitionLevelColumn.getEncoding(), this.dataColumn.getEncoding());
} catch (IOException var2) {
throw new ParquetEncodingException("could not write page for " + this.path, var2);
}
//重置內存,緩沖區
this.repetitionLevelColumn.reset();
this.definitionLevelColumn.reset();
this.dataColumn.reset();
this.valueCount = 0;
this.resetStatistics();
}
concat將 repetitionLevel,definitionLevel,dataColumn順序組裝,也就是說保存時他們是3個部分,而不是每一列一個組合。這里所有的repetitionLevel被組織在一起了,definitionLevel,dataColumn亦如是
public void writePage(BytesInput bytes, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
long uncompressedSize = bytes.size();
if (uncompressedSize > 2147483647L) {
throw new ParquetEncodingException("Cannot write page larger than Integer.MAX_VALUE bytes: " + uncompressedSize);
} else {
//壓縮
BytesInput compressedBytes = this.compressor.compress(bytes);
long compressedSize = compressedBytes.size();
if (compressedSize > 2147483647L) {
throw new ParquetEncodingException("Cannot write compressed page larger than Integer.MAX_VALUE bytes: " + compressedSize);
} else {
this.tempOutputStream.reset();
//寫入pageMetadata
ColumnChunkPageWriteStore.parquetMetadataConverter.writeDataPageHeader((int)uncompressedSize, (int)compressedSize, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding, this.tempOutputStream);
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += (long)valueCount;
++this.pageCount;
if (this.totalStatistics == null) {
this.totalStatistics = statistics.copy();
} else {
this.totalStatistics.mergeStatistics(statistics);
}
this.buf.collect(BytesInput.concat(new BytesInput[]{BytesInput.from(this.tempOutputStream), compressedBytes}));
this.rlEncodings.add(rlEncoding);
this.dlEncodings.add(dlEncoding);
this.dataEncodings.add(valuesEncoding);
}
}
}