實現Hbase的分頁


作者:R星月 出處:http://www.cnblogs.com/rxingyue 歡迎轉載,也請保留這段聲明。謝謝!

做一個項目中由於數據量比較大,並且需要定時增量分析,做了hbase的分頁。項目中用到的版本是hbase1.1 。需要啟用協處理器 Aggregation

1.啟動全局aggregation,能過操縱所有的表上的數據。通過修改hbase-site.xml這個文件來實現,只需要添加如下代碼:

<property>
   <name>hbase.coprocessor.user.region.classes</name>
   <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
 </property>

2.啟用表aggregation,只對特定的表生效。通過HBase Shell 來實現。

(1)disable指定表。hbase> disable 'mytable'

(2)添加aggregation hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'

(3)重啟指定表 hbase> enable 'mytable'

 

Hbase客戶端調用代碼示例

    1、 得到hbase的表結構總數

 public int getTotalRecord(Table keyIndexTable , String nowTime){
     int count=0;
       AggregationClient aggregationClient = new AggregationClient(config);
    Scan scan=new Scan();
    scan.setStopRow(nowTime.getBytes());//小於當前時間
    try {
     Long rowCount = aggregationClient.rowCount(keyIndexTable, new LongColumnInterpreter(), scan);
     aggregationClient.close();
     count=rowCount.intValue();
    } catch (Throwable e) {
     e.printStackTrace();
    }
    return count;
    }

2 ,實現分頁

 

  public  Map<String,String> getIndexTableInfo(Table table,String tableName, String nowTime,String startRow, Integer currentPage, Integer pageSize){
            Map<String,String> communtiyKeysMap=new TreeMap<String,String>();
            ResultScanner scanner = null;
            // 為分頁創建的封裝類對象,下面有給出具體屬性
            try {
                // 獲取最大返回結果數量
                if (pageSize == null || pageSize == 0L)
                    pageSize = 100;
                if (currentPage == null || currentPage == 0)
                    currentPage = 1;
                // 計算起始頁和結束頁
                Integer nowPageSize=pageSize+1;
                // MUST_PASS_ALL(條件 AND) MUST_PASS_ONE(條件OR)
                FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
                Filter filter1=new PageFilter(nowPageSize);
                filterList.addFilter(filter1);
//                if(tableName.equals("COMMUNITY_KEYS_INDEX")){
//                Filter filter2 = new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("communitykey")));
//                filterList.addFilter(filter2);
//                }
                Scan scan = new Scan();
                scan.setFilter(filterList);
                scan.setMaxResultSize(nowPageSize);
                scan.setStartRow(Bytes.toBytes(startRow));
                if(!nowTime.equals("")){
                scan.setStopRow(nowTime.getBytes());
                }
                scanner = table.getScanner(scan);
                int i = 1;
                // 遍歷掃描器對象, 並將需要查詢出來的數據row key取出
                for (Result result : scanner) {
                     String row=new String(result.getRow());
                        for (Cell cell : result.rawCells()) {
//                            System.out.println("列族:"+new String(CellUtil.cloneQualifier(cell))+">>>"+new String(CellUtil.cloneValue(cell)));
                            if(i==nowPageSize){
                            communtiyKeysMap.put("nextStart", row.substring(0,row.lastIndexOf(":")));
                            break;
                            }
                            communtiyKeysMap.put(row, new String(CellUtil.cloneValue(cell)));
                        }
                i++;
                }
            } catch (IOException e) {
                e.printStackTrace();

            } finally {

                if (scanner != null)
                    scanner.close();
            }
            return communtiyKeysMap;    
        }


3,該分頁中處理和跳轉下一頁

for(int page=1;page<=pageNum;page++){                    //分頁    
             List<String> pageList = new ArrayList<String>();   //子類調用具體分析
            //1.查出要分析的數據
            Map<String,String> communtiyKeysMap=getIndexTableInfo(hTable,hbaseIndexTabel,nowTime,startRow,page,pageSize);
             for(String communitykey:communtiyKeysMap.keySet()){
               String rowKeyIndex=communitykey;               
               String cellValue=communtiyKeysMap.get(rowKeyIndex);
               if(communitykey.equals("nextStart")){
                    startRow=cellValue;
                    continue;                        //下一頁進行跳轉
                }
             }
             
             //實現調用具體的分析
             //實現該分頁處理
}  

該過程總共為三步,1.設置表的協處理器 Aggregation,使表能夠實現統計功能。2.分頁,每次取出1001條數據,每頁數據為1000條,第1001條的rowkey為下一頁的startrowkey,做為標志“nextStart” 。3分頁之后進行查找關聯數據和進行邏輯分析處理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM