RocketMq延遲消息實現原理


 

這邊博文介紹Rocketmq的延遲消息的實現管理。文章直接將不會介紹RocketMq的組件,后續將會補上。

首先上圖:

定義用戶topic為study_rocketmq_topic。流程如下:

1.消息消費者將message投遞到broker的commitLog服務

2.commitLog服務判斷message為延遲消息,將實際的topic和queueId保存到message的屬性中(為了后面的流程用於消息的重新投遞)。並將topic設置成延遲topic(SCHEDULE_TOPIC_XXXX),queueId對應的延遲級別。消息投遞時間保存在tagCode中。

3.消息延遲服務(ScheduleMessageService)從SCHEDULE_TOPIC_XXXX主題循環拉取消息。

4.將達到發送要求的消息重新推向commitLog服務

5.commitLog服務,將消息推到study_rocketmq_topic中

6.消息消費者重study_rocketmq_topic拉取消息

 

重要的類:

1.org.apache.rocketmq.store.DefaultMessageStore.putMessage(MessageExtBrokerInner):消息進入mq文件系統的入口

2.org.apache.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner):消息保存到Commit文件的入口(保存消息之后,包含刷滿策略,ha處理)

3.org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService:消息分發服務

4.org.apache.rocketmq.store.schedule.ScheduleMessageService:延遲消息服務

5.org.apache.rocketmq.store.DefaultMessageStore.doDispatch(DispatchRequest):消息服務入口(分發給consumerqueue、index)

圖片地址見:https://github.com/lrlxz1127/rocketmq/blob/%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB%E7%89%88/learn/RocketMq%E5%BB%B6%E8%BF%9F%E6%B6%88%E6%81%AF%E5%AE%9E%E7%8E%B0.svg


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM