前言:
要了解rocketMq 需要知道 數據在 rocketMq 中 是如何進行傳輸,在底層的結構到底是一個什么亞子,這個需要我們對Netty 對字符編解碼有一些了解。
開始:
我們從生產者發送消息,broker 接收消息 為例,來開展底層消息結構。
消息發送流程:
1. 如下是一段生產者發送消息的代碼,這里我們進去第5步看發送消息的流程。
//1. 初始化 mq producer DefaultMQProducer mqProducer =new DefaultMQProducer("iscys-test"); //2.設置nameServer 地址 mqProducer.setNamesrvAddr("localhost:9876"); //3. 開啟mq producer,這一步是必須的,會做一些連接初始化檢測工作 mqProducer.start(); //4.創建 Message Message msg = new Message("test-topis", "iscys-test".getBytes()); //5.發送消息 mqProducer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { //在消息發送成功之后,我們收到broker的響應通知后,會進行回調 System.out.println("send success"); } @Override public void onException(Throwable e) { System.out.println("send fail"); } });
2.消息發送必須經過如下代碼,將消息組裝成 RemotingCommand 對象,無論是發送還是服務端返回消息,都會封裝成這個對象。
public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException {
//1. 初始化RemotingCommand 對象 RemotingCommand request = null;
//2.設置消息頭 if (sendSmartMsg || msg instanceof MessageBatch) {
//多條message SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else {
//單條message request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } // 3.將 message 內容放入request 中 request.setBody(msg.getBody()); //4.發送消息 switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request); default: assert false; break; } return null; }
3. 關於 RemotingCommand 遠程命令 對象,我們看一下它的組成與結構
// 業務code 對應的是 RequestCode 這個常量池的code ,基本每一種業務類型都會對應一個code碼,接收端通過code 進行做不同的處理 private int code; //Java 語言版本 private LanguageCode language = LanguageCode.JAVA; //version 版本信息 private int version = 0; // 消息唯一id ,這個id 會關聯 response private int opaque = requestId.getAndIncrement(); //用來標記這個消息是發送消息的消息還是返回的消息, private int flag = 0; //備注的信息,比如一些錯誤注意信息等 private String remark; //附帶額外的信息 private HashMap<String, String> extFields; //請求頭信息,基本每一種業務類型都會對應一個請求頭類 private transient CommandCustomHeader customHeader; //json private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; //msg body 信息 private transient byte[] body;
從這個消息體中,基本上關於消息的所有信息,都知道了。
4.接下來就是發送消息了:
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { //1.獲取與broker的連接channel ,沒有的話則創建 final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { //2.hook 前置鈎子函數調用 this.rpcHook.doBeforeRequest(addr, request); } //3.發送消息(channel 連接對象,RemotingCommand 對象,超時時間,回調函數) this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); } catch (RemotingSendRequestException e) { log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
5.設置response 對象設置,方便進行發送成功后的回調,進行真實發送
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { //1.獲取消息id final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); //2.創建response對象 final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); //3.一個消息id 一個response 對象,放入responseTable中 this.responseTable.put(opaque, responseFuture); try { //4.Netty API 將消息發送至服務端,並設置發送監聽 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseFuture.putResponse(null); responseTable.remove(opaque); try { executeInvokeCallback(responseFuture); } catch (Throwable e) { log.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { responseFuture.release(); } log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
消息編碼:
從第一步消息發送流程中,我們已經得知RocketMq 在發送消息的時候,都會把消息封裝成RemotingCommand 對象,我們都知道TCP 網絡傳輸會出現拆包與粘包的現象,那么在RocketMq 是怎么解決這一個問題的呢?
關於拆包粘包的問題,常用的4 種解決方案如下:
1.消息定長:一條消息發送我設置固定的長度,長度不夠就行空行補全。
2.消息分隔符:通過設置標志符,進行消息的解析。
3.換行分割
4.自定義消息長度:設置消息頭,來解析消息的長度。
RocketMq 采用的是第4種解決方案,也是很容易操控的一種解決方案,具體實現我們看RocketMq Netty Client bootstrap 初始化的pipeline ;
代碼地址:org.apache.rocketmq.remoting.netty.NettyRemotingClient#start
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } //1. 主要看這里,我們的RemotingCommand 對象會經過如下的處理器 pipeline.addLast( defaultEventExecutorGroup, //1.OutBound編碼處理器,用於將我們的 RemotingCommand對象轉換成底層ByteBuffer,對應編碼方法是remotingCommand 的 encode 方法 new NettyEncoder(), //2.InBound 解碼處理器,將網絡傳輸字節轉換微RemotingCommand ,采用了消息頭指定消息長度進行處理拆包粘包 new NettyDecoder(), //3/心跳檢測 new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), //4.Inbound 處理業務消息類型(業務消息邏輯在這里) new NettyClientHandler()); } }); this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } }
我們先開始分析NettyEncoder 編碼處理流程:
@Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { //1.請求頭 信息放入ByteBuffer,body[] 不會被json序列化,encodeHeader 不會放入body 信息 ByteBuffer header = remotingCommand.encodeHeader(); //2.頭信息寫入Netty 的ByteBuf out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { //3.寫入body out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } }
我們看下如下 encodeHeader 邏輯 ,執行完之后,此時我們的消息結構是:
public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size int length = 4; // 2> header data length byte[] headerData; //請求頭 JSON 序列化請求頭Byte信息 headerData = this.headerEncode(); length += headerData.length; // 3> body data length //此時length 的長度 = 4+headerLength+bodyLength length += bodyLength; //分配一個 【4個字節長度 Total Length】+【4個字節長度 header Length】+【headerLength】 ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length //前4個字節放消息的總長度 result.putInt(length); // header length //然后放入4字節長度的消息頭長度 result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data //放入消息頭信息 result.put(headerData); //flip 改變position 指針,后續進行寫操作 result.flip(); return result; }
在 out.writeBytes(body) 后,寫入body 消息后我們的消息結構是如下這個樣子的:
消息解碼:
需要看 NettyDecoder inbound 處理邏輯:
1.從類結構上看,RocketMq 解碼器繼承了netty 的 自定義長度解碼器,來實現消息的解碼
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
2.定義一些長度偏移量,解決拆包粘包 ,定義了
1.消息的最大的長度。
2.消息長度字段的偏移量(從哪里開始讀消息總長度)。
3.消息長度占幾個字節(我應該讀幾個字節就可以讀到消息長度)。
4.長度字段的補償值,一些額外的字段。
5.最后的解碼后結構我應該丟棄消息的前幾個字節(比如解碼后我不想要前4字字節的內容)。
public NettyDecoder() { super(FRAME_MAX_LENGTH, 0, 4, 0, 4); }
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { this( maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true); }
3.從上面的定義中,我們可以得知,在解碼完之后,由於丟棄了前4位的消息總長度(也沒有作用的值看來),解碼完之后消息格式 結構如下
4.然后進行RemotingDecoder:
public static RemotingCommand decode(final ByteBuffer byteBuffer) { //1.得到ByteBuffer 的總長度 int length = byteBuffer.limit(); //2.消息頭長度,因為丟棄了前4位的消息總長度,所以目前前4位是頭長度 int oriHeaderLen = byteBuffer.getInt(); int headerLength = getHeaderLength(oriHeaderLen); //3.取出消息頭的消息信息 byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); //4.json 序列化成對象 RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); //5.得出body 的長度 【total - 4 -header】 int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; //6.buffer 取出body 信息 byteBuffer.get(bodyData); } //7.設置body cmd.body = bodyData; return cmd; }
over