Dubbo源碼的關鍵類:
DubboInvoker
NioEventLoop
NettyClientHandler
IdleStateHandler
HeaderExchangeClient
io.netty.channel.socket.nio.NioSocketChannel
一、SPI機制:
1、首先創建ExtensionLoader
2、然會根據ExtensionLoader獲取和搜索類的實例
(1)類上有@Adaptive注解的,直接創建該類的實例
(2)方法上有@Adaptive注解的,直接創建該類的代理類,有相應的代理模板
根據代理類的實現獲取真正的實現類
ExtensionLoader.getExtensionLoader(Container.class)
1、ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()獲取適應能強的實力(兩種方式)
2、ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name)獲取指定的實例
3、@SPI獲取默認的實例
AdaptiveExtensionFactory【SpiExtensionFactory,SpringExtensionFactory】依據容器進行依賴注入
Wrapper的dubbo的AOP機制
IOC+AOP+DI
單例模式、工程模式、動態代理模式、裝飾器模式
dubbo的動態生成代碼機制
ServiceBean發布服務接口的類(讀取注冊中心的信息)
1、暴露本地服務
ProxyFactory getInvoker(把實現類放入對象包裝成有個Invoker的對象)
getProxy(針對客戶端)
2、暴露遠程服務
injvm和registry
InjvmProtocol
RegistryProtocol
DubboProtocol
---------------
①把接口實現類轉換成Invoker
②把Invoker轉換成exporter(經過protocol)
InjvmProtocol對應的是InjvmExporter
DubboProtocol對應的是dubboExporter
1、暴露本地服務
2、暴露遠程服務
3、啟動netty
4、連接zookeeper
5、創建節點,注冊服務
6、客戶端訂閱服務
exporterMap
客戶端:接口-->Invoker-->客戶端動態代理對象
服務端:實現類-->Invoker-->exporter
provider的流程:serviceBean-->實現類-->Invoker-->protocol-->exporter-->netty-->zookeeper
consumer的流程:referenceBean-->zookeeper-->加入集群路由-->返回動態代理對象InvokerInvocationHandler(Invoker)
服務端節點:創建:providers,訂閱:configurations
客戶端節點:創建:consumers,訂閱:providers,routers,configurations
javassit動態代理
集群容錯:
1、RegistryDirectory(Invoker的map是來源於注冊中心,並且實現了listener)(刷新緩沖中的Invoker列表)
2、router規則(從Directory里面帥選Invoker的子集)(應用隔離,讀寫分離,灰度發布)默認輪詢方式toMethodInvokers
加路由規則就刷新Invoker的map列表
ConditionRouter(后台管理系統路由配置)
MockInvokersSelector
3、Cluster將多個Invoker偽裝成一個Invoker(FailoverCluster失敗轉移默認重試3次)【失敗以后會充實其他的服務器】
FailFastCluster快速失敗
4、LoadBanlance負載均衡(RandomLoadBanlance(默認)權重設置隨機概率,RoundRobinLoadbanlance(權重),LeastActiveLoadBanlance最少活躍度,Hash一致性算法)
5、SOA服務降級【服務器壓力大時,為了保證核心業務正常使用,從而對輔助業務進行降級】
mock機制:容器、屏蔽MockClusterInvoker
屏蔽壓根就不請求
屏蔽和容錯
服務器調用超時設置優先級:消費者優先,生產者繼后
MockClusterInvoker-->FailOverClusterInvoker-->(loadBalance)-->dubboInvoker
stub本地存根
Mockb本地偽裝
兩個都配置的話,以Mock為准
dubbo的怎么利用netty接收響應的:
這個原理就是在傳輸消息的過程中協議里要維護一個id,這樣子才能找到發起請求的一方的future
具體的方法:
接着就是:
dubbo的router是如何起作用的:
這個類下的這個方法:
dubbo服務端(provider)是如何接收請求的:
1、這個是個服務端的handler
具體的方法對於熟悉netty的來說就已經很熟悉了:
2、NettyServer類
具體的方法如下:
3、批量消息處理器
具體的處理方法:
4、心跳處理器:
具體的處理方法:
5、多線程處理器(默認200個線程)
具體表的方法如下:
6、執行具體的線程的類:
具體的方法如下:
7、解碼處理器。反序列化處理器
具體的方法:
8、具體的請求頭處理器:
具體的處理方法為:
具體的處理流程如下:
9、dubbo協議類
具體的代碼為:
@Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv);//具體調用代碼的地方 return result.thenApply(Function.identity()); }
10、filter包裝類:
具體的方法為:
這里會執行一系列的filter方法和invoker方法
11、代理工廠類
這個類之后就會調用到我們寫的具體的實現類了。
netty的時間輪定時器是干什么的?(hashedwheeltimer)
1、redis的分布式鎖需要不同的刷新過期時間來延長時間,需要用什么定時器
2、dubbo是怎么使用時間輪定時器的
Dubbo客戶端是如何拿到結果的,以及它的線程模型是什么樣子的
1、AsyncToSyncInvoker(這里面的第61行就是取結果的)
2、AsyncRpcResult
3、ThreadlessExecutor
public void waitAndDrain() throws InterruptedException { /** * Usually, {@link #waitAndDrain()} will only get called once. It blocks for the response for the first time, * once the response (the task) reached and being executed waitAndDrain will return, the whole request process * then finishes. Subsequent calls on {@link #waitAndDrain()} (if there're any) should return immediately. * * There's no need to worry that {@link #finished} is not thread-safe. Checking and updating of * 'finished' only appear in waitAndDrain, since waitAndDrain is binding to one RPC call (one thread), the call * of it is totally sequential. */ if (finished) { return; } Runnable runnable = queue.take(); synchronized (lock) { waiting = false; runnable.run(); } runnable = queue.poll(); while (runnable != null) { try { runnable.run(); } catch (Throwable t) { logger.info(t); } runnable = queue.poll(); } // mark the status of ThreadlessExecutor as finished. finished = true; }
這里面的queue是阻塞隊列,在這里等着結果,這個結果會有兩種情況,一個是會由NettyClientWorker線程(ChannelEventRunnable)即響應線程把結果放進去,或者由超時線程HashedWheelTimer(TimeoutCheckTask)把超時結果放進去。下面就詳細講一下這兩個線程。
(4~15是接收正常數據的處理流程)
(16~19是超時的數據處理流程)
4、NettyClientHandler
5、AbstractPeer
6、MultiMessageHandler
7、HeartbeatHandler
@Override public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp(channel); if (isHeartbeatRequest(message)) { Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(HEARTBEAT_EVENT); channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if (logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } if (isHeartbeatResponse(message)) { if (logger.isDebugEnabled()) { logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName()); } return; } handler.received(channel, message); }
8、AllChannelHandler
9、ThreadlessExecutor
這里的lock和waiting都要注意,這里的lock是防止對結果進行重復處理也就是說結果只能處理一次。這里的waiting的意思是是指結果已經被處理過不需要再次加入隊列當中所以直接執行就可以了,但是這里的直接執行也沒有多大意義了,因為最終在FUTURES容器里得到的futrue=null。
sharedExecutor的線程指的是DubboClientHandler相關的線程。
10、ChannelEventRunnable
@Override public void run() { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } }
11、DecodeHandler
12、HeaderExchangeHandler(handleResponse(channel, (Response) message);)
@Override public void received(Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } }
13、HeaderExchangeHandler
14、DefaultFuture
15、DefaultFuture
16、DefaultFuture.TimeoutCheckTask
17、ThreadlessExecutor
18、ThreadlessExecutor
19、DefaultFuture
參考文獻:
Dubbo協議:http://dubbo.apache.org/zh-cn/blog/dubbo-protocol.html