Dubbo 服務引入-Version2.7.5


1.服務引用原理

Dubbo 服務引用的時機有兩個,第一個是在 Spring 容器調用 ReferenceBean 的 afterPropertiesSet 方法時引用服務,第二個是在 ReferenceBean 對應的服務被注入到其他類中時引用。這兩個引用服務的時機區別在於,第一個是餓漢式的,第二個是懶漢式的。默認情況下,Dubbo 使用懶漢式引用服務。如果需要使用餓漢式,可通過配置 dubbo:reference 的 init 屬性開啟。下面我們按照 Dubbo 默認配置進行分析,整個分析過程從 ReferenceBean 的 getObject 方法開始。當我們的服務被注入到其他類中時,Spring 會第一時間調用 getObject 方法,並由該方法執行服務引用邏輯。按照慣例,在進行具體工作之前,需先進行配置檢查與收集工作。接着根據收集到的信息決定服務用的方式,有三種,第一種是引用本地 (JVM) 服務,第二是通過直連方式引用遠程服務,第三是通過注冊中心引用遠程服務。不管是哪種引用方式,最后都會得到一個 Invoker 實例。如果有多個注冊中心,多個服務提供者,這個時候會得到一組 Invoker 實例,此時需要通過集群管理類 Cluster 將多個 Invoker 合並成一個實例。合並后的 Invoker 實例已經具備調用本地或遠程服務的能力了,但並不能將此實例暴露給用戶使用,這會對用戶業務代碼造成侵入。此時框架還需要通過代理工廠類 (ProxyFactory) 為服務接口生成代理類,並讓代理類去調用 Invoker 邏輯。避免了 Dubbo 框架代碼對業務代碼的侵入,同時也讓框架更容易使用。

2.源碼分析

服務引用的入口方法為 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實現了這個方法。實現代碼如下:

public Object getObject() throws Exception {
    return get();
}

public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    }
    // 檢測 ref 是否為空,為空則通過 init 方法創建
    if (ref == null) {
        // init 方法主要用於處理配置,以及調用 createProxy 生成代理類
        init();
    }
    return ref;
}

2.1處理配置

Dubbo 提供了豐富的配置,用於調整和優化框架行為,性能等。Dubbo 在引用或導出服務時,首先會對這些配置進行檢查和處理,以保證配置的正確性。配置解析邏輯封裝在 ReferenceConfig 的 init 方法中,下面進行分析。

public synchronized void init() {
    	// 避免重復初始化
        if (initialized) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }
    	//校驗並初始化所有的本類中所有的Config配置類,不允許為空的則拋異常,允許為空的則嘗試從全局配置中賦值
        checkAndUpdateSubConfigs();

    	//檢查本地存根合法性,Local將被棄用,替換成Stud,即:類全限定類名+Stud
        checkStubAndLocal(interfaceClass);
        ConfigValidationUtils.checkMock(interfaceClass, this);

    	//配置一些說明參數
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, CONSUMER_SIDE);

        ReferenceConfigBase.appendRuntimeParameters(map);
    	//檢查是否為泛化類型
        if (!ProtocolUtils.isGeneric(generic)) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }
            //創建該類的的代理(此代理類放置在wapper類的WRAPPER_MAP緩存中)並獲取方法
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
            }
        }
        map.put(INTERFACE_KEY, interfaceName);
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ConsumerConfig
        // appendParameters(map, consumer, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, consumer);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
    	//有關asyn方法
        Map<String, AsyncMethodInfo> attributes = null;
        if (CollectionUtils.isNotEmpty(getMethods())) {
            attributes = new HashMap<>();
            for (MethodConfig methodConfig : getMethods()) {
                AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                if (asyncMethodInfo != null) {
                    attributes.put(methodConfig.getName(), asyncMethodInfo);
                }
            }
        }
    	//獲取本機ip
        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(REGISTER_IP_KEY, hostToRegistry);

        serviceMetadata.getAttachments().putAll(map);
    	//創建代理類
        ref = createProxy(map);

        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
        consumerModel.setProxyObject(ref);
        consumerModel.init(attributes);

        initialized = true;

        // dispatch a ReferenceConfigInitializedEvent since 2.7.4
    	//發布invoker創建成功的事件
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }

上面的代碼很長,做的事情比較多。

1.檢測各個Config的配置是否正常,再檢測 ConsumerConfig 實例是否存在,如不存在則創建一個新的實例,然后通過系統變量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段。接着是檢測泛化配置,並根據配置設置 interfaceClass 的值。

2.將各個配置類上標注Parameter注解的屬性按要求放入map中,並將map設置進serviceMetadata中

3.創建代理對象

4.發布事件

2.3. 引用服務

本節我們要從 createProxy 開始看起。從字面意思上來看,createProxy 似乎只是用於創建代理對象的。但實際上並非如此,該方法還會調用其他方法構建以及合並 Invoker 實例。具體細節如下。

private T createProxy(Map<String, String> map) {
    	//檢測是否從本地調用服務方
        if (shouldJvmRefer(map)) {
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            urls.clear();
            //用戶指定了url,則采用直連的方式
            if (url != null && url.length() > 0) { 
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (UrlUtils.isRegistry(url)) {
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { 
                //從注冊中心發現服務
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                    //檢測注冊中心有效性
                    checkRegistry();
                    //獲取url
                    List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            //嘗試獲取監控中心url
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                    }
                }
            }
            // 單個注冊中心或服務提供者(服務直連,下同)
            if (urls.size() == 1) {
                //創建invoker對象
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
                // 多個注冊中心或多個服務提供者,或者兩者混合
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { 
                    URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                } else { // not a registry url, must be direct invoke.
                    invoker = CLUSTER.join(new StaticDirectory(invokers));
                }
            }
        }
		//校驗invoker有效性,即是否有提供者
        if (shouldCheck() && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service "
                    + interfaceName
                    + ". No provider available for the service "
                    + (group == null ? "" : group + "/")
                    + interfaceName +
                    (version == null ? "" : ":" + version)
                    + " from the url "
                    + invoker.getUrl()
                    + " to the consumer "
                    + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        String metadata = map.get(METADATA_KEY);
        WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
        if (metadataService != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataService.publishServiceDefinition(consumerURL);
        }
        // 生成代理類
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }

上面代碼很多,不過邏輯比較清晰。首先根據配置檢查是否為本地調用,若是,則調用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例。若不是,則讀取直連配置項,或注冊中心 url,並將讀取到的 url 存儲到 urls 中。然后根據 urls 元素數量進行后續操作。若 urls 元素數量為1,則直接通過 Protocol 自適應拓展類構建 Invoker 實例接口。若 urls 元素數量大於1,即存在多個注冊中心或服務直連 url,此時先根據 url 構建 Invoker。然后再通過 Cluster 合並多個 Invoker,最后調用 ProxyFactory 生成代理類。Invoker 的構建過程以及代理類的過程比較重要,因此接下來將分兩小節對這兩個過程進行分析。

2.3.1 創建 Invoker

Invoker 是 Dubbo 的核心模型,代表一個可執行體。在服務提供方,Invoker 用於調用服務提供類。在服務消費方,Invoker 用於執行遠程調用。Invoker 是由 Protocol 實現類構建而來。Protocol 實現類有很多,本節會分析最常用的兩個,分別是 RegistryProtocol 和 DubboProtocol,其他的大家自行分析。下面先來分析 RegisterProtocol的 refer 方法源碼。如下:

REF_PROTOCOL通過SPI構建Filter和Listener鏈后,再走進RegisterProtocol的refer方法中

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = getRegistryUrl(url);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    	// 創建 RegistryDirectory 實例
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    	// 設置注冊中心和協議
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
    	// 生成服務消費者鏈接
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    	// 注冊服務消費者,在 consumers 目錄下創建新節點
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            //注冊消費者 
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
    	// 訂閱 providers、configurators、routers 等節點數據
        directory.subscribe(toSubscribeUrl(subscribeUrl));

    	// 一個注冊中心可能有多個服務提供者,因此這里需要將多個服務提供者合並為一個
        Invoker<T> invoker = cluster.join(directory);
    	//通過SPI機制獲取org.apache.dubbo.registry.integration.RegistryProtocolListener的實現類
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
    	//將監聽器構建成鏈並返回
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

然后是DubboProtocol

@Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // 創建 DubboInvoker
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

上面方法看起來比較簡單,不過這里有一個調用需要我們注意一下,即 getClients。這個方法用於獲取客戶端實例,實例類型為 ExchangeClient。ExchangeClient 實際上並不具備通信能力,它需要基於更底層的客戶端實例進行通信。比如 NettyClient、MinaClient 等,默認情況下,Dubbo 使用 NettyClient 進行通信。接下來,我們簡單看一下 getClients 方法的邏輯。

private ExchangeClient[] getClients(URL url) {
    // 是否共享連接
    boolean service_share_connect = false;
  	// 獲取連接數,默認為0,表示未配置
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // 如果未配置 connections,則共享連接
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            // 獲取共享客戶端
            clients[i] = getSharedClient(url);
        } else {
            // 初始化新的客戶端
            clients[i] = initClient(url);
        }
    }
    return clients;
}

這里根據 connections 數量決定是獲取共享客戶端還是創建新的客戶端實例,默認情況下,使用共享客戶端實例。getSharedClient 方法中也會調用 initClient 方法,因此下面我們一起看一下這兩個方法。

private ExchangeClient getSharedClient(URL url) {
    String key = url.getAddress();
    // 獲取帶有“引用計數”功能的 ExchangeClient
    ReferenceCountExchangeClient client = referenceClientMap.get(key);
    if (client != null) {
        if (!client.isClosed()) {
            // 增加引用計數
            client.incrementAndGetCount();
            return client;
        } else {
            referenceClientMap.remove(key);
        }
    }

    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        if (referenceClientMap.containsKey(key)) {
            return referenceClientMap.get(key);
        }

        // 創建 ExchangeClient 客戶端
        ExchangeClient exchangeClient = initClient(url);
        // 將 ExchangeClient 實例傳給 ReferenceCountExchangeClient,這里使用了裝飾模式
        client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
        referenceClientMap.put(key, client);
        ghostClientMap.remove(key);
        locks.remove(key);
        return client;
    }
}

上面方法先訪問緩存,若緩存未命中,則通過 initClient 方法創建新的 ExchangeClient 實例,並將該實例傳給 ReferenceCountExchangeClient 構造方法創建一個帶有引用計數功能的 ExchangeClient 實例。ReferenceCountExchangeClient 內部實現比較簡單,就不分析了。下面我們再來看一下 initClient 方法的代碼。

private ExchangeClient initClient(URL url) {

    // 獲取客戶端類型,默認為 netty
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    // 添加編解碼和心跳包參數到 url 中
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // 檢測客戶端類型是否存在,不存在則拋出異常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: ...");
    }

    ExchangeClient client;
    try {
        // 獲取 lazy 配置,並根據配置值決定創建的客戶端類型
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            // 創建懶加載 ExchangeClient 實例
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 創建普通 ExchangeClient 實例
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service...");
    }
    return client;
}

initClient 方法首先獲取用戶配置的客戶端類型,默認為 netty。然后檢測用戶配置的客戶端類型是否存在,不存在則拋出異常。最后根據 lazy 配置決定創建什么類型的客戶端。這里的 LazyConnectExchangeClient 代碼並不是很復雜,該類會在 request 方法被調用時通過 Exchangers 的 connect 方法創建 ExchangeClient 客戶端,該類的代碼本節就不分析了。下面我們分析一下 Exchangers 的 connect 方法。

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger 實例,默認為 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);
}

如上,getExchanger 會通過 SPI 加載 HeaderExchangeClient 實例,這個方法比較簡單,大家自己看一下吧。接下來分析 HeaderExchangeClient 的實現。

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 這里包含了多個調用,分別如下:
    // 1. 創建 HeaderExchangeHandler 對象
    // 2. 創建 DecodeHandler 對象
    // 3. 通過 Transporters 構建 Client 實例
    // 4. 創建 HeaderExchangeClient 對象
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

這里的調用比較多,我們這里重點看一下 Transporters 的 connect 方法。如下:

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handler 數量大於1,則創建一個 ChannelHandler 分發器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    
    // 獲取 Transporter 自適應拓展類,並調用 connect 方法生成 Client 實例
    return getTransporter().connect(url, handler);
}

如上,getTransporter 方法返回的是自適應拓展類,該類會在運行時根據客戶端類型加載指定的 Transporter 實現類。若用戶未配置客戶端類型,則默認加載 NettyTransporter,並調用該類的 connect 方法。如下:

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    // 創建 NettyClient 對象
    return new NettyClient(url, listener);
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM