Flink實時維表查詢優化-旁路緩存


一、背景說明:

在目前實時數倉中,由於維表具有主鍵唯一性的特點,Hbase/Redis通常作為維表存放選擇

  • Hbase:數據存於磁盤具有持久性但是查詢效率慢。
  • Redis:數據存於內存查詢效率高,但維表多數據量大時候占用資源多。

基於旁路緩存思想,對維表存儲的優化的思路為:維表數據存儲在Hbase,使用Redis作為緩存,但查詢維表時有限查詢Redis,如果沒有該維表則去Hbase查詢后並將維表數據放入Redis,並按一定時間保存,超過時間Redis自動清理(可使不常用維表無需常駐內存,缺點是首次查詢較慢):
在這里插入圖片描述

Phoenix支持SQL語法,使用Hbase可以通過Phoenix操作更簡便。

二、代碼實現

示例維表如下,使用直接查詢及旁路緩存兩種方式,對比用時。

0: jdbc:phoenix:hadoop102> select * from GMALL0820_REALTIME.DIM_BASE_PROVINCE;
+-----+-------+------------+------------+-----------+-------------+
| ID  | NAME  | REGION_ID  | AREA_CODE  | ISO_CODE  | ISO_3166_2  |
+-----+-------+------------+------------+-----------+-------------+
| 1   | 北京    | 1          | 110000     | CN-11     | CN-BJ       |

1.不使用旁路緩存,直接通過Phoenix查詢hbase維表數據示例:

package com.test.cacheside;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.List;

public class DimUtilWithNoCache {
    //從phoenix查詢數據  入參使用動態參數,可以實現多條件查詢
    public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String,String>... cloNameAndValue){

        //拼接SQL語句
        String wheresql = " where ";
        for (int i = 0; i < cloNameAndValue.length; i++) {
            Tuple2<String,String> tuple = cloNameAndValue[i];
            if (i>0){
                wheresql+="and";    //多條件查詢拼接and條件
            }
            wheresql += tuple.f0 + "='" + tuple.f1 + "'";
        }
        String sql = "select * from " + tableName + wheresql;
        System.out.println("查詢SQL為:" + sql);
        JSONObject dimJsonObj = null;
        List<JSONObject> dimList = PhoenixUtil.queryList(sql, JSONObject.class);

        if (dimList!=null || dimList.size()>0){
            dimJsonObj = dimList.get(0);
        }else{
            System.out.println("維度數據沒有找到:" + sql);
        }
        return dimJsonObj;
    }
    public static void main(String[] args) {
        long startTime=System.currentTimeMillis();   //獲取開始時間
        System.out.println(getDimInfoNoCache("DIM_BASE_PROVINCE", Tuple2.of("id", "1")));
        long endTime=System.currentTimeMillis();   //獲取結束時間
        System.out.println("程序運行時間: "+(endTime-startTime)+"ms");
    }
}

2.使用旁路緩存實現:

package com.test.cacheside;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import java.util.List;

public class DimUtilWithCache {
    //方法重載的思路,減少調用時候的輸入
    public static JSONObject getDimInfo(String tableName, String id){
        return getDimInfo(tableName, Tuple2.of("id",id));
    }
    /*
    優化,從phoenix查詢數據,加入旁路緩存(Redis),先從緩存查詢,如果緩存沒有查到數據,再到Phoenix查詢,並將查詢結果放到緩存
    redis
        類型      string
        Key      dim:表名:值      例如:dim:DIM_BASE_PROVINCE:10_xxx
        value    通過PhoenixUtil到維度表中查詢數據,取出第一條並將其轉換為json字符串
        失效時間: 24*3600
    */
    //入參使用Tuple及動態參數,可以實現多條件查詢
    public static JSONObject getDimInfo(String tableName, Tuple2<String,String>... cloNameAndValue){

        String wheresql = " where ";
        String redisKey = "dim:" + tableName.toLowerCase() + ":";
        for (int i = 0; i < cloNameAndValue.length; i++) {
            Tuple2<String,String> tuple = cloNameAndValue[i];
            if (i > 0){
                wheresql += "and";
                redisKey += "_";
            }
            wheresql += tuple.f0 + "='" + tuple.f1 + "'";
            redisKey += tuple.f1;
        }
        //從Redis中獲取數據
        Jedis jedis = null;
        String dimJsonStr = null;
        JSONObject dimJsonObj = null;

        try {
            //獲取jedis客戶端
            jedis = RedisUtil.getJedis();
            //根據key到redis中查詢
            dimJsonStr = jedis.get(redisKey);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("從Redis中查詢維度失敗!");
        }
        //判斷是否從Redis中查詢到數據
        if (dimJsonStr != null && dimJsonStr.length()>0){
            dimJsonObj = JSON.parseObject(dimJsonStr);
        }else{
            //Redis沒有查到數據,到phoenix中查詢
            String sql = "select * from " + tableName + wheresql;
            System.out.println("查詢SQL為:" + sql);

            List<JSONObject> dimList = PhoenixUtil.queryList(sql, JSONObject.class);

            if (dimList!=null || dimList.size()>0){
                dimJsonObj = dimList.get(0);
                //將查詢出來的數據放到Redis中緩存起來
                if (jedis!=null){
                    jedis.setex(redisKey,3600*24,dimJsonObj.toJSONString());
                }
            }else{
                System.out.println("維度數據沒有找到:" + sql);
            }
        }
        //關閉jedis
        if (jedis!=null){
            jedis.close();
        }
        return dimJsonObj;
    }
    public static void main(String[] args) {
        long startTime=System.currentTimeMillis();   //獲取開始
        System.out.println(DimUtilWithCache.getDimInfo("DIM_BASE_PROVINCE","1"));
        long endTime=System.currentTimeMillis();   //獲取結束時間
        System.out.println("程序運行時間: "+(endTime-startTime)+"ms");
    }
}

查詢Redis,可以看到結果如下:

127.0.0.1:6379> keys *
1) "dim:dim_base_province:1"
127.0.0.1:6379> get dim:dim_base_province:1
"{\"REGION_ID\":\"1\",\"ISO_CODE\":\"CN-11\",\"ISO_3166_2\":\"CN-BJ\",\"ID\":\"1\",\"AREA_CODE\":\"110000\",\"NAME\":\"\xe5\x8c\x97\xe4\xba\xac\"}"

ps:首次運行旁路緩存的查詢時間較慢,后續查詢這是基於Redis查詢。

三:其他工具類代碼及依賴補充說明:

1.PhoenixUtil(從Phoenix查詢數據工具類)

package com.test.cacheside;

import org.apache.commons.beanutils.BeanUtils;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class PhoenixUtil {

    private static Connection conn = null;
    public static void init(){
        try {
            //注冊驅動
            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
            //獲取Phoenix的連接
            conn = DriverManager.getConnection("jdbc:phoenix:hadoop102:2181");
            //指定操作的表空間
            conn.setSchema("GMALL0820_REALTIME");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //從Phoenix查詢數據
    public static <T> List<T> queryList(String sql, Class<T> clazz){

        List<T> resultList = new ArrayList<T>();
        if (conn==null){
            init();
        }
        //獲取數據庫操作對象
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            ps = conn.prepareStatement(sql);
            //執行SQL語句
            rs = ps.executeQuery();
            //處理結果集
            //查詢元數據信息
            ResultSetMetaData metaData = rs.getMetaData();

            //判斷結果集中是否存在數據,如果有,那么進行一次循環
            while (rs.next()) {
                //創建一個對象,用於封裝查詢出來的一條結果集中的數據
                T obj = clazz.newInstance(); //不知道字段類型,使用反射獲取
                //對查詢的所有列進行遍歷,獲取每一列的名稱
                for (int i = 1; i <= metaData.getColumnCount(); i++) {
                    String columnName = metaData.getColumnName(i);
                    BeanUtils.setProperty(obj,columnName,rs.getObject(i));
                }
                //將當前結果中的一行數據封裝的obj對象放到list集合中
                resultList.add(obj);
            }
        }catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("從維度表取數據錯誤!");
        }
        finally {
            if (rs!=null){
                try {
                    rs.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
            if (ps!=null){
                try {
                    ps.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
        }
        return resultList;
    }
}

2.RedisUtil(通過jdispool連接Redis)

package com.test.cacheside;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {
    private static JedisPool jedisPool;

    public static Jedis getJedis(){
        if (jedisPool == null){
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

            jedisPoolConfig.setMaxTotal(100); //最大可用連接數
            jedisPoolConfig.setBlockWhenExhausted(true); //連接耗盡是否等待
            jedisPoolConfig.setMaxWaitMillis(2000); //等待時間
            jedisPoolConfig.setMaxIdle(5); //最大閑置連接數
            jedisPoolConfig.setMinIdle(5); //最小閑置連接數
            jedisPoolConfig.setTestOnBorrow(true); //取連接的時候進行一下測試 ping pong

            jedisPool = new JedisPool(jedisPoolConfig,"hadoop102",6379,10000);
            System.out.println("開辟連接池");
            return jedisPool.getResource();
        }else {
            System.out.println(" 連接池:" + jedisPool.getNumActive());
            return jedisPool.getResource();
        }
    }
}

3.依賴

       <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.0</version>
        </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.68</version>
    </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
            <version>1.9.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>4.14.3-HBase-1.4</version>
            <exclusions>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

學習交流,有任何問題還請隨時評論指出交流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM