官網解釋

通過讀取所有響應來同步管道。此操作將關閉管道。為了從流水線命令獲取返回值,請捕獲所執行命令的不同Response <?>。

通過讀取所有響應來同步管道。此操作將關閉管道。盡可能避免使用此版本,並使用ShardedJedisPipeline.sync(),因為它不會遍歷所有響應並生成正確的響應類型(通常是浪費時間)。
如果對返回順序沒有要求,建議使用sync()
附上通過管道從緩存取數據的實現:
/** * 通過key批量取數據 * * @param keys key列表 這里入參是緩存key,返回的map里面也用這個作為Key值 * @param clazz 轉成的對象 * @param <T> key對應的map數據 * @return */ @RunTimeLog public <T> Map<String, T> batchGetObject(List<String> keys, Class<T> clazz) { //創建接收對象格式 Map<String, T> responses = Maps.newHashMap(); Map<String, Response<String>> redisValueMap = Maps.newHashMap(); try { ShardedJedisPipeline pip = redisClient.pipeline(); keys.stream().filter(key -> CommonUtils.checkNotNullString(key)).forEach(key -> redisValueMap.put(key, pip.get(key))); //通過讀取所有響應來同步管道。此操作將關閉管道。為了從流水線命令獲取返回值,請捕獲所執行命令的不同Response <?>。 pip.sync(); //通過讀取所有響應來同步管道。此操作將關閉管道。盡可能避免使用此版本,並使用ShardedJedisPipeline.sync(),因為它不會遍歷所有響應並生成正確的響應類型(通常是浪費時間)。 //pip.syncAndReturnAll(); redisValueMap.keySet().forEach(key -> responses.put(key, JSON.parseObject(redisValueMap.get(key).get(), clazz))); } catch (Exception ex) { LOGGER.error("error:{}", ex); } return responses; }
/**
* 通過key批量取數據,如果取不到就用key帶到函數參數中去查,主要用於如果緩存沒有,再去數據庫里查
*
* @param keys key列表 這里入參是緩存key,返回的map里面也用這個作為Key值
* @param clazz 轉成的對象
* @param <T> key對應的map數據
* @return
*/
public synchronized <T> Map<String, T> batchGetObject(List<String> keys, Class<T> clazz, Function<String, T> function) {
Map<String, T> responses = this.batchGetObject(keys, clazz);
List<String> delKeys= Lists.newArrayList();
try {
responses.keySet().forEach(key -> {
if (responses.get(key) == null) {
T t = function.apply(key);
if (t != null) {
responses.put(key, t);
}else{
delKeys.add(key);
}
}
});
delKeys.forEach(key->{
responses.remove(key);
});
} catch (Exception ex) {
LOGGER.error("error:{}", ex);
}
return responses;
}
調用
/** * 批量根據緩存key去redis查詢數據,如果沒有查詢到數據,再去庫里查找 * @param storeCode * @param categoryCodes * @return */ public Map<String, StoreCategoryEntity> queryBatchStoreCategoryInfoByCategoryCode(String storeCode, List<String> categoryCodes) { Map<String, StoreCategoryEntity> map= Maps.newHashMap(); try { //循環批量生成key List<String> keys = Lists.newArrayList(); categoryCodes=categoryCodes.stream().distinct().collect(Collectors.toList()); categoryCodes.forEach(categoryCode -> { keys.add(CacheKeyBusiness.getStoreCategoryByCategoryCodePrefixKey(storeCode, categoryCode)); }); map=redisClientBusiness.batchGetObject(keys,StoreCategoryEntity.class,(key)->{ //SFSPC:STORE_CODE_CATEGORY_CODE:{storeCode}:{categoryCode} //實現當無法從緩存獲得數據時,解析key然后通過其他途徑獲得數據並返回 String[] info = key.split(":"); if (info.length != 4) { return null; }
//通過數據庫去加載,並返回對象 StoreCategoryEntity storeCategoryEntity = storeCategoryDao.queryStoreCategoryByCategoryCode(storeCode, info[3]); if (storeCategoryEntity == null) { return null; } redisClientBusiness.redisSet(key, JSON.toJSONString(storeCategoryEntity)); return storeCategoryEntity; }); } catch (Exception ex) { LOGGER.error("error:{}", ex); return map; } return map; }
