一、背景說明:
在目前實時數倉中,由於維表具有主鍵唯一性的特點,Hbase/Redis通常作為維表存放選擇
- Hbase:數據存於磁盤具有持久性但是查詢效率慢。
- Redis:數據存於內存查詢效率高,但維表多數據量大時候占用資源多。
基於旁路緩存思想,對維表存儲的優化的思路為:維表數據存儲在Hbase,使用Redis作為緩存,但查詢維表時有限查詢Redis,如果沒有該維表則去Hbase查詢后並將維表數據放入Redis,並按一定時間保存,超過時間Redis自動清理(可使不常用維表無需常駐內存,缺點是首次查詢較慢):

Phoenix支持SQL語法,使用Hbase可以通過Phoenix操作更簡便。
二、代碼實現
示例維表如下,使用直接查詢及旁路緩存兩種方式,對比用時。
0: jdbc:phoenix:hadoop102> select * from GMALL0820_REALTIME.DIM_BASE_PROVINCE;
+-----+-------+------------+------------+-----------+-------------+
| ID | NAME | REGION_ID | AREA_CODE | ISO_CODE | ISO_3166_2 |
+-----+-------+------------+------------+-----------+-------------+
| 1 | 北京 | 1 | 110000 | CN-11 | CN-BJ |
1.不使用旁路緩存,直接通過Phoenix查詢hbase維表數據示例:
package com.test.cacheside;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.List;
public class DimUtilWithNoCache {
//從phoenix查詢數據 入參使用動態參數,可以實現多條件查詢
public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String,String>... cloNameAndValue){
//拼接SQL語句
String wheresql = " where ";
for (int i = 0; i < cloNameAndValue.length; i++) {
Tuple2<String,String> tuple = cloNameAndValue[i];
if (i>0){
wheresql+="and"; //多條件查詢拼接and條件
}
wheresql += tuple.f0 + "='" + tuple.f1 + "'";
}
String sql = "select * from " + tableName + wheresql;
System.out.println("查詢SQL為:" + sql);
JSONObject dimJsonObj = null;
List<JSONObject> dimList = PhoenixUtil.queryList(sql, JSONObject.class);
if (dimList!=null || dimList.size()>0){
dimJsonObj = dimList.get(0);
}else{
System.out.println("維度數據沒有找到:" + sql);
}
return dimJsonObj;
}
public static void main(String[] args) {
long startTime=System.currentTimeMillis(); //獲取開始時間
System.out.println(getDimInfoNoCache("DIM_BASE_PROVINCE", Tuple2.of("id", "1")));
long endTime=System.currentTimeMillis(); //獲取結束時間
System.out.println("程序運行時間: "+(endTime-startTime)+"ms");
}
}
2.使用旁路緩存實現:
package com.test.cacheside;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import java.util.List;
public class DimUtilWithCache {
//方法重載的思路,減少調用時候的輸入
public static JSONObject getDimInfo(String tableName, String id){
return getDimInfo(tableName, Tuple2.of("id",id));
}
/*
優化,從phoenix查詢數據,加入旁路緩存(Redis),先從緩存查詢,如果緩存沒有查到數據,再到Phoenix查詢,並將查詢結果放到緩存
redis
類型 string
Key dim:表名:值 例如:dim:DIM_BASE_PROVINCE:10_xxx
value 通過PhoenixUtil到維度表中查詢數據,取出第一條並將其轉換為json字符串
失效時間: 24*3600
*/
//入參使用Tuple及動態參數,可以實現多條件查詢
public static JSONObject getDimInfo(String tableName, Tuple2<String,String>... cloNameAndValue){
String wheresql = " where ";
String redisKey = "dim:" + tableName.toLowerCase() + ":";
for (int i = 0; i < cloNameAndValue.length; i++) {
Tuple2<String,String> tuple = cloNameAndValue[i];
if (i > 0){
wheresql += "and";
redisKey += "_";
}
wheresql += tuple.f0 + "='" + tuple.f1 + "'";
redisKey += tuple.f1;
}
//從Redis中獲取數據
Jedis jedis = null;
String dimJsonStr = null;
JSONObject dimJsonObj = null;
try {
//獲取jedis客戶端
jedis = RedisUtil.getJedis();
//根據key到redis中查詢
dimJsonStr = jedis.get(redisKey);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("從Redis中查詢維度失敗!");
}
//判斷是否從Redis中查詢到數據
if (dimJsonStr != null && dimJsonStr.length()>0){
dimJsonObj = JSON.parseObject(dimJsonStr);
}else{
//Redis沒有查到數據,到phoenix中查詢
String sql = "select * from " + tableName + wheresql;
System.out.println("查詢SQL為:" + sql);
List<JSONObject> dimList = PhoenixUtil.queryList(sql, JSONObject.class);
if (dimList!=null || dimList.size()>0){
dimJsonObj = dimList.get(0);
//將查詢出來的數據放到Redis中緩存起來
if (jedis!=null){
jedis.setex(redisKey,3600*24,dimJsonObj.toJSONString());
}
}else{
System.out.println("維度數據沒有找到:" + sql);
}
}
//關閉jedis
if (jedis!=null){
jedis.close();
}
return dimJsonObj;
}
public static void main(String[] args) {
long startTime=System.currentTimeMillis(); //獲取開始
System.out.println(DimUtilWithCache.getDimInfo("DIM_BASE_PROVINCE","1"));
long endTime=System.currentTimeMillis(); //獲取結束時間
System.out.println("程序運行時間: "+(endTime-startTime)+"ms");
}
}
查詢Redis,可以看到結果如下:
127.0.0.1:6379> keys *
1) "dim:dim_base_province:1"
127.0.0.1:6379> get dim:dim_base_province:1
"{\"REGION_ID\":\"1\",\"ISO_CODE\":\"CN-11\",\"ISO_3166_2\":\"CN-BJ\",\"ID\":\"1\",\"AREA_CODE\":\"110000\",\"NAME\":\"\xe5\x8c\x97\xe4\xba\xac\"}"
ps:首次運行旁路緩存的查詢時間較慢,后續查詢這是基於Redis查詢。
三:其他工具類代碼及依賴補充說明:
1.PhoenixUtil(從Phoenix查詢數據工具類)
package com.test.cacheside;
import org.apache.commons.beanutils.BeanUtils;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class PhoenixUtil {
private static Connection conn = null;
public static void init(){
try {
//注冊驅動
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
//獲取Phoenix的連接
conn = DriverManager.getConnection("jdbc:phoenix:hadoop102:2181");
//指定操作的表空間
conn.setSchema("GMALL0820_REALTIME");
} catch (Exception e) {
e.printStackTrace();
}
}
//從Phoenix查詢數據
public static <T> List<T> queryList(String sql, Class<T> clazz){
List<T> resultList = new ArrayList<T>();
if (conn==null){
init();
}
//獲取數據庫操作對象
PreparedStatement ps = null;
ResultSet rs = null;
try {
ps = conn.prepareStatement(sql);
//執行SQL語句
rs = ps.executeQuery();
//處理結果集
//查詢元數據信息
ResultSetMetaData metaData = rs.getMetaData();
//判斷結果集中是否存在數據,如果有,那么進行一次循環
while (rs.next()) {
//創建一個對象,用於封裝查詢出來的一條結果集中的數據
T obj = clazz.newInstance(); //不知道字段類型,使用反射獲取
//對查詢的所有列進行遍歷,獲取每一列的名稱
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String columnName = metaData.getColumnName(i);
BeanUtils.setProperty(obj,columnName,rs.getObject(i));
}
//將當前結果中的一行數據封裝的obj對象放到list集合中
resultList.add(obj);
}
}catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("從維度表取數據錯誤!");
}
finally {
if (rs!=null){
try {
rs.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
if (ps!=null){
try {
ps.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
return resultList;
}
}
2.RedisUtil(通過jdispool連接Redis)
package com.test.cacheside;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtil {
private static JedisPool jedisPool;
public static Jedis getJedis(){
if (jedisPool == null){
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(100); //最大可用連接數
jedisPoolConfig.setBlockWhenExhausted(true); //連接耗盡是否等待
jedisPoolConfig.setMaxWaitMillis(2000); //等待時間
jedisPoolConfig.setMaxIdle(5); //最大閑置連接數
jedisPoolConfig.setMinIdle(5); //最小閑置連接數
jedisPoolConfig.setTestOnBorrow(true); //取連接的時候進行一下測試 ping pong
jedisPool = new JedisPool(jedisPoolConfig,"hadoop102",6379,10000);
System.out.println("開辟連接池");
return jedisPool.getResource();
}else {
System.out.println(" 連接池:" + jedisPool.getNumActive());
return jedisPool.getResource();
}
}
}
3.依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>4.14.3-HBase-1.4</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
學習交流,有任何問題還請隨時評論指出交流。
