摘要: 在前篇博客里已經講述了通過一個自定義 HBase Filter來獲取數據的辦法,在末尾指出此辦法的性能是不能滿足應用要求的,很顯然對於如此成熟的HBase來說,高性能獲取數據應該不是問題。下面首先簡單介紹了搜索引擎的性能,然后詳細說明了HBase與MySQL的性能對比,這里的數據都是經過實際的測試獲得的。最后,給出了采用多線程批量從HBase中取數據的方案,此方案經過測試要比通過自定義Filter的方式性能高出很多。
關鍵詞: HBase, 高性能, 獲取數據, 性能對比, 多線程
Solr和HBase專輯
1、“關於Solr的使用總結的心得體會”(http://www.cnblogs.com/wgp13x/p/3742653.html)
2、“中文分詞器性能比較”(http://www.cnblogs.com/wgp13x/p/3748764.html)
3、“Solr與HBase架構設計”(http://www.cnblogs.com/wgp13x/p/a8bb8ccd469c96917652201007ad3c50.html)
4、 “大數據架構: 使用HBase和Solr將存儲與索引放在不同的機器上”(http://www.cnblogs.com/wgp13x/p/3927979.html)
5、“一個自定義 HBase Filter -通過RowKeys來高性能獲取數據”(http://www.cnblogs.com/wgp13x/p/4196466.html)
6、“HBase 高性能獲取數據 - 多線程批量式解決辦法”(http://www.cnblogs.com/wgp13x/p/4245182.html)
1、 如何存儲十億、百億數據? 答:使用數據存儲集群,增加水平拓展能力,以容納上百億數據量
2、 如何保證在十億、百億數據上面的查詢效率? 答:使用分布式搜索引擎
數據量過億,無論是存儲在關系型數據庫還是非關系型數據庫,使用非索引字段進行條件查詢、模糊查詢等復雜查詢都是一件極其緩慢甚至是不可能完成的任務,數據庫索引建立的是二級索引,大數據查詢主要依靠搜索引擎。
根據Solr中國資料顯示,在2400億每條數據大概200字節的數據建立索引,搭建分布式搜索引擎,在50台機器進行搜索測試,其中有條件查詢、模糊查詢等,其中80%的搜索能夠在毫秒內返回結果,剩下一部分能夠在20秒內返回,還有5%左右的查詢需要在50秒左右的時間完成查詢請求,客戶端查詢請求的並發量為100個客戶端。
以下結論均是在同一台服務器上的測試結果。
MySQL單機隨機讀寫能力測試
MySQL(InnoDB) |
|
運行環境 |
Window Server 2008 x64 |
存儲引擎 |
InnoDB |
最大存儲容量 |
64T |
列數 |
39列 |
每條數據的大小 |
Avg=507Byte |
總數據量 |
302,418,176條 |
占用的磁盤空間 |
210G |
插入效率 |
總共耗時13個小時,每秒約6500條,隨着數據量的增大,插入的效率影響不大 |
30ms |
|
百條數據全表隨機讀取時間 |
1,783ms;1,672ms |
18,579ms;15,473ms |
|
其他 |
條件查詢、Order By、模糊查詢基本上是無法響應的 |
HBase基本說明與性能測試
從上面的測試結果表中可以看出,MySQL單表插入速度為每秒6500條,HBase單台機器能夠實現1w~3w之間的插入速度,這充分說明HBase插入數據的速度比MySQL高很多。在MySQL單機隨機讀寫能力測試中,單條數據全表隨機讀取時間是指依據主鍵去MySQL單表取數據花費的時間;在HBase基本說明與性能測試中,大數據量下查詢響應時間是指依照Rowkey到HBase取數據所花費的時間。30ms對5ms,這說明HBase取數據的速度之快也是MySQL望塵莫及的。
在進行上面的性能測試中,無論是從MySQL通過主鍵讀取,還是從HBase通過Rowkey讀取,讀取的數據量都不大,不超過1000條。當需要一次性讀取萬級數據時,需要通過設計優化的代碼來保證讀取速度。
在實現過程中,發現當批量Get的數據量達到一定程度時(如10W),向HBase請求數據會從innerGet發生EOFExeption異常。這里附加上一段從HBase依照多Rowkey獲取數據的代碼,它采用了性能高的批量Get。在這里,我將這種大批量請求化為每1000個Get的請求,並且采用多線程方式,經過驗證,這種方法的效率還是蠻高的。
private ExecutorService pool = Executors.newFixedThreadPool(10); // 這里創建了10個 Active RPC Calls public Datas getDatasFromHbase(final List<String> rowKeys, final List<String> filterColumn, boolean isContiansRowkeys, boolean isContainsList) { if (rowKeys == null || rowKeys.size() <= 0) { return Datas.getEmptyDatas(); } final int maxRowKeySize = 1000; int loopSize = rowKeys.size() % maxRowKeySize == 0 ? rowKeys.size() / maxRowKeySize : rowKeys.size() / maxRowKeySize + 1; ArrayList<Future<List<Data>>> results = new ArrayList<Future<List<Data>>>(); for (int loop = 0; loop < loopSize; loop++) { int end = (loop + 1) * maxRowKeySize > rowKeys.size() ? rowKeys .size() : (loop + 1) * maxRowKeySize; List<String> partRowKeys = rowKeys.subList(loop * maxRowKeySize, end); HbaseDataGetter hbaseDataGetter = new HbaseDataGetter(partRowKeys, filterColumn, isContiansRowkeys, isContainsList); synchronized (pool) { Future<List<Data>> result = pool.submit(hbaseDataGetter); results.add(result); } } Datas datas = new Datas(); List<Data> dataQueue = new ArrayList<Data>(); try { for (Future<List<Data>> result : results) { List<Data> rd = result.get(); dataQueue.addAll(rd); } datas.setDatas(dataQueue); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return datas; } |
class HbaseDataGetter implements Callable<List<Data>>
{
private List<String> rowKeys;
private List<String> filterColumn;
private boolean isContiansRowkeys;
private boolean isContainsList;
public HbaseDataGetter(List<String> rowKeys, List<String> filterColumn,
boolean isContiansRowkeys, boolean isContainsList)
{
this.rowKeys = rowKeys;
this.filterColumn = filterColumn;
this.isContiansRowkeys = isContiansRowkeys;
this.isContainsList = isContainsList;
}
@Override
public List<Data> call() throws Exception
{
Object[] objects = getDatasFromHbase(rowKeys, filterColumn);
List<Data> listData = new ArrayList<Data>();
for (Object object : objects)
{
Result r = (Result) object;
Data data = assembleData(r, filterColumn, isContiansRowkeys,
isContainsList);
listData.add(data);
}
return listData;
}
}
|
private Object[] getDatasFromHbase(List<String> rowKeys,
List<String> filterColumn)
{
createTable(tableName);
Object[] objects = null;
HTableInterface hTableInterface = createTable(tableName);
List<Get> listGets = new ArrayList<Get>();
for (String rk : rowKeys)
{
Get get = new Get(Bytes.toBytes(rk));
if (filterColumn != null)
{
for (String column : filterColumn)
{
get.addColumn(columnFamilyName.getBytes(),
column.getBytes());
}
}
listGets.add(get);
}
try
{
objects = hTableInterface.get(listGets);
}
catch (IOException e1)
{
e1.printStackTrace();
}
finally
{
try
{
listGets.clear();
hTableInterface.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
return objects;
}
|
private HTableInterface createTable(String tableName) { HTable table = null; try { table = new HTable(initHbaseConfiguration(), tableName); } catch (IOException e) { e.printStackTrace(); } return table; } |
可以肯定的是,此種批量取數據的方法達成的速度,與取一次性數據的數量基本成線性關系,與總數據量相關不大,需要取出的數據越多耗時也就越多,經過測試一次性取1000條數據花費大約在2至3s以內,總數據量為400W。而通過自定義Filter方式取數據的方法的速度,與取一次性數據的數量相關不大,與總數據量成線性關系,總數據量越大取出越慢,即使只需取一條 ,因為此方式對HBase每條數據都過濾一遍。這樣,如果在總數不大,需要取數據量較大的情況下,通過自定義Filter取數據的方式可能還占有些優勢,但在正常情況下,此種批量取數據的方法還是優勢更大。
不得不提的是:在實現過程中,我曾將這種大批量請求化為每4000個Get的多線程請求方式,我們的HBase版本為0.94,這樣在一次性請求200000條數據時,HBase直接掛機,client拋出EOFException異常,【processBatchCallback(HConnectionManager.java:1708),processBatch(HConnectionManager.java:1560),(HTable.java:779)】,查看並發連接數與每1000個Get請求一樣保持為10個左右,沒有異常。查閱相關資料后,我們懷疑,這是由於HTable的非線程安全特性導致的,但經過多時糾纏,最終也沒得到可靠結論。后來確定這是由於HBase0.94版自身的問題,在使用0.96版后,此問題便不再出現了。而且我們發現0.94版HBase並不穩定,經常有掛掉情況出現。0.96版HBase要好得多。
這里補充非常重要的一點,在上面的代碼中,我通過 private ExecutorService pool = Executors.newFixedThreadPool(10); 創建了一個最多容納10個線程的線程池,從而創建了10個 Active RPC Calls,有效提高了獲取速度。然而,我將此線程池容量擴大至20個后,的確創建了20個 Active RPC Calls,如下圖所示,但是會直接引起事故:HBase掛掉。不得不吐cao,HBase實在不穩定,維護極其花費成本。在種種實踐驗證后,才得到了這個穩定高效的方式,每1000個Get一次批量請求,至多10個線程同時取。
平均效率如下圖所示:
更多的HBase及其它的數據存儲方案測試情況,HBase高性能插入數據解決方案,正在整理中,敬請批評指正。