作者:R星月 出處:http://www.cnblogs.com/rxingyue 歡迎轉載,也請保留這段聲明。謝謝!
做一個項目中由於數據量比較大,並且需要定時增量分析,做了hbase的分頁。項目中用到的版本是hbase1.1 。需要啟用協處理器 Aggregation
1.啟動全局aggregation,能過操縱所有的表上的數據。通過修改hbase-site.xml這個文件來實現,只需要添加如下代碼:
<property> <name>hbase.coprocessor.user.region.classes</name> <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value> </property>
2.啟用表aggregation,只對特定的表生效。通過HBase Shell 來實現。
(1)disable指定表。hbase> disable 'mytable'
(2)添加aggregation hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'
(3)重啟指定表 hbase> enable 'mytable'
Hbase客戶端調用代碼示例
1、 得到hbase的表結構總數
public int getTotalRecord(Table keyIndexTable , String nowTime){
int count=0;
AggregationClient aggregationClient = new AggregationClient(config);
Scan scan=new Scan();
scan.setStopRow(nowTime.getBytes());//小於當前時間
try {
Long rowCount = aggregationClient.rowCount(keyIndexTable, new LongColumnInterpreter(), scan);
aggregationClient.close();
count=rowCount.intValue();
} catch (Throwable e) {
e.printStackTrace();
}
return count;
}
2 ,實現分頁
public Map<String,String> getIndexTableInfo(Table table,String tableName, String nowTime,String startRow, Integer currentPage, Integer pageSize){ Map<String,String> communtiyKeysMap=new TreeMap<String,String>(); ResultScanner scanner = null; // 為分頁創建的封裝類對象,下面有給出具體屬性 try { // 獲取最大返回結果數量 if (pageSize == null || pageSize == 0L) pageSize = 100; if (currentPage == null || currentPage == 0) currentPage = 1; // 計算起始頁和結束頁 Integer nowPageSize=pageSize+1; // MUST_PASS_ALL(條件 AND) MUST_PASS_ONE(條件OR) FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); Filter filter1=new PageFilter(nowPageSize); filterList.addFilter(filter1); // if(tableName.equals("COMMUNITY_KEYS_INDEX")){ // Filter filter2 = new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("communitykey"))); // filterList.addFilter(filter2); // } Scan scan = new Scan(); scan.setFilter(filterList); scan.setMaxResultSize(nowPageSize); scan.setStartRow(Bytes.toBytes(startRow)); if(!nowTime.equals("")){ scan.setStopRow(nowTime.getBytes()); } scanner = table.getScanner(scan); int i = 1; // 遍歷掃描器對象, 並將需要查詢出來的數據row key取出 for (Result result : scanner) { String row=new String(result.getRow()); for (Cell cell : result.rawCells()) { // System.out.println("列族:"+new String(CellUtil.cloneQualifier(cell))+">>>"+new String(CellUtil.cloneValue(cell))); if(i==nowPageSize){ communtiyKeysMap.put("nextStart", row.substring(0,row.lastIndexOf(":"))); break; } communtiyKeysMap.put(row, new String(CellUtil.cloneValue(cell))); } i++; } } catch (IOException e) { e.printStackTrace(); } finally { if (scanner != null) scanner.close(); } return communtiyKeysMap; }
3,該分頁中處理和跳轉下一頁
for(int page=1;page<=pageNum;page++){ //分頁 List<String> pageList = new ArrayList<String>(); //子類調用具體分析 //1.查出要分析的數據 Map<String,String> communtiyKeysMap=getIndexTableInfo(hTable,hbaseIndexTabel,nowTime,startRow,page,pageSize); for(String communitykey:communtiyKeysMap.keySet()){ String rowKeyIndex=communitykey; String cellValue=communtiyKeysMap.get(rowKeyIndex); if(communitykey.equals("nextStart")){ startRow=cellValue; continue; //下一頁進行跳轉 } } //實現調用具體的分析 //實現該分頁處理 }
該過程總共為三步,1.設置表的協處理器 Aggregation,使表能夠實現統計功能。2.分頁,每次取出1001條數據,每頁數據為1000條,第1001條的rowkey為下一頁的startrowkey,做為標志“nextStart” 。3分頁之后進行查找關聯數據和進行邏輯分析處理。