一、Dubbo中Invoker介紹
為什么說Invoker是Dubbo核心模型呢?
Invoker是Dubbo中的實體域,也就是真實存在的。其他模型都向它靠攏或轉換成它,它也就代表一個可執行體,可向它發起invoke調用。在服務提供方,Invoker用於調用服務提供類。在服務消費方,Invoker用於執行遠程調用。
二、服務提供方的Invoker
在服務提供方中的Invoker是由ProxyFactory創建而來的,Dubbo默認的ProxyFactory實現類為JavassistProxyFactory。
創建Invoker的入口方法getInvoker:
1 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { 2 // 為目標類創建 Wrapper 3 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type); 4 // 創建匿名 Invoker 類對象,並實現 doInvoke 方法。 5 return new AbstractProxyInvoker<T>(proxy, type, url) { 6 @Override 7 protected Object doInvoke(T proxy, String methodName, 8 Class<?>[] parameterTypes, 9 Object[] arguments) throws Throwable { 10 // 調用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調用目標方法 11 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); 12 } 13 }; 14 }
JavassistProxyFactory創建了一個繼承自AbstractProxyInvoker類的匿名對象,並覆寫了抽象方法doInvoke。覆寫后的doInvoke 邏輯比較簡單,僅是將調用請求轉發給了Wrapper類的invokeMethod 方法。以及生成 invokeMethod 方法代碼和其他一些方法代碼。代碼生成完畢后,通過 Javassist 生成 Class 對象,最后再通過反射創建 Wrapper 實例。
注:Wapper是一個包裝類。主要用於“包裹”目標類,僅可以通過getWapper(Class)方法創建子類。在創建子類過程中,子類代碼會對傳進來的Class對象進行解析,拿到類方法,類成員變量等信息。而這個包裝類持有實際的擴展點實現類。也可以把擴展點的公共邏輯全部移到包裝類中,功能上就是作為AOP實現。
創建包裝類的構造方法:
1 public static Wrapper getWrapper(Class<?> c) { 2 while (ClassGenerator.isDynamicClass(c)) 3 c = c.getSuperclass(); 4 5 if (c == Object.class) 6 return OBJECT_WRAPPER; 7 8 // 從緩存中獲取 Wrapper 實例 9 Wrapper ret = WRAPPER_MAP.get(c); 10 if (ret == null) { 11 // 緩存未命中,創建 Wrapper 12 ret = makeWrapper(c); 13 // 寫入緩存 14 WRAPPER_MAP.put(c, ret); 15 } 16 return ret; 17 }
在緩存中獲取不到Wapper就會進入下面的方法makeWapper:

1 private static Wrapper makeWrapper(Class<?> c) { 2 // 檢測 c 是否為基本類型,若是則拋出異常 3 if (c.isPrimitive()) 4 throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c); 5 6 String name = c.getName(); 7 ClassLoader cl = ClassHelper.getClassLoader(c); 8 9 // c1 用於存儲 setPropertyValue 方法代碼 10 StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ "); 11 // c2 用於存儲 getPropertyValue 方法代碼 12 StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ "); 13 // c3 用於存儲 invokeMethod 方法代碼 14 StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ "); 15 16 // 生成類型轉換代碼及異常捕捉代碼,比如: 17 // DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); } 18 c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); 19 c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); 20 c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); 21 22 // pts 用於存儲成員變量名和類型 23 Map<String, Class<?>> pts = new HashMap<String, Class<?>>(); 24 // ms 用於存儲方法描述信息(可理解為方法簽名)及 Method 實例 25 Map<String, Method> ms = new LinkedHashMap<String, Method>(); 26 // mns 為方法名列表 27 List<String> mns = new ArrayList<String>(); 28 // dmns 用於存儲“定義在當前類中的方法”的名稱 29 List<String> dmns = new ArrayList<String>(); 30 31 // --------------------------------✨ 分割線1 ✨------------------------------------- 32 33 // 獲取 public 訪問級別的字段,並為所有字段生成條件判斷語句 34 for (Field f : c.getFields()) { 35 String fn = f.getName(); 36 Class<?> ft = f.getType(); 37 if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) 38 // 忽略關鍵字 static 或 transient 修飾的變量 39 continue; 40 41 // 生成條件判斷及賦值語句,比如: 42 // if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;} 43 // if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;} 44 c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }"); 45 46 // 生成條件判斷及返回語句,比如: 47 // if( $2.equals("name") ) { return ($w)w.name; } 48 c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }"); 49 50 // 存儲 <字段名, 字段類型> 鍵值對到 pts 中 51 pts.put(fn, ft); 52 } 53 54 // --------------------------------✨ 分割線2 ✨------------------------------------- 55 56 Method[] methods = c.getMethods(); 57 // 檢測 c 中是否包含在當前類中聲明的方法 58 boolean hasMethod = hasMethods(methods); 59 if (hasMethod) { 60 c3.append(" try{"); 61 } 62 for (Method m : methods) { 63 if (m.getDeclaringClass() == Object.class) 64 // 忽略 Object 中定義的方法 65 continue; 66 67 String mn = m.getName(); 68 // 生成方法名判斷語句,比如: 69 // if ( "sayHello".equals( $2 ) 70 c3.append(" if( \"").append(mn).append("\".equals( $2 ) "); 71 int len = m.getParameterTypes().length; 72 // 生成“運行時傳入的參數數量與方法參數列表長度”判斷語句,比如: 73 // && $3.length == 2 74 c3.append(" && ").append(" $3.length == ").append(len); 75 76 boolean override = false; 77 for (Method m2 : methods) { 78 // 檢測方法是否存在重載情況,條件為:方法對象不同 && 方法名相同 79 if (m != m2 && m.getName().equals(m2.getName())) { 80 override = true; 81 break; 82 } 83 } 84 // 對重載方法進行處理,考慮下面的方法: 85 // 1. void sayHello(Integer, String) 86 // 2. void sayHello(Integer, Integer) 87 // 方法名相同,參數列表長度也相同,因此不能僅通過這兩項判斷兩個方法是否相等。 88 // 需要進一步判斷方法的參數類型 89 if (override) { 90 if (len > 0) { 91 for (int l = 0; l < len; l++) { 92 // 生成參數類型進行檢測代碼,比如: 93 // && $3[0].getName().equals("java.lang.Integer") 94 // && $3[1].getName().equals("java.lang.String") 95 c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"") 96 .append(m.getParameterTypes()[l].getName()).append("\")"); 97 } 98 } 99 } 100 101 // 添加 ) {,完成方法判斷語句,此時生成的代碼可能如下(已格式化): 102 // if ("sayHello".equals($2) 103 // && $3.length == 2 104 // && $3[0].getName().equals("java.lang.Integer") 105 // && $3[1].getName().equals("java.lang.String")) { 106 c3.append(" ) { "); 107 108 // 根據返回值類型生成目標方法調用語句 109 if (m.getReturnType() == Void.TYPE) 110 // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null; 111 c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;"); 112 else 113 // return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); 114 c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");"); 115 116 // 添加 }, 生成的代碼形如(已格式化): 117 // if ("sayHello".equals($2) 118 // && $3.length == 2 119 // && $3[0].getName().equals("java.lang.Integer") 120 // && $3[1].getName().equals("java.lang.String")) { 121 // 122 // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); 123 // return null; 124 // } 125 c3.append(" }"); 126 127 // 添加方法名到 mns 集合中 128 mns.add(mn); 129 // 檢測當前方法是否在 c 中被聲明的 130 if (m.getDeclaringClass() == c) 131 // 若是,則將當前方法名添加到 dmns 中 132 dmns.add(mn); 133 ms.put(ReflectUtils.getDesc(m), m); 134 } 135 if (hasMethod) { 136 // 添加異常捕捉語句 137 c3.append(" } catch(Throwable e) { "); 138 c3.append(" throw new java.lang.reflect.InvocationTargetException(e); "); 139 c3.append(" }"); 140 } 141 142 // 添加 NoSuchMethodException 異常拋出代碼 143 c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }"); 144 145 // --------------------------------✨ 分割線3 ✨------------------------------------- 146 147 Matcher matcher; 148 // 處理 get/set 方法 149 for (Map.Entry<String, Method> entry : ms.entrySet()) { 150 String md = entry.getKey(); 151 Method method = (Method) entry.getValue(); 152 // 匹配以 get 開頭的方法 153 if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) { 154 // 獲取屬性名 155 String pn = propertyName(matcher.group(1)); 156 // 生成屬性判斷以及返回語句,示例如下: 157 // if( $2.equals("name") ) { return ($w).w.getName(); } 158 c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }"); 159 pts.put(pn, method.getReturnType()); 160 161 // 匹配以 is/has/can 開頭的方法 162 } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) { 163 String pn = propertyName(matcher.group(1)); 164 // 生成屬性判斷以及返回語句,示例如下: 165 // if( $2.equals("dream") ) { return ($w).w.hasDream(); } 166 c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }"); 167 pts.put(pn, method.getReturnType()); 168 169 // 匹配以 set 開頭的方法 170 } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) { 171 Class<?> pt = method.getParameterTypes()[0]; 172 String pn = propertyName(matcher.group(1)); 173 // 生成屬性判斷以及 setter 調用語句,示例如下: 174 // if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; } 175 c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }"); 176 pts.put(pn, pt); 177 } 178 } 179 180 // 添加 NoSuchPropertyException 異常拋出代碼 181 c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }"); 182 c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }"); 183 184 // --------------------------------✨ 分割線4 ✨------------------------------------- 185 186 long id = WRAPPER_CLASS_COUNTER.getAndIncrement(); 187 // 創建類生成器 188 ClassGenerator cc = ClassGenerator.newInstance(cl); 189 // 設置類名及超類 190 cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id); 191 cc.setSuperClass(Wrapper.class); 192 193 // 添加默認構造方法 194 cc.addDefaultConstructor(); 195 196 // 添加字段 197 cc.addField("public static String[] pns;"); 198 cc.addField("public static " + Map.class.getName() + " pts;"); 199 cc.addField("public static String[] mns;"); 200 cc.addField("public static String[] dmns;"); 201 for (int i = 0, len = ms.size(); i < len; i++) 202 cc.addField("public static Class[] mts" + i + ";"); 203 204 // 添加方法代碼 205 cc.addMethod("public String[] getPropertyNames(){ return pns; }"); 206 cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }"); 207 cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }"); 208 cc.addMethod("public String[] getMethodNames(){ return mns; }"); 209 cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }"); 210 cc.addMethod(c1.toString()); 211 cc.addMethod(c2.toString()); 212 cc.addMethod(c3.toString()); 213 214 try { 215 // 生成類 216 Class<?> wc = cc.toClass(); 217 218 // 設置字段值 219 wc.getField("pts").set(null, pts); 220 wc.getField("pns").set(null, pts.keySet().toArray(new String[0])); 221 wc.getField("mns").set(null, mns.toArray(new String[0])); 222 wc.getField("dmns").set(null, dmns.toArray(new String[0])); 223 int ix = 0; 224 for (Method m : ms.values()) 225 wc.getField("mts" + ix++).set(null, m.getParameterTypes()); 226 227 // 創建 Wrapper 實例 228 return (Wrapper) wc.newInstance(); 229 } catch (RuntimeException e) { 230 throw e; 231 } catch (Throwable e) { 232 throw new RuntimeException(e.getMessage(), e); 233 } finally { 234 cc.release(); 235 ms.clear(); 236 mns.clear(); 237 dmns.clear(); 238 } 239 }
代碼較長,同樣注釋也很多。大致說一下里面邏輯:
-
- 創建c1,c2,c3三個字符串,用於存儲類型轉換代碼和異常捕捉代碼,而后pts用於存儲成員變量名和類型,ms用於存儲方法描述信息(可理解為方法簽名)及Method實例,mns為方法名列表,dmns用於存儲“定義在當前類中的方法”的名稱。在這里做完了一些初始工作
- 獲取所有public字段,用c1存儲條件判斷及賦值語句,可以理解為通過c1能夠為public字段賦值,而c2是條件判斷及返回語句,同樣的是得到public字段的值。再用pts存儲<字段名,字段類型>。也就是現在能對目標類字段進行操作了,而要操作一些私有字段,是要訪問set開頭和get開頭的方法,同樣這些方法也都對應使用c1存set,c2存get,pts存儲<屬性名,屬性類型>
- 現在到類中的方法,先檢查方法中的參數,然后再檢查是否有重載的方法。通過c3存儲調用目標方法的語句以及方法中可能會拋出的異常,而后用mns集合進行存儲方法名,對已經聲明的方法存到ms中,未聲明但是定義了的方法存在dmns中。
- 通過ClassGenerator為剛剛生成的代碼構建Class類,並通過反射創建對象。ClassGenerator是Dubbo自己封裝的,該類的核心是toClass()的重載方法 toClass(ClassLoader, ProtectionDomain),該方法通過javassist構建Class。
最后在創建完成Wapper類,回到上面的getInvoker方法然后通過下面這條語句
1 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); 2 3 //進入到invokeMethod中 4 5 public Object invokeMethod(Object instance, String mn, Class<?>[] types, Object[] args) throws NoSuchMethodException { 6 if ("getClass".equals(mn)) { 7 return instance.getClass(); 8 } else if ("hashCode".equals(mn)) { 9 return instance.hashCode(); 10 } else if ("toString".equals(mn)) { 11 return instance.toString(); 12 } else if ("equals".equals(mn)) { 13 if (args.length == 1) { 14 return instance.equals(args[0]); 15 } else { 16 throw new IllegalArgumentException("Invoke method [" + mn + "] argument number error."); 17 } 18 } else { 19 throw new NoSuchMethodException("Method [" + mn + "] not found."); 20 } 21 } 22 };
到這里Invoker就能實現調用服務提供類的方法了。也就是服務提供類的Invoker實體域創建完成。底層是通過javassist來構建對象的。
三、服務消費方的Invoker
在服務消費方,Invoker用於執行遠程調用。Invoker是由 Protocol實現類構建而來。Protocol實現類有很多但是最常用的兩個,分別是RegistryProtocol和DubboProtocol。
DubboProtocol的refer方法:
1 public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { 2 optimizeSerialization(url); 3 // 創建 DubboInvoker 4 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); 5 invokers.add(invoker); 6 return invoker; 7 }
上述方法較為簡單,最重要的一個在於getClients。這個方法用於獲取客戶端實例,實例類型為ExchangeClient。ExchangeClient實際上並不具備通信能力,它需要基於更底層的客戶端實例進行通信。比如NettyClient、MinaClient等,默認情況下,Dubbo使用NettyClient進行通信。每次創建好的Invoker都會添加到invokers這個集合里。也就是可以認為服務消費方的Invoker是一個具有通信能力的Netty客戶端
getClients方法:

1 private ExchangeClient[] getClients(URL url) { 2 // 是否共享連接 3 boolean service_share_connect = false; 4 // 獲取連接數,默認為0,表示未配置 5 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); 6 // 如果未配置 connections,則共享連接 7 if (connections == 0) { 8 service_share_connect = true; 9 connections = 1; 10 } 11 12 ExchangeClient[] clients = new ExchangeClient[connections]; 13 for (int i = 0; i < clients.length; i++) { 14 if (service_share_connect) { 15 // 獲取共享客戶端 16 clients[i] = getSharedClient(url); 17 } else { 18 // 初始化新的客戶端 19 clients[i] = initClient(url); 20 } 21 } 22 return clients; 23 } 24 25 //進入到獲取共享客戶端方法 26 27 private ExchangeClient getSharedClient(URL url) { 28 String key = url.getAddress(); 29 // 獲取帶有“引用計數”功能的 ExchangeClient 30 ReferenceCountExchangeClient client = referenceClientMap.get(key); 31 if (client != null) { 32 if (!client.isClosed()) { 33 // 增加引用計數 34 client.incrementAndGetCount(); 35 return client; 36 } else { 37 referenceClientMap.remove(key); 38 } 39 } 40 41 locks.putIfAbsent(key, new Object()); 42 synchronized (locks.get(key)) { 43 if (referenceClientMap.containsKey(key)) { 44 return referenceClientMap.get(key); 45 } 46 47 // 創建 ExchangeClient 客戶端 48 ExchangeClient exchangeClient = initClient(url); 49 // 將 ExchangeClient 實例傳給 ReferenceCountExchangeClient,這里使用了裝飾模式 50 client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); 51 referenceClientMap.put(key, client); 52 ghostClientMap.remove(key); 53 locks.remove(key); 54 return client; 55 } 56 } 57 58 //進入到初始化客戶端方法 59 60 private ExchangeClient initClient(URL url) { 61 62 // 獲取客戶端類型,默認為 netty 63 String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); 64 65 // 添加編解碼和心跳包參數到 url 中 66 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); 67 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 68 69 // 檢測客戶端類型是否存在,不存在則拋出異常 70 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { 71 throw new RpcException("Unsupported client type: ..."); 72 } 73 74 ExchangeClient client; 75 try { 76 // 獲取 lazy 配置,並根據配置值決定創建的客戶端類型 77 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { 78 // 創建懶加載 ExchangeClient 實例 79 client = new LazyConnectExchangeClient(url, requestHandler); 80 } else { 81 // 創建普通 ExchangeClient 實例 82 client = Exchangers.connect(url, requestHandler); 83 } 84 } catch (RemotingException e) { 85 throw new RpcException("Fail to create remoting client for service..."); 86 } 87 return client; 88 } 89 90 //進入到connect方法中,getExchanger 會通過 SPI 加載 HeaderExchangeClient 實例,這個方法比較簡單,大家自己看一下吧。接下來分析 HeaderExchangeClient 的實現。 91 92 public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { 93 if (url == null) { 94 throw new IllegalArgumentException("url == null"); 95 } 96 if (handler == null) { 97 throw new IllegalArgumentException("handler == null"); 98 } 99 url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); 100 // 獲取 Exchanger 實例,默認為 HeaderExchangeClient 101 return getExchanger(url).connect(url, handler); 102 } 103 104 //創建HeaderExchangeClient實例 105 106 public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { 107 // 這里包含了多個調用,分別如下: 108 // 1. 創建 HeaderExchangeHandler 對象 109 // 2. 創建 DecodeHandler 對象 110 // 3. 通過 Transporters 構建 Client 實例 111 // 4. 創建 HeaderExchangeClient 對象 112 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); 113 } 114 115 //通過 Transporters 構建 Client 實例 116 117 public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { 118 if (url == null) { 119 throw new IllegalArgumentException("url == null"); 120 } 121 ChannelHandler handler; 122 if (handlers == null || handlers.length == 0) { 123 handler = new ChannelHandlerAdapter(); 124 } else if (handlers.length == 1) { 125 handler = handlers[0]; 126 } else { 127 // 如果 handler 數量大於1,則創建一個 ChannelHandler 分發器 128 handler = new ChannelHandlerDispatcher(handlers); 129 } 130 131 // 獲取 Transporter 自適應拓展類,並調用 connect 方法生成 Client 實例 132 return getTransporter().connect(url, handler); 133 } 134 135 //創建Netty對象 136 137 public Client connect(URL url, ChannelHandler listener) throws RemotingException { 138 // 創建 NettyClient 對象 139 return new NettyClient(url, listener); 140 }
上面的源碼大概分一下幾個邏輯:
-
- 通過refer方法進入DubboInvoker實例的創建,在這個實例中其實serviceType,url,以及invokers都已經是不用去關心的,invokers可以說是存儲以及創建好的Invoker。而最關鍵的在於getClient方法。可以這么認為,現在的Invoker是一個Netty客戶端。而在服務提供方的Invoker是一個Wapper類。
- 在getClient方法里面首先根據connections數量決定是獲取共享客戶端還是創建新的客戶端實例,默認情況下是獲取共享客戶端,但是獲取共享客戶端中若緩存中拿不到對應客戶端也會新建一個客戶端。最終返回的是ExchangeClient,而當前的ExchangeClient也沒有通信能力,需要更加底層的Netty客戶端。
- initClient方法首先獲取用戶配置的客戶端類型,默認為Netty,然后檢測用戶配置的客戶端類型是否存在,不存在就要拋出異常,最后根據lazy配置覺得創建什么類型的客戶端。LazyConnectExchangeClient代碼並不是很復雜,該類會在request方法被調用時通過Exchangers的connect方法創建 ExchangeClient客戶端
- getExchanger會通過SPI加載HeaderExchangeClient實例。最后通過Transporter實現類以及調用Netty的API來創建Netty客戶端。最后層層返回,就最后成為了底層為Netty上層為DubboInvoker實例的這樣一個類。
RegistryProtocol中的refer:

1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 // 取 registry 參數值,並將其設置為協議頭 3 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 4 // 獲取注冊中心實例 5 Registry registry = registryFactory.getRegistry(url); 6 if (RegistryService.class.equals(type)) { 7 return proxyFactory.getInvoker((T) registry, type, url); 8 } 9 10 // 將 url 查詢字符串轉為 Map 11 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); 12 // 獲取 group 配置 13 String group = qs.get(Constants.GROUP_KEY); 14 if (group != null && group.length() > 0) { 15 if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 16 || "*".equals(group)) { 17 // 通過 SPI 加載 MergeableCluster 實例,並調用 doRefer 繼續執行服務引用邏輯 18 return doRefer(getMergeableCluster(), registry, type, url); 19 } 20 } 21 22 // 調用 doRefer 繼續執行服務引用邏輯 23 return doRefer(cluster, registry, type, url); 24 } 25 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { 26 // 創建 RegistryDirectory 實例 27 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); 28 // 設置注冊中心和協議 29 directory.setRegistry(registry); 30 directory.setProtocol(protocol); 31 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); 32 // 生成服務消費者鏈接 33 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); 34 35 // 注冊服務消費者,在 consumers 目錄下新節點 36 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) 37 && url.getParameter(Constants.REGISTER_KEY, true)) { 38 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, 39 Constants.CHECK_KEY, String.valueOf(false))); 40 } 41 42 // 訂閱 providers、configurators、routers 等節點數據 43 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 44 Constants.PROVIDERS_CATEGORY 45 + "," + Constants.CONFIGURATORS_CATEGORY 46 + "," + Constants.ROUTERS_CATEGORY)); 47 48 // 一個注冊中心可能有多個服務提供者,因此這里需要將多個服務提供者合並為一個 49 Invoker invoker = cluster.join(directory); 50 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); 51 return invoker; 52 } 53 54 //進入到集群創建Invoker模式 55 56 @SPI(FailoverCluster.NAME) 57 public interface Cluster { 58 59 /** 60 * 合並其中Directory的Invoker為一個Invoker 61 */ 62 @Adaptive 63 <T> Invoker<T> join(Directory<T> directory) throws RpcException; 64 } 65 66 //進入到MockerClusterWrapper實現類中 67 68 public class MockClusterWrapper implements Cluster { 69 70 private Cluster cluster; 71 72 public MockClusterWrapper(Cluster cluster) { 73 this.cluster = cluster; 74 } 75 76 public <T> Invoker<T> join(Directory<T> directory) throws RpcException { 77 return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); 78 } 79 } 80 81 //具體的invoke方法 82 83 public Result invoke(Invocation invocation) throws RpcException { 84 Result result = null; 85 86 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), 87 Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 88 if (value.length() == 0 || value.equalsIgnoreCase("false")){ 89 //no mock 90 result = this.invoker.invoke(invocation); 91 } else if (value.startsWith("force")) { 92 if (logger.isWarnEnabled()) { 93 logger.info("force-mock: " + invocation.getMethodName() + 94 " force-mock enabled , url : " + directory.getUrl()); 95 } 96 //force:direct mock 97 result = doMockInvoke(invocation, null); 98 } else { 99 //fail-mock 100 try { 101 result = this.invoker.invoke(invocation); 102 }catch (RpcException e) { 103 if (e.isBiz()) { 104 throw e; 105 } else { 106 if (logger.isWarnEnabled()) { 107 logger.info("fail-mock: " + invocation.getMethodName() + 108 " fail-mock enabled , url : " + directory.getUrl(), e); 109 } 110 //fail:mock 111 result = doMockInvoke(invocation, e); 112 } 113 } 114 } 115 return result; 116 }
大致說一下上面的邏輯:
-
- 當前的Invoker底層依然是NettyClient,但是此時注冊中心是集群搭建模式。所以需要將多個Invoker合並為一個,這里是邏輯合並的。實際上Invoker底層還是會有多個,只是通過一個集群模式來管理。所以暴露出來的就是一個集群模式的Invoker。於是進入Cluster.join方法。
- Cluster是一個通用代理類,會根據URL中的cluster參數值定位到實際的Cluster實現類也就是FailoverCluster。這里用到了@SPI注解,也就是需要ExtensionLoader擴展點加載機制,而該機制在實例化對象是,會在實例化后自動套上Wapper
- 但是是集群模式所以需要Dubbo中另外一個核心機制——Mock。Mock可以在測試中模擬服務調用的各種異常情況,還可以實現服務降級。在MockerClusterInvoker中,Dubbo先檢查URL中是否存在mock參數。(這個參數可以通過服務治理后台Consumer端的屏蔽和容錯進行設置或者直接動態設置mock參數值)如果存在force開頭,這不發起遠程調用直接執行降級邏輯。如果存在fail開頭,則在遠程調用異常時才會執行降級邏輯。
- 可以說注冊中心為集群模式時,Invoker就會外面多包裹一層mock邏輯。是通過Wapper機制實現的。最終可以在調用或者重試時,每次都通過Dubbo內部的負載均衡機制選出多個Invoker中的一個進行調用
四、總結
到這里Invoker的實現就可以是說完了,總結一下,在服務提供方Invoker是javassist創建的服務類的實例,可以實現調用服務類內部的方法和修改字段。而在服務消費方的Invoker是基於Netty的客戶端。最終通過服務消費方Netty客戶端獲得服務提供方創建的服務類實例。而后消費方為保護服務類就需要為其創建代理類,這樣就可以在不實例化服務類情況下安全有效的遠程調用服務類內部方法並且得到具體數據了。