rocketmq源碼分析2-broker的消息接收


broker消息接收,假設接收的是一個普通消息(即沒有事務),此處分析也只分析master上動作邏輯,不涉及ha。

1. 如何找到消息接收處理入口

可以通過broker的監聽端口10911順藤摸瓜式的找到 NettyClientConfig.setListenPort-->BrokerStartup-->BrokerController-->NettyRemotingServer
com.alibaba.rocketmq.remoting.netty.NettyDecoder com.alibaba.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler.channelRead0(ChannelHandlerContext, RemotingCommand)

也可以順着broker的啟動腳本找到
BrokerStartup-->BrokerController-->NettyRemotingServer

因為rocketmq連接層默認使用netty開發,如果熟悉netty的話,
可以直接查找ChannelInitializer或者pipeline().addLast等

2. 調試NettyDecoder

com.alibaba.rocketmq.remoting.netty.NettyDecoder.decode(ChannelHandlerContext, ByteBuf)
會調用RemotingCommand.decode(byteBuffer)組裝RemotingCommand
在com.alibaba.rocketmq.remoting.protocol.RemotingCommand.decode(ByteBuffer) 中斷點 排除如下code

1 cmd.getCode() != 0 && cmd.getCode() != 34 && cmd.getCode() != 15 
2 && cmd.getCode() != 38 && cmd.getCode() != 11

 

用producer的client發一條消息到broker
發現RemotingCommand.decode解析出來的RemotingCommand的code為310 310對應RequestCode.SEND_MESSAGE_V2 發送消息

此時處理線程是:Thread [NettyServerWorkerThread_2]
RemotingCommand主要描述了操作code、opaque號、body即消息體。

3. 調試NettyServerHandler

對com.alibaba.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler.channelRead0(ChannelHandlerContext, RemotingCommand)斷點
斷點條件屬性是msg.getCode() == 310
此時處理線程是:Thread [NettyServerWorkerThread_2]

4. NettyRemotingAbstract

com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(ChannelHandlerContext, RemotingCommand)
方法負責分發給相應的NettyRequestProcessor實例
負責處理的是com.alibaba.rocketmq.broker.processor.SendMessageProcessor@17fb0278
處理線程是:Thread [SendMessageThread_1],跟上面的處理已經不在一個線程上了,異步

5. SendMessageProcessor

SendMessageProcessor.processRequest(ChannelHandlerContext, RemotingCommand)介入處理
構建SendMessageRequestHeader
進入SendMessageProcessor.sendMessage(ChannelHandlerContext, RemotingCommand, SendMessageContext, SendMessageRequestHeader)
int queueIdInt = requestHeader.getQueueId(); 作用是什么 什么時候確定的id
構建MessageExtBrokerInner實例 tags轉tagsCode 就是tags的String的hashcode

PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

交給MessageStore進行put(store,進行存放)

6. DefaultMessageStore

com.alibaba.rocketmq.store.DefaultMessageStore.putMessage(MessageExtBrokerInner) 消息topic的長度校驗 不能超過127
消息properties轉成字符串后,長度不能超過32767
下面交個CommitLog進行putMessage

7. CommitLog

com.alibaba.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner)
設置存儲時間
設置消息體CRC校驗值
交給MapedFile進行存儲mapedFile.appendMessage(msg, this.appendMessageCallback)

8. MapedFile

mapedFile.appendMessage(msg, this.appendMessageCallback)
獲取當前文件已經寫到什么位置了
文件已經做了map map到mappedByteBuffer 講當前的mappedByteBuffer 割出來並設置position為當前寫位置
交由callback做append
callback是CommitLog$DefaultAppendMessageCallback
傳給callback的參數有

  • 1. 這個文件的offset,意思是:一個broker要存儲很多消息,那么一個文件肯定不夠存,當存到第二個文件的時候,這個文件里的第一條消息相對於整個broker中的消息有個offset,此值是文件名。
  • 2. 從store文件映射的buffer中割出來的byteBuffer
  • 3. 文件的剩余空間
  • 4. 消息體

8.1 callback中append邏輯:

計算wroteOffset 即整個broker中的offset, 代碼注釋中稱之為物理offset,計算邏輯也很簡單即上面參數1+這條消息在這個文件中的position
計算消息id,邏輯使用主機物理IP加上一步的wroteOffset計算,用了ByteBuffer處理,比較高效,待完善驗證用例。
獲取這個topic的這個隊列的offset(一個topic寫多個隊列),這個offset有啥用?估計是用於快速查找
將消息bean對象用msgStoreItemMemory進行以約定消息體的形式進行byte的put
將msgStoreItemMemory put到上面傳進來的參數2的buffer 即寫入磁盤
構建append結果,沒啥邏輯,主要是一些信息 AppendMessageResult
將消息在這個隊列中的offset加1
更新MapedFile的wrotePostion,就是將原來的wrotePostion加上這次寫入的字節數
此時訂閱斷已經收到消息,(應該更早,待確認,待分析消息如何送到consume的)

8.2 回CommitLog--:

構建 PutMessageResult對象。

8.3 處理同異步刷盤邏輯

同步刷盤 用GroupCommitService, 其中使用了多條消息一起刷的設計,並且設計了刷盤如果超時的異常場景的反饋
異步刷盤 用CommitLog$FlushRealTimeService(是一個線程,此時會叫醒他)

8.4 處理主從雙機同異步同步邏輯

同步形式同步從節點 交由HAService 實時同步,並等待同步結果。
異步不用管。

9. 收集統計數據

耗時等等

10. 對非oneway的消息做response處理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM