Dubbo死磕之擴展點加載ExetnsionLoader


dubbo的SPI機制與JDK的SPI機制對比     

       dubbo一款阿里一款開源的RPC框架,他本身是一款非常復雜的系統,我們主要針對里邊的一些核心點來展開分析,其中duboo里的一種核心機制叫SPI( Service Provider Interface)服務發現機制,他是基於原生jdk的SPI機制演化而來。在分析duboo的ExtensionLoader之前,我們先大致了解一下標准JDK的SPI機制。一個最經典的JDK的SPI機制,就是java數據庫驅動JDBC,其實JDK自帶的jdbc 驅動並沒有實現對數據庫的封裝,而是開放了一系列的接口供各大廠商調用。
了解完了JDK的SPI基本功能之后,來分析一下dubbo的ExtensionLoader,以下是摘自dubbo的官網。 http://dubbo.apache.org/#/docs/dev/SPI.md?lang=zh-cn
Dubbo 改進了 JDK 標准的 SPI 的以下問題:
  • JDK 標准的 SPI 會一次性實例化擴展點所有實現,如果有擴展實現初始化很耗時,但如果沒用上也加載,會很浪費資源。
  • 如果擴展點加載失敗,連擴展點的名稱都拿不到了。比如:JDK 標准的 ScriptEngine,通過 getName()獲取腳本類型的名稱,但如果 RubyScriptEngine 因為所依賴的 jruby.jar 不存在,導致 RubyScriptEngine 類加載失敗,這個失敗原因被吃掉了,和 ruby 對應不起來,當用戶執行 ruby 腳本時,會報不支持 ruby,而不是真正失敗的原因。
  • 增加了對擴展點 IoC 和 AOP 的支持,一個擴展點可以直接 setter 注入其它擴展點。
在分析dubbo的擴展點加載機制ExtensionLoader加載機制之前,先給大家看一下duboo的層級構成(源自dubbo官網)
  •        config,配置層,對外配置接口,以ServiceConfig, ReferenceConfig為中心,可以直接new配置類,也可以通過spring解析配置生成配置類
  • proxy,服務代理層,服務接口透明代理,生成服務的客戶端Stub和服務器端Skeleton,以ServiceProxy為中心,擴展接口為ProxyFactory
  • registry,注冊中心層,封裝服務地址的注冊與發現,以服務URL為中心,擴展接口為RegistryFactory, Registry, RegistryService
  • cluster,路由層,封裝多個提供者的路由及負載均衡,並橋接注冊中心,以Invoker為中心,擴展接口為Cluster, Directory, Router, LoadBalance
  • monitor,監控層,RPC調用次數和調用時間監控,以Statistics為中心,擴展接口為MonitorFactory, Monitor, MonitorService
  • protocol,遠程調用層,封將RPC調用,以Invocation, Result為中心,擴展接口為Protocol, Invoker, Exporter
  • exchange,信息交換層,封裝請求響應模式,同步轉異步,以Request, Response為中心,擴展接口為Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
  • transport,網絡傳輸層,抽象mina和netty為統一接口,以Message為中心,擴展接口為Channel, Transporter, Client, Server, Codec
  •         serialize,數據序列化層,可復用的一些工具,擴展接口為Serialization, ObjectInput, ObjectOutput, ThreadPool
我們可以看到,dubbo的源碼每個層級,不只引用了一個擴展方式,而是提供了多種擴展方式,如果我們把dubbo的源碼層級整合成一個整體來看,把他當作一個平面,那么他上面每個層級的擴展實現方式就好比一個槽,這些槽可以供用戶自主選擇實現合適的擴展方式。
好了,現在我們話不多說了,開始分析duboo啟動的時候加載ExtensionLoader,以這種方式來分析ExtensionLoader加載機制。

getExtensionLoader

首先,ExtensionLoader是dubbo的一個類,dubbo器的時候首先會加載ExtensionLoader里面的靜態方法getExtensionLoader。
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
//首次進來,初始化的type是默認的擴展點interface com.alibaba.dubbo.container.Container
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if(!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
if(!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
//根據這個type去緩存中獲取loader
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
//這里會去new一個ExtensionLoader
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
這里主要是做了一些初始化的合法性校驗操作
判斷擴展type是否為null,是就拋出異常
  1. 判斷擴展type是否為接口,不是接口拋出異常
  2. 判斷擴展type是對應的接口是否帶有SPI注解,沒有拋出異常
  3. 從緩存中去根據擴展type獲取ExtensionLoader,如果沒有獲取到,會先new一個ExtensionLoader,然后加他push到緩存,否則直接返回
進行完合法性校驗操作完之后,會從緩存中去獲取擴展機制,沒有會new一個擴展機制,然后會到ExtensionLoader的私有構造器中。
private ExtensionLoader(Class<?> type) {
//首次進來這個type等於interface com.alibaba.dubbo.container.Container,所以會執行后面的ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()
//其中ExtensionLoader.getExtensionLoader(ExtensionFactory.class)通過,
//再次進來的時候type變為了interface com.alibaba.dubbo.common.extension.ExtensionFactory,就會直接返回
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
代碼邏輯,
  1. 首次進來type為interface com.alibaba.dubbo.container.Container,與ExtensionLoader.class不相等,所以會執行后邊的邏輯,再次執行.getExtensionLoader方法,將其加入緩存當中
  2. 調用getAdaptiveExtension方法,獲得自適應擴展點,將獲取到的實例復制給objectFactory。

getAdaptiveExtension

然后進入getAdaptiveExtension函數,分析該方法是如何獲取自適應擴展點的。
@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
     //從緩存中獲取實例
     Object instance = cachedAdaptiveInstance.get();
     //雙重檢查鎖,和單點登陸里的雙重檢查鎖類似
     if (instance == null) {
         if(createAdaptiveInstanceError == null) {
             synchronized (cachedAdaptiveInstance) {
                 instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                       try {
                           //創建自適應擴展點,獲得的實例復制給instance
                           instance = createAdaptiveExtension();
                          //將回去的自適應擴展點實例加入緩存
                          cachedAdaptiveInstance.set(instance);
                           } catch (Throwable t) {
                                  createAdaptiveInstanceError = t;
                                  throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                           }
                      }
                    }
                }
      else {
              throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
     }
}
return (T) instance;
}
處理邏輯
  1. 從緩存中獲取實例
  2. 因為該實例的單例模式,采用了雙重檢查鎖模式,對該實例做了兩次null的判斷
  3. 調用createAdaptiveExtension方法創建自適應擴展點,將獲得的實例復制給instance,並加入緩存

createAdaptiveExtension

第3點里的createAdaptiveExtension的代碼如下
@SuppressWarnings("unchecked")
private T createAdaptiveExtension() {
     try {
          return injectExtension((T) getAdaptiveExtensionClass().newInstance());
      } catch (Exception e) {
            throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
      }
}
處理邏輯
  1. 首先調用getAdaptiveExtensionClass方法,然后獲取實例
  2. 然后將獲取的實例最為參數傳入injectExtension方法,將結果返回

getAdaptiveExtensionClass

第1點的getAdaptiveExtensionClass方法代碼如下
private Class<?> getAdaptiveExtensionClass() {
       getExtensionClasses();
      if (cachedAdaptiveClass != null) {
          return cachedAdaptiveClass;
       }
       return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
根據代碼可以發現,我們先不管getExtensionClasses方法里做了什么事情,先看主線,cachedAdaptiveClass不為null的話就直接返回,為null的話就先創建再返回。

createAdaptiveExtensionClass

我們先分析createAdaptiveExtensionClass方法,然后再去分析getExtensionClasses,createAdaptiveExtensionClass方法的代碼如下。
//基於動態代理生成一個動態的字節碼文件
private Class<?> createAdaptiveExtensionClass() {
    //生成字節碼代碼
    String code = createAdaptiveExtensionClassCode();
    //獲得類加載器
    ClassLoader classLoader = findClassLoader();
    //再次創建一個自適應擴展點
  com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    //動態編譯字節碼
   return compiler.compile(code, classLoader);
}

createAdaptiveExtensionClassCode

該方法主要是通過動態代理,基於javassist方式實現,生成二進制字節碼代碼文本,然后獲得類加載器,再次獲取一個compiler自適應擴展點,生成一個編譯器,然后動態編譯字節碼文本,生成動態的類。現在主要分析一下createAdaptiveExtensionClassCode是如何生成字節碼代碼的,通過dubug模式,獲取到code的文本,整理一下的代碼如下
 1 package com.alibaba.dubbo.rpc;
 2  
 3 import com.alibaba.dubbo.common.extension.ExtensionLoader; 
 4 public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol { 
 5 public void destroy() { 
 6 throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.
 7 Protocol.destroy() of interfacecom.alibaba.dubbo.rpc.Protocol is not adaptive method!");
 8 }
 9  
10 public int getDefaultPort() { 
11 throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.
12 rpc.Protocol.getDefaultPort()of interface com.alibaba.dubbo.rpc.Protocol is not adaptive
13 method!");
14 } 
15  
16 public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.
17 alibaba.dubbo.rpc.Invoker { 
18 if (arg0 == null) 
19 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
20 if (arg0.getUrl() =null) 
21 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
22 com.alibaba.dubbo.common.URL url = arg0.getUrl();
23 String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); 
24 if(extName == null)
25 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol)
26 name from url(" + url.toString() + ") use keys([protocol])");
27 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.
28 getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
29 return extension.export(arg0);
30 } 
31  
32 public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws 
33 java.lang.Class { 
34 if (arg1 == null) 
35 throw new IllegalArgumentException("url == null");
36 com.alibaba.dubbo.common.URL url = arg1;
37 String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); 
38 if(extName == null) 
39 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol)
40 name from url(" + url.toString() + ") use keys([protocol])");
41 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.
42 getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
43 return extension.refer(arg0, arg1);
44 }
45 }
通過代碼可以發現,這里默認的RPC方案就是dubbo的自身方案,包括服務的暴露以及引用
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); 

getExtensionClasses

到這里,一個擴展點的加載到此結束,我們再回過頭來分析getExtensionClasses方法具體做了什么事情。
//加載擴展點 的實現類
private Map<String, Class<?>> getExtensionClasses() {
        Map<String, Class<?>> classes = cachedClasses.get();
       if (classes == null) {
          synchronized (cachedClasses) {
             classes = cachedClasses.get();
            if (classes == null) {
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
       }
}
return classes;
}

loadExtensionClasses

這里又用到了雙重檢查鎖機制,兩次判斷classes是否為null,然后再調用loadExtensionClasses加載擴展點實現類
 1 // 此方法已經getExtensionClasses方法同步過。
 2 private Map<String, Class<?>> loadExtensionClasses() {
 3     final SPI defaultAnnotation = type.getAnnotation(SPI.class);
 4     if(defaultAnnotation != null) {
 5         String value = defaultAnnotation.value();
 6     if(value != null && (value = value.trim()).length() > 0) {
 7        String[] names = NAME_SEPARATOR.split(value);
 8     if(names.length > 1) {
 9        throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
10        + ": " + Arrays.toString(names));
11      }
12     if(names.length == 1) cachedDefaultName = names[0];
13     }
14 }
15 Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
16 loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
17 loadFile(extensionClasses, DUBBO_DIRECTORY);
18 loadFile(extensionClasses, SERVICES_DIRECTORY);
19 return extensionClasses;
20 }

loadFile

loadFile方法根據DUBBO_INTERNAL_DIRECTORY、DUBBO_DIRECTORY、SERVICES_DIRECTORY的路勁去尋找各自的配置文件, 類似於jdk的SPI中放在META-INF/services目錄下的文件。loadFiles方法的代碼如下,這個方法代碼很長
  1   private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
  2         String fileName = dir + type.getName();
  3         try {
  4             Enumeration<java.net.URL> urls;
  5             //獲取類加載器
  6             ClassLoader classLoader = findClassLoader();
  7             if (classLoader != null) {
  8                 urls = classLoader.getResources(fileName);
  9             } else {
 10                 urls = ClassLoader.getSystemResources(fileName);
 11             }
 12             if (urls != null) {
 13                 while (urls.hasMoreElements()) {
 14                     java.net.URL url = urls.nextElement();
 15                     try {
 16                         BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
 17                         try {
 18                             String line = null;
 19                             while ((line = reader.readLine()) != null) {
 20                                 final int ci = line.indexOf('#');
 21                                 if (ci >= 0) line = line.substring(0, ci);
 22                                 line = line.trim();
 23                                 if (line.length() > 0) {
 24                                     try {
 25                                         String name = null;
 26                                         int i = line.indexOf('=');
 27                                         if (i > 0) {
 28                                             name = line.substring(0, i).trim();
 29                                             line = line.substring(i + 1).trim();
 30                                         }
 31                                         if (line.length() > 0) {
 32                                             Class<?> clazz = Class.forName(line, true, classLoader);
 33                                             if (! type.isAssignableFrom(clazz)) {
 34                                                 throw new IllegalStateException("Error when load extension class(interface: " +
 35                                                         type + ", class line: " + clazz.getName() + "), class " 
 36                                                         + clazz.getName() + "is not subtype of interface.");
 37                                             }
 38                                             if (clazz.isAnnotationPresent(Adaptive.class)) {
 39                                                 if(cachedAdaptiveClass == null) {
 40                                                     cachedAdaptiveClass = clazz;
 41                                                 } else if (! cachedAdaptiveClass.equals(clazz)) {
 42                                                     throw new IllegalStateException("More than 1 adaptive class found: "
 43                                                             + cachedAdaptiveClass.getClass().getName()
 44                                                             + ", " + clazz.getClass().getName());
 45                                                 }
 46                                             } else {
 47                                                 try {
 48                                                     clazz.getConstructor(type);
 49                                                     Set<Class<?>> wrappers = cachedWrapperClasses;
 50                                                     if (wrappers == null) {
 51                                                         cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
 52                                                         wrappers = cachedWrapperClasses;
 53                                                     }
 54                                                     wrappers.add(clazz);
 55                                                 } catch (NoSuchMethodException e) {
 56                                                     clazz.getConstructor();
 57                                                     if (name == null || name.length() == 0) {
 58                                                         name = findAnnotationName(clazz);
 59                                                         if (name == null || name.length() == 0) {
 60                                                             if (clazz.getSimpleName().length() > type.getSimpleName().length()
 61                                                                     && clazz.getSimpleName().endsWith(type.getSimpleName())) {
 62                                                                 name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
 63                                                             } else {
 64                                                                 throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
 65                                                             }
 66                                                         }
 67                                                     }
 68                                                     String[] names = NAME_SEPARATOR.split(name);
 69                                                     if (names != null && names.length > 0) {
 70                                                         Activate activate = clazz.getAnnotation(Activate.class);
 71                                                         if (activate != null) {
 72                                                             cachedActivates.put(names[0], activate);
 73                                                         }
 74                                                         for (String n : names) {
 75                                                             if (! cachedNames.containsKey(clazz)) {
 76                                                                 cachedNames.put(clazz, n);
 77                                                             }
 78                                                             Class<?> c = extensionClasses.get(n);
 79                                                             if (c == null) {
 80                                                                 extensionClasses.put(n, clazz);
 81                                                             } else if (c != clazz) {
 82                                                                 throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
 83                                                             }
 84                                                         }
 85                                                     }
 86                                                 }
 87                                             }
 88                                         }
 89                                     } catch (Throwable t) {
 90                                         IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
 91                                         exceptions.put(line, e);
 92                                     }
 93                                 }
 94                             } // end of while read lines
 95                         } finally {
 96                             reader.close();
 97                         }
 98                     } catch (Throwable t) {
 99                         logger.error("Exception when load extension class(interface: " +
100                                             type + ", class file: " + url + ") in " + url, t);
101                     }
102                 } // end of while urls
103             }
104         } catch (Throwable t) {
105             logger.error("Exception when load extension class(interface: " +
106                     type + ", description file: " + fileName + ").", t);
107         }
108     }
109    
View Code

injectExtension

該方法沒有什么很好講的,里面就是一些if-else,主要是將擴展點的配置文件加載出來或者放入緩存當中。
到此,我們將目光再往前退一下,我們之前是不是調用了getAdaptiveExtensionClass方法,到此為止,我們的僅僅是調用完成了getAdaptiveExtensionClass方法,然后將getAdaptiveExtensionClass的實例當作參數,傳入injectExtension方法,我們再看一下injectExtension的代碼
 1 private T injectExtension(T instance) {
 2     try {
 3          //getExtensionLoader的時候賦值的
 4         if (objectFactory != null) {
 5             for (Method method : instance.getClass().getMethods()) {
 6                 //判斷是否以set開頭,以set方式動態注入
 7                 if (method.getName().startsWith("set")
 8                     && method.getParameterTypes().length == 1
 9                        && Modifier.isPublic(method.getModifiers())) {
10                          //獲取set方法的參數類型
11                          Class<?> pt = method.getParameterTypes()[0];
12                         try {
13                               String property = method.getName().length() > 3 ? method.getName().substring(3, 4).
14                               toLowerCase() + method.getName().substring(4) : "";
15                              //根據類型名稱獲得對應的擴展點
16                              Object object = objectFactory.getExtension(pt, property);
17                             if (object != null) {
18                                 method.invoke(instance, object);
19                               }
20                          } catch (Exception e) {
21                             logger.error("fail to inject via method " + method.getName()
22                            + " of interface " + type.getName() + ": " + e.getMessage(), e);
23                          }
24                        }
25                }
26           }
27      } catch (Exception e) {
28      logger.error(e.getMessage(), e);
29 }
30 return instance;
31 }

 

這里可以看到,擴展點自動注入的一句就是根據setter方法對應的參數類型和property名稱從ExtensionFactory中查詢,如果有返回擴展點實例,那么就進行注入操作。到這里getAdaptiveExtension方法就分析完畢了。

總結

ExtensionLoader到此為止大概的走了一遍,一步一步的往里走然后再出來。整個過程還是算比較清晰的。
最后粘貼一張我簡單畫的時序圖

 

歡迎掃碼關注我的微信公眾號,我會定期的更新一些個人技術文章

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM