寫在前面
RocketMQ Remoting模塊也是整個代碼中比較簡單的一個模塊,在掌握基本的Netty知識之后就可以嘗試對源碼進行簡單的閱讀分析,我也是結合源碼分析來進行Netty應用的學習。
該模塊主要有兩個類 NettyRemotingServer 和 NettyRemotingClient 。分別對應服務端和客戶端,服務端主要在Broker和NameService中使用。
本文是對NettyRemotingServer 的初始化和啟動流程以及請求發送和處理的過程結合源碼進行簡單分析。第一次認真寫博客,如有不足希望在評論里指出。
Server啟動流程
初始化實例
初始化時首先實例化了 ServerBootstrap 以及publicExecutor,publicExecutor為負責處理請求業務的線程池。RocketMQ中NettyRemotingServer的使用的是 Reactor 多線程模型
this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; this.channelEventListener = channelEventListener; int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet()); } });
根據 useEpoll() 方法判斷當前主機是否支持epoll,然后決定創建 EpollEventLoopGroup 或者 NioEventLoopGroup;
源碼如下所示,並且通過實現ThreadFactory來自定義線程名。
if (useEpoll()) { this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } else { this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); }
最后會調用 loadSslContext() 方法,加載ssl信息..(這塊暫時不管)
啟動服務
首先初始化Work線程池,該線程負責 Netty ChannelHandler 執行
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } });
然后就是通過ServerBootStrap來構建Netty服務,並綁定端口開始監聽連接
啟動完成后會開啟一個定時任務,來掃描並移除已超時的請求。延遲3秒啟動,每一秒執行一次;
this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000);
超時請求掃描。在每次請求時,會將ResponseFuture放入 responseTable Map中緩存,通過定時掃描該map中的value,根據 ResponseFuture記錄的開始時間和超時時間來判斷是否超時。
超時的請求會從響應表中移除,並執行其回調方法,防止調用的業務線程一直阻塞。
public void scanResponseTable () { final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>(); Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator(); while (it.hasNext()) { Entry<Integer, ResponseFuture> next = it.next(); ResponseFuture rep = next.getValue(); //當前時間大於 請求開始時間 + 超時等待時間 + 1秒 則認為該請求已超時,然后釋放資源並移除該元素 if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { rep.release(); it.remove(); rfList.add(rep); log.warn("remove timeout request, " + rep); } } //執行被移除 ResponseFuture 的回調方法 for (ResponseFuture rf : rfList) { try { executeInvokeCallback(rf); } catch (Throwable e) { log.warn("scanResponseTable, operationComplete Exception", e); } } }
業務處理
RemotingCommand
在RocketMQ中,所有的網絡請求都會被封裝為一個RemotingCommand對象。字段和含義如下表所示
Header字段 |
類型 |
Request說明 |
Response說明 |
code |
int |
請求操作碼,應答方根據不同的請求碼進行不同的業務處理 |
應答響應碼。0表示成功,非0則表示各種錯誤 |
language |
LanguageCode |
請求方實現的語言 |
應答方實現的語言 |
version |
int |
請求方程序的版本 |
應答方程序的版本 |
opaque |
int |
相當於requestId,在同一個連接上的不同請求標識碼,與響應消息中的相對應 |
應答不做修改直接返回 |
flag |
int |
區分是普通RPC還是onewayRPC得標志 |
區分是普通RPC還是onewayRPC得標志 |
remark |
String |
傳輸自定義文本信息 |
傳輸自定義文本信息 |
extFields |
HashMap<String, String> |
請求自定義擴展信息 |
響應自定義擴展信息 |
body |
byte[] |
消息主體數據 |
消息主體數據 |
傳輸內容主要可以分為以下4部分:
- 消息長度:總長度,四個字節存儲,占用一個int類型;
- 序列化類型&消息頭長度:同樣占用一個int類型,第一個字節表示序列化類型,后面三個字節表示消息頭長度;
- 消息頭數據:經過序列化后的消息頭數據;
- 消息主體數據:消息主體的二進制字節數據內容;
該類除了包含數據結構外,還包含了消息的編解碼功能;詳細可以參考源碼,不多說
ResponseFuture
在每執行一次請求時,都會創建一個ResponseFuture對象,該對象根據名字就可以看出是和響應信息相關的。在該對象中記錄了請求的一些基本信息以及響應信息,和回調方法等。
public class ResponseFuture { //一個對應於請求的序列號 private final int opaque; //請求執行的Channel private final Channel processChannel; //請求超時時間 private final long timeoutMillis; //回調接口 private final InvokeCallback invokeCallback; //請求開始時間 private final long beginTimestamp = System.currentTimeMillis(); //對響應進行阻塞 private final CountDownLatch countDownLatch = new CountDownLatch(1); //信號量資源 private final SemaphoreReleaseOnlyOnce once; //判斷是否回調方法已執行,避免重復調用 private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false); //封裝的響應信息 private volatile RemotingCommand responseCommand; //記錄是否請求發送成功 private volatile boolean sendRequestOK = true; //異常信息 private volatile Throwable cause;
發送信息
請求有三種模式:
- 同步請求:invokeSync
- 異步請求:invokeAsync
- 單向請求:invokeOneway
同步請求
請求執行的代碼就比較直觀,
首先是創建一個ResponseFuture對象,封裝本次請求的相關信息。
然后將該對象放入 responseTable中,等接收到響應時,通過opaque來獲取到對應的ResponseFuture,然后放入response信息並 喚醒等待 (通過調用 countDownLatch.down());
當前方法在發送請求后,會調用 responseFuture.waitResponse() 方法進行阻塞等待,直至響應返回或者等待超時。最后返回Response信息;
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { //對應每個請求的唯一ID,采用自增序列方式產生 final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
異步請求
異步請求時,會使用信號量來限制異步請求數量,避免大量的異步請求等待導致內存占用過高; 對應 protected final Semaphore semaphoreAsync; 字段
異步請求時,ResponseFuture 由之前啟動服務時創建的定時任務來定時判斷是否等待超時,超時的會被移除並釋放信號量資源
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); final int opaque = request.getOpaque(); //獲取一個信號量 boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { once.release(); throw new RemotingTimeoutException("invokeAsyncImpl call timeout"); } final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { //請求發送失敗則釋放資源 responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
單向請求
丟出去就完事
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { doBeforeRpcHooks(addr, request); this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
處理請求
在start()方法里,啟動netty服務時,注冊了一個NettyServerHandler,該Handler就是負責處理請求的,在這之前,請求的數據經過decoder已經解碼成為一個RemotingCommand對象。
解碼操作時通過NettyDecoder類對請求數據進行解碼,最終是調用的 RemotingCommand#decode方法。
NettyServerHandler是NettyRemotingServer的一個內部類。其實現的是SimpleChannelInboundHandler類。在channelRead0方法中調用NettyRemotingServer#processMessageReceived方法將請求分為了Request請求和Response請求;分別調用對應的方法進行處理;
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { // 傳入命令的處理 switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
處理過程大致流程和線程模型如下圖所示。(參考官方文檔的線程模型圖,進行了一些刪改)
Request請求處理(NettyRemotingAbstract#processRequestCommand)
請求處理器 (Pair<NettyRequestProcessor, ExecutorService>)
對應每個請求 RemotingCommand 都有一個固定的code來標明對應不同的業務類型,可以在下面這個字段中注冊不同業務類型的處理器,Pair<NettyRequestProcessor, ExecutorService>;
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
NettyRequestProcessor 即為業務處理對象,ExecutorService為該業務類型處理的線程池。
可以通過NettyRemotingServer#registerProcessor 方法來注冊處理器實例。
public void registerProcessor ( int requestCode, NettyRequestProcessor processor, ExecutorService executor){ ExecutorService executorThis = executor; if (null == executor) { executorThis = this.publicExecutor; } Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis); this.processorTable.put(requestCode, pair); }
默認處理器
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
該處理器為默認請求處理器,當請求的code在 processrTable中找不到對應的處理器時,則使用該默認處理器來執行請求。該實例通過 NettyRemotingServer#registerDefaultProcessor 方法進行注冊。
處理過程
大致處理過程:首先根據code獲取Pair對象,獲取處理器對象。
//根據請求代碼獲取對應的處理器來處理該請求 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); // 如果沒有匹配的處理程序,則使用默認的處理程序 final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
然后處理器對請求進行處理並返回Response信息,調用回調方法將Response返回給請求端。如果是異步處理器,則是異步處理之后調用回調方法,同步方法則是同步等待執行結束並調用回調方法。該過程是使用獲取的Pair對象中的線程池進行處理的,而不是在netty'的Channel處理線程中。RocketMQ使用的是多線程 Reactor模型。
業務執行源碼如下:
Runnable run = new Runnable() { @Override public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); //初始化回調接口,如果是異步請求處理器處理,則執行完后調用該回調接口 final RemotingResponseCallback callback = new RemotingResponseCallback() { @Override public void callback(RemotingCommand response) { doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { ... } } else { } } } }; // 如果是Netty異步請求處理器,處理請求命令 if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); processor.asyncProcessRequest(ctx, cmd, callback); } else { NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); callback.callback(response); } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); //如果是非單向請求,當業務處理發生異常時,則返回一個包含異常信息的響應命令 if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } };
該Runnable 對象會被封裝為一個RequestTask任務,然后提交到線程池執行
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask);
如果沒有對應code的處理器,且沒有配置默認請求處理器,則會返回一個無法處理該請求的響應信息
if (pair != null) { ... } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); }
Response請求處理(processResponseCommand)
response響應處理的邏輯比較簡單,根據響應信息里面的opaque找到 responseTable中緩存的ResponseFuture,然后進行相應的處理。源碼邏輯很清晰明了
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); //如果該ResponseFuture對象回調方法不為空,則執行其回調方法 if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { //設置響應信息,並喚醒等待線程 responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
執行回調方法
private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; ExecutorService executor = this.getCallbackExecutor(); if (executor != null) { try { executor.submit(new Runnable() { @Override public void run() { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("execute callback in executor exception, and callback throw", e); } finally { responseFuture.release(); } } }); } catch (Exception e) { runInThisThread = true; log.warn("execute callback in executor exception, maybe executor busy", e); } } else { runInThisThread = true; } if (runInThisThread) { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("executeInvokeCallback Exception", e); } finally { responseFuture.release(); } } }
寫在后面
NettyRemotingServer和NettyRemotingClient請求的發送和處理,最終都是通過其繼承的NettyRemotingAbstract類里面的方法實現的,NettyRemotingClient在調用時,還需要處理一些其它信息,不過大體的過程和Server類似,因此不額外去將Client。
當然,在處理過程中還涉及到一些其它的擴展功能,可以具體去看一下源碼。了解完Remoting模塊后,再去學習其它模塊時也會相對容易一些。