dubbo與zk注冊中心如何對接,如何做到服務自動發現


先看下consumer端發起調用時的鏈路流程:

 +---------------------------+            +---------------------------+            +---------------------------+              
 |      helloService         |            |      proxy                |            |  InvokerInvocationHandler |              
 |      sayHello             +----------> |      sayHello             +----------> |  invoke                   |              
 |                           |            |                           |            |  proxy method args        |              
 +---------------------------+            +---------------------------+            +-------------+-------------+              
                                                                                                 |                            
                                                                                                 |                            
                                                                                  +---------------------------------+         
                                                                                  |              |                  |         
                                                                                  | +------------v--------------+   |         
                                                                                  | |  MockClusterInvoker       |   |         
                                                                                  | |  invoke                   |   |         
                                                                                  | |                           |   |         
                                                                                  | +------------+--------------+   |         
                                                                                  |              |                  |         
                                                                                  |              |                  |         
                                                                                  |              |                  |         
 +---------------------------+            +---------------------------+           | +------------v--------------+   |         
 | Router                    |            | RegistryDirectory         |           | |  FailoverClusterInvoker   |   |         
 | route                     | <----------+ list                      | <-----------+  invoke                   |   |         
 | MockInVokersSelector      |            | INVOCATION-->List INVOKER |           | |                           |   |         
 +------------+--------------+            +---------------------------+           | +---------------------------+   |         
              |                                                                   |                                 |         
              |                                                                   +---------------------------------+         
              |                                                                 cluster invoke,分布式調用容錯機制也是在這做                      
              |                                                                                                               
              |                                                                                                               
              |                                                                                                               
              |                                                                                                               
              |                                                                                                               
+-------------v-------------+             +---------------------------+             +---------------------------+             
|  RandomLoadBalance        |             |InvokerDelegate            |             | ListenerInvokerWrap       |             
|  select                   +-----------> |invoke                     +-----------> | invoke                    |             
|  List INVOKER-->INVOKER   |             |                           |             |                           |             
+---------------------------+             +---------------------------+             +---------------------------+             

1. 引入zookeeper作為注冊中心后,服務查找過程

從建立spring到netty client建立連接的調用棧:
NettyClient.doOpen() line: 66
NettyClient(AbstractClient). (URL, ChannelHandler) line: 94
NettyClient. (URL, ChannelHandler) line: 61
NettyTransporter.connect(URL, ChannelHandler) line: 37
Transporter$Adpative.connect(URL, ChannelHandler) line: not available
Transporters.connect(URL, ChannelHandler...) line: 67
HeaderExchanger.connect(URL, ExchangeHandler) line: 37
Exchangers.connect(URL, ExchangeHandler) line: 102
DubboProtocol.initClient(URL) line: 378
DubboProtocol.getSharedClient(URL) line: 344
DubboProtocol.getClients(URL) line: 321
DubboProtocol.refer(Class , URL) line: 303
ProtocolListenerWrapper.refer(Class , URL) line: 65
ProtocolFilterWrapper.refer(Class , URL) line: 62
Protocol$Adpative.refer(Class, URL) line: not available
RegistryDirectory .toInvokers(List ) line: 405
RegistryDirectory .refreshInvoker(List ) line: 228
RegistryDirectory .notify(List ) line: 196
ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List ) line: 449
ZookeeperRegistry(FailbackRegistry).doNotify(URL, NotifyListener, List ) line: 273
ZookeeperRegistry(FailbackRegistry).notify(URL, NotifyListener, List ) line: 259
ZookeeperRegistry.doSubscribe(URL, NotifyListener) line: 170
ZookeeperRegistry(FailbackRegistry).subscribe(URL, NotifyListener) line: 189
RegistryDirectory .subscribe(URL) line: 134
RegistryProtocol.doRefer(Cluster, Registry, Class , URL) line: 271
RegistryProtocol.refer(Class , URL) line: 254
ProtocolListenerWrapper.refer(Class , URL) line: 63
ProtocolFilterWrapper.refer(Class , URL) line: 60
Protocol$Adpative.refer(Class, URL) line: not available
ReferenceBean (ReferenceConfig ).createProxy() line: 394
ReferenceBean (ReferenceConfig ).init() line: 303
ReferenceBean (ReferenceConfig ).get() line: 138
ReferenceBean .getObject() line: 65
DefaultListableBeanFactory(FactoryBeanRegistrySupport).doGetObjectFromFactoryBean(FactoryBean, String, boolean) line: 142

整體來說: 先由注冊中心的協議處理器處理注冊中心的地址,找到所有provider的地址,創建所有invoker,然后再由invoker在真正調用時發起調用。
注冊中心的這個也抽象一種協議,由注冊中心結合提供者的協議推導出提供者的協議地址,也就是目標端的地址與端口得知了。
每一個接口在zookeeper上都有節點,節點下面是provider,再下面是所有provider的具體地址。

2. netty client的基本步驟

創建channelPipelineFactory,並在這個factory中返回加工好的ChannelPipeline,加工過程包括加入編解碼器,連接事件處理組成的netty handler實現
(包括連接建立,斷開,請求寫出去,)

writeRequested(netty的)-->調用編碼器(編碼的這個對象中包括了 需要調用的目標接口名 方法名等信息)
(繼承SimpleChannelHandler重寫邏輯,可以定制channel的讀取與寫出邏輯)

3. 在使用zookeeper作為注冊中心時,如果有provider服務停掉, consumer端如何感知?再次啟動剛停掉的provider呢?

provider停掉會觸發zk客戶端的監聽,監聽對客戶端的invoker列表進行刷新。
再次啟動會觸發 zk的監聽,代碼在ZkclientZookeeperClient

	public IZkChildListener createTargetChildListener(String path, final ChildListener listener) {
		return new IZkChildListener() {
			public void handleChildChange(String parentPath, List<String> currentChilds)
					throws Exception {
				listener.childChanged(parentPath, currentChilds);
			}
		};
	}

然后再觸發 com.alibaba.dubbo.registry.support.FailbackRegistry.doNotify(URL, NotifyListener, List )。
com.alibaba.dubbo.registry.integration.RegistryDirectory.refreshInvoker(List ), 這是在zk的event線程完成的。
如果有provider停掉了 走一樣的監聽邏輯

同時,dubbo支持 定時檢查provider的狀態並進行重連,具體參見
com.alibaba.dubbo.remoting.transport.AbstractClient.initConnectStatusCheckCommand()
reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);

4. 如果正在發服務的時候,provider停掉了,dubbo是如何處理的?

如果在發服務時,provider停掉了,那么此時會拋出異常,並在FailoverClusterInvoker doInvoke中捕獲,
FailoverClusterInvoker支持調用失敗時重試(可配置),此時達到再次重試的目的。

5. client在多次調用時,與provider端的連接是建立幾次,在prodvider端服務狀態有變化時呢?

NettyClient 的doOpen doConnect均在初始化的時候調用,有幾個provider就調用幾次,真正rpc調用服務的時候是不會再調用open與connect的。
上面這個說法不嚴格,因為看他發送消息的代碼就知道了,每次發消息時還會檢查下:

public void send(Object message, boolean sent) throws RemotingException {
    if (send_reconnect && !isConnected()){
        connect();
    }
    Channel channel = getChannel();
    //TODO getChannel返回的狀態是否包含null需要改進
    if (channel == null || ! channel.isConnected()) {
      throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    }
    channel.send(message, sent);
}

6. 對於多個provider,dubbo默認在哪里選擇了一個invoker進行調用的

com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.select(LoadBalance, Invocation, List<Invoker >, List<Invoker >)。

com.alibaba.dubbo.rpc.filter.EchoFilter@1fd14d74
com.alibaba.dubbo.rpc.filter.ClassLoaderFilter@563e4951
com.alibaba.dubbo.rpc.filter.GenericFilter@4066c471
com.alibaba.dubbo.rpc.filter.ContextFilter@2b175c00
com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter@3eb81efb
com.alibaba.dubbo.rpc.filter.TimeoutFilter@1ae8bcbc
com.alibaba.dubbo.monitor.support.MonitorFilter@6cdba6dc
com.alibaba.dubbo.rpc.filter.ExceptionFilter@2609b277


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM