Rocketmq消息持久化


本文編寫,參考:https://my.oschina.net/bieber/blog/725646

producer Send()的Message最終將由broker處理,處理類為:SendMessageProcessor ,處理方法:processRequet.

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

private List<ConsumeMessageHook> consumeMessageHookList;

public SendMessageProcessor(final BrokerController brokerController) {
super(brokerController);
}

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {}
上述方法,並不是直接處理消息,而是交由MessageStore處理,相關代碼如下:
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
//......
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

然而MessageStore也不直接持久化消息,轉交給 CommitLog
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);

從MappedFileQueue中取出最新的一條:
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//寫消息
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
//持久化到磁盤,最終通過FileChannel持久化到文件
handleDiskFlush(result, putMessageResult, messageExtBatch);

handleHA(result, putMessageResult, messageExtBatch);


2.cousumer 從broker讀消息。
消費者從broker讀取消息經由PullMessageProcessor類處理的,processRequest()方法處理請求:
RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)

經過一系列的判斷處理,之后交由 MessageStore:
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
讀取消息。
之后交由commitLog,讀出消息,
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
可以看到是先從ConsumerQueue中獲取消息索引,然后再從commitlog中讀取消息內容。這些內容也是在存儲消息的時候寫入的。
相關也可參考:http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM