hbase 聚合操作


hbase本身提供了 聚合方法可以服務端聚合操作

hbase中的CoprocessorProtocol機制. 

CoprocessorProtocol的原理比較簡單,近似於一個mapreduce框架。由client將scan分解為面向多個region的請求,並行發送請求到多個region,然后client做一個reduce的操作,得到最后的結果。 


先看一個例子,使用hbase的AggregationClient可以做到簡單的面向單個column的統計。 

Java代碼   收藏代碼
  1. @Test  
  2. public void testAggregationClient() throws Throwable {  
  3.   
  4.     LongColumnInterpreter columnInterpreter = new LongColumnInterpreter();  
  5.   
  6.     AggregationClient aggregationClient = new AggregationClient(  
  7.             CommonConfig.getConfiguration());  
  8.     Scan scan = new Scan();  
  9.   
  10.     scan.addColumn(ColumnFamilyName, QName1);  
  11.   
  12.     Long max = aggregationClient.max(TableNameBytes, columnInterpreter,  
  13.             scan);  
  14.     Assert.assertTrue(max.longValue() == 100);  
  15.   
  16.     Long min = aggregationClient.min(TableNameBytes, columnInterpreter,  
  17.             scan);  
  18.     Assert.assertTrue(min.longValue() == 20);  
  19.   
  20.     Long sum = aggregationClient.sum(TableNameBytes, columnInterpreter,  
  21.             scan);  
  22.     Assert.assertTrue(sum.longValue() == 120);  
  23.   
  24.     Long count = aggregationClient.rowCount(TableNameBytes,  
  25.             columnInterpreter, scan);  
  26.     Assert.assertTrue(count.longValue() == 4);  
  27.   
  28. }  



看下hbase的源碼。AggregateImplementation 

Java代碼   收藏代碼
  1. @Override  
  2.   public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan)  
  3.       throws IOException {  
  4.     T temp;  
  5.     T max = null;  
  6.     InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())  
  7.         .getRegion().getScanner(scan);  
  8.     List<KeyValue> results = new ArrayList<KeyValue>();  
  9.     byte[] colFamily = scan.getFamilies()[0];  
  10.     byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();  
  11.     // qualifier can be null.  
  12.     try {  
  13.       boolean hasMoreRows = false;  
  14.       do {  
  15.         hasMoreRows = scanner.next(results);  
  16.         for (KeyValue kv : results) {  
  17.           temp = ci.getValue(colFamily, qualifier, kv);  
  18.           max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;  
  19.         }  
  20.         results.clear();  
  21.       } while (hasMoreRows);  
  22.     } finally {  
  23.       scanner.close();  
  24.     }  
  25.     log.info("Maximum from this region is "  
  26.         + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()  
  27.             .getRegionNameAsString() + ": " + max);  
  28.     return max;  
  29.   }  


這里由於 

Java代碼   收藏代碼
  1. byte[] colFamily = scan.getFamilies()[0];  
  2. byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();  


所以,hbase自帶的Aggregate函數,只能面向單列進行統計。 

當我們想對多列進行Aggregate,並同時進行countRow時,有以下選擇。 
1 scan出所有的row,程序自己進行Aggregate和count。 
2 使用AggregationClient,調用多次,得到所有的結果。由於多次調用,有一致性問題。 
3 自己擴展CoprocessorProtocol。 

 

 

這個是github的hbase集成插件

這個功能集成到simplehbase里面了。
https://github.com/zhang-xzhi/simplehbase


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM