java中Jedis對redis內數據的批量獲取- hscan,Pipeline的使用- 常用工具類-RedisUtil


指出問題

在使用redis的時候,配合jedis使用,但是發現jedis的keys* 或者mget都會造成redis阻塞,所以使用了redis的解決方案Pipeline(管道)的方式進行對redis內數據的獲取。封裝了以下工具類。需要自取。或者提供好的方法可以留言,我可以寫進來。

初始化配置

先在application.yml配置redis基本信息

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    database: 0
    timeout: 5000

創建jedis連接池

創建RedisConfig.java

@Configuration
@Slf4j
public class RedisConfig {
    @Autowired
    private RedisProperties redisProperties; //從配置文件獲取redis連接配置

    private JedisPool jedisPool;

    @Autowired
    private JedisPoolConfig jedisPoolConfig;

    @Bean
    @Scope("prototype")
    /**
     * 多例模式返回redis客戶端,但是在依賴注入時,建議不要使用
     * 因為jedis客戶端是阻塞式IO的,而且線程不安全,
     * 所以在操作數據庫的時候不要使用單例,建議從連接池獲取
     */
    public redis.clients.jedis.Jedis jedis(){
        if (jedisPool!=null){
            return jedisPool.getResource();
        }else {
            return jedisPool().getResource();
        }
//        return jedisPool.getResource();
    }

    @Bean
    /**
     * 單例模式返回redis連接池
     * 依賴注入時,建議注入jedisPool,獲取連接池,然后調用getResource方法獲取jedis客戶端
     * 使用完后,調用jedis.close()方法歸還連接,節約資源
     */
    public JedisPool jedisPool(){
        synchronized (this){
            if(jedisPool==null){
                synchronized (this){
                    
                    if (StringUtils.isEmpty(redisProperties.getPassword())){
                        //無密碼
                        jedisPool = new JedisPool(jedisPoolConfig,redisProperties.getHost(),redisProperties.getPort(),(int)redisProperties.getTimeout().toMillis());
                    }else {//有密碼
                        jedisPool = new JedisPool(jedisPoolConfig,redisProperties.getHost(),redisProperties.getPort(),(int)redisProperties.getTimeout().toMillis(),redisProperties.getPassword());
                    }
                         
                }
            }
        }
        return jedisPool;
    }
    

    
    @Bean
    public JedisPoolConfig jedisPoolConfig(){
        // Jedis連接池配置
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(redisProperties.getLettuce().getPool().getMaxActive());
        // 最大空閑連接數, 默認8個
//        jedisPoolConfig.setMaxIdle(50);
        jedisPoolConfig.setMaxIdle(redisProperties.getLettuce().getPool().getMaxIdle());
        // 最大連接數, 默認8個
//        jedisPoolConfig.setMaxTotal(300);
        //最小空閑連接數, 默認0
        jedisPoolConfig.setMinIdle(redisProperties.getLettuce().getPool().getMinIdle());
//        jedisPoolConfig.setMinIdle(20);
        // 獲取連接時的最大等待毫秒數(如果設置為阻塞時BlockWhenExhausted),如果超時就拋異常, 小於零:阻塞不確定的時間,  默認-1
        jedisPoolConfig.setMaxWaitMillis(redisProperties.getLettuce().getPool().getMaxWait().toMillis()); // 設置10秒


//        jedisPoolConfig.setMaxWaitMillis(3000); // 設置2秒
        //對拿到的connection進行validateObject校驗
        
        jedisPoolConfig.setTestOnBorrow(true);
        return jedisPoolConfig;
    }
}

jedis工具類



@Component
@Slf4j
public class RedisUtil {
    @Autowired
    private JedisPool jedisPool;

    /**
     * findValueByKeyOfPipeline和 findValueByKeyOfPipelineMget 的區別在於 前者根據閾值獲取多次 后者是直接一次性獲取
     */


    /**
     * 根據(不完整)key值通過Pipeline的方式獲取值
     * 先通過scan獲取全部的key,再通過Pipeline獲取全部的值
     * 根據countVPT 這個 閾值來控制 獲取多次
     * (通過建立一個管道一次提交)
     * @param redisKey
     * @Auther erpangshou
     * @Date:2021/6/17 10:36
     **/
    public List<String> findValueByKeyOfPipeline(String redisKey) {
        Jedis jedis = null;
        List<String> values = new ArrayList<>();
        ArrayList<Response<List<String>>> responses = new ArrayList<>();
        int countVPT=ConstantCom.BATCH_SIZE;
        try {
            jedis = jedisPool.getResource();
            //從redis獲取值刷新航跡信息
            List<String> allKeys = findAllKeys(redisKey);
            if (Objects.isNull(allKeys) || allKeys.isEmpty()) {
                return values;
            }
            Pipeline pipelined = jedis.pipelined();
            ArrayList<String> strings = new ArrayList<>();
            for (String key : allKeys) {
                strings.add(key);
                if (strings.size()==countVPT){
                    ArrayList<String> subList = new ArrayList<>(strings);
                    String[] keys = subList.toArray(new String[subList.size()]);
                    Response<List<String>> mget = pipelined.mget(keys);
                    responses.add(mget);
                    strings=new ArrayList<>();
                }
            }
            //最后的數據
            String[] keys = strings.toArray(new String[strings.size()]);
            Response<List<String>> mget = pipelined.mget(keys);
            responses.add(mget);
            pipelined.sync();
            for (Response<List<String>> respons : responses) {
                values.addAll(respons.get());
            }

        } catch (Exception e) {
            e.printStackTrace();
            log.error("redis查詢異常:" + e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return values;
    }

    /**
     * 根據(不完整)key值通過Pipeline的方式獲取值
     * 先通過scan獲取全部的key,再通過Pipeline獲取全部的值
     * (通過Pipeline.mget直接獲取)
     * @param redisKey
     * @Auther erpangshou
     * @Date:2021/6/17 10:36
     **/
    public List<String> findValueByKeyOfPipelineMget(String redisKey) {
        Jedis jedis = null;
        List<String> values = new ArrayList<>();
        ArrayList<Response<List<String>>> responses = new ArrayList<>();
        try {
            jedis = jedisPool.getResource();
            //從redis獲取值刷新航跡信息
            List<String> allKeys = findAllKeys(redisKey);
            if (Objects.isNull(allKeys) || allKeys.isEmpty()) {
                return values;
            }
//            allKeys=allKeys.subList(0,5000);
            String[] keys = allKeys.toArray(new String[allKeys.size()]);
            Pipeline pipelined = jedis.pipelined();
            Response<List<String>> mget = pipelined.mget(keys);
            pipelined.sync();
            values=mget.get();
        } catch (Exception e) {
            log.error("redis查詢異常:" + e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return values;
    }


    /**
     * jedis的met 
     * @param redisKey
     * @Auther erpangshou
     * @return
     */
    public List<String> mget(String redisKey) {
        Jedis jedis = null;
        List<String> values = new ArrayList<>();
        try {
            jedis = jedisPool.getResource();
            //從redis獲取值刷新航跡信息
            List<String> allKeys = findAllKeys(redisKey);
            if (Objects.isNull(allKeys) || allKeys.isEmpty()) {
                return values;
            }
            String[] keys = allKeys.toArray(new String[allKeys.size()]);
            List<String> mget1 = jedis.mget(keys);
            values=mget1;

        } catch (Exception e) {
            log.error("redis查詢異常:" + e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return values;
    }

    /**
     * 根據(不完整)key值通過Pipeline的方式獲取值   --使用傳入jedis的方式
     * 先通過scan獲取全部的key,再通過Pipeline獲取全部的值
     *
     * @param redisKey
     * @Auther erpangshou
     * @Date:2021/6/17 10:36
     **/
    public List<String> findValueByKeyOfPipeline(String redisKey, Jedis jedis) {
        List<String> values = new ArrayList<>();
        try {
            //從redis獲取值刷新航跡信息
            List<String> allKeys = findAllKeys(redisKey);
            if (allKeys == null || allKeys.isEmpty()) {
                return values;
            }
            //此處采用Pipeline,以提升性能
            Pipeline pipelined = jedis.pipelined();
            Response<List<String>> mget = pipelined.mget(allKeys.toArray(new String[allKeys.size()]));
            pipelined.sync();
            values = mget.get();
        } catch (Exception e) {
            log.error("redis查詢異常:" + e.getMessage());
        }
        return values;
    }

    /**
     * 根據多個完整(精確)key值通過Pipeline的方式獲取值
     * 使用完整的key通過Pipeline獲取全部的值
     *
     * @param allKeys
     * @Auther erpangshou
     * @Date 2021/6/17  10:33
     */
    public List<String> findAllkeysByKeysOfPipeline(String[] allKeys) {
        Jedis jedis = null;
        List<String> values = new ArrayList<>();
        int countVPT=ConstantCom.BATCH_SIZE;

        try {
            jedis = jedisPool.getResource();
            ArrayList<Response<List<String>>> responses = new ArrayList<>();
            //從redis獲取值刷新航跡信息
            if (Objects.isNull(allKeys) || allKeys.length == 0) {
                return values;
            }
            //此處采用Pipeline,以提升性能
            Pipeline pipelined = jedis.pipelined();
            ArrayList<String> strings = new ArrayList<>();
            for (String key : allKeys) {
                strings.add(key);
                if (strings.size()==countVPT){
                    ArrayList<String> subList = new ArrayList<>(strings);
                    String[] keys = subList.toArray(new String[subList.size()]);
                    Response<List<String>> mget = pipelined.mget(keys);
                    responses.add(mget);
                    strings=new ArrayList<>();
                }
            }
            //最后的數據
            String[] keys = strings.toArray(new String[strings.size()]);
            Response<List<String>> mget = pipelined.mget(keys);
            responses.add(mget);
            pipelined.sync();

            for (Response<List<String>> respons : responses) {
                values.addAll(respons.get());
            }

        } catch (Exception e) {
            log.error("redis查詢異常:" + e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return values;
    }



    /**
     * 根據多個完整(精確)key值通過Pipeline的方式獲取值  --使用傳入jedis的方式
     * 使用完整的key通過Pipeline獲取全部的值
     *
     * @param redisKeys
     * @Auther erpangshou
     * @Date 2021/6/17  10:33
     */
    public List<String> findAllkeysByKeysOfPipeline(String[] redisKeys, Jedis jedis) {
        List<String> values = new ArrayList<>();
        try {
            //從redis獲取值刷新航跡信息
            if (redisKeys == null || redisKeys.length == 0) {
                return values;
            }
            //此處采用Pipeline,以提升性能
            Pipeline pipelined = jedis.pipelined();
            Response<List<String>> mget = pipelined.mget(redisKeys);
            pipelined.sync();
            values = mget.get();
        } catch (Exception e) {
            log.error("redis查詢異常:" + e.getMessage());
        }
        return values;
    }

    /**
     * 根據keys 獲取value
     *
     * @param pattern
     * @Auther erpangshou
     */
    public List<String> findValueByKey(String pattern) {
        List<String> values = null;
        List<String> keys = findAllKeys(pattern);
        if (keys != null && keys.size() > 0) {
            String[] strings = keys.toArray(new String[keys.size()]);
            values = findAllkeysByKeysOfPipeline(strings);
        }
        return values;
    }

    /**
     * 根據keys 獲取value  --使用傳入jedis的方式
     *
     * @param pattern
     * @Auther erpangshou
     */
    public List<String> findValueByKey(String pattern, Jedis jedis) {
        List<String> values = null;
        List<String> keys = findAllKeys(pattern);
        if (keys != null && keys.size() > 0) {
            String[] strings = keys.toArray(new String[keys.size()]);
            values = findAllkeysByKeysOfPipeline(strings, jedis);
        }
        return values;
    }

    /**
     * 根據keys 獲取value 已map形式返回
     *
     * @param pattern
     * @Auther erpangshou
     */
    public List<Map> findValuetoMapByKey(String pattern, Jedis jedis) {
        List<String> values = new ArrayList<>();
        ArrayList<Map> mapList = new ArrayList<>();
        try {
            List<String> keys = findAllKeys(pattern);
            if (keys != null && keys.size() > 0) {
                String[] strings = keys.toArray(new String[keys.size()]);
                values = findAllkeysByKeysOfPipeline(strings);
            }

            if (ObjectUtil.isNotEmpty(values)) {
                for (String value : values) {
                    Map<String, Object> map = ((JSONObject) JSONObject.parse(value)).toJavaObject(Map.class);
                    mapList.add(map);
                }
            }

        } catch (Exception e) {
            jedis.close();
            e.printStackTrace();
        }
        return mapList;
    }

    /**
     * 因為Redis是單線程jedis.keys()方法會導致數據庫阻塞,
     * 可能導致Redis其它業務無法操作,所有采用迭代模式(scan)獲取數據
     * 並且keys()方法的時間復雜度為O(n),scan()時間復雜度為O(1)
     */
    public List<String> findAllKeys(String pattern) {
        Jedis jedis = null;
        List<String> total = null;
        try {
            jedis = jedisPool.getResource();
            String cursor = String.valueOf(0);
            total = new ArrayList<>();
            ScanParams params = new ScanParams();
            params.match(pattern);
            do {
                params.count(ConstantCom.BATCH_SIZE);
                ScanResult<String> result = jedis.scan(cursor, params);
                cursor = result.getStringCursor();
                total.addAll(result.getResult());
            } while (Integer.valueOf(cursor) > 0);
        } catch (NumberFormatException e) {
            e.printStackTrace();
            log.error("批量獲取redis的key失敗");
        } finally {
            if (ObjectUtil.isNotEmpty(jedis)) {
                jedis.close();
            }
        }
        return total;
    }

    /**
     * 獲取全都的key --使用傳入jedis的方式
     * 因為Redis是單線程jedis.keys()方法會導致數據庫阻塞,
     * 可能導致Redis其它業務無法操作,所有采用迭代模式(scan)獲取數據
     * 並且keys()方法的時間復雜度為O(n),scan()時間復雜度為O(1)
     */
    public List<String> findAllKeys(String pattern, Jedis jedis) {
        List<String> total = null;
        try {
            String cursor = String.valueOf(0);
            total = new ArrayList<>();
            ScanParams params = new ScanParams();
            params.match(pattern);
            do {
                params.count(ConstantCom.BATCH_SIZE);
                ScanResult<String> result = jedis.scan(cursor, params);
                cursor = result.getStringCursor();
                total.addAll(result.getResult());
            } while (Integer.valueOf(cursor) > 0);
        } catch (NumberFormatException e) {
            e.printStackTrace();
            log.error("批量獲取redis的key失敗");
        }
        return total;
    }

    /**
     * 取代jedis.keys(*)
     *
     * @param pattern
     * @return
     */
    public Set<String> findAllKeysToSet(String pattern) {
        Jedis jedis = null;
        Set<String> total = null;
        try {
            jedis = jedisPool.getResource();
            String cursor = String.valueOf(0);
            total = new HashSet<String>() {
            };
            ScanParams params = new ScanParams();
            params.match(pattern);
            do {
                params.count(ConstantCom.BATCH_SIZE);
                ScanResult<String> result = jedis.scan(cursor, params);
                cursor = result.getStringCursor();
                total.addAll(result.getResult());
            } while (Integer.valueOf(cursor) > 0);
        } catch (NumberFormatException e) {
            e.printStackTrace();
            log.error("批量獲取redis的key失敗");

        } finally {
            if (ObjectUtil.isNotEmpty(jedis)) {
                jedis.close();
            }
        }
        return total;
    }

    /**
     * 取代jedis.keys(*)  --使用傳入jedis的方式
     * @param pattern
     * @return
     */
    public Set<String> findAllKeysToSet(String pattern, Jedis jedis) {
        Set<String> total = null;
        try {
            String cursor = String.valueOf(0);
            total = new HashSet<String>();
            ScanParams params = new ScanParams();
            params.match(pattern);
            do {
                params.count(ConstantCom.BATCH_SIZE);
                ScanResult<String> result = jedis.scan(cursor, params);
                cursor = result.getStringCursor();
                total.addAll(result.getResult());
            } while (Integer.valueOf(cursor) > 0);
        } catch (NumberFormatException e) {
            e.printStackTrace();
            log.error("批量獲取redis的key失敗");

        }
        return total;
    }

    /**
     * 實時數據取出,封裝成指定的對象集合
     *
     * @param list        redis取出的value集合
     * @param entityClass 需要封裝的對象類型
     * @param <T>
     * @return
     */
    public <T> List<T> valueToClass(List<String> list, Class<T> entityClass) {
        List<T> result = new ArrayList<>();
        for (String value : list) {
            Map<String, Object> map = ((JSONObject) JSONObject.parse(value)).toJavaObject(Map.class);
            Set<String> keySet = map.keySet();
            // 將所有key轉換為小寫
            Map<String, Object> map1 = new HashMap<>();
            for (String set : keySet) {
                map1.put(set.toLowerCase(), map.get(set));
            }
            T instance = null;
            try {
                instance = entityClass.newInstance();
                BeanUtils.populate(instance, map1);
            } catch (InstantiationException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
            result.add(instance);
        }
        return result;

    }

    public List<Map<String, Object>> valueToMap(List<String> list) {
        List<Map<String, Object>> result = new ArrayList<>();
        for (String value : list) {
            if (org.apache.commons.lang3.StringUtils.isNotBlank(value)) {
                Map<String, Object> map = ((JSONObject) JSONObject.parse(value)).toJavaObject(Map.class);
                result.add(map);
            }
        }
        return result;
    }

    /**
     * @description redis常用方法封裝get
     * @author erpangshou
     * @date 2021/6/11
     **/
    public String get(String key) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (!Objects.isNull(jedis)) {
                jedis.close();
            }
        }
        return null;
    }

    /**
     * @description redis常用方法封裝set
     * @author erpangshou
     * @date 2021/6/11
     **/
    public String set(String key, String value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.set(key, value);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (!Objects.isNull(jedis)) {
                jedis.close();
            }
        }
        return null;
    }
    

    public String hget(String key, String field) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.hget(key, field);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (!Objects.isNull(jedis)) {
                jedis.close();
            }
        }
        return null;
    }

    /**
     * 獲取redis的hash值
     * 用scan方式模仿hgetAll
     * @param pattern key
     * @Auther erpangshou
     * @return
     */

    public Map<String, String> hScan(String pattern) {
        long startTime = System.currentTimeMillis();
        Jedis jedis = null;

        Map<String, String> result = new HashMap<>();
        List<Map.Entry<String, String>> results = new ArrayList<Map.Entry<String, String>>();
        try {
            jedis = jedisPool.getResource();
            String cursor = String.valueOf(0);
            ScanParams params = new ScanParams();
            params.count(ConstantCom.BATCH_SIZE);

            do {
                ScanResult<Map.Entry<String, String>> scanResult = jedis.hscan(pattern, cursor, params);
                cursor = scanResult.getStringCursor();
                scanResult.getResult().stream().forEach(entry->result.put(entry.getKey(),entry.getValue()));
            } while (Integer.valueOf(cursor) > 0);
        } catch (NumberFormatException e) {
            e.printStackTrace();
            log.error("批量獲取redis的key失敗");
        } finally {
            if (ObjectUtil.isNotEmpty(jedis)) {
                jedis.close();
            }
        }
        System.out.println("hscansize:"+result.size());
        System.out.println("hscantime:"+(System.currentTimeMillis()-startTime));
        return result;
    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM