Dubbo之服務消費原理


前言

上篇文章《Dubbo之服務暴露》分析 Dubbo 服務是如何暴露的,本文接着分析 Dubbo 服務的消費流程。主要從以下幾個方面進行分析:注冊中心的暴露通過注冊中心進行服務消費通知直連服務進行消費
服務消費端啟動時,將自身的信息注冊到注冊中心的目錄,同時還訂閱服務提供方的目錄,當服務提供方的 URL 發生更改時,實時獲取新的數據。

服務消費端流程

下面是一個服務消費的流程圖:

上圖中可以看到,服務消費的流程與服務暴露的流程有點類似逆向的。同樣,Dubbo 服務也是分為兩個大步驟:第一步就是將遠程服務通過Protocol轉換成Invoker(概念在上篇文章中有解釋)。第二步通過動態代理將Invoker轉換成消費服務需要的接口。

org.apache.dubbo.config.ReferenceConfig 類是ReferenceBean的父類,與生產端服務的ServiceBean一樣,存放着解析出來的 XML 和注解信息。類關系如下:

服務初始化中轉換的入口

當我們消費端調用本地接口就能實現遠程服務的調用,這是怎么實現的呢?根據上面的流程圖,來分析消費原理。
在消費端進行初始化時ReferenceConfig#init,會執行ReferenceConfig#createProxy來完成這一系列操作。以下為ReferenceConfig#createProxy主要的代碼部分:

private T createProxy(Map<String, String> map) {
    // 判斷是否為 Jvm 本地引用
    if (shouldJvmRefer(map)) {
        // 通過 injvm 協議,獲取本地服務
        URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        invoker = REF_PROTOCOL.refer(interfaceClass, url);
    } else {
        urls.clear();
        // 判斷是否有自定義的直連地址,或注冊中心地址
        if (url != null && url.length() > 0) { 
            String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (StringUtils.isEmpty(url.getPath())) {
                        url = url.setPath(interfaceName);
                    }
                    if (UrlUtils.isRegistry(url)) {
                        // 如果是注冊中心Protocol類型,則向地址中添加 refer 服務消費元數據
                        urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 直連服務提供端
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else {
            // 組裝注冊中心的配置
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                // 檢查配置中心
                checkRegistry();
                List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                        if (monitorUrl != null) {
                            // 監控上報信息
                            map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // 注冊中心地址添加 refer 服務消費元數據
                        urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
            }
        }

        // 只有一條注冊中心數據,即單注冊中心
        if (urls.size() == 1) {
            // 將遠程服務轉化成 Invoker
            invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
        } else {
            // 因為多注冊中心就會存在多個 Invoker,這里用保存在 List 中
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                // 將每個注冊中心轉換成 Invoker 數據
                invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                if (UrlUtils.isRegistry(url)) {
                    // 會覆蓋前遍歷的注冊中心,使用最后一條注冊中心數據
                    registryURL = url;
                }
            }
            if (registryURL != null) {
                // 默認使用 zone-aware 策略來處理多個訂閱
                URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                // 將轉換后的多個 Invoker 合並成一個
                invoker = CLUSTER.join(new StaticDirectory(u, invokers));
            } else {
                invoker = CLUSTER.join(new StaticDirectory(invokers));
            }
        }
    }
    // 利用動態代理,將 Invoker 轉換成本地接口代理
    return (T) PROXY_FACTORY.getProxy(invoker);
}

上面轉換的過程中,主要可概括為:先分為本地引用和遠程引用兩類。本地就是以 inJvm 協議的獲取本地服務,這不做過多說明;遠程引用分為直連服務和通過注冊中心。注冊中心分為單注冊中心和多注冊中心的情況,單注冊中心好解決,直接使用即可,多注冊中心時,將轉換后的 Invoker 合並成一個 Invoker。最后通過動態代理將 Invoker 轉換成本地接口代理。

獲取 Invoker 實例

由於本地服務時直接從緩存中獲取,這里就注冊中心的消費進行分析,上面代碼片段中使用的是REF_PROTOCOL.refer進行轉換,該方法代碼:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 獲取服務的注冊中心url,里面會設置注冊中心的協議和移除 registry 的參數
    url = getRegistryUrl(url);
    // 獲取注冊中心實例
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // 獲取服務消費元數據
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    // 從服務消費元數據中獲取分組信息
    String group = qs.get(GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            // 執行 Invoker 轉換工作
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // 執行 Invoker 轉換工作
    return doRefer(cluster, registry, type, url);
}

上面主要是獲取服務消費的注冊中心實例和進行服務分組,最后調用doRefer方法進行轉換工作,以下為doRefer的代碼:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 創建 RegistryDirectory 對象
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 設置注冊中心
    directory.setRegistry(registry);
    // 設置協議
    directory.setProtocol(protocol);
    // directory.getUrl().getParameters() 是服務消費元數據
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        // 消費消息注冊到注冊中心
        registry.register(directory.getRegisteredConsumerUrl());
    }

    directory.buildRouterChain(subscribeUrl);
    // 服務消費者訂閱:服務提供端,動態配置,路由的通知
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

    // 多個Invoker合並為一個
    Invoker invoker = cluster.join(directory);
    return invoker;
}

上面實現主要是完成創建 RegistryDirectory 對象,將消費服務元數據注冊到注冊中心,通過 RegistryDirectory 對象里的信息,實現服務提供端,動態配置及路由的訂閱相關功能。

RegistryDirectory 這個類實現了 NotifyListener 這個通知監聽接口,當訂閱的服務,配置或路由發生變化時,會接收到通知,進行相應改變:

public synchronized void notify(List<URL> urls) {
    // 將服務提供方配置,路由配置,服務提供方的服務分別以不同的 key 保存在 Map 中
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(url -> {
                if (UrlUtils.isConfigurator(url)) {
                    return CONFIGURATORS_CATEGORY;
                } else if (UrlUtils.isRoute(url)) {
                    return ROUTERS_CATEGORY;
                } else if (UrlUtils.isProvider(url)) {
                    return PROVIDERS_CATEGORY;
                }
                return "";
            }));

    // 更新服務提供方配置
    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    // 更新路由配置
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // 加載服務提供方的服務信息
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    /**
     * 3.x added for extend URL address
     */
    ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
    List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
    if (supportedListeners != null && !supportedListeners.isEmpty()) {
        for (AddressListener addressListener : supportedListeners) {
            providerURLs = addressListener.notify(providerURLs, getUrl(),this);
        }
    }
    // 重新加載 Invoker 實例
    refreshOverrideAndInvoker(providerURLs);
}

RegistryDirectory#notify里面最后會刷新 Invoker 進行重新加載,下面是核心代碼的實現:

private void refreshOverrideAndInvoker(List<URL> urls) {
    // mock zookeeper://xxx?mock=return null
    overrideDirectoryUrl();
    // 刷新 invoker 
    refreshInvoker(urls);
}

private void refreshInvoker(List<URL> invokerUrls) {
    Assert.notNull(invokerUrls, "invokerUrls should not be null");

    if (invokerUrls.size() == 1
            && invokerUrls.get(0) != null
            && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {

        ......

    } else {
        // 刷新之前的 Invoker
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        // 加載新的 Invoker Map
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        // 獲取新的 Invokers
        List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        // 緩存新的 Invokers
        routerChain.setInvokers(newInvokers);
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;

        try {
            // 通過新舊 Invokers 對比,銷毀無用的 Invokers
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

獲取刷新前后的 Invokers,將新的 Invokers 重新緩存起來,通過對比,銷毀無用的 Invoker。

上面將 URL 轉換 Invoker 是在RegistryDirectory#toInvokers中進行。

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
   
    Set<String> keys = new HashSet<>();
    String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
    for (URL providerUrl : urls) {

        // 過濾消費端不匹配的協議,及非法協議
        ......

        // 合並服務提供端配置數據
        URL url = mergeUrl(providerUrl);
        // 過濾重復的服務提供端配置數據
        String key = url.toFullString();
        if (keys.contains(key)) {
            continue;
        }
        keys.add(key);

        // 緩存鍵是不與使用者端參數合並的url,無論使用者如何合並參數,如果服務器url更改,則再次引用
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        
        // 緩存無對應 invoker,再次調用 protocol#refer 是否有數據
        if (invoker == null) {
            try {
                boolean enabled = true;
                if (url.hasParameter(DISABLED_KEY)) {
                    enabled = !url.getParameter(DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(ENABLED_KEY, true);
                }
                if (enabled) {
                    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            // 將新的 Invoker 緩存起來
            if (invoker != null) { // Put new invoker in cache
                newUrlInvokerMap.put(key, invoker);
            }
        } else {
            // 緩存里有數據,則進行重新覆蓋
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}

總結

通過《Dubbo之服務暴露》和本文兩篇文章對 Dubbo 服務暴露和服務消費原理的了解。我們可以看到,不管是暴露還是消費,Dubbo 都是以 Invoker 為數據交換主體進行,通過對 Invoker 發起調用,實現一個遠程或本地的實現。

個人博客: https://ytao.top
關注公眾號 【ytao】,更多原創好文
我的公眾號


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM