RocketMQ原理解析-Producer


producer 

producer 1.啟動流程

Producer如何感知要發送消息的broker即brokerAddrTable中的值是怎么獲得的,
1.      發送消息的時候指定會指定topic,如果producer集合中沒有會根據指定topic到namesrv獲取topic發布信息TopicPublishInfo,並放入本地集合
2.      定時從namesrv更新topic路由信息,

Producer與broker間的心跳

Producer定時發送心跳將producer信息(其實就是procduer的group)定時發送到, brokerAddrTable集合中列出的broker上去
Producer發送消息只發送到master的broker機器,在通過broker的主從復制機制拷貝到broker的slave上去

producer 2.如何發送消息

Producer輪詢某topic下的所有隊列的方式來實現發送方的負載均衡

 

 

1)  Topic下的所有隊列如何理解:

比如broker1, broker2, borker3三台broker機器都配置了Topic_A

Broker1 的隊列為queue0 , queue1

Broker2 的隊列為queue0, queue2, queue3,

Broker3 的隊列為queue0

當然一般情況下的broker的配置都是一樣的

以上當broker啟動的時候注冊到namesrv的Topic_A隊列為共6個分別為:

broker1_queue0, broker1_queue1,

broker2_queue0, broker2_queue1, broker2_queue2,

broker3_queue0,

2)  Producer如何實現輪詢隊列:

Producer從namesrv獲取的到Topic_A路由信息TopicPublishInfo

           --List<MessageQueue>messageQueueList  //Topic_A的所有的隊列

           --AtomicIntegersendWhichQueue        //自增整型

           方法selectOneMessageQueue方法用來選擇一個發送隊列

                    (++sendWitchQueue)% messageQueueList.size為隊列集合的下標

                    每次獲取queue都會通過sendWhichQueue加一來實現對所有queue的輪詢

                            如果入參lastBrokerName不為空,代表上次選擇的queue發送失敗,這次選擇應該避開同一個queue

3)  Producer發消息系統重試:

發送失敗后,重試幾次retryTimesWhenSendFailed = 2

發送消息超時sendMsgTimeout = 3000

Producer通過selectOneMessageQueue方法獲取一個MessagQueue對象

           --topic            //Topic_A

           --brokerName           //代表發送消息到達的broker

           --queueId              //代表發送消息的在指定broker上指定topic下的隊列編號

向指定broker的指定topic的指定queue發送消息

                   發送失敗(1)重試次數不到兩次(2)發送此條消息花費時間還沒有到3000(毫秒), 換個隊列繼續發送。 

 producer發送普通消息

producer 3.如何發送順序消息

Rocketmq能夠保證消息嚴格順序,但是Rocketmq需要producer保證順序消息按順序發送到同一個queue中,比如購買流程(1)下單(2)支付(3)支付成功,
這三個消息需要根據特定規則將這個三個消息按順序發送到一個queue 如何實現把順序消息發送到同一個queue: 一般消息是通過輪詢所有隊列發送的,順序消息可以根據業務比如說訂單號orderId相同的消息發送到同一個隊列, 或者同一用戶userId發送到同一隊列等等 messageQueueList [orderId%messageQueueList.size()] messageQueueList [userId%messageQueueList.size()]

 

 

producer 4.如何發布分布式事務消息

先引入官方文檔圖:

  

分布式事物是基於二階段提交的

1)      一階段,向broker發送一條prepared的消息,返回消息的offset即消息地址commitLog中消息偏移量。Prepared狀態消息不被消費
發送消息ok,執行本地事物分支, 本地事物方法需要實現rocketmq的回調接口

2) LocalTransactionExecuter,
處理本地事物邏輯返回處理的事物狀態LocalTransactionState 3) 二階段,處理完本地事物中業務得到事物狀態, 根據offset查找到commitLog中的prepared消息,設置消息狀態commitType或者rollbackType,
讓后將信息添加到commitLog中, 其實二階段生成了兩條消息

事物消息發送

 

producer 5.消息在落地broker落地之普通消息

Broker根據producer請求的RequestCode.SEND_MESSAGE選擇對應的處理器SendMessageProcessor

         根據請求消息內容構建消息內部結構MessageExtBrokerInner

         調DefaultMessageStore加消息寫入commitlog

 

producer 6.消息在落地broker落地之事務消息

1. 消息落地

commitLog針對事物消息的處理,消息的第20位開始的八位記錄是的消息在邏輯隊列中的queueoffset,
但是針對事物消息為preparedType和rollbackType的存儲的是事物狀態表的索引偏移量

2. 分發事物消息:   

  分發消息位置信息到ConsumeQueue: 事物狀態為preparedType和rollbackType的消息不會將請求分發到ConsumeQueue中去,即不處理,所以不會被消息
  更新transactionstable table:如果是prepared消息記,通過TransactionStateService服務將消息加到存儲事務狀態的表格tranStateTable的文件中;
如果是commitType和rollbackType消息, 修改事物狀態表格tranStateTable中的消息狀態。 記錄Transaction Redo Log日志: 記錄了 commitLogOffset, msgSize,preapredTransactionOffset, storeTimestamp。

3. 事物狀態表

         事物狀態表是有MapedFileQueue將多個文件組成一個連續的隊列,它的存儲單元是定長為24個字節的數據,

         tranStateTableOffset可以認為是事物狀態消息的個數,索引偏移量, 它的值是 tranStateTable.getMaxOffset()/ TSStoreUnitSize     

 

 

3. 事物回查

定時回查線程會定時掃描(默認每分鍾)每個存儲事務狀態的表格文件,遍歷存儲事務狀態的表格記錄

如果是已經提交或者回滾的消息調過過,

如果是prepared狀態的如果消息小於事務回查至少間隔時間(默認是一分鍾)跳出終止遍歷

調transactionCheckExecuter.gotocheck方法向producer回查事物狀態,

         根據group隨機選擇一台producer

         查詢消息,根據commitLogOffset和msgSize到commitlog查找消息

         向Producder發起請求,請求code類型為CHECK_TRANSACTION_STATE,producer的DefaultMQProducerImpl.checkTransactionState()方法
來處理broker定時回調的請求,這里構建一個Runnable任務異步執行producer注冊的回調接口,處理回調,在調endTransactionOneway向broker
發送請求更新事物消息的最終狀態 無Prepared消息,且遍歷完,則終止掃描這個文件的定時任務

 

4. 事物消息的load&recover

TransactionStateService.load ()事物狀態服務加載, 加載只是建立文件映射
 redoLog隊列恢復,加載本地redoLog文件
 tranStateTable事物狀態表, 加載本地tranStateTable文件

 recover:

正常恢復:

         利用tranRedoLog文件的recover

         利用tranStateTable文件重建事物狀態表

異常恢復:

         先按照正常流程恢復TranRedo Log

         commitLog異常恢復,commitLog根據checkpoint時間點重新生成 redolog,重新分發消息DispatchRequest,

分發消息到位置信息到ConsumeQueue

                  更新Transaction State Table

                   記錄TransactionRedo Log

         刪除事物狀態表tranStateTable

通過RedoLog全量恢復StateTable

         重頭掃描RedoLog, 過濾出所有prepared狀態的消息, 將commit或者rollback的消息對應的prepared消息刪除

         重建StateTable,  將上面過濾出的prepared消息,添加到事物狀態表文件中
這個事物狀態表transstable的作用是定期(1分鍾)將狀態為prepared事物回查producer端redolog這個隊列其實標記消費到哪了, 
事物狀態的恢復根本上是有commitlog來做的

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM