hbase+springboot+redis實現分頁


實現原理:

  1、讀取hbase數據每頁的數據時多取一條數據。如:分頁是10條一頁,第一次查詢hbase時, 取10+1條數據,然后把第一條和最后一條rowkey數據保存在redis中,redis中的key為用戶的token+URL。即token.set(token+url:list<String>);

  2、前台點擊下頁時,查詢當前頁(currentPagae)在redis的list是否存在list.get(currentPage)的rowkey。如果存在,則以之前為startRowKey,取10+1條,並把最后一條保存在redis中。不存在則查詢出錯,提示重新查詢,理論上不會出現,除非redis掛了。

  3、如果查詢的數據的startRowKey和stopRowKey在token中都能找到。則只需要查詢這個范圍數據即可。

  3、什么時候清除這些redis數據呢?第一、設置redis有效期。第二,這條很重要,是保證前三條數據准確性的前提。在用戶點擊非下一頁上一頁按鈕的操作時,都清除redis中的當前用戶的數據。

  4、即然有分頁,那當然有數據量統計count,這個我使用hbase的協處理器coprocessor。當然每次查詢count也費時間。當第一次查詢時,把count保存在用戶的redis中。redis的清除還是第(3)步的過程。

  5、能這么做還有一個很重要的前提:前端界面的分頁,只提供了兩個按鈕:上一頁和下一頁。這是保證這個方案可行性的基礎。

下面上代碼:

controller中方法的代碼,這里的基本框架使用的是renren快速開發套件

@ResponseBody
	@RequestMapping("/list")
	public R list(@RequestParam Map<String, Object> params,HttpServletRequest httpRequest){

		long time = System.currentTimeMillis();
		HBasePage page= null;
		try{
			String token = httpRequest.getHeader("token");

			//如需要額外的過濾器,請自己定義並使用.buildFilter(filter)方法添加到query中
			//請注意順序:如果有RowKey的過濾,先添加buildRowKeyFilter,再加buildRowKeyPage。否則無效
			HbaseQuery query = new HbaseQuery(params,token,true);

			Object cnOrderCodeObject = params.get("cnOrderCode");
			if(cnOrderCodeObject!=null&&!((String)cnOrderCodeObject).equals("")){
				String cnOrderCode = (String)cnOrderCodeObject;
				query.buildRowKeyFilter(cnOrderCode,cnOrderCode);
			}
			query.buildRowKeyPage().finish();
			page = service.query(query).buildRedisRowKey(token);

		}catch (Exception e){
			e.printStackTrace();
		}
		long time2 = System.currentTimeMillis();
		System.out.println("time2-time==list="+(time2-time));
		return R.ok().put("page",page);

	}

  

  

處理查詢參數的類HBaseQuery,因為業務原因,所有的hbase查詢有兩個必須的條件:開始日期和結束日期,所以我在HBaseQuery中把這兩個參數直接封裝了。

@Data
public class HbaseQuery  {
	private static final long serialVersionUID = 1L;
	//當前頁碼
    private int page;
    //每頁條數
    private long limit;

    private Map<String, Object> params;

    private List<String> pageStartRowKeys;

    private Date startDate;

    private Date endDate;

    private StringBuffer startRowKey = new StringBuffer();
    private StringBuffer stopRowKey = new StringBuffer();

    private RedisUtils redisUtils = (RedisUtils)SpringContextUtils.getBean("redisUtils");

    private Scan countScan;

    private Scan dataScan= new Scan();

    private int cache = 10;

    private DateFormat sf =new SimpleDateFormat("yyyy-MM-dd");

    private FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL);
    private FilterList countfl = new FilterList(FilterList.Operator.MUST_PASS_ALL);

    private boolean isOpenCount;
    private String token;

    public HbaseQuery(Map<String, Object> params,String token,boolean isOpenCount)throws  Exception{
        this.isOpenCount = isOpenCount;
        this.token = token;
        this.params = params;
        String temp2 = (String)params.get("startDate");
        startDate = sf.parse(temp2);
        String temp = (String)params.get("endDate");
        endDate = sf.parse(temp);
        endDate = DateUtils.addDays(endDate,1);
        this.page = Integer.parseInt(params.get("page").toString());
        this.limit = Integer.parseInt(params.get("limit").toString());
        cache = limit>5000?5000:((int)limit+1);//加1,因為每次都會取limit+1條數據
        params.remove("startDate");
        params.remove("endDate");
        params.remove("page");
        params.remove("limit");

        initRowKeyFilter();
    }

    public void initRowKeyFilter(){
        Long endLong = Long.MAX_VALUE-startDate.getTime();
        Long startLong = Long.MAX_VALUE-(DateUtils.addDays(endDate,1).getTime()-1);
        stopRowKey.append(endLong);
        startRowKey.append(startLong);
    }

    //請注意順序:如果有RowKey的過濾,先添加buildRowKeyFilter,再加buildRowKeyPage。否則無效
    public HbaseQuery buildRowKeyFilter(String startRowKey,String stopRowKey){
        if (this.startRowKey.equals("")) {
            this.stopRowKey.append(startRowKey);
        } else {
            this.stopRowKey.append("-").append(startRowKey);
        }
        if (this.stopRowKey.equals("")) {
            this.stopRowKey.append(stopRowKey);
        } else {
            this.stopRowKey.append("-").append(stopRowKey);
        }
        return this;
    }


    //請注意順序:如果有RowKey的過濾,先添加buildRowKeyFilter,再加buildRowKeyPage。否則無效
    public HbaseQuery buildRowKeyPage()throws Exception{


        List<String> pageStartRowKeys = redisUtils.get(token,List.class);


        //點擊上一頁或下一頁
        Object temp =params.get("pageicon");
        if(temp!=null&&!((String)temp).equals("")&&!((String)temp).equals("records")){
            //且redis中的startRowKeys不為空
            String pageicon =  (String)temp;
            if(pageStartRowKeys!=null){
                String startRowKey = pageStartRowKeys.get(this.page-1);
                if(pageicon.equals("next")&&pageStartRowKeys.size()==this.page){
                    this.startRowKey = new StringBuffer(startRowKey);
                    Filter pageFilter=new PageFilter(cache);
                    fl.addFilter(pageFilter);
                }else if((pageicon.equals("next")&&pageStartRowKeys.size()>this.page)
                        ||pageicon.equals("prev")){
                    String stopRowKey = pageStartRowKeys.get(this.page);
                    this.startRowKey = new StringBuffer(startRowKey);
                    this.stopRowKey = new StringBuffer(stopRowKey);
                    Filter pageFilter=new PageFilter(this.getLimit());
                    fl.addFilter(pageFilter);
                }

            }else{
                throw new Exception("點的是分頁,但是redis中沒有數據,這程序肯定有問題");
            }
        }else{//點擊的非分頁按鈕,則刪除redis中分頁信息
            redisUtils.delete(token);
            redisUtils.delete(RedisKeys.getPageCountKey(token));
            Filter pageFilter=new PageFilter(this.getLimit()+1);
            fl.addFilter(pageFilter);
        }
        dataScan.setCaching(cache);
        return this;
    }

    public HbaseQuery addFilter(Filter filter){
        fl.addFilter(filter);
        countfl.addFilter(filter);
        return this;
    }

    public HbaseQuery finish(){
        String count = redisUtils.get(RedisKeys.getPageCountKey(token));
        if(isOpenCount&&(count==null||count.equals(""))){
            countScan= new Scan();
            countScan.setMaxVersions();
            countScan.setCaching(5);
            countScan.setStartRow((startRowKey.toString()).getBytes());
            countScan.setStopRow((stopRowKey.toString()).getBytes());
            countScan.setFilter(countfl);
        }
        dataScan.setMaxVersions();
        dataScan.setStartRow((startRowKey.toString()).getBytes());
        dataScan.setStopRow((stopRowKey.toString()).getBytes());
        dataScan.setFilter(fl);

        return this;
    }

}

  

  

查詢hbase的方法。注意:在使用hbase的協處理器前,請先確保表開通了此功能。

  hbase表開通協處理功能方法(shell命令):

  (1)disable指定表。hbase> disable 'mytable'
  (2)添加aggregation hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'
  (3)重啟指定表 hbase> enable 'mytable'

public HBasePage query(HbaseQuery query) {
        long time = System.currentTimeMillis();
        Map<String,String> communtiyKeysMap = new HashMap<>();
        HBasePage page = new HBasePage(query.getLimit(),query.getPage());
        final String tableName = this.getTableName();
        if(query.getCountScan()!=null){
            AggregationClient ac = new AggregationClient(hbaseTemplate.getConfiguration());
            try{
                long count = ac.rowCount(TableName.valueOf(tableName),new LongColumnInterpreter(),query.getCountScan());
                page.setTotalCount(count);
            }catch (Throwable  e){
                e.printStackTrace();
            }
        }
        long time2 = System.currentTimeMillis();
        List rows = hbaseTemplate.find(tableName, query.getDataScan(), new RowMapper<Object>() {

            @Override
            public Object mapRow(Result result, int i) throws Exception {
                Class clazz = ReflectMap.get(tableName);//這里做了表名和實體Bean的映射。
                if(i==0){
                    communtiyKeysMap.put("curPageStart", new String(result.getRow()));
                }
                if(i==query.getLimit()){
                    communtiyKeysMap.put("nextPageStart", new String(result.getRow()));
                }
                HBaseResultBuilder hrb = new HBaseResultBuilder<Object>("sf", result, clazz);
                return hrb.buildAll().fetch();
            }
        });
        //
        if(rows.size()>0&&page.getPageSize()<rows.size()){
            rows.remove(rows.size()-1);
        }
        page.setList(rows);
        page.setNextPageRow(communtiyKeysMap.get("nextPageStart"));
        page.setCurPageRow(communtiyKeysMap.get("curPageStart"));
        long time3 = System.currentTimeMillis();
        System.out.println("time2-time==getCount="+(time2-time));
        System.out.println("time3-time2==getData="+(time3-time2));
        return page;
    }

/分頁類的代碼HbasePage

public class HBasePage  implements Serializable {
	private static final long serialVersionUID = 1L;
	//總記錄數
	protected long totalCount;
	//每頁記錄數
	protected long pageSize;
	//總頁數
	protected int totalPage;
	//當前頁數
	protected int currPage;
	//列表數據
	protected List<Object> list;

	private String nextPageRow;//下一頁的ROWKEY

	private String curPageRow;//當前頁的開始ROWKEY

	/**
	 * 分頁
	 * @param pageSize    每頁記錄數
	 * @param currPage    當前頁數
	 */
	public HBasePage(long pageSize, int currPage) {
		this.list = list;
		this.totalCount = totalCount;
		this.pageSize = pageSize;
		this.currPage = currPage;
	}


	public void setTotalCount(long totalCount) {
		this.totalCount = totalCount;
		this.totalPage = (int)Math.ceil((double)totalCount/pageSize);
	}

	public HBasePage buildRedisRowKey(String token){
		RedisUtils redisUtils = (RedisUtils)SpringContextUtils.getBean("redisUtils");
		List<String> pageStartRowKeys = redisUtils.get(token,List.class);
		List<String> pageRowKeys = redisUtils.get(token,List.class);
		if(this.getList().size()>0){
			if(pageRowKeys==null||pageRowKeys.size()<=0){
				pageRowKeys = new ArrayList<>();
				pageRowKeys.add(this.getCurPageRow().substring(0,this.getCurPageRow().indexOf("-")+1));
				pageRowKeys.add(this.getNextPageRow().substring(0,this.getNextPageRow().indexOf("-")+1));
				redisUtils.set(token,pageRowKeys);
			}else{
				if(pageRowKeys.size()>this.getCurrPage()){
					//doNothing
				}else if(pageRowKeys.size()==this.getCurrPage()){
					pageRowKeys.add(this.getNextPageRow().substring(0,this.getNextPageRow().indexOf("-")+1));
					redisUtils.set(token,pageRowKeys);
				}
			}
		}
		return this;
	}

}  

注意:

1、我的rowKey設置規則是 (Long_Max-new Date().getTime()+"-"+id),所以在看startRowKey和stopRowKey時特別注意。

如有什么更好的辦法或代碼缺陷,歡迎留言探討。

 

@ResponseBody
@RequestMapping("/list")
@RequiresPermissions("business:stockinbill:list")
public R list(@RequestParam Map<String, Object> params,HttpServletRequest httpRequest){

long time = System.currentTimeMillis();
HBasePage page= null;
try{
String token = httpRequest.getHeader("token");

//如需要額外的過濾器,請自己定義並使用.buildFilter(filter)方法添加到query
//請注意順序:如果有RowKey的過濾,先添加buildRowKeyFilter,再加buildRowKeyPage。否則無效
HbaseQuery query = new HbaseQuery(params,token,true);

Object cnOrderCodeObject = params.get("cnOrderCode");
if(cnOrderCodeObject!=null&&!((String)cnOrderCodeObject).equals("")){
String cnOrderCode = (String)cnOrderCodeObject;
query.buildRowKeyFilter(cnOrderCode,cnOrderCode);
}
query.buildRowKeyPage().finish();
page = stockInBillService.query(query).buildRedisRowKey(token);

}catch (Exception e){
e.printStackTrace();
}
long time2 = System.currentTimeMillis();
System.out.println("time2-time==list="+(time2-time));
return R.ok().put("page",page);

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM