開始,先放一張Dubbo官網的整體設計圖,
一、Server 啟動
依靠統一的URL 傳遞配置數據,擴展動態加載機制, 上下層的代碼關聯非常少。 Protocol 與Exchange 兩層之間,就非常明顯。
以默認的dubbo 協議為例 ( 每個協議,處理方式不一樣,例如httpProtocol 啟動server就簡單些 )。
1. Netty4 Server端啟動,僅是一句代碼: server = Exchangers.bind(url, requestHandler);
啟動Server ,僅僅調用一個靜態方法,傳入DubboProtocol 類的實例化的 ExchangeHandlerAdapter 實例requestHandler 。
requestHandler僅有一個reply(ExchangeChannel channel, Object message)方法,message 是已經被反序列化的Invocation對象。
requestHandler 執行2個重要邏輯:
1. 根據channel里面的url數據,獲取Invoker (Invoker 為 封裝好的FilterChain 過濾鏈) ,
2. 執行invoker 調用,獲取接口服務處理結果,因為Invoker為FilterChain,會優先執行Fitler 里面的邏輯,最后一個調用才執行 最終的 接口實現邏輯調用。
2. Exchanges.bind()
Exchanges.bind() 只有1行關鍵代碼, getExchanger(url).bind(url, handler);
getExchanger(url) 會調用: ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type) , type就是默認的 header ,最終獲取到 HeaderExchanger 擴展( 這里沒有經過adaptive擴展)。
在HeaderExchanger的bind 方法里面,會進行handler的再次包裝,nettyserver 監聽本機端口。
將代碼展開(方法里面內嵌調用,變成前置變量,方便閱讀,理解)后,代碼如下:
@Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { ChannelHandler h = new DecodeHandler(new HeaderExchangeHandler(handler)); //1. handler 會再次經過2層包裝,增加功能 Server s = Transporters.bind(url,h); //2. Transports 操作會啟動netty 監聽端口,配置序列化實現, // 3.返回 會對 上步netty建立的server ,對此再次進行包裝,主要增加 channel 空閑的檢測,檢測到超過一定時間的空閑的channel,會關閉 return new HeaderExchangeServer( s); }
DecodeHandler : 該handler邏輯比較簡單,對消息和消息內容進行識別,提取真實請求數據,給后續handler處理。
HeaderExchangeHandler : 處理channel狀態事件,會對channel 寫入讀寫時間屬性,對接收請求進行過濾。 目前只看了server端代碼,單根據邏輯,該handler 應該是在netty客戶端也會用到,會對response進行處理。
Transporters.bind() 方法,啟動nettyServer端。 后面的邏輯較多,下個章節單獨講。
HeaderExchangeServer: 會對nettyServer 進行包裝, 主要增加2個功能:
a. 對channel進行 空閑時間檢測,超過則關閉連接,節省資源。
b. 如果server關閉,則發送消息給client端,不再發送請求到該server。
二、 nettyServer 啟動
Transporters.bind(url,handler) ,因為采用默認缺省配置,最終會到org.apache.dubbo.remoting.transport.netty4.NettyTransporter , 直接 new NettyServer(url, listener) 。
url 是之前的url,listener 就是上面經過包裝的 handler, 跟着代碼進入NettyServer 。
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants. // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); //super方法必須放在第一行,這里無法拆分臨時變量 }
1. handler包裝
NettyServer在調用super方法的時候,調用 ChannelHandlers.wrap 方法,對傳入的handler 再次進行了包裝。代碼如下:
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); //Dispatch.dispatch()方法實際是對handler的再次裝飾,根據配置動態的添加新功能 }
channelhandler.wrap 里面會調用Dispatcher 的擴展,進行dispatch操作,實際是對handler 的包裝動態化。
根據配置不同,調用Dispatch擴展包裝后的handler 是不一樣的。 默認采用AllDispatcher擴展,用 AllChannelHandler 包裝1次。
采用默認配置,經過包裝之后,傳入super方法的handler 包裝鏈如下:
MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> DubboProtocol.requestHandler
MultiMessageHandler : 檢查消息是否為MutiMessage ,如果 是,分開單條調用后續handler
HeartbeatHandler :1.在每個channel動作,對channel標記時間屬性, 2. 檢查是否心跳請求,是則直接返回心跳,不繼續后續請求。
AllChannelHandler : 1. 將后續handler 包裝成 ChannelEventRunnable,捕獲后續執行的異常,記錄日志 。 2. 包裝的runnable 放到獨立線程池運行, 達到全流程異步化效果。
2. codec2擴展注入
用idea 生成NettyServer 類關系圖,如下:
NettyServer 間接繼承AbstractEndPoint , 在AbstractEndPoint構造方法里面,會調用ExtensionLoader,獲取 Codec2 擴展。
Codec2 默認擴展名為dobbo ,獲取到的是:DubboCountCodec ,在DoubboCountCodec內部,實例化了 DubboCodec,最終的解編碼操作還是DubboCodec。
DubboCountCodec 與DubboCodec 涉及到 dubbo的通訊協議,這個后續再仔細學習。
繼承的AbstractServer的構造方法里,會調用抽象的doOpen()方法,實際調用NettyServer的doOpen() 打開端口監聽。
三、 doOpen 啟動NettyServer端口監聽
doOpen方法代碼較少,主要就是NettyServer 設置&啟動。
因為netty的封裝,一個簡單的nettyserver啟動,主要關注: 1. 消息處理 handler ,2. 消息解碼&編碼
如下:
@Override protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); // 1. nettyServerHandler 構造方法需要傳入 經過鏈式包裝的handler final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); // 2. encode & decode 編碼 adapter ,需要獲取父類注入的 codec2對象, NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) //3. 收取消息的解碼器 .addLast("encoder", adapter.getEncoder()) //4.發送消息的編碼器 .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); //5. 收發消息的handler } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
注釋1、5 ,就是 netty 的消息handler 實例化與設置。
注釋2、3、4 ,為netty消息 解碼、編碼器的實現與設置。 在注釋2,NettyCodecAdapter 內部,會 借由傳入的Codec2 對象實現解編碼工作。
1. Channel 轉換&處理
NettyServerHandler 是netty的消息處理實現,在方法channelRead、write等里面,會從入參ChannelHandlerContext里面獲取實際的netty 數據channel,轉換成Dubbo定義的NettyChannel類, 執行相應后續業務動作。
相關類圖如下:
Channel、ChannelHandler 是dubbo定義的抽象接口。
NettyChannel是一個 包裝類,持有一個netty的實際數據channel通道,同時實現了Dubbo的Channel、ChannelHandler 的抽象接口, channel handler里面的 需要實際讀取、寫入數據時,會由nettychannel里面的 channel 屬性實現。
NettyChannel是一個中間類,類似的應該還有MinaChannel,功能主要是封裝底層傳輸實現, 包裝成框架的邏輯接口 。 是一個隔離中間層。
在上面注釋1.中,nettyserverhandler初始化的時候 new NettyServerHandler(getUrl(), this) ,傳入了 nettyserver對象。 因為nettyserver是 繼承 AbstractPeer ,算是對channelhandler的再次封裝。
所以當netty收到消息時,NettyServerHandler 的channelRead方法會被調用:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { handler.received(channel, msg); // 1. channel 為NettyChannel,里面有netty的實際數據傳輸channel , } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
channelRead方法里面,會從netty的ctx里面獲取數據channel,然后封裝成功nettychannel, 在調用nettychannel的sent方法時,會將對象寫入真實的channel,發送!
方法里面的handler 對象,調用包裝鏈如下:
NettyServer -> MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> DubboProtocol.requestHandler
HeaderExchangeHandler 是整個 handler 的最后一個,在其received方法里面,會執行 handleRequest 方法,調用持有的 DubboProtocol.requestHandler的reply方法,執行invoke操作,獲取方法結果res,執行channel.send
方法,將結果res寫入通道。
這里的res 是調用結果,netty會調用啟動時設置的 encode編碼器,將其對象序列號 & 編碼 發送。
channel 是 在 HeaderExchangeHandler 里面經過 包裝過一次的 ExchangeChannel。 channel 鏈為:ExchangeChannel -> NettyChannel
四、總結
Dubbo Server 啟動的過程,就是channel handler 層層包裝。 Dubbo的裝飾模式、鏈式調用, 遍布整個過程。 Invoke、Handler、Channel 等等,都是如此。 不記錄下來,理解非常困難。
在 handler鏈的最高層是NettyServerHandler, 負責轉換 Netty的具體channel與 Dubbo框架的Channel,應用不同的框架傳輸,應該都存在這樣一個handler。
在handler鏈的最底部,是HeaderExchangeHandler, 調用Invoke,執行Filter鏈,獲取服務結果,將結果寫入channel,發送給客戶端。
在handler鏈的中間,會有一個擴展,AllChannelHandler 位置。 讓用戶可以定制自己的hannler,執行特定操作。 默認的AllChannelHandler 是將后續執行放入線程池,異步化執行。
server啟動時傳入的encoder、decoder,會調用codec 對象,動態獲取 序列化工程,序列號對象& 編碼輸出。
經過記錄一遍,過程更清晰。