consumer 1.啟動
有別於其他消息中間件由broker做負載均衡並主動向consumer投遞消息,RocketMq是基於拉模式拉取消息,consumer做負載均衡並通過長輪詢向broker拉消息。
Consumer消費拉取的消息的方式有兩種
1. Push方式:rocketmq已經提供了很全面的實現,consumer通過長輪詢拉取消息后回調MessageListener接口實現完成消費,應用系統只要MessageListener完成業務邏輯即可
2. Pull方式:完全由業務系統去控制,定時拉取消息,指定隊列消費等等,當然這里需要業務系統去根據自己的業務需求去實現
下面介紹默認以push方式為主,因為絕大多數是由push消費方式來使用rocketmq的。
consumer啟動流程
指定group
訂閱topic
注冊消息監聽處理器,當消息到來時消費消息
消費端Start
復制訂閱關系
初始化rebalance變量
構建offsetStore消費進度存儲對象
啟動消費消息服務
向mqClientFactory注冊本消費者
啟動client端遠程通信
啟動定時任務
定時獲取nameserver地址
定時從nameserver獲取topic路由信息
定時清理下線的borker
定時向所有broker發送心跳信息,(包括訂閱關系)
定時持久化Consumer消費進度(廣播存儲到本地,集群存儲到Broker)
統計信息打點
動態調整消費線程池
啟動拉消息服務PullMessageService
啟動消費端負載均衡服務RebalanceService
從namesrv更新topic路由信息
向所有broker發送心跳信息,(包括訂閱關系)
喚醒Rebalance服務線程

consumer 2.消費端負載均衡
消費端負載均衡
消費端會通過RebalanceService線程,10秒鍾做一次基於topic下的所有隊列負載
消費端遍歷自己的所有topic,依次調rebalanceByTopic 根據topic獲取此topic下的所有queue 選擇一台broker獲取基於group的所有消費端(有心跳向所有broker注冊客戶端信息) 選擇隊列分配策略實例AllocateMessageQueueStrategy執行分配算法,獲取隊列集合Set<MessageQueue>mqSet
1) 平均分配算法,其實是類似於分頁的算法 將所有queue排好序類似於記錄 將所有消費端consumer排好序,相當於頁數 然后獲取當前consumer所在頁面應該分配到的queue 2) 按照配置來分配隊列, 也就是說在consumer啟動的時候指定了queue 3) 按照機房來配置隊列 Consumer啟動的時候會指定在哪些機房的消息 獲取指定機房的queue 然后在執行如1)平均算法 根據分配隊列的結果更新ProccessQueueTable<MessageQueue,ProcessQueue> 1) 比對mqSet 將多余的隊列刪除,當broker當機或者添加,會導致分配到mqSet變化, a) 將不在被本consumer消費的messagequeue的ProcessQueue刪除,其實是設置ProcessQueue的droped屬性為true b) 將超過兩份中沒有拉取動作ProcessQueue刪除 //TODO 為什么要刪除掉,兩分鍾后來了消息怎么辦? // 2) 添加新增隊列,比對mqSet,給新增的messagequeue 構建長輪詢對象PullRequest對象,會從broker獲取消費的進度 構建這個隊列的ProcessQueue 將PullRequest對象派發到長輪詢拉消息服務(單線程異步拉取) 注:ProcessQueue正在被消費的隊列, (1) 長輪詢拉取到消息都會先存儲到ProcessQueue的TreeMap<Long, MessageExt>集合中,消費調后會刪除掉,用來控制consumer消息堆積, TreeMap<Long, MessageExt> key是消息在此ConsumeQueue隊列中索引 (2) 對於順序消息消費處理 locked屬性:當consumer端向broker申請鎖隊列成功后設置true,只有被鎖定的processqueue才能被執行消費 rollback: 將消費在msgTreeMapTemp中的消息,放回msgTreeMap重新消費 commit: 將臨時表msgTreeMapTemp數據清空,代表消費完成,放回最大偏移值 (3) 這里是個TreeMap,對key即消息的offset進行排序,這個樣可以使得消息進行順序消費
consumer 3.長輪詢
Rocketmq的消息是由consumer端主動到broker拉取的, consumer向broker發送拉消息請求, PullMessageService服務通過一個線程將阻塞隊列LinkedBlockingQueue<PullRequest>中的PullRequest到broker拉取消息
DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法執行向broker拉消息動作
1. 獲取ProcessQueue判讀是否drop的, drop為true返回
2. 給ProcessQueue設置拉消息時間戳
3. 流量控制,正在消費隊列中消息(未被消費的)超過閥值,稍后在執行拉消息
4. 流量控制,正在消費隊列中消息的跨度超過閥值(默認2000),稍后在消費
5. 根據topic獲取訂閱關系
6. 構建拉消息回調對象PullBack, 從broker拉取消息(異步拉取)返回結果是回調
7. 從內存中獲取commitOffsetValue //TODO 這個值跟pullRequest.getNextOffset區別
8. 構建sysFlag pull接口用到的flag
9. 調底層通信層向broker發送拉消息請求
如果master壓力過大,會建議去slave拉取消息
如果是到broker拉取消息清楚實時提交標記位,因為slave不允許實時提交消費進度,可以定時提交
//TODO 關於master拉消息實時提交指的是什么?
10. 拉到消息后回調PullCallback
處理broker返回結果pullResult
更新從哪個broker(master 還是slave)拉取消息
反序列化消息
消息過濾
消息中放入隊列最大最小offset,方便應用來感知消息堆積度
將消息加入正在處理隊列ProcessQueue
將消息提交到消費消息服務ConsumeMessageService
流控處理, 如果pullInterval參數大於0 (拉消息間隔,如果為了降低拉取速度,可以設置大於0的值),延遲再執行拉消息, 如果pullInterval為0立刻在執行拉消息動作
序列圖
1. 向broker發送長輪詢請求
2. Broker接收長輪詢請求
3. Consumer接收broker響應
長輪詢活動圖:
一張圖畫不下,再來一張
consumer 4.長輪詢push消息-並發消息
通過長輪詢拉取到消息后會提交到消息服務ConsumeMessageConcurrentlyService,
ConsumeMessageConcurrentlyServic的submitConsumeRequest方法構建ConsumeRequest任務提交到線程池。
長輪詢向broker拉取消息是批量拉取的, 默認設置批量的值為pullBatchSize= 32,可配置
消費端consumer構建一個消費消息任務ConsumeRequest消費一批消息的個數是可配置的consumeMessageBatchMaxSize = 1, 默認批量個數為一個
ConsumeRequest 任務run方法執行
判斷proccessQueue是否被droped的, 廢棄直接返回,不在消費消息
構建並行消費上下文
給消息設置消費失敗時候的retrytopic,當消息發送失敗的時候發送到topic為%RETRY%groupname的隊列中
調MessageListenerConcurrently監聽器的consumeMessage方法消費消息,返回消費結果
如果ProcessQueue的droped為true,不處理結果,不更新offset, 但其實這里消費端是消費了消息的,這種情況感覺有被重復消費的風險
處理消費結果
消費成功, 對於批次消費消息,返回消費成功並不代表所有消息都消費成功,但是消費消息的時候一旦遇到消費消息失敗直接放回,根據ackIndex來標記成功消費到哪里了
消費失敗, ackIndex設置為-1
廣播模式發送失敗的消息丟棄, 廣播模式對於失敗重試代價過高,對整個集群性能會有較大影響,失敗重試功能交由應用處理
集群模式, 將消費失敗的消息一條條的發送到broker的重試隊列中去,如果此時還有發送到重試隊列發送失敗的消息,那就在cosumer的本地線程定時5秒鍾以后重試重新消費消息,在走一次上面的消費流程。
刪除正在消費的隊列processQueue中本次消費的消息,放回消費進度
更新消費進度,這里只是一個內存offsettable的更新,后面有定時任務更新到broker上去
consumer 5.長輪詢push消息-順序消費消息
順序消費服務ConsumeMessageConcurrentlyService構建的時候 構建一個線程池來接收消費請求ConsumeRequest 構建一個單線程的本地線程,用來稍后定時重新消費ConsumeRequest, 用來執行定時周期性(一秒)鍾鎖隊列任務 周期性鎖隊列lockMQPeriodically 獲取正在消費隊列列表ProcessQueueTable所有MesssageQueue, 構建根據broker歸類成MessageQueue集合Map<brokername,Set<MessageQueue>> 遍歷Map<brokername,Set<MessageQueue>>的brokername, 獲取broker的master機器地址,將brokerName的Set<MessageQueue>發送到broker請求鎖定這些隊列。
在broker端鎖定隊列,其實就是在broker的queue中標記一下消費端,表示這個queue被某個client鎖定。 Broker會返回成功鎖定隊列的集合,
根據成功鎖定的MessageQueue,設置對應的正在處理隊列ProccessQueue的locked屬性為true沒有鎖定設置為false 通過長輪詢拉取到消息后會提交到消息服務ConsumeMessageOrderlyService, ConsumeMessageOrderlyService的submitConsumeRequest方法構建ConsumeRequest任務提交到線程池。ConsumeRequest是由ProcessQueue和Messagequeue組成。 ConsumeRequest任務的run方法 判斷proccessQueue是否被droped的, 廢棄直接返回,不在消費消息 每個messagequeue都會生成一個隊列鎖來保證在當前consumer內,同一個隊列串行消費, 判斷processQueue的lock屬性是否為true,lock屬性是否過期,如果為false或者過期,放到本地線程稍后鎖定在消費。 如果lock為true且沒有過期,開始消費消息 計算任務執行的時間如果大於一分鍾且線程數小於隊列數情況下,將processqueue, messagequeue重新構建ConsumeRequest加到線程池10ms后在消費,這樣防止個別隊列被餓死 獲取客戶端的消費批次個數,默認一批次為一條 從proccessqueue獲取批次消息, processqueue.takeMessags(batchSize), 從msgTreeMap中移除消息放到臨時map中msgTreeMapTemp,這個臨時map用來回滾消息和commit消息來實現事物消費 調回調接口消費消息,返回狀態對象ConsumeOrderlyStatus 根據消費狀態,處理結果 1) 非事物方式,自動提交 消息消息狀態為success:調用processQueue.commit方法 獲取msgTreeMapTemp的最后一個key,表示提交的 offset 清空msgTreeMapTemp的消息,已經成功消費 2) 事物提交,由用戶來控制提交回滾(精衛專用) 更新消費進度, 這里的更新只是一個內存offsetTable的更新,后面有定時任務定時更新到broker上去