正文
首先來明確一下 Offset 的含義, RocketMQ 中, 一 種類型的消息會放到 一 個 Topic 里,為了能夠並行, 一般一個 Topic 會有多個 Message Queue (也可以 設置成一個), Offset是指某個 Topic下的一條消息在某個 Message Queue里的 位置,通過 Offset的值可以定位到這條消息,或者指示 Consumer從這條消息 開始向后繼續處理 。
Offset主要分為本地文件類型和 Broker代存 的類型兩種 。
Rocketmq集群有兩種消費模式
默認是 CLUSTERING 模 式,也就是同一個 Consumer group 里的多個消費者每人消費一部分,各自收到 的消息內容不一樣 。 這種情況下,由 Broker 端存儲和控制 Offset 的值,使用 RemoteBrokerOffsetStore 結構 。
BROADCASTING模式下,每個 Consumer 都收到這個 Topic 的全部消息,各個 Consumer 間相互沒有干擾, RocketMQ 使 用 LocalfileOffsetStore,把 Offset存到本地 。
概念
- message queue 是無限長的數組,一條消息進來下標就會漲1,下標就是 offset,消息在某個 MessageQueue 里的位置,通過 offset 的值可以定位到這條消息,或者指示 Consumer 從這條消息開始向后處理。
- message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 並不是最新的那條消息的 offset,而是最新消息的 offset+1,minOffset 則是現存在的最小 offset。
- fileReserveTime=48 默認消息存儲48小時后,消費會被物理地從磁盤刪除,message queue 的 minOffset 也就對應增長。所以比 minOffset 還要小的那些消息已經不在 broker上了,就無法被消費
類型(父類是OffsetStore):
- 本地文件類型
- DefaultMQPushConsumer 的 BROADCASTING 廣播模式,各個 Consumer 沒有互相干擾,使用 LocalFileOffsetStore,把 Offset 存儲在本地
- Broker 代存儲類型
- DefaultMQPushConsumer 的 CLUSTERING 集群模式,由 Broker 端存儲和控制 Offset 的值,使用 RemoteBrokerOffsetStore
作用
- 主要是記錄消息的偏移量,有多個消費者進行消費
- 集群模式下采用 RemoteBrokerOffsetStore,broker 控制 offset 的值
- 廣播模式下采用 LocalFileOffsetStore,消費端存儲
建議采用 pushConsumer,RocketMQ 自動維護 OffsetStore,如果用另外一種 pullConsumer 需要自己進行維護 OffsetStore
消息存儲 CommitLog
消息存儲是由 ConsumeQueue 和 CommitLog 配合完成
- ConsumeQueue 是邏輯隊列,CommitLog 是真正存儲消息文件的,ConsumeQueue 存儲的是指向物理存儲的地址。Topic 下的每個 message queue 都有對應的 ConsumeQueue 文件,內容也會被持久化到磁盤。默認地址:store/consumequeue/{topicName}/{queueid}/fileName
- CommitLog:存儲消息真正內容的文件。
-
- 生成規則:
- 每個文件的默認1G =1024 * 1024 * 1024,commitlog 的文件名 fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如 00000000000000000000 代表了第一個文件,起始偏移量為0,文件大小為1G=1 073 741 824 Byte;當這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824,消息存儲的時候會順序寫入文件,當文件滿了則寫入下一個文件。
- 判斷消息存儲在哪個 CommitLog 上
- 例如 1073742827 為物理偏移量,則其對應的相對偏移量為 1003 = 1073742827 - 1073741824,並且該偏移量位於第二個 CommitLog。
- 生成規則:
Broker 里面一個 Topic 里面有多個 MesssageQueue,每個 MessageQueue 對應一個 ConsumeQueue,ConsumeQueue 里面記錄的是消息在 CommitLog 里面的物理存儲地址。
IndexFile 消息索引文件
ConsumerQueue是通過偏移量offset去CommitLog文件中查找消息,但實際工作應用中,我們想查找某條具體的消息,並不知道offset值,那該怎么辦呢?那IndexFile作用就來了。
IndexFile是消息索引文件,如果一個生產者發送的消息包含key值的話,會使用IndexFile存儲消息索引,主要用於使用key來查詢消息。文件的內容結構如圖
在Broker端,通過Key來計算Hash槽的位置,從而找到Index索引數據。從Index索引中拿到消息的物理偏移量,最后根據這個物理偏移量,直接到CommitLog文件中去找就可以了。另外說明下,通過IndexFile來查找消息的方法不影響RocketMQ的正常生產-消費流程,它只是查詢定位消息的方法而已。
offset
在rocketMQ中,offset用來管理每個消費隊列的不同消費組的消費進度。對offset的管理分為本地模式和遠程模式,本地模式是以文本文件的形式存儲在客戶端,而遠程模式是將數據保存到broker端,對應的數據結構分別為LocalFileOffsetStore和RemoteBrokerOffsetStore。
默認情況下,當消費模式為廣播模式時,offset使用本地模式存儲,因為每條消息會被所有的消費者消費,每個消費者管理自己的消費進度,各個消費者之間不存在消費進度的交集;當消費模式為集群消費時,則使用遠程模式管理offset,消息會被多個消費者消費,不同的是每個消費者只負責消費其中部分消費隊列,添加或刪除消費者,都會使負載發生變動,容易造成消費進度沖突,因此需要集中管理。同時,RocketMQ也提供接口供用戶自己實現offset管理(實現OffsetStore接口)。
生產環境上一般使用集群模式,本文主要記錄集群模式下offset的管理,即RemoteBrokerOffsetStore。
broke端
offset的存儲與加載
rocketMQ的broker端中,offset的是以json的形式持久化到磁盤文件中,文件路徑為${user.home}/store/config/consumerOffset.json。其內容示例如下:
{ "offsetTable": { "test-topic@test-group": { "0": 88526, "1": 88528 } } }
broker端啟動后,會調用BrokerController.initialize()方法,方法中會對offset進行加載,consumerOffsetManager.load()。獲取文件內容后,序列化為ConsumerOffsetManager對象,實質是其屬性ConcurrentMap<String,ConcurrentMap<Integer, Long>> offsetTable,offsetTable的數據結構為ConcurrentMap,是一個線程安全的容器,key的形式為topic@group(每個topic下不同消費組的消費進度),value也是一個ConcurrentMap,key為queueId,value為消費位移(這里不是offset而是位移)。通過對全局ConsumerOffsetManager對象就可以對各個topic下不同消費組的消費位移進行獲取與管理。
/**ConsumerOffsetManager.offsetTable*/ private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512); /**ConsumerOffsetManager.decode*/ public void decode(String jsonString) { if (jsonString != null) { // 序列化成功后復制給全局ConsumerOffsetManager對象 ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); if (obj != null) { this.offsetTable = obj.offsetTable; } } }
commitLog與offset
如下圖所示,producer發送消息到broker之后,會將消息具體內容持久化到commitLog文件中,再分發到topic下的消費隊列consume Queue,消費者提交消費請求時,broker從該consumer負責的消費隊列中根據請求參數起始offset獲取待消費的消息索引信息,再從commitLog中獲取具體的消息內容返回給consumer。在這個過程中,consumer提交的offset為本次請求的起始消費位置,即beginOffset;consume Queue中的offset定位了commitLog中具體消息的位置。
consume Queue中每個消息索引信息長度為20bytes,包括8位長度的offset,記錄commitLog中消息內容的位移;4位長度的size,記錄具體消息內容的長度;8位長度的tagHashCode,記錄消息的tag的哈希值(訂閱時如果指定tag,會根據HashCode快速查找訂閱的消息)

nextBeginOffset
對於consumer的消費請求處理(PullMessageProcessor.processRequest()),除了待消費的消息內容,broker在responseHeader(PullMessageResponseHeader)附帶上當前消費隊列的最小offset(minOffset)、最大offset(maxOffset)、及下次拉取的起始offset(nextBeginOffset)。
- minOffset、maxOffset是當前消費隊列consumeQueue記錄的最小及最大的offset信息。
- nextBeginOffset是consumer下次拉取消息的offset信息,即consumer對該consumeQueue的消費進度。
其中nextBeginOffset是consumer在下一輪消息拉取時offset的重要依據,無論當次拉取的消息消費是否正常,nextBeginOffset都不會回滾,這是因為rocketMQ對消費異常的消息的處理是將消息重新發回broker端的重試隊列(會為每個topic創建一個重試隊列,以%RERTY%開頭),達到重試時間后將消息投遞到重試隊列中進行消費重試。對消費異常的處理不是通過offset回滾,這使得客戶端簡化了offset的管理。
client端
offset初始化
consumer啟動過程中(Consumer主函數默認調用DefaultMQPushConsumer.start()方法)根據MessageModel選擇對應的offsetStore,然后調用offsetStore.load()對offset進行加載,LocalFileOffsetStore是對本地文件的加載,而RemotebrokerOffsetStore是沒有本地文件的,因此load()方法沒有實現。在rebalance完成對messageQueue的分配之后會對messageQueue對應的消費位置offset進行更新。
/** RebalanceImpl */ /** doRebalance() -> rebalanceByTopic() -> updateProcessQueueTableInRebalance() -> computePullFromWhere() */ private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { // (省略部分代碼)負載均衡獲取當前consumer負責的消息隊列后對processQueue進行篩選,刪除processQueue不必要的messageQueue // 獲取topic下consumer消息拉取列表,List<PullRequest> List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } // 刪除messageQueue舊的offset信息 this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); // 獲取nextOffset,即更新當前messageQueue對應請求的offset long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } }
Push模式下,computePullFromWhere()方法的實現類為RebalancePushImpl.class。根據配置信息consumeFromWhere進行不同的操作。ConsumeFromWhere的類型枚舉如下,其中有三個已經被標記為Deprecated(基於rocketmq-all 4.6.0版本)
public enum ConsumeFromWhere { CONSUME_FROM_LAST_OFFSET, @Deprecated CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST, @Deprecated CONSUME_FROM_MIN_OFFSET, @Deprecated CONSUME_FROM_MAX_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP, }
- CONSUME_FROM_LAST_OFFSET
從最新的offset開始消費。
獲取consumer對當前消息隊列messageQueue的消費進度lastOffset,如果lastOffset>=0,從lastOffset開始消費;如果lastOffset小於0說明是first start,沒有offset信息,topic為重試topic時從0開始消費,否則請求獲取該消息隊列對應的消費隊列consumeQueue的最大offset(maxOffset),從maxOffset開始消費
- CONSUME_FROM_FIRST_OFFSET
從第一個offset開始消費。
獲取consumer對當前消息隊列messageQueue的消費進度lastOffset,如果lastOffset>=0,從lastOffset開始消費;
否則從0開始消費。
- CONSUME_FROM_TIMESTAMP
獲取consumer對當前消息隊列messageQueue的消費進度lastOffset,如果lastOffset>=0,從lastOffset開始消費;
當lastOffset<0,如果為重試topic,獲取consumeQueue的最大offset;否則獲取ConsumeTimestamp(consumer啟動時間),根據時間戳請求查找offset。
上述三種消費位置的設置流程有一個共同點,都請求獲取consumer對當前消息隊列messageQueue的消費進度lastOffset,如果lastOffset不小於0,則從lastOffset開始消費。這也是有時候設置了CONSUME_FROM_FIRST_OFFSET卻不是從0開始重新消費的原因,rocketMQ減少了由於配置原因造成的重復消費。
對於lastOffset、maxOffset、時間戳查找offset都是通過MQClientAPIImpl提供的接口進行查詢的,MQClientAPIImplclient對broker請求的封裝類,使用Netty進行異步請求,對應的RequestCode分別為RequestCode.QUERY_CONSUMER_OFFSET、RequestCode.GET_MAX_OFFSET、RequestCode.SEARCH_OFFSET_BY_TIMESTAMP。
/** RebalancePushImpl */ public long computePullFromWhere(MessageQueue mq) { long result = -1; final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: case CONSUME_FROM_MIN_OFFSET: case CONSUME_FROM_MAX_OFFSET: case CONSUME_FROM_LAST_OFFSET: { // 從broker獲取當前消費隊列offset long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L; } else { try { // 獲取消費隊列最大offset result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } case CONSUME_FROM_FIRST_OFFSET: { // 先查詢當前消費隊列消費進度 long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } // 當前消費隊列消費進度小於0,則從0開始 else if (-1 == lastOffset) { result = 0L; } else { result = -1; } break; } case CONSUME_FROM_TIMESTAMP: { // 同樣也是先查詢當前消費隊列消費進度 long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (