Lettuce同步源碼分析
在上一篇分享中分享了單機模式異步連接創建過程Lettuce創建連接過程源碼分析; 在本次分享內容主要介紹同步命令的處理過程.
Lettuce是基於Netty的Redis高級客戶端,對於異步命令來說是天然的,那么lettuce中是如何處理同步命令的呢?實際上同步連接還是對異步命令的一次封裝;下面我們就通過源碼進行分析看看Lettuce中的具體實現.
通過上一篇文章中可以知道在StatefulRedisConnectionImpl中創建 異步模式,同步模式以及響應式模式命令處理模式,那么我們就從 該處看起
public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {
super(writer, timeout);
this.codec = codec;
//創建異步redis命令處理模式
this.async = newRedisAsyncCommandsImpl();
//創建redis命令同步處理模式
this.sync = newRedisSyncCommandsImpl();
//創建redis命令響應式處理模式
this.reactive = newRedisReactiveCommandsImpl();
}
通過這里似乎看不出同步處理模式同異步處理模式有什么關聯,那么我們在深入進去看一下
protected RedisCommands<K, V> newRedisSyncCommandsImpl() {
return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);
}
在這段代碼中可以看到async(),這個就是redis命令異步處理模式,那么它是如何封裝的呢?
protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {
//對異步API創建調用處理器
FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection<?, ?>) this, asyncApi, interfaces);
//創建動態代理
return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
}
通過上面對源碼可以發現原來是對異步api創建了一個JDK動態代理;那么關鍵的邏輯還是在FutureSyncInvocationHandler中,對於動態代理的知識就不在展開了.
在invoke處理是在AbstractInvocationHandler中完成的,它將一些基本公用的抽象在了基類中,將特殊的實現延遲到子類中實現.
public final Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//如果參數為null則 將args設置為"{}"
if (args == null) {
args = NO_ARGS;
}
//如果參數長度為0同時方法名稱為hashCode則直接返回hashCode
if (args.length == 0 && method.getName().equals("hashCode")) {
return hashCode();
}
//如果是equals
if (args.length == 1 && method.getName().equals("equals") && method.getParameterTypes()[0] == Object.class) {
Object arg = args[0];
if (arg == null) {
return false;
}
if (proxy == arg) {
return true;
}
return isProxyOfSameInterfaces(arg, proxy.getClass()) && equals(Proxy.getInvocationHandler(arg));
}
//如果是toString
if (args.length == 0 && method.getName().equals("toString")) {
return toString();
}
return handleInvocation(proxy, method, args);
}
在FutureSyncInvocationHandler中實現了同步命令處理過程,其源碼如下:
protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
try {
//獲取當前method在asyncApi 中對應的方法
Method targetMethod = this.translator.get(method);
//調用異步接口
Object result = targetMethod.invoke(asyncApi, args);
//如果返回結果是RedisFuture類型
if (result instanceof RedisFuture<?>) {
//類型強轉
RedisFuture<?> command = (RedisFuture<?>) result;
//如果不是事務控制方法 同時還在事務中則返回null
if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) {
return null;
}
//是事務控制方法,或不在事務中則進行如下處理
//等待超時或取消
LettuceFutures.awaitOrCancel(command, connection.getTimeout().toNanos(), TimeUnit.NANOSECONDS);
//返回結果,這里處理不是很好 上一步中就可以直接返回了
return command.get();
}
//如果不是RedisFuture類型則直接返回
return result;
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
在上文中有一段是獲取獲取指定方法在delegate中對應方法的處理,下面就看看這個處理是如何實現的
/**
* 方法翻譯器
*/
protected static class MethodTranslator {
private final static WeakHashMap<Class<?>, MethodTranslator> TRANSLATOR_MAP = new WeakHashMap<>(32);
//真實方法和代理類中方法映射表
private final Map<Method, Method> map;
private MethodTranslator(Class<?> delegate, Class<?>... methodSources) {
map = createMethodMap(delegate, methodSources);
}
/**
* 通過指定代理類,和目標類創建方法翻譯器
*/
public static MethodTranslator of(Class<?> delegate, Class<?>... methodSources) {
//同步代碼塊
synchronized (TRANSLATOR_MAP) {
//如果翻譯器映射表中不存在delegate的翻譯器則創建一個新的
return TRANSLATOR_MAP.computeIfAbsent(delegate, key -> new MethodTranslator(key, methodSources));
}
}
private Map<Method, Method> createMethodMap(Class<?> delegate, Class<?>[] methodSources) {
Map<Method, Method> map;
List<Method> methods = new ArrayList<>();
//遍歷源類,找到所有public方法
for (Class<?> sourceClass : methodSources) {
methods.addAll(getMethods(sourceClass));
}
map = new HashMap<>(methods.size(), 1.0f);
//創建方法和代理類的方法的映射表
for (Method method : methods) {
try {
map.put(method, delegate.getMethod(method.getName(), method.getParameterTypes()));
} catch (NoSuchMethodException ignore) {
}
}
return map;
}
//獲取目標方法中的所有方法
private Collection<? extends Method> getMethods(Class<?> sourceClass) {
//目標方法集合
Set<Method> result = new HashSet<>();
Class<?> searchType = sourceClass;
while (searchType != null && searchType != Object.class) {
//將目標類中所有public方法添加到集合中
result.addAll(filterPublicMethods(Arrays.asList(sourceClass.getDeclaredMethods())));
//如果souceClass是接口類型
if (sourceClass.isInterface()) {
//獲取souceClass的所有接口
Class<?>[] interfaces = sourceClass.getInterfaces();
//遍歷接口,將接口的public方法也添加到方法集合中
for (Class<?> interfaceClass : interfaces) {
result.addAll(getMethods(interfaceClass));
}
searchType = null;
} else {//如果不是接口則查找父類
searchType = searchType.getSuperclass();
}
}
return result;
}
//獲取給定方法集合中所有public方法
private Collection<? extends Method> filterPublicMethods(List<Method> methods) {
List<Method> result = new ArrayList<>(methods.size());
for (Method method : methods) {
if (Modifier.isPublic(method.getModifiers())) {
result.add(method);
}
}
return result;
}
public Method get(Method key) {
//從方法映射表中獲取目標方法
Method result = map.get(key);
//如果目標方法不為null則返回,否則拋出異常
if (result != null) {
return result;
}
throw new IllegalStateException("Cannot find source method " + key);
}
}
}
