責任鏈模式
責任鏈模式在Dubbo中發揮的作用舉足輕重,就像是Dubbo框架的骨架。Dubbo的調用鏈組織是用責任鏈模式串連起來的。
責任鏈中的每個節點實現Filter接口,然后由ProtocolFilterWrapper,將所有Filter串連起來。
Dubbo的許多功能都是通過Filter擴展實現的,比如監控、日志、緩存、安全、telnet以及RPC本身都是。
如果把Dubbo比作一列火車,責任鏈就像是火車的各車廂,每個車廂的功能不同。
如果需要加入新的功能,增加車廂就可以了,非常容易擴展。
最經典的實現鏈式Filter代碼。采用匿名內部類來實現,一定要DEBUG進去看看。
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
至少有2個典型案例:
-
dubbo filter鏈式調用
-
dubbo handler鏈式調用
Dubbo的Filter類似於 serlvet filter.可以搞一些非業務的工作,如限流,超時,訪問日志記錄,trace等。 dubbo服務,進行refer或export時,會build filter. 采用了匿名機制。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
public void destroy() {
protocol.destroy();
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
dubbo handler采用的也是鏈式模式。 鏈式模型是通信系統中的經典模式,也叫做pipeline模式。 應用數據通過協議層,傳輸層,序列化后,和管道非常類似,在每一層,都會進行相應的業務處理,然后傳到下一層
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(((Dispatcher)ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()).dispatch(handler, url))); }
最里層那個hanlder也是一層一層套的handler.分別是DecodeHandler,HeaderExchangeHandler,DubboProtocol(handler)
這幾個handler關系比較復雜
觀察者模式
Dubbo中使用觀察者模式最典型的例子是RegistryService。
消費者在初始化的時候回調用subscribe方法,注冊一個觀察者,如果觀察者引用的服務地址列表發生改變,就會通過NotifyListener通知消費者。
此外,Dubbo的InvokerListener、ExporterListener 也實現了觀察者模式,只要實現該接口,並注冊,
就可以接收到consumer端調用refer和provider端調用export的通知。Dubbo的注冊/訂閱模型和觀察者模式就是天生一對。
節點export或refer的時候,都會訂閱感興趣的節點。
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
final RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker);
final Registry registry = this.getRegistry(originInvoker);
final URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
final URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl);
final RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl);
this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory(type, url);
directory.setRegistry(registry);
directory.setProtocol(this.protocol);
URL subscribeUrl = new URL("consumer", NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (!"*".equals(url.getServiceInterface()) && url.getParameter("register", true)) {
registry.register(subscribeUrl.addParameters(new String[]{"category", "consumers", "check", String.valueOf(false)}));
}
directory.subscribe(subscribeUrl.addParameter("category", "providers,configurators,routers"));
return cluster.join(directory);
}
public void subscribe(URL url) {
this.setConsumerUrl(url);
this.registry.subscribe(url, this);
}
上面,訂閱節點信息的時候,把自己的this引用傳進去了。這樣,當節點有變化的時候,會通過this上下文,修改Invoker列表。
ChildListener zkListener = (ChildListener)listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, ZookeeperRegistry.this.toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = (ChildListener)listeners.get(listener);
}
一路進來,刷新invoker.注意,invoker會產生竟態條件,所以需要加鎖。
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList();
List<URL> routerUrls = new ArrayList();
List<URL> configuratorUrls = new ArrayList();
Iterator i$ = urls.iterator();
while(true) {
while(true) {
while(i$.hasNext()) {
URL url = (URL)i$.next();
String protocol = url.getProtocol();
String category = url.getParameter("category", "providers");
this.refreshInvoker(invokerUrls);
return;
}
}
}
修飾器模式
Dubbo中還大量用到了修飾器模式。比如ProtocolFilterWrapper類是對Protocol類的修飾。在export和refer方法中,配合責任鏈模式,
把Filter組裝成責任鏈,實現對Protocol功能的修飾。其他還有ProtocolListenerWrapper、 ListenerInvokerWrapper、InvokerWrapper等。
個人感覺,修飾器模式是一把雙刃劍,一方面用它可以方便地擴展類的功能,而且對用戶無感,
但另一方面,過多地使用修飾器模式不利於理解,因為一個類可能經過層層修飾,最終的行為已經和原始行為偏離較大。
工廠方法模式
CacheFactory的實現采用的是工廠方法模式。CacheFactory接口定義getCache方法,
然后定義一個AbstractCacheFactory抽象類實現CacheFactory,
並將實際創建cache的createCache方法分離出來,並設置為抽象方法。這樣具體cache的創建工作就留給具體的子類去完成。
插件機制
Dubbo本身的功能基本都夠用了,但是Dubbo沒有固步自封,而是平等的對待第三方,用戶可以定制自己的插件,對Dubbo功能進行擴展。 Dubbo通過SPI機制,實現插件機制。 機制如下:
-
DUBBO框架預留了接口,具體的實現,由插件實現
-
SPI注解,通過SPI注解,以及約定的配置文件,完成實現接口的映射關系
-
插件配置放在目錄”META-INF/dubbo/internal“下面
-
配置文件的格式是”KEY=VALUE“格式
-
KEY是SPI注解上面的值,VALUE是對應的插件實現類
loadExtensionClasses會從約定好的目錄下載加載類。
private Map<String, Class<?>> loadExtensionClasses() { SPI defaultAnnotation = (SPI)this.type.getAnnotation(SPI.class); String value = defaultAnnotation.value(); Map<String, Class<?>> extensionClasses = new HashMap(); this.loadFile(extensionClasses, "META-INF/dubbo/internal/"); return extensionClasses; }
一般情況下,使用默認的即可,如果需要使用自定義的插件,可以通過URL傳遞。 例如,負載均均衡測試,通過URL指定,如果未指定,則使用默認的隨機負載均衡策略。
loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(invocation.getMethodName(), "loadbalance", "random"));
上述加載類的過程非常復雜,這次簡單過一下
抽象工廠模式
ProxyFactory及其子類是Dubbo中使用抽象工廠模式的典型例子。
ProxyFactory提供兩個方法,分別用來生產Proxy和Invoker
(這兩個方法簽名看起來有些矛盾,因為getProxy方法需要傳入一個Invoker對象,而getInvoker方法需要傳入一個Proxy對象,看起來會形成循環依賴,但其實兩個方式使用的場景不一樣)。
AbstractProxyFactory實現了ProxyFactory接口,作為具體實現類的抽象父類。
然后定義了JdkProxyFactory和JavassistProxyFactory兩個具體類,分別用來生產基於jdk代理機制和基於javassist代理機制的Proxy和Invoker。
適配器模式
為了讓用戶根據自己的需求選擇日志組件,Dubbo自定義了自己的Logger接口,並為常見的日志組件(包括jcl, jdk, log4j, slf4j)提供相應的適配器。
並且利用簡單工廠模式提供一個LoggerFactory,客戶可以創建抽象的Dubbo自定義Logger,而無需關心實際使用的日志組件類型。
在LoggerFactory初始化時,客戶通過設置系統變量的方式選擇自己所用的日志組件,這樣提供了很大的靈活性。
至少有3個經典案例
-
Transport 完成Server 和Client接口功能
-
CoderAdapter完成encode和decode功能 NettyCodecAdapter
-
@Adaptive
@SPI("netty")
public interface Transporter {
@Adaptive({"server", "transporter"})
Server bind(URL var1, ChannelHandler var2) throws RemotingException;
@Adaptive({"client", "transporter"})
Client connect(URL var1, ChannelHandler var2) throws RemotingException;
}
下面就是給類動態的增加功能。
private static Wrapper makeWrapper(Class<?> c)
{
if( c.isPrimitive() )
throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
String name = c.getName();
ClassLoader cl = ClassHelper.getClassLoader(c);
StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
Map<String, Class<?>> pts = new HashMap<String, Class<?>>(); // <property name, property types>
Map<String, Method> ms = new LinkedHashMap<String, Method>(); // <method desc, Method instance>
List<String> mns = new ArrayList<String>(); // method names.
List<String> dmns = new ArrayList<String>(); // declaring method names.
// get all public field.
for( Field f : c.getFields() )
{
String fn = f.getName();
Class<?> ft = f.getType();
if( Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()) )
continue;
c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
pts.put(fn, ft);
}
Method[] methods = c.getMethods();
// get all public method.
boolean hasMethod = hasMethods(methods);
if( hasMethod ){
c3.append(" try{");
}
for( Method m : methods )
{
if( m.getDeclaringClass() == Object.class ) //ignore Object's method.
continue;
String mn = m.getName();
c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
int len = m.getParameterTypes().length;
c3.append(" && ").append(" $3.length == ").append(len);
boolean override = false;
for( Method m2 : methods ) {
if (m != m2 && m.getName().equals(m2.getName())) {
override = true;
break;
}
}
if (override) {
if (len > 0) {
for (int l = 0; l < len; l ++) {
c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
.append(m.getParameterTypes()[l].getName()).append("\")");
}
}
}
c3.append(" ) { ");
if( m.getReturnType() == Void.TYPE )
c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
else
c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
c3.append(" }");
mns.add(mn);
if( m.getDeclaringClass() == c )
dmns.add(mn);
ms.put(ReflectUtils.getDesc(m), m);
}
if( hasMethod ){
c3.append(" } catch(Throwable e) { " );
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); " );
c3.append(" }");
}
c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
// deal with get/set method.
Matcher matcher;
for( Map.Entry<String,Method> entry : ms.entrySet() )
{
String md = entry.getKey();
Method method = (Method)entry.getValue();
if( ( matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md) ).matches() )
{
String pn = propertyName(matcher.group(1));
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
}
else if( ( matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md) ).matches() )
{
String pn = propertyName(matcher.group(1));
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
}
else if( ( matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md) ).matches() )
{
Class<?> pt = method.getParameterTypes()[0];
String pn = propertyName(matcher.group(1));
c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt,"$3")).append("); return; }");
pts.put(pn, pt);
}
}
c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");
c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");
// make class
long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
ClassGenerator cc = ClassGenerator.newInstance(cl);
cc.setClassName( ( Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw" ) + id );
cc.setSuperClass(Wrapper.class);
cc.addDefaultConstructor();
cc.addField("public static String[] pns;"); // property name array.
cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
cc.addField("public static String[] mns;"); // all method name array.
cc.addField("public static String[] dmns;"); // declared method name array.
for(int i=0,len=ms.size();i<len;i++)
cc.addField("public static Class[] mts" + i + ";");
cc.addMethod("public String[] getPropertyNames(){ return pns; }");
cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
cc.addMethod("public String[] getMethodNames(){ return mns; }");
cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
cc.addMethod(c1.toString());
cc.addMethod(c2.toString());
cc.addMethod(c3.toString());
try
{
Class<?> wc = cc.toClass();
// setup static field.
wc.getField("pts").set(null, pts);
wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
wc.getField("mns").set(null, mns.toArray(new String[0]));
wc.getField("dmns").set(null, dmns.toArray(new String[0]));
int ix = 0;
for( Method m : ms.values() )
wc.getField("mts" + ix++).set(null, m.getParameterTypes());
return (Wrapper)wc.newInstance();
}
catch(RuntimeException e)
{
throw e;
}
catch(Throwable e)
{
throw new RuntimeException(e.getMessage(), e);
}
finally
{
cc.release();
ms.clear();
mns.clear();
dmns.clear();
}
}
代理模式
Dubbo consumer使用Proxy類創建遠程服務的本地代理,本地代理實現和遠程服務一樣的接口,並且屏蔽了網絡通信的細節,使得用戶在使用本地代理的時候,感覺和使用本地服務一樣。
public class JavassistProxyFactory extends AbstractProxyFactory { @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper類不能正確處理帶$的類名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
