rocketmq底層網絡使用的netty框架,類圖如下

RecketMQ通信模塊的頂層結構是RemotingServer和RemotingClient,分別對應通信的服務端和客戶端
首先看看RemotingServer
1 public interface RemotingServer extends RemotingService { 2 3 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, 4 final ExecutorService executor); 5 6 void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); 7 8 int localListenPort(); 9 10 Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); 11 12 RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, 13 final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, 14 RemotingTimeoutException; 15 16 void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, 17 final InvokeCallback invokeCallback) throws InterruptedException, 18 RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; 19 20 void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) 21 throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, 22 RemotingSendRequestException; 23 24 }
RemotingServer類中比較重要的是:localListenPort、registerProcessor和registerDefaultProcessor,
registerDefaultProcesor用來設置接收到消息后的處理方法。
RemotingClient類和RemotingServer類相對應,比較重要的方法是updateNameServerAddressList、
invokeSync和invokeOneway,updateNameServerAddresList用來獲取有效的NameServer地址,invoke-
Sync與invokeOneway用來向Server端發送請求,如下。
1 public interface RemotingClient extends RemotingService { 2 3 void updateNameServerAddressList(final List<String> addrs); 4 5 List<String> getNameServerAddressList(); 6 7 RemotingCommand invokeSync(final String addr, final RemotingCommand request, 8 final long timeoutMillis) throws InterruptedException, RemotingConnectException, 9 RemotingSendRequestException, RemotingTimeoutException; 10 11 void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis, 12 final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, 13 RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; 14 15 void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) 16 throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, 17 RemotingTimeoutException, RemotingSendRequestException; 18 19 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, 20 final ExecutorService executor); 21 22 void setCallbackExecutor(final ExecutorService callbackExecutor); 23 24 ExecutorService getCallbackExecutor(); 25 26 boolean isChannelWritable(final String addr); 27 }
二、自定義協議
NettyRemotingServer和NettyRemotingClient分別實現了RemotingServer和RemotingClient這兩個接
口,但它們有很多共有的內容,比如invokeSync、invokeOneway等,所以這些共有函數被提取到NettyRe-
motingAbstract共同繼承的父類中。首先來分析一下在NettyRemotingAbstract中是如何處理接收到的內容
的,如下。
1 public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { 2 final RemotingCommand cmd = msg; 3 if (cmd != null) { 4 switch (cmd.getType()) { 5 case REQUEST_COMMAND: 6 processRequestCommand(ctx, cmd); 7 break; 8 case RESPONSE_COMMAND: 9 processResponseCommand(ctx, cmd); 10 break; 11 default: 12 break; 13 } 14 } 15 }
1 public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { 2 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); 3 final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; 4 final int opaque = cmd.getOpaque(); 5 6 if (pair != null) { 7 Runnable run = new Runnable() { 8 @Override 9 public void run() { 10 try { 11 RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); 12 if (rpcHook != null) { 13 rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); 14 } 15 16 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); 17 if (rpcHook != null) { 18 rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); 19 } 20 21 if (!cmd.isOnewayRPC()) { 22 if (response != null) { 23 response.setOpaque(opaque); 24 response.markResponseType(); 25 try { 26 ctx.writeAndFlush(response); 27 } catch (Throwable e) { 28 log.error("process request over, but response failed", e); 29 log.error(cmd.toString()); 30 log.error(response.toString()); 31 } 32 } else { 33 34 } 35 } 36 } catch (Throwable e) { 37 log.error("process request exception", e); 38 log.error(cmd.toString()); 39 40 if (!cmd.isOnewayRPC()) { 41 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, 42 RemotingHelper.exceptionSimpleDesc(e)); 43 response.setOpaque(opaque); 44 ctx.writeAndFlush(response); 45 } 46 } 47 } 48 }; 49 50 if (pair.getObject1().rejectRequest()) { 51 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, 52 "[REJECTREQUEST]system busy, start flow control for a while"); 53 response.setOpaque(opaque); 54 ctx.writeAndFlush(response); 55 return; 56 } 57 58 try { 59 final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); 60 pair.getObject2().submit(requestTask); 61 } catch (RejectedExecutionException e) { 62 if ((System.currentTimeMillis() % 10000) == 0) { 63 log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) 64 + ", too many requests and system thread pool busy, RejectedExecutionException " 65 + pair.getObject2().toString() 66 + " request code: " + cmd.getCode()); 67 } 68 69 if (!cmd.isOnewayRPC()) { 70 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, 71 "[OVERLOAD]system busy, start flow control for a while"); 72 response.setOpaque(opaque); 73 ctx.writeAndFlush(response); 74 } 75 } 76 } else { 77 String error = " request type " + cmd.getCode() + " not supported"; 78 final RemotingCommand response = 79 RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); 80 response.setOpaque(opaque); 81 ctx.writeAndFlush(response); 82 log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); 83 } 84 }
1 public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { 2 final int opaque = cmd.getOpaque(); 3 final ResponseFuture responseFuture = responseTable.get(opaque); 4 if (responseFuture != null) { 5 responseFuture.setResponseCommand(cmd); 6 7 responseTable.remove(opaque); 8 9 if (responseFuture.getInvokeCallback() != null) { 10 executeInvokeCallback(responseFuture); 11 } else { 12 responseFuture.putResponse(cmd); 13 responseFuture.release(); 14 } 15 } else { 16 log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); 17 log.warn(cmd.toString()); 18 } 19 }
無論是服務端還是客戶端都需要處理接收到的請求,處理方法由processMessageReceived定義,
注意這里接收到的消息已經被轉換成RemotingCommand了,而不是原始的字節流。
RemotingCommand是RocketMQ自定義的協議,具體格式如下

這個協議只有四部分,但是覆蓋了RocketMQ各個角色間幾乎所有的通信過程,RemotingCommand有
實際的數據類型和各部分對應,如下所示。
1 private int code; 2 private LanguageCode language = LanguageCode.JAVA; 3 private int version = 0; 4 private int opaque = requestId.getAndIncrement(); 5 private int flag = 0; 6 private String remark; 7 private HashMap<String, String> extFields; 8 private transient CommandCustomHeader customHeader; 9 10 private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; 11 12 private transient byte[] body;
RocketMQ各個組件間的通信需要頻繁地在字節碼和RemotingCommand間相互轉換,也就是編碼、
解碼過程,好在Netty提供了codec支持,這個頻繁地操作只需要一行設置即可:pipeline().addLoast(new
NettyEncoder(), now NettyDecoder() )
RocketMQ對通信過程的另一個抽象是Processor和Executor,當接收到一個消息后,直接根據消息的類
型調用對應的Processor和Executor,把通信過程和業務邏輯分離開來。通過一個Broker中的代碼段來看看
注冊Processor的過程
1 public void registerProcessor() { 2 /** 3 * SendMessageProcessor 4 */ 5 SendMessageProcessor sendProcessor = new SendMessageProcessor(this); 6 sendProcessor.registerSendMessageHook(sendMessageHookList); 7 sendProcessor.registerConsumeMessageHook(consumeMessageHookList); 8 9 this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); 10 this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); 11 this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); 12 this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); 13 this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); 14 this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); 15 this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); 16 this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); 17 /** 18 * PullMessageProcessor 19 */ 20 this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); 21 this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); 22 23 /** 24 * QueryMessageProcessor 25 */ 26 NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); 27 this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); 28 this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); 29 30 this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); 31 this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); 32 33 /** 34 * ClientManageProcessor 35 */ 36 ClientManageProcessor clientProcessor = new ClientManageProcessor(this); 37 this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); 38 this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); 39 this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); 40 41 this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); 42 this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); 43 this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); 44 45 /** 46 * ConsumerManageProcessor 47 */ 48 ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); 49 this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); 50 this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); 51 this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); 52 53 this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); 54 this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); 55 this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); 56 57 /** 58 * EndTransactionProcessor 59 */ 60 this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); 61 this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); 62 63 /** 64 * Default 65 */ 66 AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); 67 this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); 68 this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); 69 }
