Hbase索引表的結構
Hbase Rowkey 設計
Hbase Filter
Hbase二級索引
Hbase索引表的結構
在HBase中,表格的Rowkey按照字典排序,Region按照RowKey設置split point進行shard,通過這種方式實現的全局、分布式索引,成為了其成功的最大的砝碼
每一個索引建立一個表,然后依靠表的row key來實現范圍檢索。row key在HBase中是以B+ tree結構化有序存儲的,所以scan起來會比較效率。
單表以row key存儲索引,column value存儲id值或其他數據 ,這就是Hbase索引表的結構。
Hbase QualifierFilter用於過濾qualifier,也就是一個列族里面data:xxx,冒號后面的字符串
Hbase Rowkey 設計
大數據最好從rowkey入手,ColumnValueFilter的數度是很慢的,hbase查詢速度還是要依靠rowkey,所以根據業務邏輯把rowkey設計好,之后所有的查詢都通過rowkey,是會非常快。 批量查詢最好是用 scan的startkey endkey來做查詢條件
rowkey是hbase中很重要的一個設計,如果你把它當成普通字段那你的設計就有點失敗了。它的設計可以說是一門藝術。你的查詢如果不能把rowkey加入進來,那你的設計基本是失敗的。加上rowkey,hbase可以快速地定位到具體的region去取你要的數據,否則就會滿上遍野的找數據。
設計原則:
1. 長度越短越好
Rowkey是一個二進制碼流,Rowkey的長度被很多開發者建議說設計在10~100個字節,不過建議是越短越好,不要超過16個字節。
原因如下:
(1)數據的持久化文件HFile中是按照KeyValue存儲的,如果Rowkey過長比如100個字節,1000萬列數據光Rowkey就要占用100*1000萬=10億個字節,將近1G數據,這會極大影響HFile的存儲效率;
(2)MemStore將緩存部分數據到內存,如果Rowkey字段過長內存的有效利用率會降低,系統將無法緩存更多的數據,這會降低檢索效率。因此Rowkey的字節長度越短越好。
(3)目前操作系統是都是64位系統,內存8字節對齊。控制在16個字節,8字節的整數倍利用操作系統的最佳特性。
2. 散列原則:如果Rowkey是按時間戳的方式遞增,不要將時間放在二進制碼的前面,建議將Rowkey的高位作為散列字段,由程序循環生成,低位放時間字段,這樣將提高數據均衡分布在每個Regionserver實現負載均衡的幾率。如果沒有散列字段,首字段直接是時間信息將產生所有新數據都在一個 RegionServer上堆積的熱點現象,這樣在做數據檢索的時候負載將會集中在個別RegionServer,降低查詢效率。
3. 唯一性
HBase按指定的條件獲取一批記錄時,使用的就是scan方法。 scan方法有以下特點:
(1)scan可以通過setCaching與setBatch方法提高速度(以空間換時間);
(2)scan可以通過setStartRow與setEndRow來限定范圍。范圍越小,性能越高。
通過巧妙的RowKey設計使我們批量獲取記錄集合中的元素挨在一起(應該在同一個Region下),可以在遍歷結果時獲得很好的性能。
(3)scan可以通過setFilter方法添加過濾器,這也是分頁、多條件查詢的基礎。
設計RowKey時可以這樣做:采用 UserID + CreateTime + FileID組成RowKey。
需要注意以下幾點:
(1)每條記錄的RowKey,每個字段都需要填充到相同長度。假如預期我們最多有10萬量級的用戶,則userID應該統一填充至6位,如000001,000002…
(2)結尾添加全局唯一的FileID的用意也是使每個文件對應的記錄全局唯一。避免當UserID與CreateTime相同時的兩個不同文件記錄相互覆蓋。
RowKey存儲上述文件記錄,在HBase表中是下面的結構:
rowKey(userID 6 + time 8 + fileID 6) name category ….
00000120120902000001
Hbase Filter
應用實例
//時間范圍的查找, 比如是2012-12-12到2013-01-23日之間的數據 FilterList filter = new FilterList(); if (timeFrom != null) { String sDate = String.valueOf(timeFrom.getTime()); SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes.toBytes("CF"), Bytes.toBytes("Date"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(String.valueOf(sDate))); filter.addFilter(scvf); } if (timeTo != null) { String sDate = String.valueOf(timeTo.getTime()); SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes.toBytes("CF"), Bytes.toBytes("Date"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(String.valueOf(sDate))); filter.addFilter(scvf); }
HBase(0.96以上版本)過濾器Filter詳解及實例代碼
Hbase二級索引
HBase在0.92之后引入了coprocessors,提供了一系列的鈎子,讓我們能夠輕易實現訪問控制和二級索引的特性。下面簡單介紹下兩種coprocessors,第一種是Observers,它實際類似於觸發器,第二種是Endpoint,它類似與存儲過程。由於這里只用到了Observers,所以只介紹Observers,想要更詳細的介紹請查閱(https://blogs.apache.org/hbase/entry/coprocessor_introduction)。observers分為三種:
RegionObserver:提供數據操作事件鈎子;
WALObserver:提供WAL(write ahead log)相關操作事件鈎子;
MasterObserver:提供DDL操作事件鈎子。
在二級索引的實現技術上一般有幾個方案:
1. 表索引
使用單獨的hbase表存儲索引數據,業務表的索引列值做為索引表的rowkey,業務表的rowkey做為索引表的qualifier或value。
問題:對數據更新性能影響較大;無法保證一致性;Client查詢需要2次RPC(先索引表再數據表)。
2. 列索引
與業務表使用相同表,使用單獨列族存儲索引,用戶數據列值做為索引列族的Qualifier,用戶數據Qualifier做為索引列族的列值。適用於單行有上百萬Qualifier的數據模型,如網盤應用中網盤ID做為rowkey,網盤的目錄元數據都存儲在一個hbase row內。(facebook消息模型也是此方案)
可保證事務性
為了實現像SQL一樣檢索數據,select * from table where col=val。針對HBase Secondary Indexing的方案,成為HBase新版本(0.96)呼聲最高的一項Feature。
粗略分析了當前的技術,大概的方案可以總結為這樣兩類:
1、使用HBase的coprocessor。CoProcessor相當於HBase的Observer+hook,目前支持MasterObserver、RegionObserver和WALObserver,基本上對於HBase Table的管理、數據的Put、Delete、Get等操作都可以找到對應的pre***和post***。這樣如果需要對於某一項Column建立Secondary Indexing,就可以在Put、Delete的時候,將其信息更新到另外一張索引表中。如圖二所示,對於Indexing里面的value值是否存儲的問題,可以根據需要進行控制,如果value的空間開銷不大,逆向的檢索又比較頻繁,可以直接存儲在Indexing Table中,反之則避免這種情況。
圖2 使用HBase Coprocessor實現Secondary Indexing
2、由客戶端發起對於主表和索引表的Put、Delete操作的雙重操作。源自:http://hadoop-hbase.blogspot.com/2012/10/musings-on-secondary-indexes.html 【牆外】
它具體的做法總結起來有:
-
設置主表的TTL(Time To Live)比索引表小一點,讓其略早一點消亡。
-
不要在IndexingTable存儲Value值,即刪除如圖2所示的val列。
-
Put操作時,對於操作的主表的所有列,使用同一的Local TimeStamp的值,更新到Indexing Table,然后使用該TimeStamp插入主表數據。
-
Delete操作時,首先操作主表的數據,然后再去更新Indexing Table的數據。
雖然在這種方案里無法保證原子性和一致性,但是通過TimeStamp的設置,No Locks和 No Server-side codes,使其在二級索引上有着較大的優勢。至於中間出錯的環節,我們看看是否可以容忍:
1)Put索引表成功,Put主表失敗。由於Indexing Table不存儲val值,仍需要跳轉到Main Table,所以這樣的錯誤相當於拿一個Stale index去訪問對應Rowkey吧了,對結果正確性沒有影響。
2)Delete主表成功,Delete索引表失敗。都是索引表的內容>=主表的內容而已,而實際返回值需要通過主表進行。
應用場景:
1、主表服務在線業務,它的性能需要保證。使用coprocessor和客戶端的封裝也好,都會影響其性能,所以在正常情況下,直接操作都不太合適。如果想使用方案二,我倒是感覺,可以調整Indexing Table的操作方式,去除保證其安全性的內容,比如可以關閉寫HLOG,這樣會進一步減低其操作的延遲。
2、離線更新索引表。在真正需要二級索引的場景內,其時效性要求往往不高。可以將索引實時更新到Redis等KV系統中,定時從KV更新索引到Hbase的Indexing Table中。PS:Redis里面有DB設置的概念,可以按照時間段進行隔離,這樣某段時間內的數據會更新到Redis上,保證Redis導入MapReduce之后仍然可以進行update操作。
coprocessor代碼實現 ??
We have been working on implementing secondary index in HBase and open sourced on hbase 0.94.8 version. The project is available on github. https://github.com/Huawei-Hadoop/hindex This Jira is to support secondary index on trunk(0.98). Following features will be supported. multiple indexes on table, multi column index, index based on part of a column value, equals and range condition scans using index, and bulk loading data to indexed table (Indexing done with bulk load) Most of the kernel changes needed for secondary index is available in trunk. Very minimal changes needed for it.
首先在HBase-0.19.3中必須設置參數,使得Hbase可以使用索引,修改$HBASE_INSTALL_DIR/conf/hbase-site.xml:
<property> <name>hbase.regionserver.class</name> <value>org.apache.hadoop.hbase.ipc.IndexedRegionInterface</value> </property> <property> <name>hbase.regionserver.impl</name> <value> org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer </value> </property>
(1)創建表時,增加二級索引:
HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); HTableDescriptor desc = new HTableDescriptor("test_table"); desc.addFamily(new HColumnDescriptor("columnfamily1:")); desc.addFamily(new HColumnDescriptor("columnfamily2:")); desc.addIndex(new IndexSpecification("column1", Bytes.toBytes("columnfamily1:column1"))); desc.addIndex(new IndexSpecification("column2", Bytes.toBytes("columnfamily1:column2"))); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.createTable(desc);
(2)在已經存在的表中,增加索引
HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.addIndex(Bytes.toBytes("test_table"), new IndexSpecification("column2", Bytes.toBytes("columnfamily1:column2")));
(3)刪除存在的索引
HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.removeIndex(Bytes.toBytes("test_table"), "column2");
(4)通過索引scan所有數據
HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table")); // You need to specify which columns to get Scanner scanner = table.getIndexedScanner("column1", HConstants.EMPTY_START_ROW, null, null, new byte[][] { Bytes.toBytes("columnfamily1:column1"), Bytes.toBytes("columnfamily1:column2") }); for (RowResult rowResult : scanner) { String value1 = new String( rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue()); String value2 = new String( rowResult.get(Bytes.toBytes("columnfamily1:column2")).getValue()); System.out.println(value1 + ", " + value2); } table.close();
(5)通過索引scan一部分子集,通過ColumnValueFilter過濾。
使用SingleColumnValueFilter會影響查詢性能,在真正處理海量數據時會消耗很大的資源,且需要較長的時間
ColumnValueFilter filter = new ColumnValueFilter(Bytes.toBytes("columnfamily1:column1"), CompareOp.LESS, Bytes.toBytes("value1-10")); scanner = table.getIndexedScanner("column1", HConstants.EMPTY_START_ROW, null, filter, new byte[][] { Bytes.toBytes("columnfamily1:column1"), Bytes.toBytes("columnfamily1:column2")); for (RowResult rowResult : scanner) { String value1 = new String( rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue()); String value2 = new String( rowResult.get(Bytes.toBytes("columnfamily1:column2")).getValue()); System.out.println(value1 + ", " + value2); }
一般不建議用Filter,scan.setFilters(),通過filter設置的條件查不到數據時,響應速度非常慢,大概在十幾秒,有時會超時,
但可以查到數據時,響應速度只有幾百ms,差距非常大
Scan scan = new Scan(); FilterList filters = new FilterList(); for (String[] param : params) { //param[0]為列名,param[1]為相應的值 filters.addFilter(new SingleColumnValueFilter("INFO".getBytes(), param[0].getBytes(), CompareOp.EQUAL, param[1].getBytes())); } scan.setFilter(filters);
(6)一個完全的例子
import java.io.IOException; import java.util.Date; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Scanner; import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; import org.apache.hadoop.hbase.client.tableindexed.IndexedTable; import org.apache.hadoop.hbase.client.tableindexed.IndexedTableAdmin; import org.apache.hadoop.hbase.filter.ColumnValueFilter; import org.apache.hadoop.hbase.filter.ColumnValueFilter.CompareOp; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; public class SecondaryIndexTest { public void writeToTable() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table")); String row = "test_row"; BatchUpdate update = null; for (int i = 0; i < 100; i++) { update = new BatchUpdate(row + i); update.put("columnfamily1:column1", Bytes.toBytes("value1-" + i)); update.put("columnfamily1:column2", Bytes.toBytes("value2-" + i)); table.commit(update); } table.close(); } public void readAllRowsFromSecondaryIndex() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table")); Scanner scanner = table.getIndexedScanner("column1", HConstants.EMPTY_START_ROW, null, null, new byte[][] { Bytes.toBytes("columnfamily1:column1"), Bytes.toBytes("columnfamily1:column2") }); for (RowResult rowResult : scanner) { System.out.println(Bytes.toString( rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue()) + ", " + Bytes.toString(rowResult.get( Bytes.toBytes("columnfamily1:column2")).getValue() )); } table.close(); } public void readFilteredRowsFromSecondaryIndex() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table")); ColumnValueFilter filter = new ColumnValueFilter(Bytes.toBytes("columnfamily1:column1"), CompareOp.LESS, Bytes.toBytes("value1-40")); Scanner scanner = table.getIndexedScanner("column1", HConstants.EMPTY_START_ROW, null, filter, new byte[][] { Bytes.toBytes("columnfamily1:column1"), Bytes.toBytes("columnfamily1:column2") }); for (RowResult rowResult : scanner) { System.out.println(Bytes.toString( rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue()) + ", " + Bytes.toString(rowResult.get( Bytes.toBytes("columnfamily1:column2")).getValue() )); } table.close(); } public void createTableWithSecondaryIndexes() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); HTableDescriptor desc = new HTableDescriptor("test_table"); desc.addFamily(new HColumnDescriptor("columnfamily1:column1")); desc.addFamily(new HColumnDescriptor("columnfamily1:column2")); desc.addIndex(new IndexSpecification("column1", Bytes.toBytes("columnfamily1:column1"))); desc.addIndex(new IndexSpecification("column2", Bytes.toBytes("columnfamily1:column2"))); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); if (admin.tableExists(Bytes.toBytes("test_table"))) { if (admin.isTableEnabled("test_table")) { admin.disableTable(Bytes.toBytes("test_table")); } admin.deleteTable(Bytes.toBytes("test_table")); } if (admin.tableExists(Bytes.toBytes("test_table-column1"))) { if (admin.isTableEnabled("test_table-column1")) { admin.disableTable(Bytes.toBytes("test_table-column1")); } admin.deleteTable(Bytes.toBytes("test_table-column1")); } admin.createTable(desc); } public void addSecondaryIndexToExistingTable() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.addIndex(Bytes.toBytes("test_table"), new IndexSpecification("column2", Bytes.toBytes("columnfamily1:column2"))); } public void removeSecondaryIndexToExistingTable() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.removeIndex(Bytes.toBytes("test_table"), "column2"); } public static void main(String[] args) throws IOException { SecondaryIndexTest test = new SecondaryIndexTest(); test.createTableWithSecondaryIndexes(); test.writeToTable(); test.addSecondaryIndexToExistingTable(); test.removeSecondaryIndexToExistingTable(); test.readAllRowsFromSecondaryIndex(); test.readFilteredRowsFromSecondaryIndex(); System.out.println("Done!"); } }