本文來自網易雲社區
作者:汪勝
相關概念介紹
為了了解Kylin存儲和查詢的分片問題,需要先介紹兩個重要概念:segment和cuboid。相信大數據行業的相關同學都不陌生。Kylin每次提交一個新的build任務都會生成一個新的segment,而用戶一般都是每天構建一次。那么,這種情況下,每天都會生成一個新的segment,用來保存昨天的數據。 Kylin的核心思想是預聚合,就是將用戶預先定義的維度組合計算出來,然后保存到HBase中。這樣查詢的時候就可以直接查詢預先計算好的結果,速度非常快。這里的維度組合就是cuboid。Kylin在構建過程中,會產生很多的cuboid數據(每一種cuboid都對應着一種維度組合),這些數據最終都會以HFile的形式存儲在HBase中。Kylin對於每一個cuboid都會有一個唯一的id(一個cube的所有segment都有着相同的cuboid和cuboid id)。而這個id就是根據用戶在定義cube時,維度列的排序來確定的。下面來舉一個簡單的例子。假設表一共有三列ABC,那么所有的cuboid組合就是:
cuboid | cuboid_id |
---|---|
ABC | 7(111) |
AB | 6(110) |
BC | 5(101) |
AC | 4(100) |
A | 3(011) |
B | 2(010) |
C | 1(001) |
其中,cube的維度列順序為A,B,C,括號里面的是id對應的二進制,用戶可以在構建cube的時候進行排序。最終數據在HBase中存儲的時候,rowkey也就是按這個順序將這些維度值組合起來(rowkey還包含其他一些成員,這里不展開)。一般推薦將用戶經常使用或者基數很大的維度放在前面,這樣在查詢的時候有利用提高掃描效率。
存儲分片問題
Kylin在build過程中,每一個cuboid的數據都會被分到若干個分片中(這里的分片就對應HBase中的region)。對於每個segment都會保存cuboidShardNums和totalShards成員。如下所示:
//key表示cuboid的id,value表示該cuboid占用的region數private Map<Long, Short> cuboidShardNums = Maps.newHashMap();//該segment占用的region總數private int totalShards = 0;
請注意,一個region可能會存儲多個cuboid數據,因此cuboid和region之間是多對多的關系。 Kylin可以通過下面三個配置項來控制生成build過程中生成的region相關信息:
//單個region的大小kylin.storage.hbase.region-cut-gb//region最小數量kylin.storage.hbase.min-region-count//region最大數量kylin.storage.hbase.max-region-count
通過上面這三個配置項,我們就可以控制每個build過程中生成的region數量和大小,從而進行相應的優化。segment的分片信息也會收到這幾個參數的影響。具體如下:
float cut = cubeDesc.getConfig().getKylinHBaseRegionCut();int nRegion = Math.round((float) (totalSizeInM / (cut * 1024L))); nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion); nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion);//省略余下部分代碼
其中,cut就是通過kylin.storage.hbase.region-cut-gb來設置的region分割閾值,totalSizeInM是本次build過程中生成的數據大小(所有cuboid數據之和),這樣就可以求出每個segment對應的totalShards大小,即nRegion。再通過如下代碼便可以求出每個cuboid所占用的分片數:
int mbPerRegion = (int) (totalSizeInM / nRegion);for (long cuboidId : allCuboids) { double estimatedSize = cubeSizeMap.get(cuboidId); double magic = 23; int shardNum = (int) (estimatedSize * magic / mbPerRegion + 1); if (shardNum < 1) { shardNum = 1; } //省略余下部分代碼}
首先求出每個region的實際大小mbPerRegion,然后根據每個cuboid的數據大小estimatedSize就可以求出每個cuboid所占的region數,即shardNum。這里使用了一個magic,這是為了將cuboid數據盡量分散到多個region中,這樣在查詢的時候就可以多個region並行掃描,提高查詢效率。 搞定cuboidShardNums和totalShards之后,還需要確定每個cuboid存儲數據的起始region(再通過region數shardNum便可以確定指定cuboid的所有數據分布的位置)。這里主要就是根據cuboid id和region總數來獲取每個cuboid存儲起始region id,具體不再展開,有興趣的同學可以自行查看源(ShardingHash.java)。
short startShard = ShardingHash.getShard(cuboidId, nRegion);
Segment使用cuboidBaseShards成員來保存cuboid id和起始region id的映射,如下所示:
private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap();
這樣一來,就基本搞定了Kylin build過程中,segment的存儲分片問題。
查詢分片問題
當新的segment生成之后,我們就可以查詢其中的數據了。從上面的分析中我們得知,每一個segment的構建結果其實就是多個cuboid的數據集合。那么,當我們進行查詢的時候,Kylin會根據sql中的列來獲取到最佳匹配的cuboid(join情況下可能會存在多個匹配的cuboid)。然后根據篩選出來的cuboid id去對應的segment中進行掃描。Kylin對於每一個待掃描的segment都會生成一個CubeSegmentScanner。在對每個segment進行掃描的時候,首先需要根據篩選到的cuboid id去獲取相應的region信息(主要是起始region id和region數)。主要處理邏輯如下所示:
//傳入的三個參數都可以通過cuboid id去相應的segment中獲取private List<Pair<byte[], byte[]>> getEPKeyRanges(short baseShard, short shardNum, int totalShards) { if (shardNum == 0) { return Lists.newArrayList(); } if (shardNum == totalShards) { //該cuboid的數據分布在所有的region中 return Lists.newArrayList(Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (shardNum - 1)))); } else if (baseShard + shardNum <= totalShards) { //該cuboid的數據分布在id連續的region中 return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (baseShard + shardNum - 1)))); } else { //0,1,2,3,4 存儲在 4,0這種情況 return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))), Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1)))); } }private byte[] getByteArrayForShort(short v) { byte[] split = new byte[Bytes.SIZEOF_SHORT]; BytesUtil.writeUnsigned(v, split, 0, Bytes.SIZEOF_SHORT); return split; }
這樣就可以獲取每個segment需要掃描的region,由於Kylin目前的數據都存儲在HBase當中,因此掃描的過程都在HBase中進行。對於每個region,kylin都會啟動一個線程來向HBase發送掃描請求,然后將所有掃描的結果返回,聚合之后再返回上一層。為了加快掃描效率,Kylin還使用了HBase的coprocessor來對每個region的掃描結果進行預聚合。關於coprocessor的相關知識這里就不再介紹,可參考源碼(CubeHBaseEndpointRPC.java和CubeVisitService.java)。
到這里,關於Kylin存儲和查詢的分片問題就整理的差不多了,本文省略了一些Kylin在使用HBase進行存儲時的一些相關細節,后續會陸續補充上來,有感興趣的同學可以一起交流學習。
網易雲免費體驗館,0成本體驗20+款雲產品!
更多網易研發、產品、運營經驗分享請訪問網易雲社區。
相關文章:
【推薦】 3分鍾掌握一個有數小技能:收入貢獻分析
【推薦】 【專家坐堂】四種並發編程模型簡介