一、背景介紹
在KUDU之前,大數據主要以兩種方式存儲;
(1)靜態數據:
以 HDFS 引擎作為存儲引擎,適用於高吞吐量的離線大數據分析場景。這類存儲的局限性是數據無法進行隨機的讀寫。
(2)動態數據:
以 HBase、Cassandra 作為存儲引擎,適用於大數據隨機讀寫場景。這類存儲的局限性是批量讀取吞吐量遠不如 HDFS,不適用於批量數據分析的場景。
從上面分析可知,這兩種數據在存儲方式上完全不同,進而導致使用場景完全不同,但在真實的場景中,邊界可能沒有那么清晰,面對既需要隨機讀寫,又需要批量分析的大數據場景,該如何選擇呢?這個場景中,單種存儲引擎無法滿足業務需求,我們需要通過多種大數據工具組合來滿足這一需求。
數據實時寫入 HBase,實時的數據更新也在 HBase 完成,為了應對 OLAP 需求,我們定時(通常是 T+1 或者 T+H)將 HBase 數據寫成靜態的文件(如:Parquet)導入到 OLAP 引擎(如:HDFS)。這一架構能滿足既需要隨機讀寫,又可以支持 OLAP 分析的場景。但是缺點也比較明顯:
(1)架構復雜。從架構上看,數據在HBase、消息隊列、HDFS 間流轉,涉及環節太多,運維成本很高。並且每個環節需要保證高可用,都需要維護多個副本,存儲空間也有一定的浪費。最后數據在多個系統上,對數據安全策略、監控等都提出了挑戰。
(2)時效性低。數據從HBase導出成靜態文件是周期性的,一般這個周期是一天(或一小時),在時效性上不是很高。
(3)難以應對后續的更新。真實場景中,總會有數據是延遲到達的。如果這些數據之前已經從HBase導出到HDFS,新到的變更數據就難以處理了,一個方案是把原有數據應用上新的變更后重寫一遍,但這代價又很高。
為了解決上述架構的這些問題,KUDU應運而生。KUDU的定位是Fast Analytics on Fast Data,是一個既支持隨機讀寫、又支持 OLAP 分析的大數據存儲引擎。
二、kudu基礎
2.1 使用場景
適用於那些既有隨機訪問,也有批量數據掃描的復合場景、高計算量的場景、使用了高性能的存儲設備,包括使用更多的內存、支持數據更新,避免數據反復遷移、支持跨地域的實時數據備份和查詢。
2.2 kudu架構
與HDFS和HBase相似,Kudu使用單個的Master節點,用來管理集群的元數據,並且使用任意數量的Tablet Server(可對比理解HBase中的RegionServer角色)節點用來存儲實際數據。可以部署多個Master節點來提高容錯性。一個table表的數據,被分割成1個或多個Tablet,Tablet被部署在Tablet Server來提供數據讀寫服務。
一些基本概念:
Master:集群中的老大,負責集群管理、元數據管理等功能
Tablet Server: 集群中的小弟,負責數據存儲,並提供數據讀寫服務。一個 tablet server 存儲了table表的tablet 和為 tablet 向 client 提供服務。對於給定的 tablet,一個tablet server 充當 leader,其他 tablet server 充當該 tablet 的 follower 副本。只有 leader服務寫請求,然而 leader 或 followers 為每個服務提供讀請求 。一個 tablet server 可以服務多個 tablets ,並且一個 tablet 可以被多個 tablet servers 服務着。
Table(表):一張table是數據存儲在Kudu的tablet server中。表具有 schema 和全局有序的primary key(主鍵)。table 被分成稱為 tablets 的 segments。
Tablet:一個 tablet 是一張 table連續的segment,tablet是kudu表的水平分區,類似於google Bigtable的tablet,或者HBase的region。每個tablet存儲着一定連續range的數據(key),且tablet兩兩間的range不會重疊。一張表的所有tablet包含了這張表的所有key空間。與其它數據存儲引擎或關系型數據庫中的 partition(分區)相似。給定的tablet 冗余到多個 tablet 服務器上,並且在任何給定的時間點,其中一個副本被認為是leader tablet。任何副本都可以對讀取進行服務,並且寫入時需要在為 tablet 服務的一組 tablet server之間達成一致性。
三、kudu分區
為了提供可擴展性,Kudu 表被划分為稱為 tablets 的單元,並分布在許多 tablet servers 上。行總是屬於單個tablet 。將行分配給 tablet 的方法由在表創建期間設置的表的分區決定。 kudu提供了3種分區方式。
3.1 Range Partitioning ( 范圍分區 )
范圍分區可以根據存入數據的數據量,均衡的存儲到各個機器上,防止機器出現負載不均衡現象.

1 /** 2 3 * 測試分區: 4 5 * RangePartition 6 7 */ 8 9 @Test 10 11 public void testRangePartition() throws KuduException { 12 13 //設置表的schema 14 15 LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>(); 16 17 columnSchemas.add(newColumn("CompanyId", Type.INT32,true)); 18 19 columnSchemas.add(newColumn("WorkId", Type.INT32,false)); 20 21 columnSchemas.add(newColumn("Name", Type.STRING,false)); 22 23 columnSchemas.add(newColumn("Gender", Type.STRING,false)); 24 25 columnSchemas.add(newColumn("Photo", Type.STRING,false)); 26 27 28 29 //創建schema 30 31 Schema schema = new Schema(columnSchemas); 32 33 34 35 //創建表時提供的所有選項 36 37 CreateTableOptions tableOptions = new CreateTableOptions(); 38 39 //設置副本數 40 41 tableOptions.setNumReplicas(1); 42 43 //設置范圍分區的規則 44 45 LinkedList<String> parcols = new LinkedList<String>(); 46 47 parcols.add("CompanyId"); 48 49 //設置按照那個字段進行range分區 50 51 tableOptions.setRangePartitionColumns(parcols); 52 53 54 55 /** 56 57 * range 58 59 * 0 < value < 10 60 61 * 10 <= value < 20 62 63 * 20 <= value < 30 64 65 * ........ 66 67 * 80 <= value < 90 68 69 * */ 70 71 int count=0; 72 73 for(int i =0;i<10;i++){ 74 75 //范圍開始 76 77 PartialRow lower = schema.newPartialRow(); 78 79 lower.addInt("CompanyId",count); 80 81 82 83 //范圍結束 84 85 PartialRow upper = schema.newPartialRow(); 86 87 count +=10; 88 89 upper.addInt("CompanyId",count); 90 91 92 93 //設置每一個分區的范圍 94 95 tableOptions.addRangePartition(lower,upper); 96 97 } 98 99 100 101 try { 102 103 kuduClient.createTable("student",schema,tableOptions); 104 105 } catch (KuduException e) { 106 107 e.printStackTrace(); 108 109 } 110 111 kuduClient.close(); 112 113 }
3.2 Hash Partitioning (哈希分區)
哈希分區通過哈希值將行分配到許多 buckets ( 存儲桶 )之一; 哈希分區是一種有效的策略,當不需要對表進行有序訪問時。哈希分區對於在 tablet 之間隨機散布這些功能是有效的,這有助於減輕熱點和 tablet 大小不均勻。

1 /** 2 3 * 測試分區: 4 5 * hash分區 6 7 */ 8 9 @Test 10 11 public void testHashPartition() throws KuduException { 12 13 //設置表的schema 14 15 LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>(); 16 17 columnSchemas.add(newColumn("CompanyId", Type.INT32,true)); 18 19 columnSchemas.add(newColumn("WorkId", Type.INT32,false)); 20 21 columnSchemas.add(newColumn("Name", Type.STRING,false)); 22 23 columnSchemas.add(newColumn("Gender", Type.STRING,false)); 24 25 columnSchemas.add(newColumn("Photo", Type.STRING,false)); 26 27 28 29 //創建schema 30 31 Schema schema = new Schema(columnSchemas); 32 33 34 35 //創建表時提供的所有選項 36 37 CreateTableOptions tableOptions = new CreateTableOptions(); 38 39 //設置副本數 40 41 tableOptions.setNumReplicas(1); 42 43 //設置范圍分區的規則 44 45 LinkedList<String> parcols = new LinkedList<String>(); 46 47 parcols.add("CompanyId"); 48 49 //設置按照那個字段進行range分區 50 51 tableOptions.addHashPartitions(parcols,6); 52 53 try { 54 55 kuduClient.createTable("dog",schema,tableOptions); 56 57 } catch (KuduException e) { 58 59 e.printStackTrace(); 60 61 } 62 63 64 kuduClient.close(); 65 66 }
3.3 Multilevel Partitioning ( 多級分區 )
Kudu 允許一個表在單個表上組合多級分區。 當正確使用時,多級分區可以保留各個分區類型的優點,同時減少每個分區的缺點需求.

1 /** 2 3 * 測試分區: 4 5 * 多級分區 6 7 * Multilevel Partition 8 9 * 混合使用hash分區和range分區 10 11 * 12 13 * 哈希分區有利於提高寫入數據的吞吐量,而范圍分區可以避免tablet無限增長問題, 14 15 * hash分區和range分區結合,可以極大的提升kudu的性能 16 17 */ 18 19 @Test 20 21 public void testMultilevelPartition() throws KuduException { 22 23 //設置表的schema 24 25 LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>(); 26 27 columnSchemas.add(newColumn("CompanyId", Type.INT32,true)); 28 29 columnSchemas.add(newColumn("WorkId", Type.INT32,false)); 30 31 columnSchemas.add(newColumn("Name", Type.STRING,false)); 32 33 columnSchemas.add(newColumn("Gender", Type.STRING,false)); 34 35 columnSchemas.add(newColumn("Photo", Type.STRING,false)); 36 37 38 39 //創建schema 40 41 Schema schema = new Schema(columnSchemas); 42 43 //創建表時提供的所有選項 44 45 CreateTableOptions tableOptions = new CreateTableOptions(); 46 47 //設置副本數 48 49 tableOptions.setNumReplicas(1); 50 51 //設置范圍分區的規則 52 53 LinkedList<String> parcols = new LinkedList<String>(); 54 55 parcols.add("CompanyId"); 56 57 58 59 //hash分區 60 tableOptions.addHashPartitions(parcols,5); 61 62 63 64 //range分區 65 66 int count=0; 67 for(int i=0;i<10;i++){ 68 PartialRow lower = schema.newPartialRow(); 69 lower.addInt("CompanyId",count); 70 count+=10; 71 72 PartialRow upper = schema.newPartialRow(); 73 upper.addInt("CompanyId",count); 74 tableOptions.addRangePartition(lower,upper); 75 76 } 77 78 try { 79 80 kuduClient.createTable("cat",schema,tableOptions); 81 82 } catch (KuduException e) { 83 84 e.printStackTrace(); 85 86 } 87 88 kuduClient.close(); 89 90 }
四、kudu原理
4.1 表與schema
Kudu設計是面向結構化存儲的,因此,Kudu的表需要用戶在建表時定義它的Schema信息,這些Schema信息包含:列定義(含類型),Primary Key定義(用戶指定的若干個列的有序組合)。數據的唯一性,依賴於用戶所提供的Primary Key中的Column組合的值的唯一性。 Kudu提供了Alter命令來增刪列,但位於Primary Key中的列是不允許刪除的。 Kudu當前並不支持二級索引。 從用戶角度來看,Kudu是一種存儲結構化數據表的存儲系統。在一個Kudu集群中可以定義任意數量的table,每個table都需要預先定義好schema。每個table的列數是確定的,每一列都需要有名字和類型,每個表中可以把其中一列或多列定義為主鍵。這么看來,Kudu更像關系型數據庫,而不是像HBase、Cassandra和MongoDB這些NoSQL數據庫。不過Kudu目前還不能像關系型數據一樣支持二級索引。
Kudu使用確定的列類型,而不是類似於NoSQL的"everything is byte"。這可以帶來兩點好處: 確定的列類型使Kudu可以進行類型特有的編碼。 可以提供 SQL-like 元數據給其他上層查詢工具,比如BI工具。
4.2 kudu的底層數據模型
kudu的底層數據文件的存儲,未采用HDFS這樣的較高抽象層次的分布式文件系統,而是自行開發了一套可基於
Table/Tablet/Replica視圖級別的底層存儲系統。
這套實現基於如下的幾個設計目標:
- 可提供快速的列式查詢
- 可支持快速的隨機更新
- 可提供更為穩定的查詢性能保障
一張表會分成若干個tablet,每個tablet包括MetaData元信息及若干個RowSet,RowSet包含一個MemRowSet及若干個DiskRowSet,DiskRowSet中包含一個BloomFile、Ad_hoc Index、BaseData、DeltaMem及若干個RedoFile和UndoFile(UndoFile一般情況下只有一個)。
- MemRowSet:用於新數據insert及已在MemRowSet中的數據的更新,一個MemRowSet寫滿后會將數據刷到磁盤形成若干個DiskRowSet。每次到達32M生成一個DiskRowSet。
- DiskRowSet:用於老數據的變更(mutation),后台定期對DiskRowSet做compaction,以刪除沒用的數據及合並歷史數據,減少查詢過程中的IO開銷。
- BloomFile:根據一個DiskRowSet中的key生成一個bloom filter,用於快速模糊定位某個key是否在DiskRowSet中存在。
- Ad_hocIndex:是主鍵的索引,用於定位key在DiskRowSet中的具體哪個偏移位置。
- BaseData是MemRowSet flush下來的數據,按列存儲,按主鍵有序。
- UndoFile是基於BaseData之前時間的歷史數據,通過在BaseData上apply UndoFile中的記錄,可以獲得歷史數據。
- RedoFile是基於BaseData之后時間的變更(mutation)記錄,通過在BaseData上apply RedoFile中的記錄,可獲得較新的數據。
- DeltaMem用於DiskRowSet中數據的變更mutation,先寫到內存中,寫滿后flush到磁盤形成RedoFile。
MemRowSets可以對比理解成HBase中的MemStore, 而DiskRowSets可理解成HBase中的HFile。MemRowSets中的數據按照行試圖進行存儲,數據結構為B-Tree。
MemRowSets中的數據被Flush到磁盤之后,形成DiskRowSets。
DisRowSets中的數據,按照32MB大小為單位,按序划分為一個個的DiskRowSet。 DiskRowSet中的數據按照Column進行組織,與Parquet類似。
這是Kudu可支持一些分析性查詢的基礎。每一個Column的數據被存儲在一個相鄰的數據區域,而這個數據區域進一步被細分成一個個的小的Page單元,與HBase File中的Block類似,對每一個Column Page可采用一些Encoding算法,以及一些通用的Compression算法。 既然可對Column Page可采用Encoding以及Compression算法,那么,對單條記錄的更改就會比較困難了。
前面提到了Kudu可支持單條記錄級別的更新/刪除,是如何做到的?
與HBase類似,也是通過增加一條新的記錄來描述這次更新/刪除操作的。DiskRowSet是不可修改了,那么 KUDU 要如何應對數據的更新呢?在KUDU中,把DiskRowSet分為了兩部分:base data、delta stores。base data 負責存儲基礎數據,delta stores負責存儲 base data 中的變更數據.
如上圖所示,數據從 MemRowSet 刷到磁盤后就形成了一份 DiskRowSet(只包含 base data),每份 DiskRowSet 在內存中都會有一個對應的DeltaMemStore,負責記錄此 DiskRowSet 后續的數據變更(更新、刪除)。DeltaMemStore 內部維護一個 B-樹索引,映射到每個 row_offset 對應的數據變更。DeltaMemStore 數據增長到一定程度后轉化成二進制文件存儲到磁盤,形成一個 DeltaFile,隨着 base data 對應數據的不斷變更,DeltaFile 逐漸增長。
4.3 kudu的寫流程
如上圖,當 Client 請求寫數據時,先根據主鍵從Master Server中獲取要訪問的目標 Tablets,然后到依次對應的Tablet獲取數據。
因為KUDU表存在主鍵約束,所以需要進行主鍵是否已經存在的判斷,這里就涉及到之前說的索引結構對讀寫的優化了。一個Tablet中存在很多個RowSets,為了提升性能,我們要盡可能地減少要掃描的RowSets數量。
首先,我們先通過每個 RowSet 中記錄的主鍵的(最大最小)范圍,過濾掉一批不存在目標主鍵的RowSets,然后在根據RowSet中的布隆過濾器,過濾掉確定不存在目標主鍵的 RowSets,最后再通過RowSets中的 B-樹索引,精確定位目標主鍵是否存在。
如果主鍵已經存在,則報錯(主鍵重復),否則就進行寫數據(寫 MemRowSet)。
4.4 kudu的讀流程
如上圖,數據讀取過程大致如下:先根據要掃描數據的主鍵范圍,定位到目標的Tablets,然后讀取Tablets 中的RowSets。
在讀取每個RowSet時,先根據主鍵過濾要scan范圍,然后加載范圍內的base data,再找到對應的delta stores,應用所有變更,最后union上MemRowSet中的內容,返回數據給Client。
4.5 kudu的更新流程
數據更新的核心是定位到待更新數據的位置,這塊與寫入的時候類似,就不展開了,等定位到具體位置后,然后將變更寫到對應的delta store 中。