1 概述
(1)板塊含義
- 紅色部分:Redis相關(Redis的抽象使用相關)
- 橙色部分:Jedis相關(Redis的具體實現相關)
- 藍色部分:Jedis的連接池的底層依賴(是用來apache的基礎工具類)
(2)核心講解
-
RedisTemplate:Spring中用於操作Redis工具類。
- 根據配置切換Redis客戶端;
- 根據配置切換單點、sentinel、cluster模式;
- 通過配置不同的RedisConnectionFactory,返回不同的RedisConnection。
-
RedisConnectionFactory:RedisConnection工廠類。
-
RedisConnection:單點模式或者說通用連接類;
-
RedisSentialConnection:Sentinel模式連接類;
-
RedisClusterConnection:Cluster模式連接類。
-
-
Redis客戶端:實際連接Redis干活的底層工具人,類比MySQL數據庫的hikari、druid、c3p0、dbcp。
- Jedis:當多線程使用同一個連接時,是線程不安全的。所以要使用連接池,為每個jedis實例分配一個連接。
- lettuce:當多線程使用同一連接實例時,是線程安全的。
注:單點模式即單台Redis;Sentinel和Cluster模式都為集群:不同的是Sentinel模式集群每個節點存全量數據;Cluster模式集群每個節點存部分數據,合計為全量且至少需要6個節點,3台機器(3主3備)。
2 RedisTemplate執行命令過程(Jedis客戶端為例)
- RedisTemplate的使用
@Service
public class DictCacheService{
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private DictService dictService;
// 字典緩存:先取Redis緩存,后找DB並緩存至Redis。
public Object get(String key){
Object result = redisTemplate.opsForValue().get(key);
if(result == null){
result = dictService.get(key);
redisTemplate.opsForValue().set(key,value);
}
return result;
}
}
- RedisTemplate繼承關系
public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>{
// cache singleton objects (where possible)
private ValueOperations<K, V> valueOps;
// 1. 獲取ValueOperations
public ValueOperations<K, V> opsForValue() {
if (valueOps == null) {
valueOps = new DefaultValueOperations<K, V>(this);
}
return valueOps;
}
// ...
}
// 2. RedisTemplate實現類RedisOperations接口
public interface RedisOperations<K, V> {
ValueOperations<K, V> opsForValue();
// ...
}
// 3. RedisTemplate.opsForValue()返回接口
public interface ValueOperations<K, V> {
void set(K key, V value);
V get(Object key);
// ...
}
// 4. RedisTemplate.opsForValue()具體返回類型
class DefaultValueOperations<K, V> extends AbstractOperations<K, V> implements ValueOperations<K, V> {
// 5. 具體執行方法
public V get(final Object key) {
// 6. 調用父類(AbstractOperations)的execute方法
return execute(new ValueDeserializingRedisCallback(key) {
protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
return connection.get(rawKey);
}
}, true);
}
public void set(K key, V value) {
final byte[] rawValue = rawValue(value);
execute(new ValueDeserializingRedisCallback(key) {
protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
connection.set(rawKey, rawValue);
return null;
}
}, true);
}
}
// 7. DefaultValueOperations的父類:AbstractOperations類
abstract class AbstractOperations<K, V> {
RedisTemplate<K, V> template;
// DefaultValueOperations.get()會使用到
abstract class ValueDeserializingRedisCallback implements RedisCallback<V> {
private Object key;
public ValueDeserializingRedisCallback(Object key) {
this.key = key;
}
public final V doInRedis(RedisConnection connection) {
byte[] result = inRedis(rawKey(key), connection);
return deserializeValue(result);
}
protected abstract byte[] inRedis(byte[] rawKey, RedisConnection connection);
}
// 8. AbstractOperations.execute方法
<T> T execute(RedisCallback<T> callback, boolean b) {
// 9. 實際調用RedisTemplate的execute方法,回到RedisTemplate類看。
return template.execute(callback, b);
}
}
// 回到RedisTemplate類
public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>{
// 10. RedisTemplate.execute方法
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
// 11. 獲取ConnectionFacotry
RedisConnectionFactory factory = getConnectionFactory();
// 通過factory獲取RedisConnection(新建或復用已存在的)
RedisConnection conn = null;
try {
if (enableTransactionSupport) {
conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
} else {
conn = RedisConnectionUtils.getConnection(factory);
}
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
RedisConnection connToUse = preProcessConnection(conn, existingConnection);
// ...
RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
// 13. 調用具體Action的doInRedis方法,如set
T result = action.doInRedis(connToExpose);
// ...
return postProcessResult(result, connToUse, existingConnection);
} finally {
RedisConnectionUtils.releaseConnection(conn, factory);
}
}
}
// 12. RedisTemplate的父類:RedisAccessor類
public class RedisAccessor implements InitializingBean {
private RedisConnectionFactory connectionFactory;
public RedisConnectionFactory getConnectionFactory() {
return connectionFactory;
}
// ...
}
- 以JedisClusterConnection為例
public class JedisClusterConnection implements RedisClusterConnection {
// 由RedisConnectionFacotry
private final JedisCluster cluster;
// ...
// RedisConnection.get()實際執行了JedisCluster.get()
@Override
public byte[] get(byte[] key) {
return cluster.get(key);
}
// ...
}
public interface RedisClusterConnection extends RedisConnection, RedisClusterCommands {
// ...
}
// JedisCluster何時創建?
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
private JedisCluster cluster;
private RedisClusterConfiguration clusterConfig;
public void afterPropertiesSet() {
if (shardInfo == null) {
shardInfo = new JedisShardInfo(hostName, port);
if (StringUtils.hasLength(password)) {
shardInfo.setPassword(password);
}
if (timeout > 0) {
setTimeoutOn(shardInfo, timeout);
}
}
if (usePool && clusterConfig == null) {
this.pool = createPool();
}
if (clusterConfig != null) {
// 1. cluster初始化
this.cluster = createCluster();
}
}
private JedisCluster createCluster() {
// 2. 創建cluster
JedisCluster cluster = createCluster(this.clusterConfig, getPoolConfig());
JedisClusterConnection.JedisClusterTopologyProvider topologyProvider = new JedisClusterConnection.JedisClusterTopologyProvider(cluster);
this.clusterCommandExecutor = new ClusterCommandExecutor(topologyProvider,
new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster, topologyProvider), EXCEPTION_TRANSLATION);
return cluster;
}
protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig, GenericObjectPoolConfig poolConfig) {
Set<HostAndPort> hostAndPort = new HashSet<HostAndPort>();
for (RedisNode node : clusterConfig.getClusterNodes()) {
hostAndPort.add(new HostAndPort(node.getHost(), node.getPort()));
}
int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects().intValue() : 5;
// 3. 調用構造方法創建
return StringUtils.hasText(getPassword())
? new JedisCluster(hostAndPort, timeout, timeout, redirects, password, poolConfig)
: new JedisCluster(hostAndPort, timeout, redirects, poolConfig);
}
// 4. 在獲取Connection的時候帶入JedisCluster引用
@Override
public RedisClusterConnection getClusterConnection() {
return new JedisClusterConnection(cluster, clusterCommandExecutor);
}
}
// 5.JedisCluster.get()實際調用Jedis.get()
public class JedisCluster extends BinaryJedisCluster implements JedisCommands, MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
public String get(final String key) {
// connectionHandler為JedisCluster的父類的屬性
return (String)(new JedisClusterCommand<String>(this.connectionHandler, this.maxAttempts) {
public String execute(Jedis connection) {
return connection.get(key);
}
}).run(key);
}
}
// 6. JedisCluster的父類
public class BinaryJedisCluster implements BasicCommands, BinaryJedisClusterCommands, MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {
protected JedisClusterConnectionHandler connectionHandler;
public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig) {
this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig, timeout);
this.maxAttempts = maxAttempts;
}
}
// 7. JedisClusterCommand類
public abstract class JedisClusterCommand<T> {
private JedisClusterConnectionHandler connectionHandler;
private int maxAttempts;
private ThreadLocal<Jedis> askConnection = new ThreadLocal<Jedis>();
public abstract T execute(Jedis connection);
public T run(String key) {
return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
}
private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
Jedis connection = null;
try {
if (asking) {
connection = askConnection.get();
connection.asking();
asking = false;
} else {
if (tryRandomNode) {
// scential方式采用random,因為每個節點保存全量數據。
connection = connectionHandler.getConnection();
} else {
// cluster方式不能采用random方式,需計算key所在的節點索引值。
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
}
return execute(connection);
} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
releaseConnection(connection);
connection = null;
if (attempts <= 1) {
this.connectionHandler.renewSlotCache();
throw jce;
}
return runWithRetries(key, attempts - 1, tryRandomNode, asking);
} catch (JedisRedirectionException jre) {
if (jre instanceof JedisMovedDataException) {
this.connectionHandler.renewSlotCache(connection);
}
releaseConnection(connection);
connection = null;
if (jre instanceof JedisAskDataException) {
asking = true;
askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
}
return runWithRetries(key, attempts - 1, false, asking);
} finally {
releaseConnection(connection);
}
}
}
public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
// 通過slot返回緩存的Jedis對象
public Jedis getConnectionFromSlot(int slot) {
JedisPool connectionPool = this.cache.getSlotPool(slot);
if (connectionPool != null) {
return connectionPool.getResource();
} else {
this.renewSlotCache();
connectionPool = this.cache.getSlotPool(slot);
return connectionPool != null ? connectionPool.getResource() : this.getConnection();
}
}
}
// 何時緩存的Jedis對象?
public abstract class JedisClusterConnectionHandler implements Closeable {
protected final JedisClusterInfoCache cache;
// 構造方法中新建了緩存類並初始化Jedis進行緩存
public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
initializeSlotsCache(nodes, poolConfig, password);
}
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
for (HostAndPort hostAndPort : startNodes) {
// 初始化Jedis進行緩存
Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
cache.discoverClusterNodesAndSlots(jedis);
// 由於是集群可以一次性全部找出來,直接break返回即可。
break;
}
}
}
// Redis Cluster模式下信息緩存類
public class JedisClusterInfoCache {
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
public JedisPool getSlotPool(int slot) {
r.lock();
try {
return slots.get(slot);
} finally {
r.unlock();
}
}
// slots的初始化
public void discoverClusterNodesAndSlots(Jedis jedis) {
w.lock();
try {
reset();
List<Object> slots = jedis.clusterSlots();
for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;
if (slotInfo.size() <= MASTER_NODE_INDEX) {
continue;
}
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
// hostInfos
int size = slotInfo.size();
for (int i = MASTER_NODE_INDEX; i < size; i++) {
List<Object> hostInfos = (List<Object>) slotInfo.get(i);
if (hostInfos.size() <= 0) {
continue;
}
HostAndPort targetNode = generateHostAndPort(hostInfos);
setupNodeIfNotExist(targetNode);
// 主節點索引
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
}
}
}
} finally {
w.unlock();
}
}
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = setupNodeIfNotExist(targetNode);
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
} finally {
w.unlock();
}
}
}
public class JedisPool extends Pool<Jedis> {
@Override
public Jedis getResource() {
Jedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
}
// 點到為止,后續有空補充,這部分已經不屬於Redis相關的包,而是基礎實現的工具包類。
package org.apache.commons.pool2.impl;
public abstract class Pool<T> implements Closeable {
protected GenericObjectPool<T> internalPool;
public T getResource() {
return internalPool.borrowObject();
}
}
3 RedisTemplate執行操作代理問題
(1)重寫RedisTemplate
- 存放路徑
ValueOperations、ZSetOperations等
都是非public訪問級別,所以無法在項目包中訪問。
放在在org.springframework.data.redis.core
,繞過XxxOperations類的訪問問題。
src/main/java
└- org.springframework.data.redis.core
└- TracingRedisTemplate
- TracingRedisTemplate
package org.springframework.data.redis.core;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import javax.annotation.PostConstruct;
/**
* 鏈路追蹤用RedisTemplate
*
* @author 80273435
* @date 2020-05-19 14-01
*/
public class TracingRedisTemplate<K, V> extends RedisTemplate<K, V> {
@Autowired
private ApplicationContext context;
private RedisTemplate proxy;
@PostConstruct
public void init() {
proxy = context.getBean(RedisTemplate.class);
}
// cache singleton objects (where possible)
private ValueOperations<K, V> valueOps;
private ListOperations<K, V> listOps;
private SetOperations<K, V> setOps;
private ZSetOperations<K, V> zSetOps;
private GeoOperations<K, V> geoOps;
private HyperLogLogOperations<K, V> hllOps;
public BoundValueOperations<K, V> boundValueOps(K key) {
return new DefaultBoundValueOperations<K, V>(key, proxy);
}
public ValueOperations<K, V> opsForValue() {
if (valueOps == null) {
valueOps = new DefaultValueOperations<K, V>(proxy);
}
return valueOps;
}
public ListOperations<K, V> opsForList() {
if (listOps == null) {
listOps = new DefaultListOperations<K, V>(proxy);
}
return listOps;
}
public BoundListOperations<K, V> boundListOps(K key) {
return new DefaultBoundListOperations<K, V>(key, proxy);
}
public BoundSetOperations<K, V> boundSetOps(K key) {
return new DefaultBoundSetOperations<K, V>(key, proxy);
}
public SetOperations<K, V> opsForSet() {
if (setOps == null) {
setOps = new DefaultSetOperations<K, V>(proxy);
}
return setOps;
}
public BoundZSetOperations<K, V> boundZSetOps(K key) {
return new DefaultBoundZSetOperations<K, V>(key, proxy);
}
public ZSetOperations<K, V> opsForZSet() {
if (zSetOps == null) {
zSetOps = new DefaultZSetOperations<K, V>(proxy);
}
return zSetOps;
}
@Override
public GeoOperations<K, V> opsForGeo() {
if (geoOps == null) {
geoOps = new DefaultGeoOperations<K, V>(proxy);
}
return geoOps;
}
@Override
public BoundGeoOperations<K, V> boundGeoOps(K key) {
return new DefaultBoundGeoOperations<K, V>(key, proxy);
}
@Override
public HyperLogLogOperations<K, V> opsForHyperLogLog() {
if (hllOps == null) {
hllOps = new DefaultHyperLogLogOperations<K, V>(proxy);
}
return hllOps;
}
public <HK, HV> BoundHashOperations<K, HK, HV> boundHashOps(K key) {
return new DefaultBoundHashOperations<K, HK, HV>(key, proxy);
}
public <HK, HV> HashOperations<K, HK, HV> opsForHash() {
return new DefaultHashOperations<K, HK, HV>(proxy);
}
public ClusterOperations<K, V> opsForCluster() {
return new DefaultClusterOperations<K, V>(proxy);
}
}
(2) 切面
package com.littleevil.test;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* RedisTemplate.execute的切面
*
* @author 80273435
* @date 2020-05-19 10-53
*/
@Aspect
@Component
@Slf4j
public class RedisTemplateAspect {
@Around("execution(* org.springframework.data.redis.core.RedisTemplate.execute(..))")
private Object executeAround(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("before execute");
Object proceed = joinPoint.proceed();
log.info("after execute");
return proceed;
}
}w