producer
producer 1.啟動流程
Producer如何感知要發送消息的broker即brokerAddrTable中的值是怎么獲得的,
1. 發送消息的時候指定會指定topic,如果producer集合中沒有會根據指定topic到namesrv獲取topic發布信息TopicPublishInfo,並放入本地集合
2. 定時從namesrv更新topic路由信息,
Producer與broker間的心跳
Producer定時發送心跳將producer信息(其實就是procduer的group)定時發送到, brokerAddrTable集合中列出的broker上去
Producer發送消息只發送到master的broker機器,在通過broker的主從復制機制拷貝到broker的slave上去
producer 2.如何發送消息
Producer輪詢某topic下的所有隊列的方式來實現發送方的負載均衡
1) Topic下的所有隊列如何理解:
比如broker1, broker2, borker3三台broker機器都配置了Topic_A
Broker1 的隊列為queue0 , queue1
Broker2 的隊列為queue0, queue2, queue3,
Broker3 的隊列為queue0
當然一般情況下的broker的配置都是一樣的
以上當broker啟動的時候注冊到namesrv的Topic_A隊列為共6個分別為:
broker1_queue0, broker1_queue1,
broker2_queue0, broker2_queue1, broker2_queue2,
broker3_queue0,
2) Producer如何實現輪詢隊列:
Producer從namesrv獲取的到Topic_A路由信息TopicPublishInfo --List<MessageQueue>messageQueueList //Topic_A的所有的隊列 --AtomicIntegersendWhichQueue //自增整型 方法selectOneMessageQueue方法用來選擇一個發送隊列 (++sendWitchQueue)% messageQueueList.size為隊列集合的下標 每次獲取queue都會通過sendWhichQueue加一來實現對所有queue的輪詢 如果入參lastBrokerName不為空,代表上次選擇的queue發送失敗,這次選擇應該避開同一個queue
3) Producer發消息系統重試:
發送失敗后,重試幾次retryTimesWhenSendFailed = 2
發送消息超時sendMsgTimeout = 3000
Producer通過selectOneMessageQueue方法獲取一個MessagQueue對象
--topic //Topic_A
--brokerName //代表發送消息到達的broker
--queueId //代表發送消息的在指定broker上指定topic下的隊列編號
向指定broker的指定topic的指定queue發送消息
發送失敗(1)重試次數不到兩次(2)發送此條消息花費時間還沒有到3000(毫秒), 換個隊列繼續發送。
producer發送普通消息
producer 3.如何發送順序消息
Rocketmq能夠保證消息嚴格順序,但是Rocketmq需要producer保證順序消息按順序發送到同一個queue中,比如購買流程(1)下單(2)支付(3)支付成功,
這三個消息需要根據特定規則將這個三個消息按順序發送到一個queue
如何實現把順序消息發送到同一個queue:
一般消息是通過輪詢所有隊列發送的,順序消息可以根據業務比如說訂單號orderId相同的消息發送到同一個隊列, 或者同一用戶userId發送到同一隊列等等
messageQueueList [orderId%messageQueueList.size()]
messageQueueList [userId%messageQueueList.size()]
producer 4.如何發布分布式事務消息
先引入官方文檔圖:
分布式事物是基於二階段提交的
1) 一階段,向broker發送一條prepared的消息,返回消息的offset即消息地址commitLog中消息偏移量。Prepared狀態消息不被消費
發送消息ok,執行本地事物分支, 本地事物方法需要實現rocketmq的回調接口
2) LocalTransactionExecuter,
處理本地事物邏輯返回處理的事物狀態LocalTransactionState
3) 二階段,處理完本地事物中業務得到事物狀態, 根據offset查找到commitLog中的prepared消息,設置消息狀態commitType或者rollbackType,
讓后將信息添加到commitLog中, 其實二階段生成了兩條消息
事物消息發送
producer 5.消息在落地broker落地之普通消息
Broker根據producer請求的RequestCode.SEND_MESSAGE選擇對應的處理器SendMessageProcessor
根據請求消息內容構建消息內部結構MessageExtBrokerInner
調DefaultMessageStore加消息寫入commitlog
producer 6.消息在落地broker落地之事務消息
1. 消息落地
commitLog針對事物消息的處理,消息的第20位開始的八位記錄是的消息在邏輯隊列中的queueoffset,
但是針對事物消息為preparedType和rollbackType的存儲的是事物狀態表的索引偏移量
2. 分發事物消息:
分發消息位置信息到ConsumeQueue: 事物狀態為preparedType和rollbackType的消息不會將請求分發到ConsumeQueue中去,即不處理,所以不會被消息
更新transactionstable table:如果是prepared消息記,通過TransactionStateService服務將消息加到存儲事務狀態的表格tranStateTable的文件中;
如果是commitType和rollbackType消息, 修改事物狀態表格tranStateTable中的消息狀態。
記錄Transaction Redo Log日志: 記錄了 commitLogOffset, msgSize,preapredTransactionOffset, storeTimestamp。
3. 事物狀態表
事物狀態表是有MapedFileQueue將多個文件組成一個連續的隊列,它的存儲單元是定長為24個字節的數據,
tranStateTableOffset可以認為是事物狀態消息的個數,索引偏移量, 它的值是 tranStateTable.getMaxOffset()/ TSStoreUnitSize
3. 事物回查
定時回查線程會定時掃描(默認每分鍾)每個存儲事務狀態的表格文件,遍歷存儲事務狀態的表格記錄
如果是已經提交或者回滾的消息調過過,
如果是prepared狀態的如果消息小於事務回查至少間隔時間(默認是一分鍾)跳出終止遍歷
調transactionCheckExecuter.gotocheck方法向producer回查事物狀態,
根據group隨機選擇一台producer
查詢消息,根據commitLogOffset和msgSize到commitlog查找消息
向Producder發起請求,請求code類型為CHECK_TRANSACTION_STATE,producer的DefaultMQProducerImpl.checkTransactionState()方法
來處理broker定時回調的請求,這里構建一個Runnable任務異步執行producer注冊的回調接口,處理回調,在調endTransactionOneway向broker
發送請求更新事物消息的最終狀態
無Prepared消息,且遍歷完,則終止掃描這個文件的定時任務
4. 事物消息的load&recover
TransactionStateService.load ()事物狀態服務加載, 加載只是建立文件映射
redoLog隊列恢復,加載本地redoLog文件
tranStateTable事物狀態表, 加載本地tranStateTable文件
recover:
正常恢復:
利用tranRedoLog文件的recover
利用tranStateTable文件重建事物狀態表
異常恢復:
先按照正常流程恢復TranRedo Log
commitLog異常恢復,commitLog根據checkpoint時間點重新生成 redolog,重新分發消息DispatchRequest,
分發消息到位置信息到ConsumeQueue
更新Transaction State Table
記錄TransactionRedo Log
刪除事物狀態表tranStateTable
通過RedoLog全量恢復StateTable
重頭掃描RedoLog, 過濾出所有prepared狀態的消息, 將commit或者rollback的消息對應的prepared消息刪除
重建StateTable, 將上面過濾出的prepared消息,添加到事物狀態表文件中
這個事物狀態表transstable的作用是定期(1分鍾)將狀態為prepared事物回查producer端redolog這個隊列其實標記消費到哪了,
事物狀態的恢復根本上是有commitlog來做的