Hadoop RCFile存儲格式詳解(源碼分析、代碼示例)


RCFile
 
RCFile全稱Record Columnar File,列式記錄文件,是一種類似於SequenceFile的鍵值對(Key/Value Pairs)數據文件。
 
關鍵詞:Record、Columnar、Key、Value。
 
RCFile的優勢在哪里?適用於什么場景?為了讓大家有一個感性的認識,我們來看一個例子。
 
假設我們有這樣一張9行3列的Hive數據表table,以普通的TextFile進行存儲,
 
 
現在我們需要統計這張數據表的第二列(col2)值為“row5_col2”的出現次數,我們通常會這樣寫SQL:
 
select count(*) from table where col2 = 'row5_col2'
 
這條Hive SQL轉換為相應的MapReduce程序執行時,雖然我們僅僅只需要查詢該表的第2列數據即可得出結果,但因為我們使用的是TextFile存儲格式,不得不讀取整張數據表的數據參與計算。雖然我們可以使用一些壓縮機制優化存儲,減少讀取的數據量,但效果通常不顯著,而且畢竟讀取了很多無用的數據(col1、col3)。
 
再來看一下RCFile會如何存儲這張數據表的數據?宏觀上大致可以分為以下三步:
 
(1)水平划分;
 
 
經過水平划分之后的各個數據塊稱之為Row Split或Record。
 
(2)垂直划分;
 
 
每一個Row Split或Record再按照“列”進行垂直划分。
 
(3)列式存儲;
 
 
RCFile以Record為單位進行存儲。
 
Record存儲數據時,首先存儲該Record內第一列的全部數據、然后存儲該Record內第二列的全部數據、…、依次將各列數據存儲完畢,然后繼續下一個Record的存儲。
 
Record實際由Key、Value兩部分組成,其中Key保存着Record的元數據,如列數、每列數據的長度、每列數據中各個列值的長度等;Value保存着Record各列的數據。實際上Record Key相當於Record的索引,利用它可以輕松的實現Record內部讀取/過濾某些列的操作。
 
而且RCFile將“行式”存儲變為“列式”存儲,相似的數據以更高的可能性被聚集在一起,壓縮效果更好。
 
要想詳細掌握一個數據文件的存儲格式,就必須知道數據是通過怎樣的方式被寫入的,讀取僅僅是寫入的反面而已。RCFile分別針對寫入和讀取提供了相應的Writer類和Reader類,本文僅僅討論Writer類的實現。
 
源碼分析
 
通常而言,RCFile文件的整個寫入過程大致可以分為三步:
 
 
(1)構建RCFile.Writer實例——Writer(...)
 
(2)通過RCFile.Writer實例寫入數據——append
 
(3)關閉RCFile.Writer實例——close
 
我們也按照這三步來分析相應的源碼。
 
1. Writer
 
 
Writer在構建函數中大體做了以下三件事情:
 
(1)初始化一些變量值;
 
a. RECORD_INTERVAL:表示多少“行”數據形成一個Row Split(Record)和columnsBufferSize配合使用;
 
b. columnNumber:表示當前RCFile文件存儲着多少“列”的數據;
 
c. Metadata
 
 
Metadata實例僅僅保存一個屬性“hive.io.rcfile.column.number”,值為columnNumber,該實例會被序列化到RCFile文件頭部;
 
d. columnsBufferSize:緩存數目(行數)上限閥值,超過這個數值就會將緩存的數據(行)形成一個Row Split(Record);
     
(2)構建一些數據結構;
 
a. columnValuePlainLength:保存着一個Row Split(Record)內部各列原始數據的大小;
 
b. columnBuffers:保存着一個Row Split(Record)內部各列原始數據;
 
c. key:保存着一個Row Split(Record)的元數據;
 
d. plainTotalColumnLength:保存着一個RCFile文件內各列原始數據的大小;
 
e. comprTotalColumnLength:保存着一個RCFile文件內各列原始數據被壓縮后的大小;
 
(3)初始化文件輸出流,並寫入文件頭部信息;
 
a. 初始化RCFile文件輸出流(FSDataOutputStream);
 
 
 
useNewMagic默認值為true,本文也以此默認值進行討論。
 
b. initializeFileHeader;
 
 
i. 寫出MAGIC;
ii. 寫出當前RCFile版本號(不同版本的RCFile具有不同的格式);
 
c. writeFileHeader;
 
 
i. 寫出是否使用壓縮,本文按使用壓縮討論;
ii. 寫出壓縮編/解碼器(CompressionCodec)類名;
iii. 序列化Metadata實例;
 
c. finalizeFileHeader;
 
 
寫出一個“同步標志位”,表示RCFile文件頭部信息到此結束。
 
我們可以得出RCFile Header的結構如下:
 
version 3 bytes of magic header “RCF”, followed by 1 byte of actual version number
compression  A boolean which specifies if compression is turned on for keys/values in this file
compression codec CompressionCodec class which is used for compression of keys and/or values
metadata Metadata for this file
sync A sync marker to denote end of the header
 
2. append
 
 
RCFile.Writer寫入數據時要求以BytesRefArrayWritable實例的形式進行“追加”,亦即一個BytesRefArrayWritable實例表示一“行”數據。
 
“追加”“行”數據的過程如下:
 
(1)從一“行”數據(即BytesRefArrayWritable實例val)中解析出各“列”數據緩存到對應的ColumnBuffer(即columnBuffers[i])中;如果這“行”數據包含的“列”小於columnNumber,則缺失的列會被填充為“空值”(即BytesRefWritable.ZeroBytesRefWritable);
 
我們可以看出,RCFile在“追加”數據的時候還是以“行”的方式進行,“行轉列”是在內部進行轉換的。轉換之后的列數據(列數為columnNumber)被緩存到各自的“Buffer”中,也就是說每一列都有自己獨立的緩存區(ColumnBuffer),這是為后來的“列式存儲”作准備的。
 
這里重點介紹一下這個ColumnBuffer,它的作用就是用來緩存“列數據”的,
 
 
內部包含兩個實例變量,如它們的變量名稱所言,它們實際也是用來緩存數據的,columnValBuffer用來緩存“列值”的數據,valLenBuffer用來緩存“列值”各自的長度,這兩個內部的緩存區都是NonSyncDataOutputBuffer實例。
 
 
從這三部分代碼可以看出,NonSyncDataOutputBuffer內部的緩存區實際是使用內存中的一個字節數組(buf)構建的,而且繼承自DataOutputStream,方便我們使用“流”的形式操作數據。
 
而且valLenBuffer在緩存“列值”的長度的時候,為了有效的節約存儲空間,使用了一個技巧,
 
 
也就是說,如果需要保存的“列值”長度為“1,1,1,2”,需要存儲四個整數,而且前面三個整數的值是一樣的,那么我們將其變為“1,~2,2”,“~2”即表示我們需要將它前面的整數“1”重復兩次。如果數據的重復度較高,這種方式會節省大量的存儲空間。
 
(2)一“行”數據轉換為多“列”數據,並被緩存到各自對應的緩存區之后,需要進行兩個判斷:
 
緩存的“列”數據(這里指columnBuffers中的全部列數據)大小是否超過上限閥值columnsBufferSize?
 
緩存的“行”記錄數目是否超過上限閥值RECORD_INTERVAL?
 
如果上述兩者條件滿足其一,我們認為已經緩存足夠多的數據,可以將緩存區的這些數據形成一個Row Split或Record,進行“溢寫”。
 
這兩個上限閥值(columnsBufferSize、RECORD_INTERVAL)也提示我們在實際應用中需要根據實際情況對這兩個值進行調整。
 
“溢寫”是通過flushRecords進行的,可以說是整個RCFile寫入過程中最為“復雜”的操作。
 
前面提到過,RCFile Record(Row Split)實際是由Key、Value組成的,現在這些“列”數據已經被緩存到columnBuffers中,那么Key的數據在哪里呢?
 
這個Key實際上就是這個Row Split(Record)的元數據,也可以理解為Row Split(Record)的索引,它是由KeyBuffer表示的,
 
 
columnNumber:列數;
numberRows:RCFile Record(Row Split)內部存儲着多少“行”數據,同一個RCFile文件,不同的Record內保存的行數可能不同;
 
RCFile Record Value實際就是前面提到的columnBuffers中的那些列值(可能經過壓縮處理),這些columnBuffers的元數據由以下三個變量表示:
 
eachColumnValueLen:eachColumnValueLen[i]表示columnBuffers[i]中緩存的列數據(原始數據)的總大小;
eachColumnUncompressedValueLen:eachColumnUncompressedValueLen[i]表示columnBuffers[i]中的緩存的列數據被壓縮之后的總大小;如果沒有經過壓縮處理,該值與columnBuffers[i]相同;
allCellValLenBuffer:allCellValLenBuffer[i]表示columnBuffers[i]中那些列數據各自的長度(注意前方提到的這些長度的保存技巧);
 
KeyBuffer被序列化之后,它的結構如下:
 
numberRows Number_of_rows_in_this_record(vint)
columnValueLen Column_1_ondisk_compressed_length(vint)
columnUncompressedValueLen Column_1_ondisk_uncompressed_length(vint)
Column_1_row_1_value_plain_length  
Column_1_row_2_value_plain_length  
...  
columnValueLen Column_2_ondisk_compressed_length(vint)
columnUncompressedValueLen Column_2_ondisk_uncompressed_length(vint)
Column_2_row_1_value_plain_length  
Column_2_row_2_value_plain_length  
...  
 
為什么說這樣的元數據可以當作索引來使用呢?
 
注意到上面的多個columnValueLen(columnUncompressedValueLen),它保存着Record Value內多個列(簇)各自的總長度,而每個columnValueLen(columnUncompressedValueLen)后面保存着該列(簇)內多個列值各自的長度。如果我們僅僅需要讀取第n列的數據,我們可以根據columnValueLen(columnUncompressedValueLen)直接跳過Record Value前面(n - 1)列的數據。
 
KeyBuffer的數據是在“溢寫”的過程中被構建的。下面我們來詳細分析flushRecords的具體邏輯。
 
 
key是KeyBuffer的實例,相當於在元數據中記錄這個Row Split(Record)的“行數”;
 
 
這段代碼在使用壓縮的場景下才有意義,它構建了一個緩存區valueBuffer,並且使用“裝飾器”模式構建了一個壓縮輸出流,用於后期將columnBuffers中的數據寫入緩存區valueBuffer,valueBuffer中的數據是壓縮過的(后續會看到這個過程)。
 
接下來就是逐個處理columnBuffers中的數據,簡要來說,對於某個columnBuffers[i]而言需要做兩件事情:
 
(1)如果使用壓縮,需要將columnBuffers[i]的數據通過壓縮輸出流deflateOut寫入valueBuffer中;
(2)維護相關的幾個變量值;
 
 
這段代碼看似較長,對於某個columnBuffers[i]而言,實際做的事情可以概括為四步:
 
(1)如果使用壓縮,將columnBuffers[i]中的全部數據寫入deflateOut(實際是valueBuffer);
(2)記錄columnBuffers[i]經過壓縮之后的長度colLen;如果沒有使用使用壓縮,則該值與原始數據長度相同;
(3)記錄columnBuffers[i]相關元數據:columnBuffers[i]壓縮/未壓縮數據的長度、columnBuffers[i]中各個列值的長度;
 
 
(4)維護plainTotalColumnLength、comprTotalColumnLength;
 
代碼至此,一個Record(Row Split)的所有元數據已構建完畢;如果啟用壓縮,columnBuffers中的數據已全部被壓縮寫入valueBuffer;接下來就是Record Key、Value的“持久化”。
 
(1)Write the key out
 
 
 
i. checkAndWriteSync
 
 
 
這里需要先說一下為什么需要這個“sync”?
 
比如我們有一個“大”的文本文件,需要使用Hadoop MapReduce進行分析。Hadoop MapReduce在提交Job之前會將這個大的文本文件根據“切片”大小(假設為128M)進行“切片”,每一個MapTask處理這個文件的一個“切片”(這里不考慮處理多個切片的情況),也就是這個文件的一部分數據。文本文件是按行進行存儲的,那么MapTask從某個“切片”的起始處讀取文件數據時,如何定位一行記錄的起始位置呢?畢竟“切片”是按照字節大小直接切分的,很有可能正好將某行記錄“切斷”。這時就需要有這樣的一個“sync”,相當於一個標志位的作用,讓我們可以識別一行記錄的起始位置,對於文本文件而言,這個“sync”就是換行符。所以,MapTask從某個“切片”的起始處讀取數據時,首先會“過濾”數據,直到遇到一個換行符,然后才開始讀取數據;如果讀取某行數據結束之后,發現“文件游標”超過該“切片”的范圍,則讀取結束。
 
RCFile同樣也需要這樣的一個“sync”,對於文本文件而言,是每行文本一個“sync”;RCFile是以Record為單位進行存儲的,但是並沒有每個Record使用一個“sync”,而是兩個“sync”之間有一個間隔限制SYNC_INTERVAL,
 
 
SYNC_INTERVAL = 100 * (4 + 16)
 
每次開始輸出下一個Record的數據之前,都會計算當前文件的輸出位置相對於上個“sync”的偏移量,如果超過SYNC_INTERVAL就輸出一個“sync”。
 
那么這個“sync”是什么呢?
 
 
也就是說,RCFile的“sync”就是一個長度為16字節的隨機字節串,這里不討論UID的生成過程。
 
ii. write total record length、key portion length
 
 
iii. write keyLength、keyBuffer
 
 
注意這里的keyLength與ii中的keyLength不同:ii中的keyLength相當於記錄的是keyBuffer原始數據的長度;而iii中的keyLength相當於記錄的是keyBuffer原始數據被壓縮之后的長度,如果沒有壓縮,該值與ii中的keyLength相同。
 
在這塊代碼之前,還涉及到一個對keyBuffer的壓縮過程(如果啟用壓縮),它與ColumnBuffer的壓縮過程是類似的,不再贅述。
 
從上面的代碼可以看出,在Record Key(KeyBuffer)之前,還存在這樣的一個結構,相當於Record Header:
 
recordLen Record length in bytes
keyLength Key length in bytes
compressedKeyLen Compressed Key length in bytes
 
(2)Write the value out
 
 
如果啟用壓縮,直接寫出valueBuffer中的壓縮數據即可;如果未啟用壓縮,需要將columnBuffers中的數據逐個寫出。
 
RCFile Record Value的結構實際上就是各個“列簇”的列值,如下:
 
column_1_row_1_value
column_1_row_2_value
...
column_2_row_1_value
column_2_row_1_value
...
 
代碼至此,我們就完成了一個Row Split(Record)的輸出。
 
最后就是清空相關記錄,為下一個Row Split(Record)的緩存輸出作准備,
 
 
3. close
 
 
RCFile文件的“關閉”操作大致可分為兩步:
 
(1)如果緩存區中仍有數據,調用flushRecords將數據“溢寫”出去;
(2)關閉文件輸出流。
 
代碼示例
 
1. Write
 
(1)構建Writer實例;
 
 
注意,一定要在Hadoop Configuration中通過屬性hive.io.rcfile.column.number.conf設置RCFile的“列數”。
 
(2)構建多行數據;
 
 
每行數據使用一個BytesRefArrayWritable實例表示。
 
(3)Writer append;
 
 
(4)Writer close;
 
 
2. Read
 
 
讀取時需要注意,RCFileRecordReader的構造函數要求指定一個“切片”,如果我們需要讀取整個文件的數據,就需要將整個文件打造成為一個“切片”(如上);RCFileRecordReader實例構建好之后,就可以通過next()不斷迭代key、value,其中key為行數,value為行記錄。
 
代碼輸出
 
 
如果我們僅僅需要讀取第1列和第3列的數據,應該怎么做呢?
 
 
通過這樣的設置,我們可以得到如下的輸出結果:
 
 
可以注意到,雖然讀取的還是3列數據,但第2列的數據已經被返回“空值”。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM