一、前言
Dubbo內核
dubbo所有功能都是基於dubbo內核之上完成的,dubbo內核由四部分構成,分別為
SPI、Adaptive、Wrapper、Activate。而dubbo的內核設計原則,也是我們所熟悉的aop,ioc與動態編譯compiler,這些稱之為dubbo的內核原理。
Wrapper機制
即擴展點
自動包裝。Wrapper 類同樣實現了擴展點接口,但是 Wrapper 不是擴展點的真正實現。它的用途主要是用於從 ExtensionLoader 返回擴展點時,包裝在真正的擴展點實現外。即從 ExtensionLoader 中返回的實際上是 Wrapper 類的實例,Wrapper 持有了實際的擴展點實現類。
擴展點的 Wrapper 類可以有多個,也可以根據需要新增。
通過 Wrapper 類可以把所有擴展點公共邏輯移至 Wrapper 中。新加的 Wrapper 在所有的擴展點上添加了邏輯,有些類似 AOP,即 Wrapper 代理了擴展點。
Wrapper的規范
Wrapper 機制不是通過注解實現的,而是通過一套 Wrapper 規范實現的。
Wrapper 類在定義時需要遵循如下規范:
- 該類要實現 SPI 接口
- 該類中要有 SPI 接口的引用
- 該類中必須含有一個含參的構造方法且參數只能有一個類型為SPI借口
- 在接口實現方法中要調用 SPI 接口引用對象的相應方法
- 該類名稱以 Wrapper 結尾
二、使用示例
未使用Wrapper之前:
@SPI("ali") // 默認的值支付寶支付 public interface Pay { // 接口的方法需要添加這個注解,在測試代碼中,參數至少要有一個URL類型的參數 @Adaptive({"paytype"}) // 付款方式 void pay(URL url); } public class AliPay implements Pay { @Override public void pay(URL url) { System.out.println("使用支付寶支付"); } } public class WechatPay implements Pay { @Override public void pay(URL url) { System.out.println("使用微信支付"); } } 在/dubbo-common/src/main/resources/META-INF/services/com.test.Pay文件下添加內容如下: wechat = com.test.WechatPay ali = com.test.AliPay
public static void main(String[] args) { ExtensionLoader<Pay> loader = ExtensionLoader.getExtensionLoader(Pay.class); Pay pay = loader.getAdaptiveExtension(); pay.pay(URL.valueOf("http://localhost:9999/xxx")); // 使用支付寶支付 pay.pay(URL.valueOf("http://localhost:9999/xxx?paytype=wechat")); // 使用微信支付 }
上述示例是原Adaptive使用示例,在使用Wrapper之后:
- 首先要添加一個Wrapper類
- 並在/dubbo-common/src/main/resources/META-INF/services/com.test.Pay文件下追加“xxx = com.test.PayWrapper1”
(1)代理模式
public class PayWrapper1 implements Pay { Pay pay; public PayWrapper1(Pay pay) { this.pay = pay; } @Override public void pay(URL url) { System.out.println("pay before..."); pay.pay(url); System.out.println("pay after..."); } }
執行上面main方法,得出如下結果:
pay before...
使用支付寶支付
pay after...
pay before...
使用微信支付
pay after...
由此可見Wrapper是一個AOP功能
(2)責任鏈模式
我們還可以給它添加一個Wrapper2類,如下所示
public class PayWrapper2 implements Pay { Pay pay; public PayWrapper2(Pay pay) { this.pay = pay; } @Override public void pay(URL url) { System.out.println("-----pay before..."); pay.pay(url); System.out.println("-----pay after..."); } }
並追加xxx2 = com.test.PayWrapper2
輸出的結果如下:
-----pay before...
pay before...
使用支付寶支付
pay after...
-----pay after...
執行順序先執行2,再執行1
三、源碼分析
上面main方法等同於
public static void main(String[] args) { URL url = URL.valueOf("http://localhost:9999/xxx"); String extName = url.getParameter("paytype", "ali"); System.out.println(extName); // ali ExtensionLoader<Pay> loader = ExtensionLoader.getExtensionLoader(Pay.class); Pay extension = (Pay) loader.getExtension(extName); // extension返回的結果為PayWrapper1 extension.pay(url); }
Wrapper功能實現分為兩個部分
一個是加載Extension時會把Wrapper類放入緩存中;
另一部分取得服務提供者實例時,將裝配過的Wrapper類返回
我們先看第一部分,加載Wrapper類
// 維護一個線程安全的HashSet來存放Wrapper try { // 嘗試取得參數類型為SPI接口類型的構造函數,即判斷該類是否是Wrapper類,如果不是會拋出異常;如果是,繼續執行,並添加到cache中 // 上面定義的Wrapper類如果有構造,則表示是一個真正的Wrapper clazz.getConstructor(type); Set<Class<?>> wrappers = cachedWrapperClasses; // 通過上面取得ExtensionLoader的代碼你需要知道,每一個SPI接口都有一個ExtensionLoader, // 所以這里面的緩存也是每一個SPI接口都有他的Wrapper緩存,生命周期和loader的生命周期一致 if (wrappers == null) { cachedWrapperClasses = new ConcurrentHashSet<Class<?>>(); wrappers = cachedWrapperClasses; } // 這里cachedWrapperClasses和wrappers用的是同一個對象地址,所以相當於往cachedWrapperClasses添加元素 wrappers.add(clazz); }
第二部分則是組裝Wrapper類
通過getExtension方法中調用了createExtension方法,createExtension會循環遍歷,通過
wrapperClass.getConstructor(type).newInstance(instance) 將wrapper構造注入
private T createExtension(String name) { Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses; // cache中是否存在wrapper類,如果存在,遍歷,最后返回wrapper這個實例 if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { // 注冊擴展,返回 wrapperClass.getConstructor(type).newInstance(instance) 實例 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }
injectExtension 只有一個邏輯,就是判斷是否有set方法,然后屬性注入
private T injectExtension(T instance) { try { if (objectFactory != null) { for (Method method : instance.getClass().getMethods()) { if (method.getName().startsWith("set") && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())) { Class<?> pt = method.getParameterTypes()[0]; try { String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : ""; Object object = objectFactory.getExtension(pt, property); if (object != null) { method.invoke(instance, object); } } catch (Exception e) { logger.error("fail to inject via method " + method.getName() + " of interface " + type.getName() + ": " + e.getMessage(), e); } } } } } catch (Exception e) { logger.error(e.getMessage(), e); } return instance; }
四、 在Dubbo中的應用
以 ProtocolFilterWrapper為例,在 dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper mock=com.alibaba.dubbo.rpc.support.MockProtocol
當服務提供者啟動時,ServiceConfig開始執行onApplicationEvent方法,並開始執行服務導出
public class ServiceConfig<T> extends AbstractServiceConfig { private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); @SuppressWarnings({"unchecked", "rawtypes"}) private void exportLocal(URL url) { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { // 組裝URL URL local = URL.valueOf(url.toFullString()) // 常量值為injvm,在執行wrapper鏈時用到 .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST) .setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); // protocol,Protocol$Adaptive Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } } }
Protocol$Adaptive 類如下(簡化版)
import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Protocol$Adpative implements Protocol { public void destroy() { // throw Exception } public int getDefaultPort() { // throw Exception } public Invoker refer(Class arg0, URL arg1) throws RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); URL url = arg1; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) // throw Exception Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } public Exporter export(Invoker arg0) throws RpcException { if (arg0 == null) // throw Exception if (arg0.getUrl() == null) // throw Exception URL url = arg0; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) // throw Exception Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); return extension.export(arg0); } }
通過如下兩行代碼
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); // dubbo Protocol extension = (Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
通過擴展名,我們可以在/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 文件分析出
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper # Wrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper # Wrapper mock=com.alibaba.dubbo.rpc.support.MockProtocol injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol # 在ServiceConfig 組裝過這個協議名稱 rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol com.alibaba.dubbo.rpc.protocol.http.HttpProtocol com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
在沒有Wrapper的情況下,得到的擴展類為com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
在有Wrarpper的情況下,得到的是最后的一個Wrapper,即com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
所以下面就調用ProtocolListenerWrapper中export方法
public class ProtocolListenerWrapper implements Protocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); } }
根據Wrapper責任鏈模式的特點,接下來執行 ProtocolFilterWrapper
public class ProtocolFilterWrapper implements Protocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } }
再接下來執行 injvm 對應的InjvmProtocol類中的export方法