org.apache.dubbo 2.7.7 服務發布注冊源碼


org.apache.dubbo 服務注冊原理源碼分析:

  本文主要針對 dubbo-spring-boot-starter   2.7.7版本, 對應的 org.apache.dubbo 2.7.7 版本的源碼。

  本文主要從以下幾個點來分析:

  1. 前置知識點--Dubbo的SPI機制。
  2. 服務發布注冊的入口。
  3. 服務發布源碼分析
  4. 服務注冊源碼分析。

Dubbo的SPI機制:

  沒接觸過 Dubbo SPI 的小伙伴可以參考我之前寫的  關於 Dubbo SPI的相關博文。雖然版本又差異,但是 SPI機制是一樣的。這里簡單做一下描述。

  關於 Dubbo 中提供的拓展點,可以參考官方文檔的說明:http://dubbo.apache.org/zh-cn/docs/dev/impls/load-balance.html

  擴展點的特征:在類級別標准`@SPI(RandomLoadBalance.NAME)`.其中,括號內的數據,表示當前擴展點的默認擴展點。另一個是@Adaptive

  • @SPI 表示當前這個接口是一個擴展點,可以實現自己的擴展實現,默認的擴展點是DubboProtocol。
  • @Adaptive  表示一個自適應擴展點,在方法級別上,會動態生成一個適配器類

  例如:

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

  在 Dubbo 中,拓展點分為以下三類:

  1. 指定名稱的擴展點:ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("name")。
  2. 自適應擴展點:ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()。
  3. 激活擴展點:ExtensionLoader.getExtensionLoader(Protocol.class).getActiveExtension。

  自定義負載均衡拓展點 :

  在 Dubbo 中,想要拓展拓展點,只需要以下幾個步驟

1.創建拓展點實現類 (以LoadBalance為例):

public class WuzzLoadBalance extends AbstractLoadBalance {
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        return null;
    }
}

2.在指定文件夾下創建以拓展點全路徑名(org.apache.dubbo.rpc.cluster.LoadBalance)的文件,Dubbo 中有多個目錄都可以配置拓展點,這里用 resource/META-INF/dubbo/

wuzzLoadBalance=com.wuzz.demo.loadbalance.WuzzLoadBalance

3.搞個測試類進行測試:

  可以發現我們已經可以拿到我們自己的實現類了。那么他具體是怎么實現的呢?讓我們繼續往下看

Dubbo的拓展點源碼:

  接下去我們來看看三種拓展點的具體實現:

指定名稱的擴展點:以 ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("wuzzLoadBalance") 為例

  先來看前半段 : ExtensionLoader#getExtensionLoader

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
  //....省略判斷邏輯
      
  // 從緩存中獲取該 loader
  ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
  if (loader == null) {
    // 如果從緩存中獲取不到,則new 一個,並且保存起來
    EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
    // 然后 get
    loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
  }
  return loader;
}

  該方法需要一個Class類型的參數,該參數表示希望加載的擴展點類型,該參數必須是接口,且該接口必須被@SPI注解注釋,否則拒絕處理。檢查通過之后首先會檢查ExtensionLoader緩存中是否已經存在該擴展對應的ExtensionLoader,如果有則直接返回,否則創建一個新的ExtensionLoader負責加載該擴展實現,同時將其緩存起來。可以看到對於每一個擴展,dubbo中只會有一個對應的ExtensionLoader實例。進入到構造方法:

private ExtensionLoader(Class<?> type) {
  this.type = type;
  // 判斷類型,很顯然 這里會走后面
  objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

  咱們姑且先當這個ExtensionLoader實例 已經存在緩存中,那么我們直接進入到 getExtension("wuzzLoadBalance")  這段代碼流程中

public T getExtension(String name) {
        if (StringUtils.isEmpty(name)) {
            throw new IllegalArgumentException("Extension name == null");
        }
        if ("true".equals(name)) {//如果name=true,表示返回一個默認的擴展點
            return getDefaultExtension();
        }
        final Holder<Object> holder = getOrCreateHolder(name);//緩存一下,如果實例已經加載過了,直接從緩存讀取
        Object instance = holder.get();
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    instance = createExtension(name);//根據名稱創建實例
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
}

  createExtension:仍然是根據名稱創建擴展,getExtensionClasses() 加載指定路徑下的所有文件

private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            initExtension(instance);
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
}

  這個方法內主要做了以下三件事

  1. 加載指定路徑下的文件內容,保存到集合中
  2. 會對存在依賴注入的擴展點進行依賴注入
  3. 會對存在Wrapper類的擴展點,實現擴展點的包裝

  先來看文件內容的加載流程:

private Map<String, Class<?>> getExtensionClasses() {
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    // 真正加載類的方法
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
}

  ExtensionLoader#loadExtensionClasses:

private Map<String, Class<?>> loadExtensionClasses() {
        cacheDefaultExtensionName();

        Map<String, Class<?>> extensionClasses = new HashMap<>();
     // 這里循環加載,
        for (LoadingStrategy strategy : strategies) {
       // 這里調用兩次,可以從下面的參數中得知可能是為了做兼容
            loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
            loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
        }

        return extensionClasses;
}

  我們可以斷點看看這個strategies :

  這里對應的三個實現實質上是分別對應的三個拓展點配置目錄:

  1. META-INF/dubbo/internal/
  2. META-INF/dubbo/
  3. META-INF/services/

  接下去具體的加載細節就不去深挖了,我們只要知道,這里通過這 三個路徑去把我們的拓展點加載出來並且緩存起來:

  這才使得我們 getExtension("wuzzLoadBalance") 能拿到我們自己的實現.

  我們還需要關注的就是拓展點的包裝 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)),我們直接斷點看一下:

  2.7.8 源碼在此處有些許差別,但是最終也是如此包裝。

  這里以 Protocol 為例,發現 cachedWrapperClasses 里面有3個 wrapper類,且返回的 instance 並不是一個 DubboProtocol 這么簡單,經過了層層包裝。那么為什么呢?我們來看一下 Protocol 拓展點文件:

  這里我們可以得出結論,在加載拓展點指定文件的時候,具有Wrapper 實現的時候,會將Wrapper 緩存到  cachedWrapperClasses 集合中,且會將這些拓展點進行包裝。

自適應擴展點:

  什么叫自適應擴展點呢?我們先演示一個例子,在下面這個例子中,我們傳入一個Protocol接口,它會返回一個AdaptiveProtocol。這個就叫自適應。

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

  我們可以看到 Protocol這個類的 export方法上面有一個注解@Adaptive。 這個就是一個自適應擴展點的標識。它可以修飾在類上,也可以修飾在方法上面。這兩者有什么區別呢? 簡單來說,放在類上,說明當前類是一個確定的自適應擴展點的類。如果是放在方法級別,那么需要生成一個動態字節碼,來進行轉發。 拿Protocol這個接口來說,它里面定義了export和refer兩個抽象方法,這兩個方法分別帶有@Adaptive的標識,標識是一個自適應方法。 我們知道Protocol是一個通信協議的接口,具體有多種實現,那么這個時候選擇哪一種呢? 取決於我們在使用dubbo的時候所配置的協議名稱。而這里的方法層面的Adaptive就決定了當前這個方法會采用何種協議來發布服務。

  我們直接進入  ExtensionLoader#getAdaptiveExtension 獲取自適應拓展點的源碼流程:

public T getAdaptiveExtension() {
     // 又是緩存中獲取
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError != null) {
                throw new IllegalStateException("Failed to create adaptive instance: " +
                        createAdaptiveInstanceError.toString(),
                        createAdaptiveInstanceError);
            }
       // 雙重檢查鎖
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }
        return (T) instance;
}

  這部分邏輯沒有特殊的地方,無非就是緩存+雙重檢查。然后進入創建自適應拓展點的代碼 : ExtensionLoader#createAdaptiveExtension,

  創建自適應拓展點:ExtensionLoader#createAdaptiveExtension,這個方法中做兩個事情

  1. 獲得一個自適應擴展點實例
  2. 實現依賴注入
private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
}

  然后進入 ExtensionLoader#getAdaptiveExtensionClass :

private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

  getExtensionClasses()這個方法在前面講過了,會加載當前傳入的類型的所有擴展點,保存在一個hashmap中 這里有一個判斷邏輯,如果 cachedApdaptiveClas!=null ,直接返回這個cachedAdaptiveClass,這個cachedAdaptiveClass是一個什么?

  cachedAdaptiveClass是在 加載解析/META-INF/dubbo下的擴展點的時候加載進來的。在加載完之后如果這個類有@Adaptive標識,則會賦值賦值而給cachedAdaptiveClass

  createAdaptiveExtensionClass:動態生成字節碼,然后進行動態加載。那么這個時候鎖返回的class,如果加載的是Protocol.class,應該是Protocol$Adaptive 這個cachedDefaultName實際上就是擴展點接口的@SPI注解對應的名字,如果此時加載的是Protocol.class,那么cachedDefaultName=dubbo

private Class<?> createAdaptiveExtensionClass() {
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
}

  例如根據  ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 生成的自適應拓展點就是:

package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    public void destroy()  {
        throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public int getDefaultPort()  {
        throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
    public java.util.List getServers()  {
        throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

關於objectFactory:

  在injectExtension這個方法中,我們發現入口出的代碼首先判斷了objectFactory這個對象是否為空。這個是在哪里初始化的呢?實際上我們在獲得ExtensionLoader的時候,就對objectFactory進行了初始化。

private ExtensionLoader(Class<?> type) {
  this.type = type;
  objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

  然后通過ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()去獲得一個自適應的擴展點,進入ExtensionFactory這個接口中,可以看到它是一個擴展點,並且有一個自己實現的自適應擴展點AdaptiveExtensionFactory; 注意:@Adaptive加載到類上表示這是一個自定義的適配器類,表示我們再調用getAdaptiveExtension方法的時候,不需要走上面這么復雜的過程。會直接加載到AdaptiveExtensionFactory。然后在getAdaptiveExtensionClass()方法處有判斷,就是上文提到的 cachedAdaptiveClass。

@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {

    private final List<ExtensionFactory> factories;

    public AdaptiveExtensionFactory() {
        ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        for (String name : loader.getSupportedExtensions()) {
            list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    @Override
    public <T> T getExtension(Class<T> type, String name) {
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }
}

  我們可以看到除了自定義的自適應適配器類以外,還有兩個實現類,一個是SPI,一個是Spring,AdaptiveExtensionFactory輪詢這2個,從一個中獲取到就返回。

激活擴展點:

  自動激活擴展點,有點類似 springboot 的時候用到的 conditional,根據條件進行自動激活。但是這里設計的初衷是,對於一個類會加載多個擴展點的實現,這個時候可以通過自動激活擴展點進行動態加載, 從而簡化配置我們的配置工作

  @Activate提供了一些配置來允許我們配置加載條件,比如group過濾,比如key過濾。舉個例子,我們可以看看org.apache.dubbo.Filter這個類,它有非常多的實現,比如說CacheFilter,這個緩存過濾器,配置信息如下group表示客戶端和和服務端都會加載,value表示url中有cache_key的時候

@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)
public class CacheFilter implements Filter {
}

  通過下面這段代碼,演示關於Filter的自動激活擴展點的效果。沒有添加“紅色部分的代碼”時,list的結果是10,添加之后list的結果是11. 會自動把cacheFilter加載進來

ExtensionLoader<Filter> loader = ExtensionLoader.getExtensionLoader(Filter.class);
URL url = new URL("", "", 0);
url = url.addParameter("cache", "cache");
List<Filter> filters = loader.getActivateExtension(url, "cache");
System.out.println(filters.size());

服務發布注冊的入口:

@DubboComponentScan:

  在我們使用 Dubbo 構建服務的時候,我們通常需要配置一個 Dubbo Service 的掃描路徑。那么這個注解應該是比較關鍵的。我們進入到這個注解的源碼來開始揭開Dubbo的神秘面紗。

@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(DubboComponentScanRegistrar.class) public @interface DubboComponentScan { //.......
}

  我們看到了熟悉的東西:@Import(DubboComponentScanRegistrar.class) ,跟進去我們發現該類 實現了  ImportBeanDefinitionRegistrar 接口,該接口提供了類的注冊的回調。也就是說DubboComponentScanRegistrar 最后會調用 registerBeanDefinitions 方法:

@Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {   // 獲取到元數據中配置的掃描路徑,可以是多個,所以這里是集合
  Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);   // 注冊指定的bean
  registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);   // 注冊通用的bean   // @since 2.7.6 Register the common beans
  registerCommonBeans(registry); }

  DubboComponentScanRegistrar#getPackagesToScan 這個方法中就是獲取 DubboComponentScan 配置的參數,進行組裝返回。

  主要關注 DubboComponentScanRegistrar#registerServiceAnnotationBeanPostProcessor 方法:

private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {      // 構建一個rootBeanDefinition
        BeanDefinitionBuilder builder = rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);      // 將前面組裝的掃描路徑作為一個屬性放到 ServiceAnnotationBeanPostProcessor 中
 builder.addConstructorArgValue(packagesToScan); builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();    //注冊該Bean,毋庸置疑,這個Bean 就是 ServiceAnnotationBeanPostProcessor
 BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry); }

  可以看到,ServiceAnnotationBeanPostProcessor 被標記了過時,后續可能會有點變化。我們先來看一下 ServiceAnnotationBeanPostProcessor 的類圖  :

  從類圖可以看出,在該Bean初始化前后,會調用好幾個回調方法,其中 BeanDefinitionRegistryPostProcessor 就是Bean 注冊后會調用一個 postProcessBeanDefinitionRegistry 方法,該方法在其父類中:

@Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {      // 注冊一個監聽器,這個是很關鍵的,等等需要去看這個類 // @since 2.7.5
        registerBeans(registry, DubboBootstrapApplicationListener.class);      // 獲取到那個掃描路徑
        Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan); if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {        // 進行掃描 DubboService 進行注入
 registerServiceBeans(resolvedPackagesToScan, registry); } else { if (logger.isWarnEnabled()) { logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!"); } } }

    然后我們重點看 ServiceClassPostProcessor#registerServiceBeans

private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {      // 注冊一個掃描器
        DubboClassPathBeanDefinitionScanner scanner =
                new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);      // Bean 名字解析相關
        BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry); scanner.setBeanNameGenerator(beanNameGenerator);      // 通過注解過濾 // refactor @since 2.7.7
        serviceAnnotationTypes.forEach(annotationType -> { scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType)); });      // 循環遍歷我們配置的掃描路徑
        for (String packageToScan : packagesToScan) {        // 掃描 // Registers @Service Bean first
 scanner.scan(packageToScan);        // 拼裝 // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
            Set<BeanDefinitionHolder> beanDefinitionHolders = findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator); if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {           // 遍歷拼裝好的 BeanDefinitionHolder
                for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {             //注冊Bean
 registerServiceBean(beanDefinitionHolder, registry, scanner); } //.......
            } else {             // .......
 } } }

  來看一下注解過濾中的serviceAnnotationTypes ,其實一目了然,DubboService 是新版的修改,避免與 Spring的 Service注解重名,org.apache.dubbo.config.annotation.Service 是兼容老版本,com.alibaba.dubbo.config.annotation.Service 也是為了兼容。

private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList( // @since 2.7.7 Add the @DubboService , the issue : https://github.com/apache/dubbo/issues/6007
            DubboService.class, // @since 2.7.0 the substitute @com.alibaba.dubbo.config.annotation.Service
            Service.class, // @since 2.7.3 Add the compatibility for legacy Dubbo's @Service , the issue : https://github.com/apache/dubbo/issues/4330
            com.alibaba.dubbo.config.annotation.Service.class );

  然后我們進入主線邏輯 ServiceClassPostProcessor#registerServiceBean

private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry, DubboClassPathBeanDefinitionScanner scanner) {      // 獲取到需要注冊的Dubbo Service 的 bean class
        Class<?> beanClass = resolveClass(beanDefinitionHolder);      // 獲取都 Dubbo Service 的 注解元數據
        Annotation service = findServiceAnnotation(beanClass); /** * The {@link AnnotationAttributes} of @Service annotation       * 獲取到我們注解上面配置的參數信息 */ AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);      // 獲取該實現的接口
        Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);      // 獲取實現類 類名
        String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();      // 該方法主要是構建了一個ServiceBean
        AbstractBeanDefinition serviceBeanDefinition = buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName); // ServiceBean Bean name     // 獲取類名,比如這里是 ServiceBean:com.wuzz.demo.api.HelloService 
        String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass); if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean        // 然后調用注冊方法 registry.registerBeanDefinition(beanName, serviceBeanDefinition);           // ......
        } else {           //.......
 } }

  源碼跟到這里,我們應該知道,這里注冊了一個 ServiceBean ,所以跟進這個類的構造,但是發現什么都沒做,但是這個時候我們需要想起來,之前 ServiceClassPostProcessor#postProcessBeanDefinitionRegistry 方法內初始化了一個監聽器 DubboBootstrapApplicationListener,我們看一下該監聽器監聽了什么:

@Override public void onApplicationContextEvent(ApplicationContextEvent event) { if (event instanceof ContextRefreshedEvent) { onContextRefreshedEvent((ContextRefreshedEvent) event); } else if (event instanceof ContextClosedEvent) { onContextClosedEvent((ContextClosedEvent) event); } }

  從這個代碼可以看出,這個監聽器必然執行,在 Spring 上下文刷新完畢的時候走 DubboBootstrapApplicationListener#onContextRefreshedEvent

private void onContextRefreshedEvent(ContextRefreshedEvent event) { dubboBootstrap.start(); }

  終於看到了曙光,原來 Dubbo 的初始化入口在這里。附上這個流程的流程圖:

服務發布源碼分析:

  通過上面的分析,我們知道了服務得發布入口在 DubboBootstrap#start:

public DubboBootstrap start() {      // 原子操作,避免並發問題
        if (started.compareAndSet(false, true)) { ready.set(false); initialize();//初始化
            if (logger.isInfoEnabled()) { logger.info(NAME + " is starting..."); } // 1. export Dubbo Services
            exportServices(); // 發布服務 // Not only provider register
            if (!isOnlyRegisterProvider() || hasExportedServices()) { // 2. export MetadataService
                exportMetadataService(); // 發布元數據服務 //3. Register the local ServiceInstance if required
                registerServiceInstance(); // 注冊服務實例
 }        // 客戶端相關的操作
 referServices(); if (asyncExportingFutures.size() > 0) { new Thread(() -> { try { this.awaitFinish(); } catch (Exception e) { logger.warn(NAME + " exportAsync occurred an exception."); } ready.set(true); if (logger.isInfoEnabled()) { logger.info(NAME + " is ready."); } }).start(); } else { ready.set(true); if (logger.isInfoEnabled()) { logger.info(NAME + " is ready."); } } if (logger.isInfoEnabled()) { logger.info(NAME + " has started."); } } return this; }

  其中 initialize 方法,就是初始化服務發布的相關配置信息:

private void initialize() { if (!initialized.compareAndSet(false, true)) { return; } // 初始化拓展外部化配置
 ApplicationModel.initFrameworkExts(); // 如果配置了中心配置,如 dubbo-admin,則進行初始化
 startConfigCenter(); // 如果有必要,注冊到中心配置
 useRegistryAsConfigCenterIfNecessary(); // 加載遠程配置
 loadRemoteConfigs(); // 檢查全局配置
 checkGlobalConfigs(); // 初始化元數據服務
 initMetadataService(); // 初始化事件監聽器
 initEventListener(); if (logger.isInfoEnabled()) { logger.info(NAME + " has been initialized!"); } }

  目前該初始化流程不影響我們繼續看服務的發布流程,所以我們這里直接進入 DubboBootstrap#exportServices

private void exportServices() {      // 遍歷我們需要發布的服務實現類,進行發布
        configManager.getServices().forEach(sc -> { // TODO, compatible with ServiceConfig.export()        // 這里就是之前將我們需要發布的 DubboService 包裝成 ServiceBean        // 而ServiceBean 是 ServiceConfig 的子類
            ServiceConfig serviceConfig = (ServiceConfig) sc; serviceConfig.setBootstrap(this);        // 異步發布?
            if (exportAsync) {//調用線程池+Futrue 發布
                ExecutorService executor = executorRepository.getServiceExporterExecutor(); Future<?> future = executor.submit(() -> { sc.export(); exportedServices.add(sc); }); asyncExportingFutures.add(future); } else {// 同步發布
 sc.export(); exportedServices.add(sc);// 發布完添加到發布服務的集合中
 } }); }

  無論同步/異步 發布,均會走到 ServiceConfig#export 方法中:

public synchronized void export() {      // 是否需要發布
        if (!shouldExport()) { return; }      // 檢查 bootstrap是否初始化
        if (bootstrap == null) { bootstrap = DubboBootstrap.getInstance(); bootstrap.init(); }      // 檢查相關配置
 checkAndUpdateSubConfigs();      // 初始化元數據 //init serviceMetadata
 serviceMetadata.setVersion(version); serviceMetadata.setGroup(group); serviceMetadata.setDefaultGroup(group); serviceMetadata.setServiceType(getInterfaceClass()); serviceMetadata.setServiceInterfaceName(getInterface()); serviceMetadata.setTarget(getRef());      // 是否延遲發布
        if (shouldDelay()) {// 構建一個定時任務
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else {        // 直接發布
 doExport(); } exported(); }

  然后進入 ServiceConfig#doExport 這里面沒有什么特殊邏輯,轉到 ServiceConfig#doExportUrls

private void doExportUrls() {      // 獲取服務倉庫,其實就是一個緩存
        ServiceRepository repository = ApplicationModel.getServiceRepository();      // 添加
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());      // 緩存 provider
 repository.registerProvider( getUniqueServiceName(), ref, serviceDescriptor, this, serviceMetadata );      // 獲取配置的注冊中心列表
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);      // 遍歷協議
        for (ProtocolConfig protocolConfig : protocols) { String pathKey = URL.buildKey(getContextPath(protocolConfig) .map(p -> p + "/" + path) .orElse(path), group, version); // In case user specified path, register service one more time to map it to path.
 repository.registerService(pathKey, interfaceClass); // TODO, uncomment this line once service key is unified
 serviceMetadata.setServiceKey(pathKey);        // 通過注冊中心發布服務
 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }

  進入 ServiceConfig#doExportUrlsFor1Protocol ,這里代碼很長,不過我們要是知道他主要做了什么看起來就輕松了,本質上做了以下幾件事

  • 生成url
  • 根據url中配置的協議類型,調用指定協議進行服務的發布
  • 啟動服務
  • 注冊服務
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); // 獲取協議名稱
        if (StringUtils.isEmpty(name)) { name = DUBBO; //默認為dubbo
 }      //准備MAP。用域拼接URL
        Map<String, String> map = new HashMap<String, String>(); map.put(SIDE_KEY, PROVIDER_SIDE); ServiceConfig.appendRuntimeParameters(map); AbstractConfig.appendParameters(map, getMetrics()); AbstractConfig.appendParameters(map, getApplication()); AbstractConfig.appendParameters(map, getModule()); // remove 'default.' prefix for configs from ProviderConfig // appendParameters(map, provider, Constants.DEFAULT_KEY);
 AbstractConfig.appendParameters(map, provider); AbstractConfig.appendParameters(map, protocolConfig); AbstractConfig.appendParameters(map, this); MetadataReportConfig metadataReportConfig = getMetadataReportConfig(); if (metadataReportConfig != null && metadataReportConfig.isValid()) { map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE); } if (CollectionUtils.isNotEmpty(getMethods())) { for (MethodConfig method : getMethods()) { AbstractConfig.appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } List<ArgumentConfig> arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { for (ArgumentConfig argument : arguments) { // convert argument type
                        if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); // visit all methods
                            if (methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // target the method, and get its signature
                                    if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); // one callback in the method
                                        if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { AbstractConfig.appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); } } } } // end of methods for
 }      // 以上代碼都是為了組裝 URL    // 是否泛化接口

        if (ProtocolUtils.isGeneric(generic)) { map.put(GENERIC_KEY, generic); map.put(METHODS_KEY, ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } /** * Here the token value configured by the provider is used to assign the value to ServiceConfig#token */
     // token 校驗

        if(ConfigUtils.isEmpty(token) && provider != null) { token = provider.getToken(); } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } //init serviceMetadata attachments
 serviceMetadata.getAttachments().putAll(map);      // 主機綁定 // export service
        String host = findConfigedHosts(protocolConfig, registryURLs, map); Integer port = findConfigedPorts(protocolConfig, name, map); // 獲取端口。默認20880      // 組裝URL
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);      // 獲取拓展點 // You can customize Configurator to append extra parameters
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(SCOPE_KEY); // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {         // 如果scope!=remote, 則先本地暴露服務 // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); }        // 如果scope!=remote, 則先本地暴露服務 // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (CollectionUtils.isNotEmpty(registryURLs)) { for (URL registryURL : registryURLs) { //if protocol is only injvm ,not register                // //如果設置的protocol是injvm,跳過

                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { if (url.getParameter(REGISTER_KEY, true)) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } else { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } }                // // 是否采用自定義的動態代理機制,默認是javassist // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); }                //獲得一個自適應擴展點,這個時候返回的Invoker是一個動態代理類。
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); } } else { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE)); if (metadataService != null) { metadataService.publishServiceDefinition(url); } } } this.urls.add(url); }

  對於上述代碼中的 getMethods 里面的一陣循環是什么意思呢?請看下面代碼:

@DubboService(loadbalance = "random", // 負載均衡
        timeout = 50000, //超時
        cluster = "failsafe", // 服務容錯
        protocol = {"dubbo", "rest"}, //多協議支持
        registry = {"hangzhou", "wenzhou"}, //多注冊中心
        methods = { @Method(name = "sayHello", timeout = -1), @Method(name = "sayHello", timeout = -1, arguments = { @Argument(), @Argument() }) } )

  其實本質上就是解析 @DubboService 的注解配置元數據,然后來到了 主機綁定,也就是 IP的查找方法上 ServiceConfig#findConfigedHosts:

private String findConfigedHosts(ProtocolConfig protocolConfig, List<URL> registryURLs, Map<String, String> map) { boolean anyhost = false;      // 查找環境變量中是否存在啟動參數 [DUBBO_IP_TO_BIND] =服務注冊的ip
        String hostToBind = getValueFromConfig(protocolConfig, DUBBO_IP_TO_BIND); if (hostToBind != null && hostToBind.length() > 0 && isInvalidLocalHost(hostToBind)) { throw new IllegalArgumentException("Specified invalid bind ip from property:" + DUBBO_IP_TO_BIND + ", value:" + hostToBind); } // if bind ip is not found in environment, keep looking up
        if (StringUtils.isEmpty(hostToBind)) {        //讀取配置文件, dubbo.protocols.dubbo.host= 服務注冊的ip
            hostToBind = protocolConfig.getHost(); if (provider != null && StringUtils.isEmpty(hostToBind)) { hostToBind = provider.getHost(); } if (isInvalidLocalHost(hostToBind)) { anyhost = true; try { logger.info("No valid ip found from environment, try to find valid host from DNS.");          // 獲得本機ip地址
                    hostToBind = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { logger.warn(e.getMessage(), e); } if (isInvalidLocalHost(hostToBind)) { if (CollectionUtils.isNotEmpty(registryURLs)) { for (URL registryURL : registryURLs) { if (MULTICAST.equalsIgnoreCase(registryURL.getParameter("registry"))) { // skip multicast registry since we cannot connect to it via Socket
                                continue; } try (Socket socket = new Socket()) { SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort()); socket.connect(addr, 1000);                    //通過Socket去連接注冊中心,從而獲取本機IP
                                hostToBind = socket.getLocalAddress().getHostAddress(); break; } catch (Exception e) { logger.warn(e.getMessage(), e); } } } if (isInvalidLocalHost(hostToBind)) {                //會輪詢本機的網卡,直到找到合適的IP地址
                        hostToBind = getLocalHost(); } } } } map.put(BIND_IP_KEY, hostToBind);      //上面獲取到的ip地址是bindip,如果需要作為服務注冊中心的ip, DUBBO_IP_TO_REGISTRY -dDUBBO_IP_TO_REGISTRY=ip // registry ip is not used for bind ip by default
        String hostToRegistry = getValueFromConfig(protocolConfig, DUBBO_IP_TO_REGISTRY); if (hostToRegistry != null && hostToRegistry.length() > 0 && isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry); } else if (StringUtils.isEmpty(hostToRegistry)) { // bind ip is used as registry ip by default
            hostToRegistry = hostToBind; } map.put(ANYHOST_KEY, String.valueOf(anyhost)); return hostToRegistry; }

  總之就是直到找到一個合法的主機地址為止。然后獲取到端口。將map 配置信息集合、IP、Port 傳入,構造一個 URL

dubbo://192.168.1.1:20880/com.wuzz.demo.api.HelloService?accepts=0&anyhost=true&application=springboot-dubbo&bind.ip=192.168.1.1

&bind.port=20880&cluster=failsafe&connections=0&deprecated=false&dubbo=2.0.2&dynamic=true&executes=0&generic=false

&interface=com.wuzz.demo.api.HelloService&iothreads=5&methods=sayHello&pid=13496&qos.enable=false&queues=0&release=2.7.7&serialization=kryo&side=provider&threadpool=fixed&threads=201&timeout=50000&timestamp=1601354940987

  ServiceConfig#doExportUrlsFor1Protocol 還有很多細節的處理,這里有必要解釋以下的就是這個 invoker 對象了:

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

  其中 PROXY_FACTORY 定義如下:

private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

  對應的接口拓展點默認實現為  javassist ,但是會有一個 StubProxyFactoryWrapper 進行包裝,但是這里不影響,所以進入 JavassistProxyFactory#getInvoker

@Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }

  通過 javassist 生成一個代理類,這里持有了對應我們需要發布的服務類的所有信息。然后將該類進行傳遞,一直到本地服務的發布及服務的注冊。而后消費端通過這里的 wrapper.invokeMethod 進行調用。

  我們可以看一下在我這個環境測試的服務下生成的代理方法的代碼,需要進入 Wrapper.getWrapper 方法斷點獲取:

  我們將 c3 拷貝出來:

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { com.wuzz.demo.api.HelloService w; try { w = ((com.wuzz.demo.api.HelloService) $1); } catch (Throwable e) { throw new IllegalArgumentException(e); } try { if ("sayHello".equals($2) && $3.length == 0) { return ($w) w.sayHello(); } } catch (Throwable e) { throw new java.lang.reflect.InvocationTargetException(e); } throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.wuzz.demo.api.HelloService."); }

  構建好了代理類之后,返回一個AbstractproxyInvoker,並且它實現了doInvoke方法,這個地方似乎看到了dubbo消費者調用過來的時候觸發的影子,因為wrapper.invokeMethod本質上就是觸發上面動態代理類的方法invokeMethod。

  接下來我們來看看服務的遠程發布 

private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);

  這個 PROTOCOL 的實例化,跟我們上面分析SPI之自適應拓展點一摸一樣,所以這里得到的對象是  ProtocolFilterWrapper(QosProtocolWrapper(ProtocolListenerWrapper(DubboProtocol)))。但是需要明白的是,Dubbo 基於URL 驅動,那么這個時候我們需要知道的是URL中攜帶的協議是什么,這樣我們才能夠找到對應的拓展點

  我們發現這里已經被替換成了 registry 協議,那么此刻應該走到 Protocol$Adaptive 的動態適配器類中,而其中最為關鍵的代碼如下:

String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );

  然后通過這個 extName ,通過獲取指定名稱的拓展點,找到對應的實現,那么這里的 registry 對應的就是 org.apache.dubbo.registry.integration.RegistryProtocol,但是Protocol 有包裝類,那么最后的對象應該是  ProtocolFilterWrapper(QosProtocolWrapper(ProtocolListenerWrapper(RegistryProtocol)))

  這里的三個包裝類都會判斷URL是不是 registry 協議,如果是直接進入下個調用鏈,當前場景正是 registry 。最終調用 RegistryProtocol#export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {      // 通過URL里面的 registry 屬性對應的值獲取的注冊地址,配置了zookeeper 則這里就是 zookeeper://192.168.1.101:2181/.....
        URL registryUrl = getRegistryUrl(originInvoker); // url to export locally      // 發布的服務地址,當前情況下是dubbo協議 則這里就是      // dubbo://192.168.1.1:20880/.......
        URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover.      // 修改URL ,這里設置成 provider://192.168.1.1:20880/.......
 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);      // 結合配置相關重寫 URL
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker      // 啟動 Netty 並且發布本地服務。
 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry      // 獲取注冊實例,這里如果配置了zookeeper ,則返回 ZookeeperRegistry
 final Registry registry = getRegistry(originInvoker);    // dubbo://.....
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) {        // 注冊服務,
 register(registryUrl, registeredProviderUrl); } // register stated url on provider model
 registerStatedUrl(registryUrl, registeredProviderUrl, register); // Deprecated! Subscribe to override rules in 2.6.x or before.
     // //設置注冊中心的訂閱
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); notifyExport(exporter); //Ensure that a new exporter instance is returned every time export
     // //保證每次export都返回一個新的exporter實例
return new DestroyableExporter<>(exporter); }

  然后走服務的發布 RegistryProtocol#doLocalExport

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); }

  其中 providerUrl 是dubbo:// 協議開頭的地址URL,正如之前所說,Dubbo基於URL驅動,那么此刻  protocol  是 Protocol$Adaptive,所以此刻 protocol.export(invokerDelegate) 會走 DubboProtocol#export ,需要注意的是,這里會進行包裝 ProtocolFilterWrapper(QosProtocolWrapper(ProtocolListenerWrapper(DubboProtocol)))

  我們直接進入 DubboProtocol#export 

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
     //是否是本地存根事件
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
     //是否配置了參數回調機制
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }

        openServer(url);
        optimizeSerialization(url);

        return exporter;
}

  openServer: 往下看這個過程,進入到openServer(),從名字來看它是用來開啟一個服務。去開啟一個服務,並且放入到緩存中(在同一台機器上(單網卡),同一個端口上僅允許啟動一個服務器實例)

private void openServer(URL url) {   // 獲取 host:port,並將其作為服務器實例的 key,用於標識當前的服務器實例
  String key = url.getAddress();   ////client 也可以暴露一個只有server可以調用的服務
  boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);   if (isServer) {     //是否在serverMap中緩存了
    ExchangeServer server = serverMap.get(key);     if (server == null) {       synchronized (this) {         server = serverMap.get(key);         if (server == null) {           // 創建服務器實例
          serverMap.put(key, createServer(url));        }      }    } else {       // 服務器已創建,則根據 url 中的配置重置服務器
      server.reset(url);    }  } }

  創建服務:createServer

private ProtocolServer createServer(URL url) { //組裝url,在url中添加心跳時間、編解碼參數
    url = URLBuilder.from(url)         // 當服務關閉以后,發送一個只讀的事件,默認是開啟狀態
     .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,      Boolean.TRUE.toString())         // 啟動心跳配置
       .addParameterIfAbsent(Constants.HEARTBEAT_KEY,      String.valueOf(Constants.DEFAULT_HEARTBEAT))        .addParameter(Constants.CODEC_KEY, DubboCodec.NAME)        .build();   String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);   //通過 SPI 檢測是否存在 server 參數所代表的 Transporter 拓展,不存在則拋出異常
  if (str != null && str.length() > 0 &&
    !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {         throw new RpcException("Unsupported server type: " + str + ", url: " +         url);  } //創建ExchangeServer.
  ExchangeServer server;   try {     server = Exchangers.bind(url, requestHandler);  } catch (RemotingException e) {     throw new RpcException("Fail to start server(url: " + url + ") " +     e.getMessage(), e);  }   str = url.getParameter(CLIENT_KEY);   if (str != null && str.length() > 0) {     Set<String> supportedTypes =     ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();     if (!supportedTypes.contains(str)) {       throw new RpcException("Unsupported client type: " + str);    }  }   return new DubboProtocolServer(server); }

  Exchangers.bind :

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } //獲取 Exchanger,默認為 HeaderExchanger。 //調用 HeaderExchanger 的 bind 方法創建 ExchangeServer 實例
  url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); } public static Exchanger getExchanger(URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } // 拓展點,默認為 header
public static Exchanger getExchanger(String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }

  然后根據拓展點進入 HeaderExchanger#bind

  • new DecodeHandler(new HeaderExchangeHandler(handler))
  • Transporters.bind :發布服務
  • new HeaderExchangeServer:服務端消費的調用鏈

  目前我們只需要關心transporters.bind方法即可

@Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }

  進入 Transporters#bind 發布遠程服務

public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); } // @SPI("netty") 默認為最新的 netty4 實現
public static Transporter getTransporter() {   return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }

  走到是最新的netty4版本的 netty進行服務發布:

  進入到 org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind

@Override public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException { return new NettyServer(url, handler); }

  然后創建了一個 NettyServer 實例, 里面有個 doOpen 方法用域開啟服務。接下去就是啟動Netty服務了。想進一步了解Netty 機制的小伙伴可以參考:https://www.cnblogs.com/wuzhenzhao/category/1528244.html

  值得注意的是,這里構造了一個請求處理鏈,Netty接受到客戶端請求的時候會走這個處理鏈:MultiMessageHandler ->HeartbeatHandle ->AllChannelHandler ->DecodeHandler ->HeaderExchangeHandler->ExchangeHandlerAdapter

服務注冊源碼分析: 

  服務在本地發布完成,那么接下去要進入服務的注冊階段,相關代碼在 org.apache.dubbo.registry.integration.RegistryProtocol#export 類中:

// url to registry
final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); // decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { register(registryUrl, registeredProviderUrl); }

  其中 getRegistry 主要是獲取到一個注冊器的實現,代碼如下:

protected Registry getRegistry(final Invoker<?> originInvoker) { // 這個時候 Url為 zookeeper://開頭
    URL registryUrl = getRegistryUrl(originInvoker); // 所以這里 RegistryFactory$Adapter 獲取到的應該為 zookeeper的實現
    return registryFactory.getRegistry(registryUrl); }

  然后這里應該進入 ZookeeperRegistryFactory#getRegistry ,但是 RegistryFactory 拓展點存在包裝類 RegistryFactoryWrapper ,所以這里先走 RegistryFactoryWrapper#getRegistry ,然后走  ZookeeperRegistryFactory#getRegistry 。由於本類未實現,則進入父類 AbstractRegistryFactory#getRegistry ,然后調用 ZookeeperRegistryFactory#createRegistry,返回一個 ListenerRegistryWrapper(ZookeeperRegistry)

  然后進入服務注冊  RegistryProtocol#register

private void register(URL registryUrl, URL registeredProviderUrl) { //zookeeper://192.168.1.101:2181/........
    Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registeredProviderUrl); }

  這里跟上面一樣的邏輯,然后一定要走 ZookeeperRegistry#register ,但是本類中也未實現 ,走父類 FailbackRegistry#register

public void register(URL url) { if (!acceptable(url)) { logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); return; }      // 調用父類注冊,緩存添加
 super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Sending a registration request to the server side      // 注冊
 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException;      // 是否啟動檢查
            if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); }        // 失敗重試 // Record a failed registration request to a failed list, retry regularly
 addFailedRegistered(url); } }

  然后進入 ZookeeperRegistry#doRegister

@Override public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }

  有經驗的開發人員看到這個就不用解釋了。服務到此注冊完畢,ZK 服務端即出現服務注冊相關的信息。最后附上服務發布、注冊的主要流程圖:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM