Hive 文件存儲格式


1、5種存儲格式

Apache Hive支持Apache Hadoop中使用的幾種熟悉的文件格式,如TextFile,RCFile,SequenceFile,AVRO,ORC和Parquet格式。

Cloudera Impala也支持這些文件格式。

在建表時使用STORED AS (TextFile|RCFile|SequenceFile|AVRO|ORC|Parquet)來指定存儲格式

TextFile每一行都是一條記錄,每行都以換行符(\ n)結尾。數據不做壓縮,磁盤開銷大,數據解析開銷大。可結合Gzip、Bzip2使用(系統自動檢查,執行查詢時自動解壓),但使用這種方式,hive不會對數據進行切分,從而無法對數據進行並行操作。 

SequenceFile是Hadoop API提供的一種二進制文件支持,其具有使用方便、可分割、可壓縮的特點。支持三種壓縮選擇:NONE, RECORD, BLOCK。 Record壓縮率低,一般建議使用BLOCK壓縮。 

RCFile是一種行列存儲相結合的存儲方式。首先,其將數據按行分塊,保證同一個record在一個塊上,避免讀一個記錄需要讀取多個block。其次,塊數據列式存儲,有利於數據壓縮和快速的列存取。 

AVRO是開源項目,為Hadoop提供數據序列化和數據交換服務。您可以在Hadoop生態系統和以任何編程語言編寫的程序之間交換數據。Avro是基於大數據Hadoop的應用程序中流行的文件格式之一。 

ORC文件代表了優化排柱狀的文件格式。ORC文件格式提供了一種將數據存儲在Hive表中的高效方法。這個文件系統實際上是為了克服其他Hive文件格式的限制而設計的。Hive從大型表讀取,寫入和處理數據時,使用ORC文件可以提高性能。

Parquet是一個面向列的二進制文件格式。Parquet對於大型查詢的類型是高效的。對於掃描特定表格中的特定列的查詢,Parquet特別有用。Parquet桌子使用壓縮Snappy,gzip;目前Snappy默認。

2、列式存儲和行式存儲

1.行存儲的特點

查詢滿足條件的一整行數據的時候,列存儲則需要去每個聚集的字段找到對應的每個列的值,行存儲只需要找到其中一個值,其余的值都在相鄰地方,所以此時行存儲查詢的速度更快。

Hadoop block中的基於行存儲的示例圖

優點是:具備快速數據加載動態負載的高適應能力,因為行存儲保證了相同記錄的所有域都在同一個集群節點
缺點是:但是它不太滿足快速的查詢響應時間的要求,特別是在當查詢僅僅針對所有列中的少數幾列時,它就不能直接定位到所需列而跳過不需要的列,由於混合着不同數據值的列,行存儲不易獲得一個極高的壓縮比。

Hadoop block中的基於行存儲的示例圖

優點是:這種結構使得在查詢時能夠直接讀取需要的列而避免不必要列的讀取,並且對於相似數據也可以有一個更好的壓縮比。
缺點是:它並不能提供基於Hadoop系統的快速查詢處理,也不能保證同一記錄的所有列都存儲在同一集群節點之上,也不適應高度動態的數據負載模式。 

2.列存儲的特點

因為每個字段的數據聚集存儲,在查詢只需要少數幾個字段的時候,能大大減少讀取的數據量;每個字段的數據類型一定是相同的,列式存儲可以針對性的設計更好的設計壓縮算法。

TEXTFILE和SEQUENCEFILE的存儲格式都是基於行存儲的。

ORC和PARQUET是基於列式存儲的。

RCFile結合列存儲和行存儲的優缺點,混合存儲

3、TextFile

默認格式,數據不做壓縮,磁盤開銷大,數據解析開銷大。
可結合Gzip、Bzip2使用(系統自動檢查,執行查詢時自動解壓),但使用這種方式,hive不會對數據進行切分,從而無法對數據進行並行操作。
示例:

create table if not exists textfile_table(
    site string,
    url  string,
    pv   bigint,
    label string)
row format delimited fields terminated by '\t'
stored as textfile;

插入數據操作:

set hive.exec.compress.output=true;  
set mapred.output.compress=true;  
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;  
set io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;  
insert overwrite table textfile_table select * from textfile_table;  

4、SequenceFile

SequenceFile是Hadoop API提供的一種二進制文件支持,其具有使用方便、可分割、可壓縮的特點。
SequenceFile支持三種壓縮選擇:none,record,block。Record壓縮率低,一般建議使用BLOCK壓縮。
示例:

create table if not exists seqfile_table(
    site string,
    url  string,
    pv   bigint,
    label string)
row format delimited fields terminated by '\t'
stored as sequencefile;

插入數據操作:

set hive.exec.compress.output=true;  
set mapred.output.compress=true;  
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;  
set io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;  
set mapred.output.compression.type=BLOCK;
insert overwrite table seqfile_table select * from textfile_table;  

5、 ORC File

ORCFile 它的全名是Optimized Row Columnar (ORC) file

其實就是對RCFile做了一些優化

據官方文檔介紹,這種文件格式可以提供一種高效的方法來存儲Hive數據

它的設計目標是來克服Hive其他格式的缺陷

運用ORC File可以提高Hive的讀、寫以及處理數據的性能。

5.1 ORCFile格式的組成

每個Orc文件由1個或多個stripe組成,每個stripe一般為HDFS的塊大小,每一個stripe包含多條記錄,這些記錄按照列進行獨立存儲,對應到Parquet中的row group的概念。每個Stripe里有三部分組成,分別是Index Data,Row Data,Stripe Footer:

1)Index Data:一個輕量級的index,默認是每隔1W行做一個索引。這里做的索引應該只是記錄某行的各字段在Row Data中的offset。

2)Row Data:存的是具體的數據,先取部分行,然后對這些行按列進行存儲。對每個列進行了編碼,分成多個Stream來存儲。

3)Stripe Footer:存的是各個Stream的類型,長度等信息。

每個文件有一個File Footer,這里面存的是每個Stripe的行數,每個Column的數據類型信息等;每個文件的尾部是一個PostScript,這里面記錄了整個文件的壓縮類型以及FileFooter的長度信息等。在讀取文件時,會seek到文件尾部讀PostScript,從里面解析到File Footer長度,再讀FileFooter,從里面解析到各個Stripe信息,再讀各個Stripe,即從后往前讀。

  1. ORC File包含一組組的行數據,稱為stripes
  2. 除此之外,ORC File的file footer還包含了該ORC File文件中stripes的信息,每個stripe中有多少行,以及每列的數據類型。當然,它里面還包含了列級別的一些聚合的結果,比如:count, min, max, and sum
  3. 在ORC File文件的最后,有一個被稱為postscript的區,它主要是用來存儲壓縮參數及壓縮頁腳的大小。
  4. 在默認情況下,一個stripe的大小為250MB。大尺寸的stripes使得從HDFS讀數據更高效。

5.2 ORCFile格式的優點

  1. 每個task只輸出單個文件,這樣可以減少NameNode的負載;
  2. 支持各種復雜的數據類型,比如: datetime, decimal, 以及一些復雜類型(struct, list, map, and union);
  3. 在文件中存儲了一些輕量級的索引數據
  4. 基於數據類型的塊模式壓縮:a、integer類型的列用行程長度編碼(run-length encoding);b、String類型的列用字典編碼(dictionary encoding);
  5. 用多個互相獨立的RecordReaders並行讀相同的文件;
  6. 無需掃描markers就可以分割文件;
  7. 綁定讀寫所需要的內存;
  8. metadata的存儲是用 Protocol Buffers的,所以它支持添加和刪除一些列

5.2 ORCFile設計思想

5.3 ORC存儲方式的壓縮

官網:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC

ORC存儲方式的壓縮:

6、Parquet

6.1 概述

Apache Parquet是Hadoop生態圈中一種新型列式存儲格式,它可以兼容Hadoop生態圈中大多數計算框架(Mapreduce、Spark等),被多種查詢引擎支持(Hive、Impala、Drill等),並且它是語言和平台無關的。Parquet最初是由Twitter和Cloudera合作開發完成並開源,2015年5月從Apache的孵化器里畢業成為Apache頂級項目。

Parquet最初的靈感來自Google於2010年發表的Dremel論文,文中介紹了一種支持嵌套結構的存儲格式,並且使用了列式存儲的方式提升查詢性能,在Dremel論文中還介紹了Google如何使用這種存儲格式實現並行查詢的。

Parquet文件是以二進制方式存儲的,所以是不可以直接讀取的,文件中包括該文件的數據和元數據

因此Parquet格式文件是自解析的。

6.2 parquet的數據模型

Parquet支持嵌套的數據模型,類似於Protocol Buffers,每一個數據模型的schema包含多個字段,每一個字段有三個屬性:重復次數、數據類型和字段名,重復次數可以是以下三種:

  1. required(只出現1次)
  2. repeated(出現0次或多次)
  3. optional(出現0次或1次)

每一個字段的數據類型可以分成兩種:

  1. group(復雜類型)
  2. primitive(基本類型)

schema示例:

可以把這個Schema轉換成樹狀結構,根節點可以理解為repeated類型

可以看出在Schema中所有的基本類型字段都是葉子節點,在這個Schema中一共存在6個葉子節點,如果把這樣的Schema轉換成扁平式的關系模型,就可以理解為該表包含六個列。Parquet中沒有Map、Array這樣的復雜數據結構,但是可以通過repeated和group組合來實現的。由於一條記錄中某一列可能出現零次或者多次,需要標示出哪些列的值構成一條完整的記錄。這是由Striping/Assembly算法實現的。

由於Parquet支持的數據模型比較松散,可能一條記錄中存在比較深的嵌套關系,如果為每一條記錄都維護一個類似的樹狀結可能會占用較大的存儲空間,因此Dremel論文中提出了一種高效的對於嵌套數據格式的壓縮算法:Striping/Assembly算法。它的原理是每一個記錄中的每一個成員值有三部分組成:Value、Repetition level和Definition levelvalue記錄了該成員的原始值,可以根據特定類型的壓縮算法進行壓縮,兩個level值用於記錄該值在整個記錄中的位置。對於repeated類型的列,Repetition level值記錄了當前值屬於哪一條記錄以及它處於該記錄的什么位置;對於repeated和optional類型的列,可能一條記錄中某一列是沒有值的,假設我們不記錄這樣的值就會導致本該屬於下一條記錄的值被當做當前記錄的一部分,從而造成數據的錯誤,因此對於這種情況需要一個占位符標示這種情況。

通過Striping/Assembly算法,parquet可以使用較少的存儲空間表示復雜的嵌套格式,並且通常Repetition level和Definition level都是較小的整數值,可以通過RLE算法對其進行壓縮,進一步降低存儲空間。

6.3 parquet文件結構

Parquet文件在磁盤所有數據分成多個RowGroup 和 Footer。

  1. RowGroup: 真正存儲數據區域,每一個RowGroup存儲多個ColumnChunk的數據。
  2. ColumnChunk就代表當前RowGroup某一列的數據,因為可能這一列還在其他RowGroup有數據。ColumnChunk可能包含一個Page。
  3. Page是壓縮和編碼的單元,主要包括PageHeader,RepetitionLevel,DefinitionLevel和Values.
  4. PageHeader: 包含一些元數據,諸如編碼和壓縮類型,有多少數據,當前page第一個數據的偏移量,當前Page第一個索引的偏移量,壓縮和解壓的大小
  5. DefinitionLevel: 當前字段在路徑中的深度
  6. RepetitionLevel: 當前字段是否可以重復
  7. Footer:主要當前文件的元數據和一些統計信息

通常情況下,在存儲Parquet數據的時候會按照Block大小設置行組的大小,由於一般情況下每一個Mapper任務處理數據的最小單位是一個Block,這樣可以把每一個行組由一個Mapper任務處理,增大任務執行並行度。Parquet文件的格式:

Definition Level

  1. 指明該列的路徑上有多少個可選的字段被定義了。A.B.C 表示C列這個路徑上有三個可選的字段被定義了。也可以理解為definition Level是該路徑上有定義的repeated field 和optional field的個數,不包括required field,因為requiredfield是必須有定義的嵌套數據的特點是有的字段可以為空,比如optional或者repeated。
  2. 如果一個字段被定義,那么它的所有父節點都是被定義的。我們從root節點開始遍歷,當某一個字段路徑上的節點為空或者我們說已經沒有子節點的節點的時候,我們就記錄下當前的深度作為這個字段的DefinitionLevel. 當一個字段的DefinitionLevel = Max Definition Level,表示這個字段是有數據的。另外,required類型是字段定義的,所以它不需要DefinitionLevel
messageDemo {--- D = 0
  optional group field1 { ----D = 1
    required group fiel2 {----D = 1(required是不使用DefinitionLevel的)
      optional string field3;----D = 2
    }
  }
}

Repetition Level

RepetitionLevel是針對repeated字段的,對於optional和required,是沒有啥關系的。意思就是指在哪一個深度上進行重復。
簡單的說,就是通過數字讓程序明白在路徑中什么repeated字段重復了,以此來確定這個字段的位置
舉個例子:
我們定一個Author的數據模型:

最后生成的數據:

分析:AuthorID:因為該字段是required,必須定義的,所以,它是沒有DefinitionValue,所以都是0

Addresses

因為該字段是repeated,允許0個或多個值,所以DefinitionLevel  = 1;第一個Author的第一個Addresses由於之前沒有重復,是一個新的record,所以RepetitionLevel = 0; 第二個 Addresses由於在之前已經出現過一次,所以它是重復的,重復深度是1,所以RepetitionLevel = 1;

到第二Author的時候,Address是一個新的record,所以沒有重復,RepetitionLevel = 0,DefinitionLevel  = 1

Books.BookID

因為該字段是required,必須定義的,所以,他沒有DefinitionValue,那么他的DefinitionValue和父父節點的DefinitionValue相同,DefinitionValue = 1. 因為Books是Repeated的,但是Books.BookId只出現一次,所以RepetitionLevel = 0。

到第二個Books.BookId的時候,由於之前已經有過Books,所以現在是重復的,只是Books重復,所以重復深度為1,那么此時RepetitionLevel = 1,DefinitionValue = 1. 到第三個Books.BookkId的時候,同樣他也是重復的,重復深度也還是1,所以RepetitionLevel = 1,DefinitionValue = 1.

Books.Price

由於price是optional,所以在樹種有兩個DefinitionLevel=2,由於第一次出現Books.Price,所以RepetitionLevel = 0;

第二個Books.Price的時候,DefinitionLevel=2,但是Books已經是重復的,所以現在RepetitionLevel = 1;第三個沒有Books.Price,所以DefinitionLevel = 1(和Books的DefinitionLevel一樣),RepetitionLevel = 1;

Books.Descs.Type

由於是Required,所以DefinitionLevel沒有,和父節點的DefinitionLevel是一樣的,故DefinitionLevel  = 2;第一次出現Books.Descs.Type,所以RepetitionLevel = 0;第二次出現Books.Descs.Type,由於之前已經存在了Books.Descs,所以現在他重復了,Descs重復深度是2,所以DefinitionLevel  = 2, Repetition Level = 2; 下一個Books.Descs.Type由於沒有Descs,所以DefinitionLevel  = 1,Repetition Level只是Books重復,所以深度為1,值為NULL;到下一個Books.Descs.Type,由於只是Books重復,所以重復深度為1,DefinitionLevel  = 2

Metadata

7、RCFile

create table if not exists rcfile_table(
    site string,
    url  string,
    pv   bigint,
    label string)
row format delimited fields terminated by '\t'
stored as rcfile;

7.1 RCFile的設計思想

RCFile結合列存儲和行存儲的優缺點,Facebook於是提出了基於行列混合存儲的RCFile,該存儲結構遵循的是“先水平划分,再垂直划分”的設計理念。先將數據按行水平划分為組,這樣一行的數據就可以保證存儲在同一個集群節點;然后在對行進行垂直划分。 
RCFile是在Hadoop HDFS之上的存儲結構,該結構強調: 

  1. RCFile存儲的表是水平划分的,分為多個行組,每個行組再被垂直划分,以便每列單獨存儲; 
  2. RCFile在每個行組中利用一個列維度的數據壓縮,並提供一種Lazy解壓(decompression)技術來在查詢執行時避免不必要的列解壓; 
  3. RCFile支持彈性的行組大小,行組大小需要權衡數據壓縮性能查詢性能兩方面。
  4. RCFile的每個行組中,元數據頭部表格數據段(每個列被獨立壓縮)分別進行壓縮,RCFile使用重量級的Gzip壓縮算法,是為了獲得較好的壓縮比。另外在由於Lazy壓縮策略,當處理一個行組時,RCFile只需要解壓使用到的列,因此相對較高的Gzip解壓開銷可以減少。 
  5. RCFile具備相當於行存儲的數據加載速度和負載適應能力,在讀數據時可以在掃描表格時避免不必要的列讀取,它比其他結構擁有更好的性能,使用列維度的壓縮能夠有效提升存儲空間利用率。

7.2 源碼分析

通常而言,RCFile文件的整個寫入過程大致可以分為三步:

  1. 構建RCFile.Writer實例——Writer(...)
  2. 通過RCFile.Writer實例寫入數據——append
  3. 關閉RCFile.Writer實例——close

我們也按照這三步來分析相應的源碼。

1. Writer

Writer在構建函數中大體做了以下三件事情:
1)初始化一些變量值;
a. RECORD_INTERVAL:表示多少“行”數據形成一個Row Split(Record)和columnsBufferSize配合使用;
b. columnNumber:表示當前RCFile文件存儲着多少“列”的數據;
c. Metadata:Metadata實例僅僅保存一個屬性“hive.io.rcfile.column.number”,值為columnNumber,該實例會被序列化到RCFile文件頭部;
d. columnsBufferSize:緩存數目(行數)上限閥值,超過這個數值就會將緩存的數據(行)形成一個Row Split(Record);
2)構建一些數據結構;
a. columnValuePlainLength:保存着一個Row Split(Record)內部各列原始數據的大小;
b. columnBuffers:保存着一個Row Split(Record)內部各列原始數據;
c. key:保存着一個Row Split(Record)的元數據;
d. plainTotalColumnLength:保存着一個RCFile文件內各列原始數據的大小;
e. comprTotalColumnLength:保存着一個RCFile文件內各列原始數據被壓縮后的大小;

3)初始化文件輸出流,並寫入文件頭部信息;
a. 初始化RCFile文件輸出流(FSDataOutputStream);useNewMagic默認值為true,本文也以此默認值進行討論。
b. initializeFileHeader;1. 寫出MAGIC;2.  寫出當前RCFile版本號(不同版本的RCFile具有不同的格式);
c. writeFileHeader;1. 寫出是否使用壓縮,本文按使用壓縮討論;2. 寫出壓縮編/解碼器(CompressionCodec)類名;3. 序列化Metadata實例;
d. finalizeFileHeader;

寫出一個“同步標志位”,表示RCFile文件頭部信息到此結束。

我們可以得出RCFile Header的結構如下:

 version 3 bytes of magic header “RCF”, followed by 1 byte of actual version number
compression  A boolean which specifies if compression is turned on for keys/values in this file
compression codec CompressionCodec class which is used for compression of keys and/or values
metadata Metadata for this file
sync A sync marker to denote end of the header

2. append

RCFile.Writer寫入數據時要求以BytesRefArrayWritable實例的形式進行“追加”,亦即一個BytesRefArrayWritable實例表示一“行”數據。
“追加”“行”數據的過程如下:
1)從一“行”數據(即BytesRefArrayWritable實例val)中解析出各“列”數據緩存到對應的ColumnBuffer(即columnBuffers[i])中;如果這“行”數據包含的“列”小於columnNumber,則缺失的列會被填充為“空值”(即BytesRefWritable.ZeroBytesRefWritable);


我們可以看出,RCFile在“追加”數據的時候還是以“行”的方式進行,“行轉列”是在內部進行轉換的。轉換之后的列數據(列數為columnNumber)被緩存到各自的“Buffer”中,也就是說每一列都有自己獨立的緩存區(ColumnBuffer),這是為后來的“列式存儲”作准備的。
ColumnBuffer
這里重點介紹一下這個ColumnBuffer,它的作用就是用來緩存“列數據”的,
內部包含兩個實例變量,如它們的變量名稱所言,它們實際也是用來緩存數據的,columnValBuffer用來緩存“列值”的數據valLenBuffer用來緩存“列值”各自的長度,這兩個內部的緩存區都是NonSyncDataOutputBuffer實例。

從這三部分代碼可以看出,NonSyncDataOutputBuffer內部的緩存區實際是使用內存中的一個字節數組(buf)構建的,而且繼承自DataOutputStream,方便我們使用“流”的形式操作數據。而且valLenBuffer在緩存“列值”的長度的時候,為了有效的節約存儲空間,使用了一個技巧,也就是說,如果需要保存的“列值”長度為“1,1,1,2”,需要存儲四個整數,而且前面三個整數的值是一樣的,那么我們將其變為“1,~2,2”,“~2”即表示我們需要將它前面的整數“1”重復兩次。如果數據的重復度較高,這種方式會節省大量的存儲空間。
RowSplit
2)一“行”數據轉換為多“列”數據,並被緩存到各自對應的緩存區之后,需要進行兩個判斷:

  1. 緩存的“列”數據(這里指columnBuffers中的全部列數據)大小是否超過上限閥值columnsBufferSize?
  2. 緩存的“行”記錄數目是否超過上限閥值RECORD_INTERVAL?

如果上述兩者條件滿足其一,我們認為已經緩存足夠多的數據,可以將緩存區的這些數據形成一個Row Split或Record,進行“溢寫”。
這兩個上限閥值(columnsBufferSize、RECORD_INTERVAL)也提示我們在實際應用中需要根據實際情況對這兩個值進行調整。
“溢寫”是通過flushRecords進行的,可以說是整個RCFile寫入過程中最為“復雜”的操作。

前面提到過,RCFile Record(Row Split)實際是由Key、Value組成的,現在這些“列”數據已經被緩存到columnBuffers中,那么Key的數據在哪里呢?
這個Key實際上就是這個Row Split(Record)的元數據,也可以理解為Row Split(Record)的索引,它是由KeyBuffer表示的,
columnNumber:列數;
numberRows:RCFile Record(Row Split)內部存儲着多少“行”數據,同一個RCFile文件,不同的Record內保存的行數可能不同;
RCFile Record Value實際就是前面提到的columnBuffers中的那些列值(可能經過壓縮處理),這些columnBuffers的元數據由以下三個變量表示:

  1. eachColumnValueLen:eachColumnValueLen[i]表示columnBuffers[i]中緩存的列數據(原始數據)的總大小;
  2. eachColumnUncompressedValueLen:eachColumnUncompressedValueLen[i]表示columnBuffers[i]中的緩存的列數據被壓縮之后的總大小;如果沒有經過壓縮處理,該值與columnBuffers[i]相同;
  3. allCellValLenBuffer:allCellValLenBuffer[i]表示columnBuffers[i]中那些列數據各自的長度(注意前方提到的這些長度的保存技巧);

KeyBuffer被序列化之后,它的結構如下:

numberRows Number_of_rows_in_this_record(vint)
columnValueLen Column_1_ondisk_compressed_length(vint)
columnUncompressedValueLen Column_1_ondisk_uncompressed_length(vint)
Column_1_row_1_value_plain_length  
Column_1_row_2_value_plain_length  
...  
columnValueLen Column_2_ondisk_compressed_length(vint)
columnUncompressedValueLen Column_2_ondisk_uncompressed_length(vint)
Column_2_row_1_value_plain_length  
Column_2_row_2_value_plain_length  
...  

RCFile的索引機制


注意到上面的多個columnValueLen(columnUncompressedValueLen),它保存着Record Value內多個列(簇)各自的總長度,而每個columnValueLen(columnUncompressedValueLen)后面保存着該列(簇)內多個列值各自的長度。如果我們僅僅需要讀取第n列的數據,我們可以根據columnValueLen(columnUncompressedValueLen)直接跳過Record Value前面(n - 1)列的數據。


KeyBuffer的數據是在“溢寫”的過程中被構建的。

flushRecords的具體邏輯

key是KeyBuffer的實例,相當於在元數據中記錄這個Row Split(Record)的“行數”;
這段代碼在使用壓縮的場景下才有意義,它構建了一個緩存區valueBuffer,並且使用“裝飾器”模式構建了一個壓縮輸出流,用於后期將columnBuffers中的數據寫入緩存區valueBuffer,valueBuffer中的數據是壓縮過的
接下來就是逐個處理columnBuffers中的數據,簡要來說,對於某個columnBuffers[i]而言需要做兩件事情:
1)如果使用壓縮,需要將columnBuffers[i]的數據通過壓縮輸出流deflateOut寫入valueBuffer中;
2)維護相關的幾個變量值;


 這段代碼看似較長,對於某個columnBuffers[i]而言,實際做的事情可以概括為四步:
1)如果使用壓縮,將columnBuffers[i]中的全部數據寫入deflateOut(實際是valueBuffer);
2)記錄columnBuffers[i]經過壓縮之后的長度colLen;如果沒有使用使用壓縮,則該值與原始數據長度相同;
3)記錄columnBuffers[i]相關元數據:columnBuffers[i]壓縮/未壓縮數據的長度、columnBuffers[i]中各個列值的長度;
4)維護plainTotalColumnLength、comprTotalColumnLength;
代碼至此,一個Record(Row Split)的所有元數據已構建完畢;如果啟用壓縮,columnBuffers中的數據已全部被壓縮寫入valueBuffer,接下來就是Record Key、Value的“持久化”。

RCFile的Sync機制

比如我們有一個“大”的文本文件,需要使用MapReduce進行分析。Hadoop MapReduce在提交Job之前會將這個大的文本文件根據“切片”大小(假設為128M)進行“切片”,每一個MapTask處理這個文件的一個“切片”(這里不考慮處理多個切片的情況),也就是這個文件的一部分數據。文本文件是按行進行存儲的,那么MapTask從某個“切片”的起始處讀取文件數據時,如何定位一行記錄的起始位置呢?


畢竟“切片”是按照字節大小直接切分的,很有可能正好將某行記錄“切斷”。這時就需要有這樣的一個“sync”,相當於一個標志位的作用,讓我們可以識別一行記錄的起始位置,對於文本文件而言,這個“sync”就是換行符。所以,MapTask從某個“切片”的起始處讀取數據時,首先會“過濾”數據,直到遇到一個換行符,然后才開始讀取數據;如果讀取某行數據結束之后,發現“文件游標”超過該“切片”的范圍,則讀取結束。


RCFile同樣也需要這樣的一個“sync”,對於文本文件而言,是每行文本一個“sync”;RCFile是以Record為單位進行存儲的,但是並沒有每個Record使用一個“sync”,而是兩個“sync”之間有一個間隔限制SYNC_INTERVAL,
SYNC_INTERVAL = 100 * (4 + 16)
 每次開始輸出下一個Record的數據之前,都會計算當前文件的輸出位置相對於上個“sync”的偏移量,如果超過SYNC_INTERVAL就輸出一個“sync”。
ii. write total record length、key portion length
iii. write keyLength、keyBuffer
注意這里的keyLength與ii中的keyLength不同:ii中的keyLength相當於記錄的是keyBuffer原始數據的長度;而iii中的keyLength相當於記錄的是keyBuffer原始數據被壓縮之后的長度,如果沒有壓縮,該值與ii中的keyLength相同。

代碼至此,我們就完成了一個Row Split(Record)的輸出。
最后就是清空相關記錄,為下一個Row Split(Record)的緩存輸出作准備,

RCFileclose過程

RCFile文件的“關閉”操作大致可分為兩步:
1)如果緩存區中仍有數據,調用flushRecords將數據“溢寫”出去;
2)關閉文件輸出流。

數據讀取和Lazy解壓

在MapReduce框架中,mapper將順序處理HDFS塊中的每個行組。當處理一個行組時,RCFile無需全部讀取行組的全部內容到內存。相反,它僅僅讀元數據頭部和給定查詢需要的列。因此,它可以跳過不必要的列以獲得列存儲的I/O優勢。(例如,表tbl(c1, c2, c3, c4)有4個列,做一次查詢“SELECT c1 FROM tbl WHERE c4 = 1”,對每個行組,RCFile僅僅讀取c1和c4列的內容。).在元數據頭部和需要的列數據加載到內存中后,它們需要解壓。元數據頭部總會解壓並在內存中維護直到RCFile處理下一個行組。然而,RCFile不會解壓所有加載的列,相反,它使用一種Lazy解壓技術。

Lazy解壓意味着列將不會在內存解壓,直到RCFile決定列中數據真正對查詢執行有用。由於查詢使用各種WHERE條件,Lazy解壓非常有用。如果一個WHERE條件不能被行組中的所有記錄滿足,那么RCFile將不會解壓WHERE條件中不滿足的列。例如,在上述查詢中,所有行組中的列c4都解壓了。然而,對於一個行組,如果列c4中沒有值為1的域,那么就無需解壓列c1。

行組大小

I/O性能是RCFile關注的重點,因此RCFile需要行組夠大並且大小可變。行組大小和下面幾個因素相關。

  1. 行組大的話,數據壓縮效率會比行組小時更有效。根據對Facebook日常應用的觀察,當行組大小達到一個閾值后,增加行組大小並不能進一步增加Gzip算法下的壓縮比。
  2. 行組變大能夠提升數據壓縮效率並減少存儲量。因此,如果對縮減存儲空間方面有強烈需求,則不建議選擇使用小行組。需要注意的是,當行組的大小超過4MB,數據的壓縮比將趨於一致。
  3. 盡管行組變大有助於減少表格的存儲規模,但是可能會損害數據的讀性能,因為這樣減少了Lazy解壓帶來的性能提升。而且行組變大會占用更多的內存,這會影響並發執行的其他MapReduce作業。考慮到存儲空間和查詢效率兩個方面,Facebook選擇4MB作為默認的行組大小,當然也允許用戶自行選擇參數進行配置。

8、存儲方式和壓縮

在實際的項目開發當中,

hive表的數據存儲格式一般選擇:orc或parquet。

壓縮方式一般選擇snappy,lzo。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM