運用hbase好長時間了,今天利用閑暇時間把Hbase的各種查詢總結下,以后有時間把協處理器和自定義File總結下。
查詢條件分為:
1、統計表數據
2,hbase 簡單分頁
3,like 查詢
4 , AND 查詢
5 , OR 查詢
6 ,rowkey 的 in 查詢
7 , 正則查詢
上代碼先。
package com.query;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import basecommon.PropertyUtil;
public class HbaseConnectionUtils {
private static Logger log = Logger.getLogger(HbaseConnectionUtils.class);
private static HbaseConnectionUtils instance = null;
public static Configuration config = null;
private static Connection connection = null;
private HbaseConnectionUtils() {
}
/**
* @author c_lishaoying 983068303@qq.com
*
* 加載集群配置
*/
static {
config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum",
PropertyUtil.get("zookeeper.quorum"));
config.set("hbase.zookeeper.property.clientPort",
PropertyUtil.get("zookeeper.property.clientPort"));
config.setLong("hbase.client.scanner.timeout.period", Integer.MAX_VALUE);
try {
connection = ConnectionFactory.createConnection(config);
} catch (Exception e) {
log.debug("connection構建失敗");
}
}
public static HbaseConnectionUtils getInstance() {
if (instance == null) {
// 給類加鎖 防止線程並發
synchronized (HbaseConnectionUtils.class) {
if (instance == null) {
instance = new HbaseConnectionUtils();
}
}
}
return instance;
}
public Configuration getConfiguration() {
return config;
}
public Connection getConnection() {
return connection;
}
/**
* @author c_lishaoying 983068303@qq.com 獲取表連接
*
*/
public static Table getTable(String tableName) {
try {
return connection.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 協處理器查詢總數據
*
*/
public int getRecordCount(String tableName) {
AggregationClient aggregationClient = new AggregationClient(config);
Scan scan = new Scan();
try {
Long rowCount = aggregationClient.rowCount(
TableName.valueOf(tableName), new LongColumnInterpreter(),
scan);
aggregationClient.close();
return rowCount.intValue();
} catch (Throwable e) {
e.printStackTrace();
}
return 0;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 協處理器查詢總數據 添加查詢條件的Scan統計總數
*/
public static int getTotalRecord(Table keyIndexTable, Configuration config,
final Scan scan) {
int count = 0;
AggregationClient aggregationClient = new AggregationClient(config);
try {
Long rowCount = aggregationClient.rowCount(keyIndexTable,
new LongColumnInterpreter(), scan);
aggregationClient.close();
count = rowCount.intValue();
} catch (Throwable e) {
e.printStackTrace();
}
return count;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 協處理器查詢總數據 添加查詢條件的Scan統計總數 queryString數組長度為3
* queryString[0] 列族 queryString[1]字段 queryString[2] 字段值
*/
public static int queryByCloumn(String tablename, String[] queryString) {
int count = 0;
if (tablename == null) {
log.info("配置的表名稱為空!");
} else {
log.info("表名稱為:" + tablename);
Table queryTablename = getTable(tablename);
Scan scan = new Scan();
scan = getScan(queryString);
count = getTotalRecord(queryTablename, config, scan);
}
return count;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 非協處理器查詢總數據 添加查詢條件的Scan統計總數 queryString數組長度為3
* queryString[0] 列族 queryString[1]字段 queryString[2] 字段值
*/
public static int scan(String tablename, String[] condition) {
Table queryTablename = getTable(tablename);
String[] s = condition;
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(s[0]), Bytes.toBytes(s[1]));
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
Bytes.toBytes(s[0]), Bytes.toBytes(s[1]), CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(s[2])));
scan.setFilter(singleColumnValueFilter);
int i = 0;
try {
ResultScanner rs = queryTablename.getScanner(scan);
/*
* for(Iterator it = rs.iterator(); it.hasNext();){ it.next(); }
*/
for (Result r : rs) {
i++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return i;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 查詢字段相等的值 相當於where city = ‘上海’ queryString數組長度為3
* queryString[0] 列族 queryString[1]字段 queryString[2] 字段值
*/
public static Scan getScan(String[] condition) {
Scan scan = new Scan();
if (condition == null || condition.length != 3) {
return new Scan();
}
FilterList filterList = new FilterList();
String[] s = condition;
BinaryComparator comp = new BinaryComparator(Bytes.toBytes(s[2]));
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(s[0]),
Bytes.toBytes(s[1]), CompareOp.EQUAL, Bytes.toBytes(s[2])));
scan.setFilter(filterList);
return scan;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 查詢字段相等的值 應用正則表達式 相當於where city like ‘%上海%’
* queryString數組長度為3 queryString[0] 列族 queryString[1]字段
* queryString[2] 字段值
*/
public static Scan regexscan(String tablename, String[] condition) {
Table queryTablename = getTable(tablename);
String[] s = condition;
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(s[0]), Bytes.toBytes(s[1]));
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
Bytes.toBytes(s[0]), Bytes.toBytes(s[1]), CompareOp.EQUAL,
new RegexStringComparator(".*" + s[2] + ".*"));
scan.setFilter(singleColumnValueFilter);
return scan;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 查詢該列的值
*/
public static Scan valuescan(String tablename, String[] condition) {
Table queryTablename = getTable(tablename);
String[] s = condition;
Scan scan = new Scan();
Filter filter1 = new ValueFilter(CompareOp.EQUAL,
new SubstringComparator(s[2]));
scan.setFilter(filter1);
return scan;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 根據rowkey 查詢 相當於 where id in ()
* queryString數組都為rowkey
*/
public static Scan rowscan(String tablename, String[] condition) {
Scan scan = new Scan();
FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
for (String s : condition) {
Filter filter = new RowFilter(CompareOp.EQUAL,
new SubstringComparator(s));
filterList.addFilter(filter);
}
scan.setFilter(filterList);
return scan;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 查詢該列的值 相當於where city = ‘上海’ AND name =‘酒店’
* queryString數組長度為3 queryString[0] 列族 queryString[1]字段
* queryString[2] 字段值
*/
public static Scan listAndColumnscan(String tablename,
List<String[]> condition) {
Scan scan = new Scan();
List<Filter> filters = new ArrayList<Filter>();
for (String[] s : condition) {
filters.add(new SingleColumnValueFilter(Bytes.toBytes(s[0]), // 列族
Bytes.toBytes(s[1]), // 列名
CompareOp.EQUAL, Bytes.toBytes(s[2]))); // 值
}
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL, filters);
scan.setFilter(filterList);
return scan;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 查詢該列的值 相當於where city = ‘上海’ OR name =‘酒店’
* queryString數組長度為3 queryString[0] 列族 queryString[1]字段
* queryString[2] 字段值
*/
public static Scan listOrColumnscan(String tablename,
List<String[]> condition) {
Scan scan = new Scan();
ArrayList<Filter> listForFilters = new ArrayList<Filter>();
Filter filter = null;
for (String[] s : condition) {
filter = new SingleColumnValueFilter(Bytes.toBytes(s[0]), // 列族
Bytes.toBytes(s[1]), // 列名
CompareOp.EQUAL, Bytes.toBytes(s[2]));
listForFilters.add(filter);
}
// 通過將operator參數設置為Operator.MUST_PASS_ONE,達到list中各filter為"或"的關系
// 默認operator參數的值為Operator.MUST_PASS_ALL,即list中各filter為"並"的關系
Filter filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE,
listForFilters);
scan.setFilter(filterList);// 多條件過濾
return scan;
}
/**
* @author c_lishaoying
* @email 983068303@qq.com 通過多條件聯合查詢和限制返回頁數 相當於mysql 中的limit 0,1000
*/
public Scan queryByFilter(String tablename, List<String[]> arr,
String starString, String stopString) throws IOException {
FilterList filterList = new FilterList();
Scan scan = new Scan();
for (String[] s : arr) {
SubstringComparator comp = new SubstringComparator(s[2]);
filterList
.addFilter(new SingleColumnValueFilter(Bytes.toBytes(s[0]),
Bytes.toBytes(s[1]), CompareOp.EQUAL, comp));
}
PageFilter pageFilter = new PageFilter(1000);
filterList.addFilter(pageFilter);
scan.setFilter(filterList);
scan.setStartRow(Bytes.toBytes(starString));
scan.setStartRow(Bytes.toBytes(stopString));
return scan;
}
}
