rocketMQ 消息回溯


rocketMQ 重置消息 offset,有两种情形:一种是有消费者在线,另一种则是无消费者在线。

命令行

mqadmin.cmd resetOffsetByTime -n localhost:9876 -t topic-zhang -g group-zhang -s yyyy-MM-dd#HH:mm:ss:SSS

首先描述存在在线消费者的情况:

1. 命令行程序和 nameserver 及 broker 交互

// org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
    boolean isC)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    // 从 nameserver 获知 topic 分布在哪些 broker 上
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
    List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
    Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
    if (brokerDatas != null) {
        for (BrokerData brokerData : brokerDatas) {
            // 默认选择主节点,主节点下线,则随机选一个从
            String addr = brokerData.selectBrokerAddr();
            if (addr != null) {
                Map<MessageQueue, Long> offsetTable =
                    this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
                        timeoutMillis, isC);
                if (offsetTable != null) {
                    allOffsetTable.putAll(offsetTable);
                }
            }
        }
    }
    return allOffsetTable;
}

2. broker 查询 topic 分区,指定时间的消息 offset,并通知连接该 broker 的 consumer

org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset
// 根据时间查找消息,是二分查找,比对消息的存储时间
org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime

3. consumer 根据 broker 返回的 offsetTable 重置位点

// org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset
public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
    DefaultMQPushConsumerImpl consumer = null;
    try {
        MQConsumerInner impl = this.consumerTable.get(group);
        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
            consumer = (DefaultMQPushConsumerImpl) impl;
        } else {
            log.info("[reset-offset] consumer dose not exist. group={}", group);
            return;
        }
        consumer.suspend();

        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
        for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                ProcessQueue pq = entry.getValue();
                pq.setDropped(true);
                pq.clear();
            }
        }

        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
        }

        Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
        while (iterator.hasNext()) {
            MessageQueue mq = iterator.next();
            Long offset = offsetTable.get(mq);
            if (topic.equals(mq.getTopic()) && offset != null) {
                try {
                    consumer.updateConsumeOffset(mq, offset);
                    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                    iterator.remove();
                } catch (Exception e) {
                    log.warn("reset offset failed. group={}, {}", group, mq, e);
                }
            }
        }
    } finally {
        if (consumer != null) {
            consumer.resume();
        }
    }
}

相关的两个请求码
INVOKE_BROKER_TO_RESET_OFFSET
RESET_CONSUMER_CLIENT_OFFSET

当没有消费者在线时

broker 直接修改 offset 并保存在 json 文件中

相关的请求码

UPDATE_CONSUMER_OFFSET 

 

一个 topic 2 个分区,p0 和 p1,p0分布在 broker0 上,p1 分布在 broker1 上

消费组 group-zhang 有 2 个消费者 c0 和 c1 订阅了该 topic,则 c0 分配到 p0,c1 分配到 p1

c0 和 broker0 及 broker1 都有连接,虽然 c0 只从 broker0 拉取数据,只提交 offset 到 broker0


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM