Dubbo(五):深入理解Dubbo核心模型Invoke


一、Dubbo中Invoker介紹

  為什么說Invoker是Dubbo核心模型呢?

  Invoker是Dubbo中的實體域,也就是真實存在的。其他模型都向它靠攏或轉換成它,它也就代表一個可執行體,可向它發起invoke調用。在服務提供方,Invoker用於調用服務提供類。在服務消費方,Invoker用於執行遠程調用。

二、服務提供方的Invoker

  在服務提供方中的Invoker是由ProxyFactory創建而來的,Dubbo默認的ProxyFactory實現類為JavassistProxyFactory。

  創建Invoker的入口方法getInvoker:

 1 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
 2     // 為目標類創建 Wrapper
 3     final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type);
 4     // 創建匿名 Invoker 類對象,並實現 doInvoke 方法。
 5     return new AbstractProxyInvoker<T>(proxy, type, url) {
 6         @Override
 7         protected Object doInvoke(T proxy, String methodName,
 8                                   Class<?>[] parameterTypes,
 9                                   Object[] arguments) throws Throwable {
10             // 調用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調用目標方法
11             return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
12         }
13     };
14 }

    JavassistProxyFactory創建了一個繼承自AbstractProxyInvoker類的匿名對象,並覆寫了抽象方法doInvoke。覆寫后的doInvoke 邏輯比較簡單,僅是將調用請求轉發給了Wrapper類的invokeMethod 方法。以及生成 invokeMethod 方法代碼和其他一些方法代碼。代碼生成完畢后,通過 Javassist 生成 Class 對象,最后再通過反射創建 Wrapper 實例。

    注:Wapper是一個包裝類。主要用於“包裹”目標類,僅可以通過getWapper(Class)方法創建子類。在創建子類過程中,子類代碼會對傳進來的Class對象進行解析,拿到類方法,類成員變量等信息。而這個包裝類持有實際的擴展點實現類。也可以把擴展點的公共邏輯全部移到包裝類中,功能上就是作為AOP實現。

  創建包裝類的構造方法:

 1  public static Wrapper getWrapper(Class<?> c) {    
 2     while (ClassGenerator.isDynamicClass(c))
 3         c = c.getSuperclass();
 4 
 5     if (c == Object.class)
 6         return OBJECT_WRAPPER;
 7 
 8     // 從緩存中獲取 Wrapper 實例
 9     Wrapper ret = WRAPPER_MAP.get(c);
10     if (ret == null) {
11         // 緩存未命中,創建 Wrapper
12         ret = makeWrapper(c);
13         // 寫入緩存
14         WRAPPER_MAP.put(c, ret);
15     }
16     return ret;
17 }

    在緩存中獲取不到Wapper就會進入下面的方法makeWapper:

  1 private static Wrapper makeWrapper(Class<?> c) {
  2     // 檢測 c 是否為基本類型,若是則拋出異常
  3     if (c.isPrimitive())
  4         throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
  5 
  6     String name = c.getName();
  7     ClassLoader cl = ClassHelper.getClassLoader(c);
  8 
  9     // c1 用於存儲 setPropertyValue 方法代碼
 10     StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
 11     // c2 用於存儲 getPropertyValue 方法代碼
 12     StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
 13     // c3 用於存儲 invokeMethod 方法代碼
 14     StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
 15 
 16     // 生成類型轉換代碼及異常捕捉代碼,比如:
 17     //   DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }
 18     c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
 19     c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
 20     c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
 21 
 22     // pts 用於存儲成員變量名和類型
 23     Map<String, Class<?>> pts = new HashMap<String, Class<?>>();
 24     // ms 用於存儲方法描述信息(可理解為方法簽名)及 Method 實例
 25     Map<String, Method> ms = new LinkedHashMap<String, Method>();
 26     // mns 為方法名列表
 27     List<String> mns = new ArrayList<String>();
 28     // dmns 用於存儲“定義在當前類中的方法”的名稱
 29     List<String> dmns = new ArrayList<String>();
 30 
 31     // --------------------------------✨ 分割線1 ✨-------------------------------------
 32 
 33     // 獲取 public 訪問級別的字段,並為所有字段生成條件判斷語句
 34     for (Field f : c.getFields()) {
 35         String fn = f.getName();
 36         Class<?> ft = f.getType();
 37         if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()))
 38             // 忽略關鍵字 static 或 transient 修飾的變量
 39             continue;
 40 
 41         // 生成條件判斷及賦值語句,比如:
 42         // if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
 43         // if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
 44         c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
 45 
 46         // 生成條件判斷及返回語句,比如:
 47         // if( $2.equals("name") ) { return ($w)w.name; }
 48         c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
 49 
 50         // 存儲 <字段名, 字段類型> 鍵值對到 pts 中
 51         pts.put(fn, ft);
 52     }
 53 
 54     // --------------------------------✨ 分割線2 ✨-------------------------------------
 55 
 56     Method[] methods = c.getMethods();
 57     // 檢測 c 中是否包含在當前類中聲明的方法
 58     boolean hasMethod = hasMethods(methods);
 59     if (hasMethod) {
 60         c3.append(" try{");
 61     }
 62     for (Method m : methods) {
 63         if (m.getDeclaringClass() == Object.class)
 64             // 忽略 Object 中定義的方法
 65             continue;
 66 
 67         String mn = m.getName();
 68         // 生成方法名判斷語句,比如:
 69         // if ( "sayHello".equals( $2 )
 70         c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
 71         int len = m.getParameterTypes().length;
 72         // 生成“運行時傳入的參數數量與方法參數列表長度”判斷語句,比如:
 73         // && $3.length == 2
 74         c3.append(" && ").append(" $3.length == ").append(len);
 75 
 76         boolean override = false;
 77         for (Method m2 : methods) {
 78             // 檢測方法是否存在重載情況,條件為:方法對象不同 && 方法名相同
 79             if (m != m2 && m.getName().equals(m2.getName())) {
 80                 override = true;
 81                 break;
 82             }
 83         }
 84         // 對重載方法進行處理,考慮下面的方法:
 85         //    1. void sayHello(Integer, String)
 86         //    2. void sayHello(Integer, Integer)
 87         // 方法名相同,參數列表長度也相同,因此不能僅通過這兩項判斷兩個方法是否相等。
 88         // 需要進一步判斷方法的參數類型
 89         if (override) {
 90             if (len > 0) {
 91                 for (int l = 0; l < len; l++) {
 92                     // 生成參數類型進行檢測代碼,比如:
 93                     // && $3[0].getName().equals("java.lang.Integer") 
 94                     //    && $3[1].getName().equals("java.lang.String")
 95                     c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
 96                             .append(m.getParameterTypes()[l].getName()).append("\")");
 97                 }
 98             }
 99         }
100 
101         // 添加 ) {,完成方法判斷語句,此時生成的代碼可能如下(已格式化):
102         // if ("sayHello".equals($2) 
103         //     && $3.length == 2
104         //     && $3[0].getName().equals("java.lang.Integer") 
105         //     && $3[1].getName().equals("java.lang.String")) {
106         c3.append(" ) { ");
107 
108         // 根據返回值類型生成目標方法調用語句
109         if (m.getReturnType() == Void.TYPE)
110             // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;
111             c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
112         else
113             // return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
114             c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
115 
116         // 添加 }, 生成的代碼形如(已格式化):
117         // if ("sayHello".equals($2) 
118         //     && $3.length == 2
119         //     && $3[0].getName().equals("java.lang.Integer") 
120         //     && $3[1].getName().equals("java.lang.String")) {
121         //
122         //     w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); 
123         //     return null;
124         // }
125         c3.append(" }");
126 
127         // 添加方法名到 mns 集合中
128         mns.add(mn);
129         // 檢測當前方法是否在 c 中被聲明的
130         if (m.getDeclaringClass() == c)
131             // 若是,則將當前方法名添加到 dmns 中
132             dmns.add(mn);
133         ms.put(ReflectUtils.getDesc(m), m);
134     }
135     if (hasMethod) {
136         // 添加異常捕捉語句
137         c3.append(" } catch(Throwable e) { ");
138         c3.append("     throw new java.lang.reflect.InvocationTargetException(e); ");
139         c3.append(" }");
140     }
141 
142     // 添加 NoSuchMethodException 異常拋出代碼
143     c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
144 
145     // --------------------------------✨ 分割線3 ✨-------------------------------------
146 
147     Matcher matcher;
148     // 處理 get/set 方法
149     for (Map.Entry<String, Method> entry : ms.entrySet()) {
150         String md = entry.getKey();
151         Method method = (Method) entry.getValue();
152         // 匹配以 get 開頭的方法
153         if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
154             // 獲取屬性名
155             String pn = propertyName(matcher.group(1));
156             // 生成屬性判斷以及返回語句,示例如下:
157             // if( $2.equals("name") ) { return ($w).w.getName(); }
158             c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
159             pts.put(pn, method.getReturnType());
160 
161         // 匹配以 is/has/can 開頭的方法
162         } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
163             String pn = propertyName(matcher.group(1));
164             // 生成屬性判斷以及返回語句,示例如下:
165             // if( $2.equals("dream") ) { return ($w).w.hasDream(); }
166             c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
167             pts.put(pn, method.getReturnType());
168 
169         // 匹配以 set 開頭的方法
170         } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
171             Class<?> pt = method.getParameterTypes()[0];
172             String pn = propertyName(matcher.group(1));
173             // 生成屬性判斷以及 setter 調用語句,示例如下:
174             // if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
175             c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
176             pts.put(pn, pt);
177         }
178     }
179 
180     // 添加 NoSuchPropertyException 異常拋出代碼
181     c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");
182     c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");
183 
184     // --------------------------------✨ 分割線4 ✨-------------------------------------
185 
186     long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
187     // 創建類生成器
188     ClassGenerator cc = ClassGenerator.newInstance(cl);
189     // 設置類名及超類
190     cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
191     cc.setSuperClass(Wrapper.class);
192 
193     // 添加默認構造方法
194     cc.addDefaultConstructor();
195 
196     // 添加字段
197     cc.addField("public static String[] pns;");
198     cc.addField("public static " + Map.class.getName() + " pts;");
199     cc.addField("public static String[] mns;");
200     cc.addField("public static String[] dmns;");
201     for (int i = 0, len = ms.size(); i < len; i++)
202         cc.addField("public static Class[] mts" + i + ";");
203 
204     // 添加方法代碼
205     cc.addMethod("public String[] getPropertyNames(){ return pns; }");
206     cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
207     cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
208     cc.addMethod("public String[] getMethodNames(){ return mns; }");
209     cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
210     cc.addMethod(c1.toString());
211     cc.addMethod(c2.toString());
212     cc.addMethod(c3.toString());
213 
214     try {
215         // 生成類
216         Class<?> wc = cc.toClass();
217         
218         // 設置字段值
219         wc.getField("pts").set(null, pts);
220         wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
221         wc.getField("mns").set(null, mns.toArray(new String[0]));
222         wc.getField("dmns").set(null, dmns.toArray(new String[0]));
223         int ix = 0;
224         for (Method m : ms.values())
225             wc.getField("mts" + ix++).set(null, m.getParameterTypes());
226 
227         // 創建 Wrapper 實例
228         return (Wrapper) wc.newInstance();
229     } catch (RuntimeException e) {
230         throw e;
231     } catch (Throwable e) {
232         throw new RuntimeException(e.getMessage(), e);
233     } finally {
234         cc.release();
235         ms.clear();
236         mns.clear();
237         dmns.clear();
238     }
239 }
View Code

    代碼較長,同樣注釋也很多。大致說一下里面邏輯:

    • 創建c1,c2,c3三個字符串,用於存儲類型轉換代碼和異常捕捉代碼,而后pts用於存儲成員變量名和類型,ms用於存儲方法描述信息(可理解為方法簽名)及Method實例,mns為方法名列表,dmns用於存儲“定義在當前類中的方法”的名稱。在這里做完了一些初始工作
    • 獲取所有public字段,用c1存儲條件判斷及賦值語句,可以理解為通過c1能夠為public字段賦值,而c2是條件判斷及返回語句,同樣的是得到public字段的值。再用pts存儲<字段名,字段類型>。也就是現在能對目標類字段進行操作了,而要操作一些私有字段,是要訪問set開頭和get開頭的方法,同樣這些方法也都對應使用c1存set,c2存get,pts存儲<屬性名,屬性類型>
    • 現在到類中的方法,先檢查方法中的參數,然后再檢查是否有重載的方法。通過c3存儲調用目標方法的語句以及方法中可能會拋出的異常,而后用mns集合進行存儲方法名,對已經聲明的方法存到ms中,未聲明但是定義了的方法存在dmns中。
    • 通過ClassGenerator為剛剛生成的代碼構建Class類,並通過反射創建對象。ClassGenerator是Dubbo自己封裝的,該類的核心是toClass()的重載方法 toClass(ClassLoader, ProtectionDomain),該方法通過javassist構建Class。   

    最后在創建完成Wapper類,回到上面的getInvoker方法然后通過下面這條語句

 1 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
 2 
 3 //進入到invokeMethod中
 4 
 5 public Object invokeMethod(Object instance, String mn, Class<?>[] types, Object[] args) throws NoSuchMethodException {
 6         if ("getClass".equals(mn)) {
 7             return instance.getClass();
 8         } else if ("hashCode".equals(mn)) {
 9             return instance.hashCode();
10         } else if ("toString".equals(mn)) {
11             return instance.toString();
12         } else if ("equals".equals(mn)) {
13             if (args.length == 1) {
14                 return instance.equals(args[0]);
15             } else {
16                 throw new IllegalArgumentException("Invoke method [" + mn + "] argument number error.");
17             }
18         } else {
19             throw new NoSuchMethodException("Method [" + mn + "] not found.");
20         }
21     }
22 };

    到這里Invoker就能實現調用服務提供類的方法了。也就是服務提供類的Invoker實體域創建完成。底層是通過javassist來構建對象的。

三、服務消費方的Invoker

  在服務消費方,Invoker用於執行遠程調用。Invoker是由 Protocol實現類構建而來。Protocol實現類有很多但是最常用的兩個,分別是RegistryProtocol和DubboProtocol。

  DubboProtocol的refer方法:

1 public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
2     optimizeSerialization(url);
3     // 創建 DubboInvoker
4     DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
5     invokers.add(invoker);
6     return invoker;
7 }

  上述方法較為簡單,最重要的一個在於getClients。這個方法用於獲取客戶端實例,實例類型為ExchangeClient。ExchangeClient實際上並不具備通信能力,它需要基於更底層的客戶端實例進行通信。比如NettyClient、MinaClient等,默認情況下,Dubbo使用NettyClient進行通信。每次創建好的Invoker都會添加到invokers這個集合里。也就是可以認為服務消費方的Invoker是一個具有通信能力的Netty客戶端

  getClients方法:

  1 private ExchangeClient[] getClients(URL url) {
  2     // 是否共享連接
  3     boolean service_share_connect = false;
  4       // 獲取連接數,默認為0,表示未配置
  5     int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
  6     // 如果未配置 connections,則共享連接
  7     if (connections == 0) {
  8         service_share_connect = true;
  9         connections = 1;
 10     }
 11 
 12     ExchangeClient[] clients = new ExchangeClient[connections];
 13     for (int i = 0; i < clients.length; i++) {
 14         if (service_share_connect) {
 15             // 獲取共享客戶端
 16             clients[i] = getSharedClient(url);
 17         } else {
 18             // 初始化新的客戶端
 19             clients[i] = initClient(url);
 20         }
 21     }
 22     return clients;
 23 }
 24 
 25 //進入到獲取共享客戶端方法
 26 
 27 private ExchangeClient getSharedClient(URL url) {
 28     String key = url.getAddress();
 29     // 獲取帶有“引用計數”功能的 ExchangeClient
 30     ReferenceCountExchangeClient client = referenceClientMap.get(key);
 31     if (client != null) {
 32         if (!client.isClosed()) {
 33             // 增加引用計數
 34             client.incrementAndGetCount();
 35             return client;
 36         } else {
 37             referenceClientMap.remove(key);
 38         }
 39     }
 40 
 41     locks.putIfAbsent(key, new Object());
 42     synchronized (locks.get(key)) {
 43         if (referenceClientMap.containsKey(key)) {
 44             return referenceClientMap.get(key);
 45         }
 46 
 47         // 創建 ExchangeClient 客戶端
 48         ExchangeClient exchangeClient = initClient(url);
 49         // 將 ExchangeClient 實例傳給 ReferenceCountExchangeClient,這里使用了裝飾模式
 50         client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
 51         referenceClientMap.put(key, client);
 52         ghostClientMap.remove(key);
 53         locks.remove(key);
 54         return client;
 55     }
 56 }
 57 
 58 //進入到初始化客戶端方法
 59 
 60 private ExchangeClient initClient(URL url) {
 61 
 62     // 獲取客戶端類型,默認為 netty
 63     String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
 64 
 65     // 添加編解碼和心跳包參數到 url 中
 66     url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
 67     url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
 68 
 69     // 檢測客戶端類型是否存在,不存在則拋出異常
 70     if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
 71         throw new RpcException("Unsupported client type: ...");
 72     }
 73 
 74     ExchangeClient client;
 75     try {
 76         // 獲取 lazy 配置,並根據配置值決定創建的客戶端類型
 77         if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
 78             // 創建懶加載 ExchangeClient 實例
 79             client = new LazyConnectExchangeClient(url, requestHandler);
 80         } else {
 81             // 創建普通 ExchangeClient 實例
 82             client = Exchangers.connect(url, requestHandler);
 83         }
 84     } catch (RemotingException e) {
 85         throw new RpcException("Fail to create remoting client for service...");
 86     }
 87     return client;
 88 }
 89 
 90 //進入到connect方法中,getExchanger 會通過 SPI 加載 HeaderExchangeClient 實例,這個方法比較簡單,大家自己看一下吧。接下來分析 HeaderExchangeClient 的實現。
 91 
 92 public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
 93     if (url == null) {
 94         throw new IllegalArgumentException("url == null");
 95     }
 96     if (handler == null) {
 97         throw new IllegalArgumentException("handler == null");
 98     }
 99     url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
100     // 獲取 Exchanger 實例,默認為 HeaderExchangeClient
101     return getExchanger(url).connect(url, handler);
102 }
103 
104 //創建HeaderExchangeClient實例 
105 
106 public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
107     // 這里包含了多個調用,分別如下:
108     // 1. 創建 HeaderExchangeHandler 對象
109     // 2. 創建 DecodeHandler 對象
110     // 3. 通過 Transporters 構建 Client 實例
111     // 4. 創建 HeaderExchangeClient 對象
112     return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
113 }
114 
115 //通過 Transporters 構建 Client 實例
116 
117 public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
118     if (url == null) {
119         throw new IllegalArgumentException("url == null");
120     }
121     ChannelHandler handler;
122     if (handlers == null || handlers.length == 0) {
123         handler = new ChannelHandlerAdapter();
124     } else if (handlers.length == 1) {
125         handler = handlers[0];
126     } else {
127         // 如果 handler 數量大於1,則創建一個 ChannelHandler 分發器
128         handler = new ChannelHandlerDispatcher(handlers);
129     }
130     
131     // 獲取 Transporter 自適應拓展類,並調用 connect 方法生成 Client 實例
132     return getTransporter().connect(url, handler);
133 }
134 
135 //創建Netty對象
136 
137 public Client connect(URL url, ChannelHandler listener) throws RemotingException {
138     // 創建 NettyClient 對象
139     return new NettyClient(url, listener);
140 }
View Code

  上面的源碼大概分一下幾個邏輯:

    • 通過refer方法進入DubboInvoker實例的創建,在這個實例中其實serviceType,url,以及invokers都已經是不用去關心的,invokers可以說是存儲以及創建好的Invoker。而最關鍵的在於getClient方法。可以這么認為,現在的Invoker是一個Netty客戶端。而在服務提供方的Invoker是一個Wapper類。
    • 在getClient方法里面首先根據connections數量決定是獲取共享客戶端還是創建新的客戶端實例,默認情況下是獲取共享客戶端,但是獲取共享客戶端中若緩存中拿不到對應客戶端也會新建一個客戶端。最終返回的是ExchangeClient,而當前的ExchangeClient也沒有通信能力,需要更加底層的Netty客戶端。
    • initClient方法首先獲取用戶配置的客戶端類型,默認為Netty,然后檢測用戶配置的客戶端類型是否存在,不存在就要拋出異常,最后根據lazy配置覺得創建什么類型的客戶端。LazyConnectExchangeClient代碼並不是很復雜,該類會在request方法被調用時通過Exchangers的connect方法創建 ExchangeClient客戶端
    • getExchanger會通過SPI加載HeaderExchangeClient實例。最后通過Transporter實現類以及調用Netty的API來創建Netty客戶端。最后層層返回,就最后成為了底層為Netty上層為DubboInvoker實例的這樣一個類。

  RegistryProtocol中的refer:

  1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  2     // 取 registry 參數值,並將其設置為協議頭
  3     url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
  4     // 獲取注冊中心實例
  5     Registry registry = registryFactory.getRegistry(url);
  6     if (RegistryService.class.equals(type)) {
  7         return proxyFactory.getInvoker((T) registry, type, url);
  8     }
  9 
 10     // 將 url 查詢字符串轉為 Map
 11     Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
 12     // 獲取 group 配置
 13     String group = qs.get(Constants.GROUP_KEY);
 14     if (group != null && group.length() > 0) {
 15         if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
 16                 || "*".equals(group)) {
 17             // 通過 SPI 加載 MergeableCluster 實例,並調用 doRefer 繼續執行服務引用邏輯
 18             return doRefer(getMergeableCluster(), registry, type, url);
 19         }
 20     }
 21     
 22     // 調用 doRefer 繼續執行服務引用邏輯
 23     return doRefer(cluster, registry, type, url);
 24 }
 25 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
 26     // 創建 RegistryDirectory 實例
 27     RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
 28     // 設置注冊中心和協議
 29     directory.setRegistry(registry);
 30     directory.setProtocol(protocol);
 31     Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
 32     // 生成服務消費者鏈接
 33     URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
 34 
 35     // 注冊服務消費者,在 consumers 目錄下新節點
 36     if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
 37             && url.getParameter(Constants.REGISTER_KEY, true)) {
 38         registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
 39                 Constants.CHECK_KEY, String.valueOf(false)));
 40     }
 41 
 42     // 訂閱 providers、configurators、routers 等節點數據
 43     directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
 44             Constants.PROVIDERS_CATEGORY
 45                     + "," + Constants.CONFIGURATORS_CATEGORY
 46                     + "," + Constants.ROUTERS_CATEGORY));
 47 
 48     // 一個注冊中心可能有多個服務提供者,因此這里需要將多個服務提供者合並為一個
 49     Invoker invoker = cluster.join(directory);
 50     ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
 51     return invoker;
 52 }
 53 
 54 //進入到集群創建Invoker模式
 55 
 56 @SPI(FailoverCluster.NAME)
 57 public interface Cluster {
 58 
 59     /**
 60      * 合並其中Directory的Invoker為一個Invoker
 61      */
 62     @Adaptive
 63     <T> Invoker<T> join(Directory<T> directory) throws RpcException;
 64 }
 65 
 66 //進入到MockerClusterWrapper實現類中
 67 
 68 public class MockClusterWrapper implements Cluster {
 69 
 70     private Cluster cluster;
 71 
 72     public MockClusterWrapper(Cluster cluster) {
 73         this.cluster = cluster;
 74     }
 75 
 76     public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
 77         return new MockClusterInvoker<T>(directory, this.cluster.join(directory));
 78     }
 79 }
 80 
 81 //具體的invoke方法
 82 
 83 public Result invoke(Invocation invocation) throws RpcException {
 84     Result result = null;
 85     
 86     String value = directory.getUrl().getMethodParameter(invocation.getMethodName(),
 87                              Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
 88     if (value.length() == 0 || value.equalsIgnoreCase("false")){
 89         //no mock
 90         result = this.invoker.invoke(invocation);
 91     } else if (value.startsWith("force")) {
 92         if (logger.isWarnEnabled()) {
 93             logger.info("force-mock: " + invocation.getMethodName() + 
 94                         " force-mock enabled , url : " +  directory.getUrl());
 95         }
 96         //force:direct mock
 97         result = doMockInvoke(invocation, null);
 98     } else {
 99         //fail-mock
100         try {
101             result = this.invoker.invoke(invocation);
102         }catch (RpcException e) {
103             if (e.isBiz()) {
104                 throw e;
105             } else {
106                 if (logger.isWarnEnabled()) {
107                     logger.info("fail-mock: " + invocation.getMethodName() + 
108                             " fail-mock enabled , url : " +  directory.getUrl(), e);
109                 }
110                 //fail:mock
111                 result = doMockInvoke(invocation, e);
112             }
113         }
114     }
115     return result;
116 }
View Code

  大致說一下上面的邏輯:

    • 當前的Invoker底層依然是NettyClient,但是此時注冊中心是集群搭建模式。所以需要將多個Invoker合並為一個,這里是邏輯合並的。實際上Invoker底層還是會有多個,只是通過一個集群模式來管理。所以暴露出來的就是一個集群模式的Invoker。於是進入Cluster.join方法。
    • Cluster是一個通用代理類,會根據URL中的cluster參數值定位到實際的Cluster實現類也就是FailoverCluster。這里用到了@SPI注解,也就是需要ExtensionLoader擴展點加載機制,而該機制在實例化對象是,會在實例化后自動套上Wapper
    • 但是是集群模式所以需要Dubbo中另外一個核心機制——Mock。Mock可以在測試中模擬服務調用的各種異常情況,還可以實現服務降級。在MockerClusterInvoker中,Dubbo先檢查URL中是否存在mock參數。(這個參數可以通過服務治理后台Consumer端的屏蔽和容錯進行設置或者直接動態設置mock參數值)如果存在force開頭,這不發起遠程調用直接執行降級邏輯。如果存在fail開頭,則在遠程調用異常時才會執行降級邏輯。
    • 可以說注冊中心為集群模式時,Invoker就會外面多包裹一層mock邏輯。是通過Wapper機制實現的。最終可以在調用或者重試時,每次都通過Dubbo內部的負載均衡機制選出多個Invoker中的一個進行調用

四、總結

  到這里Invoker的實現就可以是說完了,總結一下,在服務提供方Invoker是javassist創建的服務類的實例,可以實現調用服務類內部的方法和修改字段。而在服務消費方的Invoker是基於Netty的客戶端。最終通過服務消費方Netty客戶端獲得服務提供方創建的服務類實例。而后消費方為保護服務類就需要為其創建代理類,這樣就可以在不實例化服務類情況下安全有效的遠程調用服務類內部方法並且得到具體數據了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM