實現原理:
1、讀取hbase數據每頁的數據時多取一條數據。如:分頁是10條一頁,第一次查詢hbase時, 取10+1條數據,然后把第一條和最后一條rowkey數據保存在redis中,redis中的key為用戶的token+URL。即token.set(token+url:list<String>);
2、前台點擊下頁時,查詢當前頁(currentPagae)在redis的list是否存在list.get(currentPage)的rowkey。如果存在,則以之前為startRowKey,取10+1條,並把最后一條保存在redis中。不存在則查詢出錯,提示重新查詢,理論上不會出現,除非redis掛了。
3、如果查詢的數據的startRowKey和stopRowKey在token中都能找到。則只需要查詢這個范圍數據即可。
3、什么時候清除這些redis數據呢?第一、設置redis有效期。第二,這條很重要,是保證前三條數據准確性的前提。在用戶點擊非下一頁上一頁按鈕的操作時,都清除redis中的當前用戶的數據。
4、即然有分頁,那當然有數據量統計count,這個我使用hbase的協處理器coprocessor。當然每次查詢count也費時間。當第一次查詢時,把count保存在用戶的redis中。redis的清除還是第(3)步的過程。
5、能這么做還有一個很重要的前提:前端界面的分頁,只提供了兩個按鈕:上一頁和下一頁。這是保證這個方案可行性的基礎。
下面上代碼:
controller中方法的代碼,這里的基本框架使用的是renren快速開發套件
@ResponseBody @RequestMapping("/list") public R list(@RequestParam Map<String, Object> params,HttpServletRequest httpRequest){ long time = System.currentTimeMillis(); HBasePage page= null; try{ String token = httpRequest.getHeader("token"); //如需要額外的過濾器,請自己定義並使用.buildFilter(filter)方法添加到query中 //請注意順序:如果有RowKey的過濾,先添加buildRowKeyFilter,再加buildRowKeyPage。否則無效 HbaseQuery query = new HbaseQuery(params,token,true); Object cnOrderCodeObject = params.get("cnOrderCode"); if(cnOrderCodeObject!=null&&!((String)cnOrderCodeObject).equals("")){ String cnOrderCode = (String)cnOrderCodeObject; query.buildRowKeyFilter(cnOrderCode,cnOrderCode); } query.buildRowKeyPage().finish(); page = service.query(query).buildRedisRowKey(token); }catch (Exception e){ e.printStackTrace(); } long time2 = System.currentTimeMillis(); System.out.println("time2-time==list="+(time2-time)); return R.ok().put("page",page); }
處理查詢參數的類HBaseQuery,因為業務原因,所有的hbase查詢有兩個必須的條件:開始日期和結束日期,所以我在HBaseQuery中把這兩個參數直接封裝了。
@Data public class HbaseQuery { private static final long serialVersionUID = 1L; //當前頁碼 private int page; //每頁條數 private long limit; private Map<String, Object> params; private List<String> pageStartRowKeys; private Date startDate; private Date endDate; private StringBuffer startRowKey = new StringBuffer(); private StringBuffer stopRowKey = new StringBuffer(); private RedisUtils redisUtils = (RedisUtils)SpringContextUtils.getBean("redisUtils"); private Scan countScan; private Scan dataScan= new Scan(); private int cache = 10; private DateFormat sf =new SimpleDateFormat("yyyy-MM-dd"); private FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL); private FilterList countfl = new FilterList(FilterList.Operator.MUST_PASS_ALL); private boolean isOpenCount; private String token; public HbaseQuery(Map<String, Object> params,String token,boolean isOpenCount)throws Exception{ this.isOpenCount = isOpenCount; this.token = token; this.params = params; String temp2 = (String)params.get("startDate"); startDate = sf.parse(temp2); String temp = (String)params.get("endDate"); endDate = sf.parse(temp); endDate = DateUtils.addDays(endDate,1); this.page = Integer.parseInt(params.get("page").toString()); this.limit = Integer.parseInt(params.get("limit").toString()); cache = limit>5000?5000:((int)limit+1);//加1,因為每次都會取limit+1條數據 params.remove("startDate"); params.remove("endDate"); params.remove("page"); params.remove("limit"); initRowKeyFilter(); } public void initRowKeyFilter(){ Long endLong = Long.MAX_VALUE-startDate.getTime(); Long startLong = Long.MAX_VALUE-(DateUtils.addDays(endDate,1).getTime()-1); stopRowKey.append(endLong); startRowKey.append(startLong); } //請注意順序:如果有RowKey的過濾,先添加buildRowKeyFilter,再加buildRowKeyPage。否則無效 public HbaseQuery buildRowKeyFilter(String startRowKey,String stopRowKey){ if (this.startRowKey.equals("")) { this.stopRowKey.append(startRowKey); } else { this.stopRowKey.append("-").append(startRowKey); } if (this.stopRowKey.equals("")) { this.stopRowKey.append(stopRowKey); } else { this.stopRowKey.append("-").append(stopRowKey); } return this; } //請注意順序:如果有RowKey的過濾,先添加buildRowKeyFilter,再加buildRowKeyPage。否則無效 public HbaseQuery buildRowKeyPage()throws Exception{ List<String> pageStartRowKeys = redisUtils.get(token,List.class); //點擊上一頁或下一頁 Object temp =params.get("pageicon"); if(temp!=null&&!((String)temp).equals("")&&!((String)temp).equals("records")){ //且redis中的startRowKeys不為空 String pageicon = (String)temp; if(pageStartRowKeys!=null){ String startRowKey = pageStartRowKeys.get(this.page-1); if(pageicon.equals("next")&&pageStartRowKeys.size()==this.page){ this.startRowKey = new StringBuffer(startRowKey); Filter pageFilter=new PageFilter(cache); fl.addFilter(pageFilter); }else if((pageicon.equals("next")&&pageStartRowKeys.size()>this.page) ||pageicon.equals("prev")){ String stopRowKey = pageStartRowKeys.get(this.page); this.startRowKey = new StringBuffer(startRowKey); this.stopRowKey = new StringBuffer(stopRowKey); Filter pageFilter=new PageFilter(this.getLimit()); fl.addFilter(pageFilter); } }else{ throw new Exception("點的是分頁,但是redis中沒有數據,這程序肯定有問題"); } }else{//點擊的非分頁按鈕,則刪除redis中分頁信息 redisUtils.delete(token); redisUtils.delete(RedisKeys.getPageCountKey(token)); Filter pageFilter=new PageFilter(this.getLimit()+1); fl.addFilter(pageFilter); } dataScan.setCaching(cache); return this; } public HbaseQuery addFilter(Filter filter){ fl.addFilter(filter); countfl.addFilter(filter); return this; } public HbaseQuery finish(){ String count = redisUtils.get(RedisKeys.getPageCountKey(token)); if(isOpenCount&&(count==null||count.equals(""))){ countScan= new Scan(); countScan.setMaxVersions(); countScan.setCaching(5); countScan.setStartRow((startRowKey.toString()).getBytes()); countScan.setStopRow((stopRowKey.toString()).getBytes()); countScan.setFilter(countfl); } dataScan.setMaxVersions(); dataScan.setStartRow((startRowKey.toString()).getBytes()); dataScan.setStopRow((stopRowKey.toString()).getBytes()); dataScan.setFilter(fl); return this; } }
查詢hbase的方法。注意:在使用hbase的協處理器前,請先確保表開通了此功能。
hbase表開通協處理功能方法(shell命令):
(1)disable指定表。hbase> disable 'mytable'
(2)添加aggregation hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'
(3)重啟指定表 hbase> enable 'mytable'
public HBasePage query(HbaseQuery query) { long time = System.currentTimeMillis(); Map<String,String> communtiyKeysMap = new HashMap<>(); HBasePage page = new HBasePage(query.getLimit(),query.getPage()); final String tableName = this.getTableName(); if(query.getCountScan()!=null){ AggregationClient ac = new AggregationClient(hbaseTemplate.getConfiguration()); try{ long count = ac.rowCount(TableName.valueOf(tableName),new LongColumnInterpreter(),query.getCountScan()); page.setTotalCount(count); }catch (Throwable e){ e.printStackTrace(); } } long time2 = System.currentTimeMillis(); List rows = hbaseTemplate.find(tableName, query.getDataScan(), new RowMapper<Object>() { @Override public Object mapRow(Result result, int i) throws Exception { Class clazz = ReflectMap.get(tableName);//這里做了表名和實體Bean的映射。 if(i==0){ communtiyKeysMap.put("curPageStart", new String(result.getRow())); } if(i==query.getLimit()){ communtiyKeysMap.put("nextPageStart", new String(result.getRow())); } HBaseResultBuilder hrb = new HBaseResultBuilder<Object>("sf", result, clazz); return hrb.buildAll().fetch(); } }); // if(rows.size()>0&&page.getPageSize()<rows.size()){ rows.remove(rows.size()-1); } page.setList(rows); page.setNextPageRow(communtiyKeysMap.get("nextPageStart")); page.setCurPageRow(communtiyKeysMap.get("curPageStart")); long time3 = System.currentTimeMillis(); System.out.println("time2-time==getCount="+(time2-time)); System.out.println("time3-time2==getData="+(time3-time2)); return page; }
/分頁類的代碼HbasePage
public class HBasePage implements Serializable { private static final long serialVersionUID = 1L; //總記錄數 protected long totalCount; //每頁記錄數 protected long pageSize; //總頁數 protected int totalPage; //當前頁數 protected int currPage; //列表數據 protected List<Object> list; private String nextPageRow;//下一頁的ROWKEY private String curPageRow;//當前頁的開始ROWKEY /** * 分頁 * @param pageSize 每頁記錄數 * @param currPage 當前頁數 */ public HBasePage(long pageSize, int currPage) { this.list = list; this.totalCount = totalCount; this.pageSize = pageSize; this.currPage = currPage; } public void setTotalCount(long totalCount) { this.totalCount = totalCount; this.totalPage = (int)Math.ceil((double)totalCount/pageSize); } public HBasePage buildRedisRowKey(String token){ RedisUtils redisUtils = (RedisUtils)SpringContextUtils.getBean("redisUtils"); List<String> pageStartRowKeys = redisUtils.get(token,List.class); List<String> pageRowKeys = redisUtils.get(token,List.class); if(this.getList().size()>0){ if(pageRowKeys==null||pageRowKeys.size()<=0){ pageRowKeys = new ArrayList<>(); pageRowKeys.add(this.getCurPageRow().substring(0,this.getCurPageRow().indexOf("-")+1)); pageRowKeys.add(this.getNextPageRow().substring(0,this.getNextPageRow().indexOf("-")+1)); redisUtils.set(token,pageRowKeys); }else{ if(pageRowKeys.size()>this.getCurrPage()){ //doNothing }else if(pageRowKeys.size()==this.getCurrPage()){ pageRowKeys.add(this.getNextPageRow().substring(0,this.getNextPageRow().indexOf("-")+1)); redisUtils.set(token,pageRowKeys); } } } return this; } }
注意:
1、我的rowKey設置規則是 (Long_Max-new Date().getTime()+"-"+id),所以在看startRowKey和stopRowKey時特別注意。
如有什么更好的辦法或代碼缺陷,歡迎留言探討。
@ResponseBody
@RequestMapping("/list")
@RequiresPermissions("business:stockinbill:list")
public R list(@RequestParam Map<String, Object> params,HttpServletRequest httpRequest){
long time = System.currentTimeMillis();
HBasePage page= null;
try{
String token = httpRequest.getHeader("token");
//如需要額外的過濾器,請自己定義並使用.buildFilter(filter)方法添加到query中
//請注意順序:如果有RowKey的過濾,先添加buildRowKeyFilter,再加buildRowKeyPage。否則無效
HbaseQuery query = new HbaseQuery(params,token,true);
Object cnOrderCodeObject = params.get("cnOrderCode");
if(cnOrderCodeObject!=null&&!((String)cnOrderCodeObject).equals("")){
String cnOrderCode = (String)cnOrderCodeObject;
query.buildRowKeyFilter(cnOrderCode,cnOrderCode);
}
query.buildRowKeyPage().finish();
page = stockInBillService.query(query).buildRedisRowKey(token);
}catch (Exception e){
e.printStackTrace();
}
long time2 = System.currentTimeMillis();
System.out.println("time2-time==list="+(time2-time));
return R.ok().put("page",page);
}