使用HBase存儲中國好聲音數據的案例,業務描述如下:
為了能高效的查詢到我們需要的數據,我們在RowKey的設計上下了不少功夫,因為過濾RowKey或者根據RowKey查詢數據的效率是最高的,我們的RowKey的設計是:UserID + CreateTime + FileID,那么我們在HBase中的數據格式如下:
每一行數據中包含兩個Column:f:c和f:n
我們在查詢的時候還是用了SingleColumnValueFilter這個Filter來過濾單個的Column的Value的值,我們說如果在海量數據的時候使用這個SingleColumnValueFilter來過濾數據的話是非常耗時的事情,那么現在問題來了:
問題:
假設針對這張sound的表,我們需要查詢包含“中國好聲音”以及包含“綜藝”的數據,也就是說我們的業務查詢是:
2個條件同時輸入find(“中國好聲音”,“綜藝”)
這個時候我們該怎么查詢呢?
解決方案:
首先,我們現在的查詢條件中沒有對RowKey的過濾了,如果我們直接使用SingleColumnValueFilter這個Filter來過濾查詢數據的話是可以達到目的,但是非常的耗時,所以我們不能使用這種方式
那么,我們現在就使用HBase中的二級索引來解決這個問題,我們先不解釋二級索引是什么,我們先看下解決上面問題的過程,如下:
第一步:創建兩張HBase表
第一張HBase表的RowKey是數據中的Name字段的值,這張表可以有不定數量的Column,每一個Column的值就是sound表的RowKey(和Name對應的RowKey),這張表我們稱之為name_indexer表。create 'name_indexer','f'
第二張HBase表的RowKey是數據中的Category字段的值,這張表可以有不定數量的Column,每一個Column的值就是sound表的RowKey(和Category對應的RowKey),這張表我們稱之為category_indexer表。create 'category_indexer','f'
第二步:將sound中的數據導入到name_indexer和category_indexer兩張表中
使用Spark程序來實現索引表數據的導入,
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
/**
* 使用Spark來建立HBase中表sound的二級索引
*/
object MyIndexBuilder {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("MyIndexBuilder")
.master("local")
.getOrCreate()
// 1、創建HBaseContext
val configuration = HBaseConfiguration.create()
configuration.set("hbase.zookeeper.quorum", "master,slave1,slave2")
val hBaseContext = new HBaseContext(spark.sparkContext, configuration)
// 2、讀取HBase表sound中的f:n和f:c兩個列的值以及他們對應的rowKey的值
// 並且需要區分開是哪一個列的值
val soundRDD = hBaseContext.hbaseRDD(TableName.valueOf("sound"), new Scan())
val indexerRDD: RDD[((String, Array[Byte]), ImmutableBytesWritable)] = soundRDD.flatMap { case (byteRowKey, result) =>
val nameValue = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("n"))
val categoryValue = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("c"))
// 區分開是哪一個列的值,使用key來區分
// 返回key是(tableName,列值), value是這個列對應的rowKey的值
Seq((("name_indexer", nameValue), byteRowKey), (("category_indexer", categoryValue), byteRowKey))
}
// 3、按照key進行分組,拿到相同列值對應的所有的rowKeys(因為在原表sound中多個rowKey的值可能會對應着相同的列值)
val groupedIndexerRDD: RDD[((String, Array[Byte]), Iterable[ImmutableBytesWritable])] = indexerRDD.groupByKey()
// 4、將不同的列值以及對應的rowKeys寫入到相對應的indexer表中
groupedIndexerRDD.foreachPartition { partitionIterator =>
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "master,slave1,slave2")
val conn = ConnectionFactory.createConnection(conf)
val nameIndexerTable = conn.getTable(TableName.valueOf("name_indexer"))
val categoryIndexerTable = conn.getTable(TableName.valueOf("category_indexer"))
try {
val nameIndexerTablePuts = new util.ArrayList[Put]()
val categoryIndexerTablePuts = new util.ArrayList[Put]()
partitionIterator.map { case ((tableName, indexerValue), rowKeys) =>
val put = new Put(indexerValue) // 將列值作為索引表的rowKey
rowKeys.foreach(rowKey => {
put.addColumn(Bytes.toBytes("f"), null, rowKey.get())
})
if (tableName.equals("name_indexer")) {
nameIndexerTablePuts.add(put) // 需要寫入到表name_indexer中的數據
} else {
categoryIndexerTablePuts.add(put) // 需要寫入到表category_indexer中的數據
}
}
nameIndexerTable.put(nameIndexerTablePuts)
categoryIndexerTable.put(categoryIndexerTablePuts)
} finally {
nameIndexerTable.close()
categoryIndexerTable.close()
conn.close()
}
}
spark.stop()
}
}
第三步:查詢結果
我們先從name_indexer這張表中按照RowKey查詢屬於“中國好聲音”的記錄,這些記錄中的所有的列的值就是需要在sound中查詢的RowKey的值
然后從category_indexer這張表中按照RowKey查詢屬於“綜藝”的記錄,這些記錄中的所有的列的值就是需要在sound中查詢的RowKey的值
最后將上面兩步查詢出來的結果做一個合並,就是將查詢出來的結果做一次去重,得到了所有在sound中符合需求的RowKey,然后在根據這些RowKey去sound表中查詢相應的數據
我們每一步查詢都是根據HBase中的一級索引RowKey來查詢的,所以查詢速度會非常的快
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class SecondaryIndexSearcher {
public static void main(String[] args) throws IOException {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "master,slave1,slave2");
try(Connection connection = ConnectionFactory.createConnection(config)) {
Table nameIndexer = connection.getTable(TableName.valueOf("name_indexer"));
Table categoryIndexer = connection.getTable(TableName.valueOf("category_indexer"));
Table sound = connection.getTable(TableName.valueOf("sound"));
// 1、先從表name_indexer中找到rowKey包含“中國好聲音”對應的所有的column值
Scan nameIndexerScan = new Scan();
SubstringComparator nameComp = new SubstringComparator("中國好聲音");
RowFilter nameRowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, nameComp);
nameIndexerScan.setFilter(nameRowFilter);
Set<String> soundRowKeySetOne = new HashSet<>();
ResultScanner rsOne = nameIndexer.getScanner(nameIndexerScan);
try {
for (Result r = rsOne.next(); r != null; r = rsOne.next()) {
for (Cell cell : r.listCells()) {
soundRowKeySetOne.add(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
} finally {
rsOne.close(); // always close the ResultScanner!
}
// 2、再從表category_indexer中找到rowKey包含“綜藝”對應的所有的column值
Scan categoryIndexerScan = new Scan();
SubstringComparator categoryComp = new SubstringComparator("綜藝");
RowFilter categoryRowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, categoryComp);
nameIndexerScan.setFilter(categoryRowFilter);
Set<String> soundRowKeySetTwo = new HashSet<>();
ResultScanner rsTwo = categoryIndexer.getScanner(categoryIndexerScan);
try {
for (Result r = rsTwo.next(); r != null; r = rsTwo.next()) {
for (Cell cell : r.listCells()) {
soundRowKeySetTwo.add(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
} finally {
rsTwo.close(); // always close the ResultScanner!
}
// 3、合並並去重上面兩步查詢的結果
soundRowKeySetOne.addAll(soundRowKeySetTwo);
// 4、根據soundRowKeySetOne中所有的rowKeys去sound表中查詢數據
List<Get> gets = new ArrayList<>();
for (String rowKey : soundRowKeySetOne) {
Get get = new Get(Bytes.toBytes(rowKey));
gets.add(get);
}
Result[] results = sound.get(gets);
for (Result result : results) {
for (Cell cell : result.listCells()) {
System.out.println(Bytes.toString(CellUtil.cloneRow(cell)) + "===> " +
Bytes.toString(CellUtil.cloneFamily(cell)) + ":" +
Bytes.toString(CellUtil.cloneQualifier(cell)) + "{" +
Bytes.toString(CellUtil.cloneValue(cell)) + "}");
}
}
}
}
}
結論:
那么表name_indexer和category_indexer中的RowKey就是我們解決問題的二級索引,
所以二級索引的本質就是:建立各列值與行鍵之間的映射關系
最后,我們需要知道創建HBase二級索引的方式
1、Spark來實現二級索引的建立
我們前面使用的是Spark來實現二級索引的建立,但是這種方式適用於離線批處理,這些二級索引是每天或者每段時間執行一次的建立的
2、使用HBase的協處理器(coprocessor)
對於如果數據是實時更新的話,則這種離線批處理的方式是不行的,這個時候我們可以使用HBase的協處理器(coprocessor)
HBase的協處理器(Coprocessor)的介紹可以參考:
https://www.cnblogs.com/small-k/p/9648453.html
3、HBase + Solr其實也是一個二級索引實現,只不過是把二級索引存儲在Solr中
