正文
首先来明确一下 Offset 的含义, RocketMQ 中, 一 种类型的消息会放到 一 个 Topic 里,为了能够并行, 一般一个 Topic 会有多个 Message Queue (也可以 设置成一个), Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置,通过 Offset的值可以定位到这条消息,或者指示 Consumer从这条消息 开始向后继续处理 。
Offset主要分为本地文件类型和 Broker代存 的类型两种 。
Rocketmq集群有两种消费模式
默认是 CLUSTERING 模 式,也就是同一个 Consumer group 里的多个消费者每人消费一部分,各自收到 的消息内容不一样 。 这种情况下,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构 。
BROADCASTING模式下,每个 Consumer 都收到这个 Topic 的全部消息,各个 Consumer 间相互没有干扰, RocketMQ 使 用 LocalfileOffsetStore,把 Offset存到本地 。
概念
- message queue 是无限长的数组,一条消息进来下标就会涨1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。
- message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset+1,minOffset 则是现存在的最小 offset。
- fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费
类型(父类是OffsetStore):
- 本地文件类型
- DefaultMQPushConsumer 的 BROADCASTING 广播模式,各个 Consumer 没有互相干扰,使用 LocalFileOffsetStore,把 Offset 存储在本地
- Broker 代存储类型
- DefaultMQPushConsumer 的 CLUSTERING 集群模式,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore
作用
- 主要是记录消息的偏移量,有多个消费者进行消费
- 集群模式下采用 RemoteBrokerOffsetStore,broker 控制 offset 的值
- 广播模式下采用 LocalFileOffsetStore,消费端存储
建议采用 pushConsumer,RocketMQ 自动维护 OffsetStore,如果用另外一种 pullConsumer 需要自己进行维护 OffsetStore
消息存储 CommitLog
消息存储是由 ConsumeQueue 和 CommitLog 配合完成
- ConsumeQueue 是逻辑队列,CommitLog 是真正存储消息文件的,ConsumeQueue 存储的是指向物理存储的地址。Topic 下的每个 message queue 都有对应的 ConsumeQueue 文件,内容也会被持久化到磁盘。默认地址:store/consumequeue/{topicName}/{queueid}/fileName
- CommitLog:存储消息真正内容的文件。
-
- 生成规则:
- 每个文件的默认1G =1024 * 1024 * 1024,commitlog 的文件名 fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如 00000000000000000000 代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824 Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,消息存储的时候会顺序写入文件,当文件满了则写入下一个文件。
- 判断消息存储在哪个 CommitLog 上
- 例如 1073742827 为物理偏移量,则其对应的相对偏移量为 1003 = 1073742827 - 1073741824,并且该偏移量位于第二个 CommitLog。
- 生成规则:
Broker 里面一个 Topic 里面有多个 MesssageQueue,每个 MessageQueue 对应一个 ConsumeQueue,ConsumeQueue 里面记录的是消息在 CommitLog 里面的物理存储地址。
IndexFile 消息索引文件
ConsumerQueue是通过偏移量offset去CommitLog文件中查找消息,但实际工作应用中,我们想查找某条具体的消息,并不知道offset值,那该怎么办呢?那IndexFile作用就来了。
IndexFile是消息索引文件,如果一个生产者发送的消息包含key值的话,会使用IndexFile存储消息索引,主要用于使用key来查询消息。文件的内容结构如图
在Broker端,通过Key来计算Hash槽的位置,从而找到Index索引数据。从Index索引中拿到消息的物理偏移量,最后根据这个物理偏移量,直接到CommitLog文件中去找就可以了。另外说明下,通过IndexFile来查找消息的方法不影响RocketMQ的正常生产-消费流程,它只是查询定位消息的方法而已。
offset
在rocketMQ中,offset用来管理每个消费队列的不同消费组的消费进度。对offset的管理分为本地模式和远程模式,本地模式是以文本文件的形式存储在客户端,而远程模式是将数据保存到broker端,对应的数据结构分别为LocalFileOffsetStore和RemoteBrokerOffsetStore。
默认情况下,当消费模式为广播模式时,offset使用本地模式存储,因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集;当消费模式为集群消费时,则使用远程模式管理offset,消息会被多个消费者消费,不同的是每个消费者只负责消费其中部分消费队列,添加或删除消费者,都会使负载发生变动,容易造成消费进度冲突,因此需要集中管理。同时,RocketMQ也提供接口供用户自己实现offset管理(实现OffsetStore接口)。
生产环境上一般使用集群模式,本文主要记录集群模式下offset的管理,即RemoteBrokerOffsetStore。
broke端
offset的存储与加载
rocketMQ的broker端中,offset的是以json的形式持久化到磁盘文件中,文件路径为${user.home}/store/config/consumerOffset.json。其内容示例如下:
{ "offsetTable": { "test-topic@test-group": { "0": 88526, "1": 88528 } } }
broker端启动后,会调用BrokerController.initialize()方法,方法中会对offset进行加载,consumerOffsetManager.load()。获取文件内容后,序列化为ConsumerOffsetManager对象,实质是其属性ConcurrentMap<String,ConcurrentMap<Integer, Long>> offsetTable,offsetTable的数据结构为ConcurrentMap,是一个线程安全的容器,key的形式为topic@group(每个topic下不同消费组的消费进度),value也是一个ConcurrentMap,key为queueId,value为消费位移(这里不是offset而是位移)。通过对全局ConsumerOffsetManager对象就可以对各个topic下不同消费组的消费位移进行获取与管理。
/**ConsumerOffsetManager.offsetTable*/ private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512); /**ConsumerOffsetManager.decode*/ public void decode(String jsonString) { if (jsonString != null) { // 序列化成功后复制给全局ConsumerOffsetManager对象 ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); if (obj != null) { this.offsetTable = obj.offsetTable; } } }
commitLog与offset
如下图所示,producer发送消息到broker之后,会将消息具体内容持久化到commitLog文件中,再分发到topic下的消费队列consume Queue,消费者提交消费请求时,broker从该consumer负责的消费队列中根据请求参数起始offset获取待消费的消息索引信息,再从commitLog中获取具体的消息内容返回给consumer。在这个过程中,consumer提交的offset为本次请求的起始消费位置,即beginOffset;consume Queue中的offset定位了commitLog中具体消息的位置。
consume Queue中每个消息索引信息长度为20bytes,包括8位长度的offset,记录commitLog中消息内容的位移;4位长度的size,记录具体消息内容的长度;8位长度的tagHashCode,记录消息的tag的哈希值(订阅时如果指定tag,会根据HashCode快速查找订阅的消息)

nextBeginOffset
对于consumer的消费请求处理(PullMessageProcessor.processRequest()),除了待消费的消息内容,broker在responseHeader(PullMessageResponseHeader)附带上当前消费队列的最小offset(minOffset)、最大offset(maxOffset)、及下次拉取的起始offset(nextBeginOffset)。
- minOffset、maxOffset是当前消费队列consumeQueue记录的最小及最大的offset信息。
- nextBeginOffset是consumer下次拉取消息的offset信息,即consumer对该consumeQueue的消费进度。
其中nextBeginOffset是consumer在下一轮消息拉取时offset的重要依据,无论当次拉取的消息消费是否正常,nextBeginOffset都不会回滚,这是因为rocketMQ对消费异常的消息的处理是将消息重新发回broker端的重试队列(会为每个topic创建一个重试队列,以%RERTY%开头),达到重试时间后将消息投递到重试队列中进行消费重试。对消费异常的处理不是通过offset回滚,这使得客户端简化了offset的管理。
client端
offset初始化
consumer启动过程中(Consumer主函数默认调用DefaultMQPushConsumer.start()方法)根据MessageModel选择对应的offsetStore,然后调用offsetStore.load()对offset进行加载,LocalFileOffsetStore是对本地文件的加载,而RemotebrokerOffsetStore是没有本地文件的,因此load()方法没有实现。在rebalance完成对messageQueue的分配之后会对messageQueue对应的消费位置offset进行更新。
/** RebalanceImpl */ /** doRebalance() -> rebalanceByTopic() -> updateProcessQueueTableInRebalance() -> computePullFromWhere() */ private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { // (省略部分代码)负载均衡获取当前consumer负责的消息队列后对processQueue进行筛选,删除processQueue不必要的messageQueue // 获取topic下consumer消息拉取列表,List<PullRequest> List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } // 删除messageQueue旧的offset信息 this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); // 获取nextOffset,即更新当前messageQueue对应请求的offset long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } }
Push模式下,computePullFromWhere()方法的实现类为RebalancePushImpl.class。根据配置信息consumeFromWhere进行不同的操作。ConsumeFromWhere的类型枚举如下,其中有三个已经被标记为Deprecated(基于rocketmq-all 4.6.0版本)
public enum ConsumeFromWhere { CONSUME_FROM_LAST_OFFSET, @Deprecated CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST, @Deprecated CONSUME_FROM_MIN_OFFSET, @Deprecated CONSUME_FROM_MAX_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP, }
- CONSUME_FROM_LAST_OFFSET
从最新的offset开始消费。
获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset>=0,从lastOffset开始消费;如果lastOffset小于0说明是first start,没有offset信息,topic为重试topic时从0开始消费,否则请求获取该消息队列对应的消费队列consumeQueue的最大offset(maxOffset),从maxOffset开始消费
- CONSUME_FROM_FIRST_OFFSET
从第一个offset开始消费。
获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset>=0,从lastOffset开始消费;
否则从0开始消费。
- CONSUME_FROM_TIMESTAMP
获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset>=0,从lastOffset开始消费;
当lastOffset<0,如果为重试topic,获取consumeQueue的最大offset;否则获取ConsumeTimestamp(consumer启动时间),根据时间戳请求查找offset。
上述三种消费位置的设置流程有一个共同点,都请求获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset不小于0,则从lastOffset开始消费。这也是有时候设置了CONSUME_FROM_FIRST_OFFSET却不是从0开始重新消费的原因,rocketMQ减少了由于配置原因造成的重复消费。
对于lastOffset、maxOffset、时间戳查找offset都是通过MQClientAPIImpl提供的接口进行查询的,MQClientAPIImplclient对broker请求的封装类,使用Netty进行异步请求,对应的RequestCode分别为RequestCode.QUERY_CONSUMER_OFFSET、RequestCode.GET_MAX_OFFSET、RequestCode.SEARCH_OFFSET_BY_TIMESTAMP。
/** RebalancePushImpl */ public long computePullFromWhere(MessageQueue mq) { long result = -1; final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: case CONSUME_FROM_MIN_OFFSET: case CONSUME_FROM_MAX_OFFSET: case CONSUME_FROM_LAST_OFFSET: { // 从broker获取当前消费队列offset long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L; } else { try { // 获取消费队列最大offset result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } case CONSUME_FROM_FIRST_OFFSET: { // 先查询当前消费队列消费进度 long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } // 当前消费队列消费进度小于0,则从0开始 else if (-1 == lastOffset) { result = 0L; } else { result = -1; } break; } case CONSUME_FROM_TIMESTAMP: { // 同样也是先查询当前消费队列消费进度 long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (