深入解析 Dubbo 3.0 服務端暴露全流程


簡介: 隨着雲原生時代的到來,Dubbo 3.0 的一個很重要的目標就是全面擁抱雲原生。正因如此,Dubbo 3.0 為了能夠更好的適配雲原生,將原來的接口級服務發現機制演進為應用級服務發現機制。

作者介紹

熊聘,Github賬號pinxiong,Apache Dubbo貢獻者,關注RPC、Service Mesh和雲原生等領域。現任職於攜程國際事業部研發團隊,負責市場營銷、雲原生等相關工作。

背景

隨着雲原生時代的到來,Dubbo 3.0 的一個很重要的目標就是全面擁抱雲原生。正因如此,Dubbo 3.0 為了能夠更好的適配雲原生,將原來的接口級服務發現機制演進為應用級服務發現機制。

基於應用級服務發現機制,Dubbo 3.0 能大幅降低框架帶來的額外資源消耗,大幅提升資源利用率,主要體現在:

  • 單機常駐內存下降 75%
  • 能支持的集群實例規模以百萬計的集群
  • 注冊中心總體數據量下降超 90%

目前關於 Dubbo 服務端暴露流程的技術文章很多,但是都是基於 Dubbo 接口級服務發現機制來解讀的。在 Dubbo 3.0 的應用級服務發現機制下,服務端暴露流程與之前有很大的變化,本文希望可以通過 對Dubbo 3.0 源碼理解來解析服務端暴露全流程。

什么是應用級服務發現

簡單來說,以前 Dubbo 是將接口的信息全部注冊到注冊中心,而一個應用實例一般會存在多個接口,這樣一來注冊的數據量就要大很多,而且有冗余。應用級服務發現的機制是同一個應用實例僅在注冊中心注冊一條數據,這種機制主要解決以下幾個問題:

  • 對齊主流微服務模型,如:Spring Cloud
  • 支持 Kubernetes native service,Kubernetes 中維護調度的服務都是基於應用實例級,不支持接口級
  • 減少注冊中心數據存儲能力,降低了地址變更推送的壓力

假設應用 dubbo-application 部署了 3 個實例(instance1, instance2, instance3),並且對外提供了 3 個接口(sayHello, echo, getVersion)分別設置了不同的超時時間。在接口級和應用級服務發現機制下,注冊到注冊中心的數據是截然不同的。如下圖所示:

  • 接口級服務發現機制下注冊中心中的數據
"sayHello": [
  {"application":"dubbo-application","name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}},
  {"application":"dubbo-application","name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}},
  {"application":"dubbo-application","name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}},
],
"echo": [
  {"application":"dubbo-application","name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}},
  {"application":"dubbo-application","name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}},
  {"application":"dubbo-application","name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}},
],
"getVersion": [
  {"application":"dubbo-application","name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}},
  {"application":"dubbo-application","name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}},
  {"application":"dubbo-application","name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}}
]
  • 應用級服務發現機制下注冊中心中的數據
"dubbo-application": [
  {"name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}},
  {"name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}},
  {"name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}}
]

通過對比我們可以發現,采用應用級服務發現機制確實使注冊中心中的數據量減少了很多,那些原有的接口級的數據存儲在元數據中心中。

服務端暴露全流程

引入應用級服務發現機制以后,Dubbo 3.0 服務端暴露全流程和之前有很大的區別。暴露服務端全流程的核心代碼在 DubboBootstrap#doStart 中,具體如下:

private void doStart() {
    // 1. 暴露Dubbo服務
    exportServices();
    // If register consumer instance or has exported services
    if (isRegisterConsumerInstance() || hasExportedServices()) {
        // 2. 暴露元數據服務
        exportMetadataService();
        // 3. 定時更新和上報元數據
        registerServiceInstance();
        ....
    }
    ......
}

假設以 Zookeeper 作為注冊中,對外暴露 Triple 協議的服務為例,服務端暴露全流程時序圖如下:

image.gif

image.png

我們可以看到,整個的暴露流程還是挺復雜的,一共可以分為四個部分:

  • 暴露 injvm 協議的服務
  • 注冊 service-discovery-registry 協議
  • 暴露 Triple 協議的服務並注冊 registry 協議
  • 暴露 MetadataService 服務

下面會分別從這四個部分對服務暴露全流程進行詳細講解。

1、暴露 injvm 協議的服務

injvm 協議的服務是暴露在本地的,主要原因是在一個應用上往往既有 Service(暴露服務)又有 Reference(服務引用)的情況存在,並且 Reference 引用的服務就是在該應用上暴露的 Service。為了支持這種使用場景,Dubbo 提供了 injvm 協議,將 Service 暴露在本地,Reference 就可以不需要走網絡直接在本地調用 Service。

image.png

整體時序圖

由於這部分內容在之前的接口級服務發現機制中是類似的,所以相關的核心代碼就不在這里展開討論了。

2、注冊 service-discovery-registry 協議

注冊 service-discovery-registry 協議的核心目的是為了注冊與服務相關的元數據,默認情況下元數據通過 InMemoryWritableMetadataService 將數據存儲在本地內存和本地文件。

image.png

整體時序圖

核心代碼在 ServiceConfig#exportRemote 中,具體如下:

  • 注冊 service-discovery-registry 協議的入口
private URL exportRemote(URL url, List<URL> registryURLs) {
    if (CollectionUtils.isNotEmpty(registryURLs)) {
        // 如果是多個注冊中心,通過循環對每個注冊中心進行注冊
        for (URL registryURL : registryURLs) {
            // 判斷是否是service-discovery-registry協議
            // 將service-name-mapping參數的值設置為true
            if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
                url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
            }
            ......
            // 注冊service-discovery-registry協議復用服務暴露流程
            doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
        }
    ......
    return url;
}
  • invoker 中包裝 Metadata

核心代碼在 ServiceConfig#doExportUrl 中,具體如下:

private void doExportUrl(URL url, boolean withMetaData) {
    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
    // 此時的withMetaData的值為true
    // 將invoker包裝成DelegateProviderMetaDataInvoker
    if (withMetaData) {
        invoker = new DelegateProviderMetaDataInvoker(invoker, this);
    }
    Exporter<?> exporter = PROTOCOL.export(invoker);
    exporters.add(exporter);
}
  • 通過 RegistryProtocol 將 Invoker 轉化成 Exporter

核心代碼在 ProtocolListenerWrapper#export 中,具體如下:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 此時的protocol為RegistryProtocol類型
    if (UrlUtils.isRegistry(invoker.getUrl())) {
        return protocol.export(invoker);
    }
    ......
}
  • RegistryProtocol 將 Invoker 轉化成 Exporter 的核心流程

核心代碼在 RegistryProtocol#export 中,具體如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    URL providerUrl = getProviderUrl(originInvoker);
    ......
    // 再次暴露Triple協議的服務
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // registryUrl中包含service-discovery-registry協議
    // 通過該協議創建ServiceDiscoveryRegistry對象
    // 然后組合RegistryServiceListener監聽器,
    // 最后包裝成ListenerRegistryWrapper對象
    final Registry registry = getRegistry(registryUrl);
    final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

    boolean register = providerUrl.getParameter(REGISTER_KEY, true);
    if (register) {
        // 注冊service-discovery-registry協議
        // 觸發RegistryServiceListener的onRegister事件
        register(registry, registeredProviderUrl);
    }
    ......
    // 觸發RegistryServiceListener的onRegister事件
    notifyExport(exporter);
    return new DestroyableExporter<>(exporter);
}
  • 暴露 Triple 協議的服務

核心代碼在 RegistryProtocol#doLocalExport 中,具體如下:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);
    // 此時的protocol為Triple協議的代理類
    // 和暴露injvm協議的PROTOCOL相同
    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}
  • 注冊service-discovery-registry協議

核心代碼在 ServiceDiscoveryRegistry#register和ServiceDiscoveryRegistry#doRegister 中,具體如下:

1、ServiceDiscoveryRegistry#register

public final void register(URL url) {
    // 只有服務端(Provider)才需要注冊
    if (!shouldRegister(url)) {
        return;
    }
    // 注冊service-discovery-registry協議
    doRegister(url);
}

2、ServiceDiscoveryRegistry#doRegister

public void doRegister(URL url) {
    url = addRegistryClusterKey(url);
    // 注冊元數據
    if (writableMetadataService.exportURL(url)) {
        if (logger.isInfoEnabled()) {
            logger.info(format("The URL[%s] registered successfully.", url.toString()));
        }
    } else {
        if (logger.isWarnEnabled()) {
            logger.warn(format("The URL[%s] has been registered.", url.toString()));
        }
    }
}
  • 注冊元數據

核心代碼在 InMemoryWritableMetadataService#exportURL 中,具體如下:

public boolean exportURL(URL url) {
    // 如果是MetadataService,則不注冊元數據
    if (MetadataService.class.getName().equals(url.getServiceInterface())) {
        this.metadataServiceURL = url;
        return true;
    }

    updateLock.readLock().lock();
    try {
        String[] clusters = getRegistryCluster(url).split(",");
        for (String cluster : clusters) {
            MetadataInfo metadataInfo = metadataInfos.computeIfAbsent(cluster, k -> new MetadataInfo(ApplicationModel.getName()));
            // 將Triple協議的服務中接口相關的數據生成ServiceInfo
            // 將ServiceInfo注冊到MetadataInfo中
            metadataInfo.addService(new ServiceInfo(url));
        }
        metadataSemaphore.release();
        return addURL(exportedServiceURLs, url);
    } finally {
        updateLock.readLock().unlock();
    }
}
  • 發布 onRegister 事件

核心代碼在 ListenerRegistryWrapper#register 中,具體如下:

public void register(URL url) {
    try {
        // registry為ServiceDiscoveryRegistry對象
        // 此時已經調用完ServiceDiscoveryRegistry#registry方法
        registry.register(url);
    } finally {
        if (CollectionUtils.isNotEmpty(listeners) && !UrlUtils.isConsumer(url)) {
            RuntimeException exception = null;
            for (RegistryServiceListener listener : listeners) {
                if (listener != null) {
                    try {
                        // 注冊完service-discovery-registry協議后發布onRegister事件
                        listener.onRegister(url, registry);
                    } catch (RuntimeException t) {
                        logger.error(t.getMessage(), t);
                        exception = t;
                    }
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }
}
  • 發布服務注冊事件

核心代碼在 RegistryProtocol#notifyExport 中,具體如下:

private <T> void notifyExport(ExporterChangeableWrapper<T> exporter) {
    List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
        .getActivateExtension(exporter.getOriginInvoker().getUrl(), "registry.protocol.listener");
    if (CollectionUtils.isNotEmpty(listeners)) {
        for (RegistryProtocolListener listener : listeners) {
            // 發布RegistryProtocolListener的onExport事件
            listener.onExport(this, exporter);
        }
    }
}

我們可以看出注冊 service-discovery-registry 協議的核心目的是為了將服務的接口相關的信息存儲在內存中。從兼容性和平滑遷移兩方面來考慮,社區在實現的時候采取復用 ServiceConfig 的暴露流程的方式。

3、暴露Triple協議服務並注冊registry協議

暴露 Triple 協議的服務並注冊 registry 協議是 Dubbo 3.0 服務暴露的核心流程,一共分為兩部分:

  • 暴露 Triple 協議的服務
  • 注冊 registry 協議

由於暴露 Triple 協議服務的流程和暴露 Injvm 協議服務的流程是一致的,所以不再贅述。注冊 registry 協議的過程僅僅注冊了應用實例相關的信息,也就是之前提到的應用級服務發現機制。

image.gif

image.png

整體時序圖

  • 通過 InterfaceCompatibleRegistryProtocol 將 Invoker 轉化成 Exporter

核心代碼在 ProtocolListenerWrapper#export 中,具體如下:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 此時的protocol為InterfaceCompatibleRegistryProtocol類型(繼承了RegistryProtocol)
    // 注意:在注冊service-discovery-registry協議的時候protocol為RegistryProtocol類型
    if (UrlUtils.isRegistry(invoker.getUrl())) {
        return protocol.export(invoker);
    }
    ......
}
  • RegistryProtocol 將 Invoker 轉化成 Exporter 的核心流程

核心代碼在 RegistryProtocol#export 中,具體如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    URL providerUrl = getProviderUrl(originInvoker);
    ......
    // 再次暴露Triple協議的服務
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // registryUrl中包含registry協議
    // 通過該協議創建ZookeeperRegistry對象
    // 然后組合RegistryServiceListener監聽器,
    // 最后包裝成ListenerRegistryWrapper對象
    // 注意:
    // 1. service-discovery-registry協議對應的是ServiceDiscoveryRegistry
    // 2. registry協議對應的是ZookeeperRegistry
    final Registry registry = getRegistry(registryUrl);
    final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

    boolean register = providerUrl.getParameter(REGISTER_KEY, true);
    if (register) {
        // 注冊registry協議
        // 觸發RegistryServiceListener的onRegister事件
        register(registry, registeredProviderUrl);
    }
    ......
    // 發布RegistryProtocolListener的onExport事件
    notifyExport(exporter);
    return new DestroyableExporter<>(exporter);
}
  • 注冊 registry 協議

核心代碼在 FailbackRegistry#register 和 ServiceDiscoveryRegistry#doRegister 中(ZookeeperRegistry 繼承 FailbackRegistry)中,具體如下:

1、FailbackRegistry#register

public void register(URL url) {
    if (!acceptable(url)) {
        ......
        try {
            // 注冊registry協議
            doRegister(url);
        } catch (Exception e) {
            ......
        }
    }
}

2、ServiceDiscoveryRegistry#doRegister

public void doRegister(URL url) {
    try {
        // 在zookeeper上注冊Provider
        // 目錄:/dubbo/xxxService/providers/***
        // 數據:dubbo://192.168.31.167:20800/xxxService?anyhost=true&
        //      application=application-name&async=false&deprecated=false&dubbo=2.0.2&
        //      dynamic=true&file.cache=false&generic=false&interface=xxxService&
        //      metadata-type=remote&methods=hello&pid=82470&release=&
        //      service-name-mapping=true&side=provider&timestamp=1629588251493
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
  • 訂閱地址變更

核心代碼在 FailbackRegistry#subscribe 和 ZookeeperRegistry#doSubscribe 中,具體如下:

1、FailbackRegistry#subscribe

public void subscribe(URL url, NotifyListener listener) {
    ......
    try {
        // 調用ZookeeperRegistry#doSubscribe
        doSubscribe(url, listener);
    } catch (Exception e) {
    ......
}

2、ZookeeperRegistry#doSubscribe

public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (ANY_VALUE.equals(url.getServiceInterface())) {
            ......
        } else {
            ......
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
                if (zkListener instanceof RegistryChildListenerImpl) {
                    ((RegistryChildListenerImpl) zkListener).setLatch(latch);
                }
                // 創建臨時節點用來存儲configurators數據
                // 目錄:/dubbo/xxxService/configurators
                // 數據:應用的配置信息,可以在dubbo-admin中進行修改,默認為空
                zkClient.create(path, false);
                // 添加監聽器,用來監聽configurators中的變化
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            ......
        }
    } catch (Throwable e) {
        ......
    }
}
  • 建立暴露的 Triple 協議服務與 Metadata 之間的聯系

核心代碼在 ServiceConfig#exportUrl、MetadataUtils#publishServiceDefinition、InMemoryWritableMetadataService#publishServiceDefinition、RemoteMetadataServiceImpl#publishServiceDefinition 和 MetadataReport#storeProviderMetadata 中,具體如下:

1、ServiceConfig#exportUrl

private void exportUrl(URL url, List<URL> registryURLs) {
    ......
    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
        ......
        if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
            url = exportRemote(url, registryURLs);
            // 發布事件,更新服務接口相關的數據
            MetadataUtils.publishServiceDefinition(url);
        }
    }
    ......
}

2、MetadataUtils#publishServiceDefinition

public static void publishServiceDefinition(URL url) {
    // 將服務接口相關的數據存在到InMemoryWritableMetadataService中
    WritableMetadataService.getDefaultExtension().publishServiceDefinition(url);
    // 將服務接口相關的數據存在到遠端的元數據中心
    if (REMOTE_METADATA_STORAGE_TYPE.equalsIgnoreCase(url.getParameter(METADATA_KEY))) {
        getRemoteMetadataService().publishServiceDefinition(url);
    }
}

3、InMemoryWritableMetadataService#publishServiceDefinition

public void publishServiceDefinition(URL url) {
    try {
        String interfaceName = url.getServiceInterface();
        if (StringUtils.isNotEmpty(interfaceName)
            && !ProtocolUtils.isGeneric(url.getParameter(GENERIC_KEY))) {
            Class interfaceClass = Class.forName(interfaceName);
            ServiceDefinition serviceDefinition = ServiceDefinitionBuilder.build(interfaceClass);
            Gson gson = new Gson();
            String data = gson.toJson(serviceDefinition);
            // 存儲服務接口相關數據
            // 數據格式:
            // {
            //   "canonicalName": "xxxService",
            //   "codeSource": "file:/Users/xxxx",
            //   "methods": [{
            //       "name": "hello",
            //       "parameterTypes": ["java.lang.String"],
            //       "returnType": "java.lang.String",
            //       "annotations": []
            //   }],
            //   "types": [{
            //       "type": "java.lang.String"
            //    }],
            //  "annotations": []
            // } 
            serviceDefinitions.put(url.getServiceKey(), data);
            return;
        } else if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            ......
        }
        ......
    } catch (Throwable e) {
        ......
    }
}

4、RemoteMetadataServiceImpl#publishServiceDefinition

public void publishServiceDefinition(URL url) {
    checkRemoteConfigured();
    String side = url.getSide();
    if (PROVIDER_SIDE.equalsIgnoreCase(side)) {
        // 發布服務端(Provider)的服務接口信息到元數據中心
        publishProvider(url);
    } else {
        ......
    }
}

RemoteMetadataServiceImpl#publishProvider

private void publishProvider(URL providerUrl) throws RpcException {
    ......
    try {
        String interfaceName = providerUrl.getServiceInterface();
        if (StringUtils.isNotEmpty(interfaceName)) {
            ......
            for (Map.Entry<String, MetadataReport> entry : getMetadataReports().entrySet()) {
                // 獲取MetadataReport服務,該服務用來訪問元數據中心
                MetadataReport metadataReport = entry.getValue();
                // 將服務接口信息存儲到元數據中心
                metadataReport.storeProviderMetadata(new MetadataIdentifier(providerUrl.getServiceInterface(),
                    providerUrl.getVersion(), providerUrl.getGroup(),
                    PROVIDER_SIDE, providerUrl.getApplication()), fullServiceDefinition);
            }
            return;
        }
        ......
    } catch (ClassNotFoundException e) {
        ......
    }
}

5、AbstractMetadataReport#storeProviderMetadata

public void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition){
    if (syncReport) {
        storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition);
    } else {
        // 異步存儲到元數據中心
        reportCacheExecutor.execute(() -> storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition));
    }
}

private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
    try {
        ......
        allMetadataReports.put(providerMetadataIdentifier, serviceDefinition);
        failedReports.remove(providerMetadataIdentifier);
        Gson gson = new Gson();
        // data的數據格式:
        // {
        //   "parameters": {
        //       "side": "provider", 
        //       "interface": "xxxService",
        //       "metadata-type": "remote",
        //       "service-name-mapping": "true",
        //   },
        //   "canonicalName": "xxxService",
        //   "codeSource": "file:/Users/xxxx",
        //   "methods": [{
        //       "name": "hello",
        //       "parameterTypes": ["java.lang.String"],
        //       "returnType": "java.lang.String",
        //       "annotations": []
        //   }],
        //   "types": [{
        //       "type": "java.lang.String"
        //    }],
        //  "annotations": []
        // } 
        String data = gson.toJson(serviceDefinition);
        // 存儲到元數據中心,實例中的元數據中心是ZookeeperMetadataReport
        // 目錄:元數據中心Metadata-report的/dubbo/metadata/xxxService/provider/${application-name}節點下
        doStoreProviderMetadata(providerMetadataIdentifier, data);
        // 存儲到本地文件
        // 路徑:xxxService:::provider:${application-name} 
        saveProperties(providerMetadataIdentifier, data, true, !syncReport);
    } catch (Exception e) {
        ......
    }
}
  • 建立 Triple 協議服務與 MetadataReport 服務之間的關系

核心代碼在 ServiceConfig#exported、MetadataServiceNameMapping#map 和 ZookeeperMetadataReport#registerServiceAppMapping 中,具體如下:

1、ServiceConfig#exported

protected void exported() {
    exported = true;
    List<URL> exportedURLs = this.getExportedUrls();
    exportedURLs.forEach(url -> {
        // 判斷URL中是否標記有service-name-mapping的字段
        // 標記有該字段的服務是需要將暴露的服務與元數據中心關聯起來
        // Consumer可以通過元數據中心的消息變更感知到Provider端元數據的變更
        if (url.getParameters().containsKey(SERVICE_NAME_MAPPING_KEY)) {
            ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension();
            // 建立關系
            serviceNameMapping.map(url);
        }
    });
    onExported();
}

2、MetadataServiceNameMapping#map

public void map(URL url) {
    execute(() -> {
        String registryCluster = getRegistryCluster(url);
        // 獲取MetadataReport,也就是元數據中心的訪問路徑
        MetadataReport metadataReport = MetadataReportInstance.getMetadataReport(registryCluster);
        ......
        int currentRetryTimes = 1;
        boolean success;
        String newConfigContent = getName();
        do {
            // 獲取元數據中心中存儲的應用的版本信息
            ConfigItem configItem = metadataReport.getConfigItem(serviceInterface, DEFAULT_MAPPING_GROUP);
            String oldConfigContent = configItem.getContent();
            if (StringUtils.isNotEmpty(oldConfigContent)) {
                boolean contains = StringUtils.isContains(oldConfigContent, getName());
                if (contains) {
                    break;
                }
                newConfigContent = oldConfigContent + COMMA_SEPARATOR + getName();
            }
            // 在元數據中心創建mapping節點,並將暴露的服務數據存到元數據中心,這里的元數據中心用zookeeper實現的
            // 目錄:/dubbo/mapping/xxxService
            // 數據:configItem.content為${application-name},configItem.ticket為版本好
            success = metadataReport.registerServiceAppMapping(serviceInterface, DEFAULT_MAPPING_GROUP, newConfigContent, configItem.getTicket());
        } while (!success && currentRetryTimes++ <= CAS_RETRY_TIMES);
    });
}

3、ZookeeperMetadataReport#registerServiceAppMapping

public boolean registerServiceAppMapping(String key, String group, String content, Object ticket) {
    try {
        if (ticket != null && !(ticket instanceof Stat)) {
            throw new IllegalArgumentException("zookeeper publishConfigCas requires stat type ticket");
        }
        String pathKey = buildPathKey(group, key);
        // 1. 創建/dubbo/mapping/xxxService目錄,存儲的數據為configItem
        // 2. 生成版本號
        zkClient.createOrUpdate(pathKey, content, false, ticket == null ? 0 : ((Stat) ticket).getVersion());
        return true;
    } catch (Exception e) {
        logger.warn("zookeeper publishConfigCas failed.", e);
        return false;
    }
}

到這里,暴露Triple協議的服務並注冊 registry 協議的流程就結束了。主要是將以前接口級服務發現機制中注冊到注冊中心中的數據(應用實例數據+服務接口數據)拆分出來了。注冊 registry 協議部分將應用實例數據注冊到注冊中心,在 Exporter 暴露完以后通過調用 MetadataUtils#publishServiceDefinition 將服務接口數據注冊到元數據中心。

4、暴露MetadataService服務

MetadataService 主要是對 Consumer 側提供一個可以獲取元數據的 API,暴露流程是復用了 Triple 協議的服務暴露流程

image.png

整體時序圖

  • 暴露 MetadataService 的入口

核心代碼在 DubboBootstrap#exportMetadataService 中,具體如下:

private void exportMetadataService() {
    // 暴露MetadataServer
    metadataServiceExporter.export();
}
  • 暴露 MetadataService

核心代碼在 ConfigurableMetadataServiceExporter#export 中,具體如下:

public ConfigurableMetadataServiceExporter export() {

    if (!isExported()) {
        // 定義MetadataService的ServiceConfig
        ServiceConfig<MetadataService> serviceConfig = new ServiceConfig<>();
        serviceConfig.setApplication(getApplicationConfig());
        // 不會注冊到注冊中心
        serviceConfig.setRegistry(new RegistryConfig("N/A"));
        serviceConfig.setProtocol(generateMetadataProtocol());
        serviceConfig.setInterface(MetadataService.class);
        serviceConfig.setDelay(0);
        serviceConfig.setRef(metadataService);
        serviceConfig.setGroup(getApplicationConfig().getName());
        serviceConfig.setVersion(metadataService.version());
        serviceConfig.setMethods(generateMethodConfig());
        // 用暴露Triple協議服務的流程來暴露MetadataService
        // 采用的是Dubbo協議
        serviceConfig.export();
        this.serviceConfig = serviceConfig;
    }
    return this;
}

由於暴露 MetadataService 的流程是復用前面提到的暴露 Triple 協議服務的流程,整個過程有少許地方會不同,這些不同之處在上面的代碼中都已經標明,所以就不再贅述了。

  • 注冊 ServiceInstance 實例

注冊 ServiceInstance 的目的是為了定時更新 Metadata,當有更新的時候就會通過 MetadataReport 來更新版本號讓 Consumer 端感知到。

核心代碼在 DubboBootstrap#registerServiceInstance 和 DubboBootstrap#doRegisterServiceInstance 中,具體如下:

private void registerServiceInstance() {
    ....
    // 創建ServiceInstance
    // ServiceInstance中包含以下字段
    // 1. serviceName:${application-name}
    // 2. host: 192.168.31.167
    // 3. port: 2080
    // 4. metadata: 服務接口級相關的數據,比如:methods等數據
    // 同時,還會對ServiceInstance數據中的字段進行補充,分別調用下面4個ServiceInstanceCustomizer實例
    // 1)ServiceInstanceMetadataCustomizer
    // 2)MetadataServiceURLParamsMetadataCustomizer
    // 3)ProtocolPortsMetadataCustomizer
    // 4)ServiceInstanceHostPortCustomizer
    ServiceInstance serviceInstance = createServiceInstance(serviceName);
    boolean registered = true;
    try {
        // 注冊ServiceInstance
        doRegisterServiceInstance(serviceInstance);
    } catch (Exception e) {
        registered = false;
        logger.error("Register instance error", e);
    }
    // 如果注冊成功,定時更新Metadata,沒10s更新一次
    if(registered){
        executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> {
            ......
            try {
                // 刷新Metadata和ServiceInstance
                ServiceInstanceMetadataUtils.refreshMetadataAndInstance(serviceInstance);
            } catch (Exception e) {
                ......
            } finally {
                ......
            }
        }, 0, ConfigurationUtils.get(METADATA_PUBLISH_DELAY_KEY, DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MILLISECONDS);
    }
}

DubboBootstrap#doRegisterServiceInstance

private void doRegisterServiceInstance(ServiceInstance serviceInstance) {
    if (serviceInstance.getPort() > 0) {
        // 發布Metadata數據到遠端存儲元數據中心
        // 調用RemoteMetadataServiceImpl#publishMetadata,
        // 內部會調用metadataReport#publishAppMetadata
        publishMetadataToRemote(serviceInstance);
        logger.info("Start registering instance address to registry.");
        getServiceDiscoveries().forEach(serviceDiscovery ->{
            ServiceInstance serviceInstanceForRegistry = new DefaultServiceInstance((DefaultServiceInstance) serviceInstance);
            calInstanceRevision(serviceDiscovery, serviceInstanceForRegistry);
            ......
            // 調用ZookeeperServiceDiscovery#doRegister注冊serviceInstance實例
            // 將應用服務信息注冊到注冊中心中
            // 目錄:/services/${application-name}/192.168.31.167:20800
            // 數據:serviceInstance序列化后的byte數組
            serviceDiscovery.register(serviceInstanceForRegistry);
        });
    }
}

通過上面的分析,我們可以很容易知道

  • ServiceInstance 是中包含 Metadata
  • Metadata 是存儲在 InMemoryWritableMetadataService 中的元數據,占用的是本地內存空間
  • InMemoryWritableMetadataService 用來更新 Metadata
  • ServiceInstance 是存儲在遠端元數據注冊中心中的數據結構
  • RemoteMetadataServiceImpl 會調用 metadataReport 將 ServiceInstance 數據更新到遠端元數據注冊中心

總結

通過對 Dubbo 3.0 服務端暴露全流程的解析可以看出,盡管應用級服務發現機制的實現要復雜很多,但是 Dubbo 3.0 為了能夠讓使用者平滑遷移,兼容了 2.7.x 的版本,所以在設計的時候很多地方都盡可能復用之前的流程。

從最近 Dubbo 3.0 發布的 Benchmark 數據來看,Dubbo 3.0 的性能和資源利用上確實提升了不少。Dubbo 3.0 在擁抱雲原生的道路上還有很長的一段路要走,社區正在對 Dubbo 3.0 中核心流程進行梳理和優化,后續計划支持多實例應用部署,希望有興趣見證 Dubbo 雲原生之路的同學可以積極參與社區貢獻!

原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM