RCFile
RCFile全稱Record Columnar File,列式記錄文件,是一種類似於SequenceFile的鍵值對(Key/Value Pairs)數據文件。
關鍵詞:Record、Columnar、Key、Value。
RCFile的優勢在哪里?適用於什么場景?為了讓大家有一個感性的認識,我們來看一個例子。
假設我們有這樣一張9行3列的Hive數據表table,以普通的TextFile進行存儲,

現在我們需要統計這張數據表的第二列(col2)值為“row5_col2”的出現次數,我們通常會這樣寫SQL:
select count(*) from table where col2 = 'row5_col2'
這條Hive SQL轉換為相應的MapReduce程序執行時,雖然我們僅僅只需要查詢該表的第2列數據即可得出結果,但因為我們使用的是TextFile存儲格式,不得不讀取整張數據表的數據參與計算。雖然我們可以使用一些壓縮機制優化存儲,減少讀取的數據量,但效果通常不顯著,而且畢竟讀取了很多無用的數據(col1、col3)。
再來看一下RCFile會如何存儲這張數據表的數據?宏觀上大致可以分為以下三步:
(1)水平划分;

經過水平划分之后的各個數據塊稱之為Row Split或Record。
(2)垂直划分;

每一個Row Split或Record再按照“列”進行垂直划分。
(3)列式存儲;

RCFile以Record為單位進行存儲。
Record存儲數據時,首先存儲該Record內第一列的全部數據、然后存儲該Record內第二列的全部數據、…、依次將各列數據存儲完畢,然后繼續下一個Record的存儲。
Record實際由Key、Value兩部分組成,其中Key保存着Record的元數據,如列數、每列數據的長度、每列數據中各個列值的長度等;Value保存着Record各列的數據。實際上Record Key相當於Record的索引,利用它可以輕松的實現Record內部讀取/過濾某些列的操作。
而且RCFile將“行式”存儲變為“列式”存儲,相似的數據以更高的可能性被聚集在一起,壓縮效果更好。
要想詳細掌握一個數據文件的存儲格式,就必須知道數據是通過怎樣的方式被寫入的,讀取僅僅是寫入的反面而已。RCFile分別針對寫入和讀取提供了相應的Writer類和Reader類,本文僅僅討論Writer類的實現。
源碼分析
通常而言,RCFile文件的整個寫入過程大致可以分為三步:

(1)構建RCFile.Writer實例——Writer(...)
(2)通過RCFile.Writer實例寫入數據——append
(3)關閉RCFile.Writer實例——close
我們也按照這三步來分析相應的源碼。
1. Writer

Writer在構建函數中大體做了以下三件事情:
(1)初始化一些變量值;
a. RECORD_INTERVAL:表示多少“行”數據形成一個Row Split(Record)和columnsBufferSize配合使用;
b. columnNumber:表示當前RCFile文件存儲着多少“列”的數據;
c. Metadata

Metadata實例僅僅保存一個屬性“hive.io.rcfile.column.number”,值為columnNumber,該實例會被序列化到RCFile文件頭部;
d. columnsBufferSize:緩存數目(行數)上限閥值,超過這個數值就會將緩存的數據(行)形成一個Row Split(Record);
(2)構建一些數據結構;
a. columnValuePlainLength:保存着一個Row Split(Record)內部各列原始數據的大小;
b. columnBuffers:保存着一個Row Split(Record)內部各列原始數據;
c. key:保存着一個Row Split(Record)的元數據;
d. plainTotalColumnLength:保存着一個RCFile文件內各列原始數據的大小;
e. comprTotalColumnLength:保存着一個RCFile文件內各列原始數據被壓縮后的大小;
(3)初始化文件輸出流,並寫入文件頭部信息;
a. 初始化RCFile文件輸出流(FSDataOutputStream);


useNewMagic默認值為true,本文也以此默認值進行討論。
b. initializeFileHeader;

i. 寫出MAGIC;
ii. 寫出當前RCFile版本號(不同版本的RCFile具有不同的格式);
c. writeFileHeader;

i. 寫出是否使用壓縮,本文按使用壓縮討論;
ii. 寫出壓縮編/解碼器(CompressionCodec)類名;
iii. 序列化Metadata實例;
c. finalizeFileHeader;

寫出一個“同步標志位”,表示RCFile文件頭部信息到此結束。
我們可以得出RCFile Header的結構如下:
version | 3 bytes of magic header “RCF”, followed by 1 byte of actual version number |
compression | A boolean which specifies if compression is turned on for keys/values in this file |
compression codec | CompressionCodec class which is used for compression of keys and/or values |
metadata | Metadata for this file |
sync | A sync marker to denote end of the header |
2. append

RCFile.Writer寫入數據時要求以BytesRefArrayWritable實例的形式進行“追加”,亦即一個BytesRefArrayWritable實例表示一“行”數據。
“追加”“行”數據的過程如下:
(1)從一“行”數據(即BytesRefArrayWritable實例val)中解析出各“列”數據緩存到對應的ColumnBuffer(即columnBuffers[i])中;如果這“行”數據包含的“列”小於columnNumber,則缺失的列會被填充為“空值”(即BytesRefWritable.ZeroBytesRefWritable);
我們可以看出,RCFile在“追加”數據的時候還是以“行”的方式進行,“行轉列”是在內部進行轉換的。轉換之后的列數據(列數為columnNumber)被緩存到各自的“Buffer”中,也就是說每一列都有自己獨立的緩存區(ColumnBuffer),這是為后來的“列式存儲”作准備的。
這里重點介紹一下這個ColumnBuffer,它的作用就是用來緩存“列數據”的,

內部包含兩個實例變量,如它們的變量名稱所言,它們實際也是用來緩存數據的,columnValBuffer用來緩存“列值”的數據,valLenBuffer用來緩存“列值”各自的長度,這兩個內部的緩存區都是NonSyncDataOutputBuffer實例。



從這三部分代碼可以看出,NonSyncDataOutputBuffer內部的緩存區實際是使用內存中的一個字節數組(buf)構建的,而且繼承自DataOutputStream,方便我們使用“流”的形式操作數據。
而且valLenBuffer在緩存“列值”的長度的時候,為了有效的節約存儲空間,使用了一個技巧,

也就是說,如果需要保存的“列值”長度為“1,1,1,2”,需要存儲四個整數,而且前面三個整數的值是一樣的,那么我們將其變為“1,~2,2”,“~2”即表示我們需要將它前面的整數“1”重復兩次。如果數據的重復度較高,這種方式會節省大量的存儲空間。
(2)一“行”數據轉換為多“列”數據,並被緩存到各自對應的緩存區之后,需要進行兩個判斷:
緩存的“列”數據(這里指columnBuffers中的全部列數據)大小是否超過上限閥值columnsBufferSize?
緩存的“行”記錄數目是否超過上限閥值RECORD_INTERVAL?
如果上述兩者條件滿足其一,我們認為已經緩存足夠多的數據,可以將緩存區的這些數據形成一個Row Split或Record,進行“溢寫”。
這兩個上限閥值(columnsBufferSize、RECORD_INTERVAL)也提示我們在實際應用中需要根據實際情況對這兩個值進行調整。
“溢寫”是通過flushRecords進行的,可以說是整個RCFile寫入過程中最為“復雜”的操作。
前面提到過,RCFile Record(Row Split)實際是由Key、Value組成的,現在這些“列”數據已經被緩存到columnBuffers中,那么Key的數據在哪里呢?
這個Key實際上就是這個Row Split(Record)的元數據,也可以理解為Row Split(Record)的索引,它是由KeyBuffer表示的,

columnNumber:列數;
numberRows:RCFile Record(Row Split)內部存儲着多少“行”數據,同一個RCFile文件,不同的Record內保存的行數可能不同;
RCFile Record Value實際就是前面提到的columnBuffers中的那些列值(可能經過壓縮處理),這些columnBuffers的元數據由以下三個變量表示:
eachColumnValueLen:eachColumnValueLen[i]表示columnBuffers[i]中緩存的列數據(原始數據)的總大小;
eachColumnUncompressedValueLen:eachColumnUncompressedValueLen[i]表示columnBuffers[i]中的緩存的列數據被壓縮之后的總大小;如果沒有經過壓縮處理,該值與columnBuffers[i]相同;
allCellValLenBuffer:allCellValLenBuffer[i]表示columnBuffers[i]中那些列數據各自的長度(注意前方提到的這些長度的保存技巧);
KeyBuffer被序列化之后,它的結構如下:
numberRows | Number_of_rows_in_this_record(vint) |
columnValueLen | Column_1_ondisk_compressed_length(vint) |
columnUncompressedValueLen | Column_1_ondisk_uncompressed_length(vint) |
Column_1_row_1_value_plain_length | |
Column_1_row_2_value_plain_length | |
... | |
columnValueLen | Column_2_ondisk_compressed_length(vint) |
columnUncompressedValueLen | Column_2_ondisk_uncompressed_length(vint) |
Column_2_row_1_value_plain_length | |
Column_2_row_2_value_plain_length | |
... |
為什么說這樣的元數據可以當作索引來使用呢?
注意到上面的多個columnValueLen(columnUncompressedValueLen),它保存着Record Value內多個列(簇)各自的總長度,而每個columnValueLen(columnUncompressedValueLen)后面保存着該列(簇)內多個列值各自的長度。如果我們僅僅需要讀取第n列的數據,我們可以根據columnValueLen(columnUncompressedValueLen)直接跳過Record Value前面(n - 1)列的數據。
KeyBuffer的數據是在“溢寫”的過程中被構建的。下面我們來詳細分析flushRecords的具體邏輯。

key是KeyBuffer的實例,相當於在元數據中記錄這個Row Split(Record)的“行數”;

這段代碼在使用壓縮的場景下才有意義,它構建了一個緩存區valueBuffer,並且使用“裝飾器”模式構建了一個壓縮輸出流,用於后期將columnBuffers中的數據寫入緩存區valueBuffer,valueBuffer中的數據是壓縮過的(后續會看到這個過程)。
接下來就是逐個處理columnBuffers中的數據,簡要來說,對於某個columnBuffers[i]而言需要做兩件事情:
(1)如果使用壓縮,需要將columnBuffers[i]的數據通過壓縮輸出流deflateOut寫入valueBuffer中;
(2)維護相關的幾個變量值;

這段代碼看似較長,對於某個columnBuffers[i]而言,實際做的事情可以概括為四步:
(1)如果使用壓縮,將columnBuffers[i]中的全部數據寫入deflateOut(實際是valueBuffer);
(2)記錄columnBuffers[i]經過壓縮之后的長度colLen;如果沒有使用使用壓縮,則該值與原始數據長度相同;
(3)記錄columnBuffers[i]相關元數據:columnBuffers[i]壓縮/未壓縮數據的長度、columnBuffers[i]中各個列值的長度;

(4)維護plainTotalColumnLength、comprTotalColumnLength;
代碼至此,一個Record(Row Split)的所有元數據已構建完畢;如果啟用壓縮,columnBuffers中的數據已全部被壓縮寫入valueBuffer;接下來就是Record Key、Value的“持久化”。
(1)Write the key out


i. checkAndWriteSync


這里需要先說一下為什么需要這個“sync”?
比如我們有一個“大”的文本文件,需要使用Hadoop MapReduce進行分析。Hadoop MapReduce在提交Job之前會將這個大的文本文件根據“切片”大小(假設為128M)進行“切片”,每一個MapTask處理這個文件的一個“切片”(這里不考慮處理多個切片的情況),也就是這個文件的一部分數據。文本文件是按行進行存儲的,那么MapTask從某個“切片”的起始處讀取文件數據時,如何定位一行記錄的起始位置呢?畢竟“切片”是按照字節大小直接切分的,很有可能正好將某行記錄“切斷”。這時就需要有這樣的一個“sync”,相當於一個標志位的作用,讓我們可以識別一行記錄的起始位置,對於文本文件而言,這個“sync”就是換行符。所以,MapTask從某個“切片”的起始處讀取數據時,首先會“過濾”數據,直到遇到一個換行符,然后才開始讀取數據;如果讀取某行數據結束之后,發現“文件游標”超過該“切片”的范圍,則讀取結束。
RCFile同樣也需要這樣的一個“sync”,對於文本文件而言,是每行文本一個“sync”;RCFile是以Record為單位進行存儲的,但是並沒有每個Record使用一個“sync”,而是兩個“sync”之間有一個間隔限制SYNC_INTERVAL,

SYNC_INTERVAL = 100 * (4 + 16)
每次開始輸出下一個Record的數據之前,都會計算當前文件的輸出位置相對於上個“sync”的偏移量,如果超過SYNC_INTERVAL就輸出一個“sync”。
那么這個“sync”是什么呢?

也就是說,RCFile的“sync”就是一個長度為16字節的隨機字節串,這里不討論UID的生成過程。
ii. write total record length、key portion length

iii. write keyLength、keyBuffer

注意這里的keyLength與ii中的keyLength不同:ii中的keyLength相當於記錄的是keyBuffer原始數據的長度;而iii中的keyLength相當於記錄的是keyBuffer原始數據被壓縮之后的長度,如果沒有壓縮,該值與ii中的keyLength相同。
在這塊代碼之前,還涉及到一個對keyBuffer的壓縮過程(如果啟用壓縮),它與ColumnBuffer的壓縮過程是類似的,不再贅述。
從上面的代碼可以看出,在Record Key(KeyBuffer)之前,還存在這樣的一個結構,相當於Record Header:
recordLen | Record length in bytes |
keyLength | Key length in bytes |
compressedKeyLen | Compressed Key length in bytes |
(2)Write the value out

如果啟用壓縮,直接寫出valueBuffer中的壓縮數據即可;如果未啟用壓縮,需要將columnBuffers中的數據逐個寫出。
RCFile Record Value的結構實際上就是各個“列簇”的列值,如下:
column_1_row_1_value |
column_1_row_2_value |
... |
column_2_row_1_value |
column_2_row_1_value |
... |
代碼至此,我們就完成了一個Row Split(Record)的輸出。
最后就是清空相關記錄,為下一個Row Split(Record)的緩存輸出作准備,

3. close

RCFile文件的“關閉”操作大致可分為兩步:
(1)如果緩存區中仍有數據,調用flushRecords將數據“溢寫”出去;
(2)關閉文件輸出流。
代碼示例
1. Write
(1)構建Writer實例;

注意,一定要在Hadoop Configuration中通過屬性hive.io.rcfile.column.number.conf設置RCFile的“列數”。
(2)構建多行數據;

每行數據使用一個BytesRefArrayWritable實例表示。
(3)Writer append;

(4)Writer close;

2. Read

讀取時需要注意,RCFileRecordReader的構造函數要求指定一個“切片”,如果我們需要讀取整個文件的數據,就需要將整個文件打造成為一個“切片”(如上);RCFileRecordReader實例構建好之后,就可以通過next()不斷迭代key、value,其中key為行數,value為行記錄。
代碼輸出

如果我們僅僅需要讀取第1列和第3列的數據,應該怎么做呢?

通過這樣的設置,我們可以得到如下的輸出結果:

可以注意到,雖然讀取的還是3列數據,但第2列的數據已經被返回“空值”。