broker消息接收,假設接收的是一個普通消息(即沒有事務),此處分析也只分析master上動作邏輯,不涉及ha。
1. 如何找到消息接收處理入口
可以通過broker的監聽端口10911順藤摸瓜式的找到 NettyClientConfig.setListenPort-->BrokerStartup-->BrokerController-->NettyRemotingServer
com.alibaba.rocketmq.remoting.netty.NettyDecoder com.alibaba.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler.channelRead0(ChannelHandlerContext, RemotingCommand)
也可以順着broker的啟動腳本找到
BrokerStartup-->BrokerController-->NettyRemotingServer
因為rocketmq連接層默認使用netty開發,如果熟悉netty的話,
可以直接查找ChannelInitializer或者pipeline().addLast等
2. 調試NettyDecoder
com.alibaba.rocketmq.remoting.netty.NettyDecoder.decode(ChannelHandlerContext, ByteBuf)
會調用RemotingCommand.decode(byteBuffer)組裝RemotingCommand
在com.alibaba.rocketmq.remoting.protocol.RemotingCommand.decode(ByteBuffer) 中斷點 排除如下code
1 cmd.getCode() != 0 && cmd.getCode() != 34 && cmd.getCode() != 15 2 && cmd.getCode() != 38 && cmd.getCode() != 11
用producer的client發一條消息到broker
發現RemotingCommand.decode解析出來的RemotingCommand的code為310 310對應RequestCode.SEND_MESSAGE_V2 發送消息
此時處理線程是:Thread [NettyServerWorkerThread_2]
RemotingCommand主要描述了操作code、opaque號、body即消息體。
3. 調試NettyServerHandler
對com.alibaba.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler.channelRead0(ChannelHandlerContext, RemotingCommand)斷點
斷點條件屬性是msg.getCode() == 310
此時處理線程是:Thread [NettyServerWorkerThread_2]
4. NettyRemotingAbstract
com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(ChannelHandlerContext, RemotingCommand)
方法負責分發給相應的NettyRequestProcessor實例
負責處理的是com.alibaba.rocketmq.broker.processor.SendMessageProcessor@17fb0278
處理線程是:Thread [SendMessageThread_1],跟上面的處理已經不在一個線程上了,異步
5. SendMessageProcessor
SendMessageProcessor.processRequest(ChannelHandlerContext, RemotingCommand)介入處理
構建SendMessageRequestHeader
進入SendMessageProcessor.sendMessage(ChannelHandlerContext, RemotingCommand, SendMessageContext, SendMessageRequestHeader)
int queueIdInt = requestHeader.getQueueId(); 作用是什么 什么時候確定的id
構建MessageExtBrokerInner實例 tags轉tagsCode 就是tags的String的hashcode
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
交給MessageStore進行put(store,進行存放)
6. DefaultMessageStore
com.alibaba.rocketmq.store.DefaultMessageStore.putMessage(MessageExtBrokerInner) 消息topic的長度校驗 不能超過127
消息properties轉成字符串后,長度不能超過32767
下面交個CommitLog進行putMessage
7. CommitLog
com.alibaba.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner)
設置存儲時間
設置消息體CRC校驗值
交給MapedFile進行存儲mapedFile.appendMessage(msg, this.appendMessageCallback)
8. MapedFile
mapedFile.appendMessage(msg, this.appendMessageCallback)
獲取當前文件已經寫到什么位置了
文件已經做了map map到mappedByteBuffer 講當前的mappedByteBuffer 割出來並設置position為當前寫位置
交由callback做append
callback是CommitLog$DefaultAppendMessageCallback
傳給callback的參數有
- 1. 這個文件的offset,意思是:一個broker要存儲很多消息,那么一個文件肯定不夠存,當存到第二個文件的時候,這個文件里的第一條消息相對於整個broker中的消息有個offset,此值是文件名。
- 2. 從store文件映射的buffer中割出來的byteBuffer
- 3. 文件的剩余空間
- 4. 消息體
8.1 callback中append邏輯:
計算wroteOffset 即整個broker中的offset, 代碼注釋中稱之為物理offset,計算邏輯也很簡單即上面參數1+這條消息在這個文件中的position
計算消息id,邏輯使用主機物理IP加上一步的wroteOffset計算,用了ByteBuffer處理,比較高效,待完善驗證用例。
獲取這個topic的這個隊列的offset(一個topic寫多個隊列),這個offset有啥用?估計是用於快速查找
將消息bean對象用msgStoreItemMemory進行以約定消息體的形式進行byte的put
將msgStoreItemMemory put到上面傳進來的參數2的buffer 即寫入磁盤
構建append結果,沒啥邏輯,主要是一些信息 AppendMessageResult
將消息在這個隊列中的offset加1
更新MapedFile的wrotePostion,就是將原來的wrotePostion加上這次寫入的字節數
此時訂閱斷已經收到消息,(應該更早,待確認,待分析消息如何送到consume的)
8.2 回CommitLog--:
構建 PutMessageResult對象。
8.3 處理同異步刷盤邏輯
同步刷盤 用GroupCommitService, 其中使用了多條消息一起刷的設計,並且設計了刷盤如果超時的異常場景的反饋
異步刷盤 用CommitLog$FlushRealTimeService(是一個線程,此時會叫醒他)
8.4 處理主從雙機同異步同步邏輯
同步形式同步從節點 交由HAService 實時同步,並等待同步結果。
異步不用管。
9. 收集統計數據
耗時等等
