rocketMQ 消息回溯


rocketMQ 重置消息 offset,有兩種情形:一種是有消費者在線,另一種則是無消費者在線。

命令行

mqadmin.cmd resetOffsetByTime -n localhost:9876 -t topic-zhang -g group-zhang -s yyyy-MM-dd#HH:mm:ss:SSS

首先描述存在在線消費者的情況:

1. 命令行程序和 nameserver 及 broker 交互

// org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
    boolean isC)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    // 從 nameserver 獲知 topic 分布在哪些 broker 上
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
    List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
    Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
    if (brokerDatas != null) {
        for (BrokerData brokerData : brokerDatas) {
            // 默認選擇主節點,主節點下線,則隨機選一個從
            String addr = brokerData.selectBrokerAddr();
            if (addr != null) {
                Map<MessageQueue, Long> offsetTable =
                    this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
                        timeoutMillis, isC);
                if (offsetTable != null) {
                    allOffsetTable.putAll(offsetTable);
                }
            }
        }
    }
    return allOffsetTable;
}

2. broker 查詢 topic 分區,指定時間的消息 offset,並通知連接該 broker 的 consumer

org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset
// 根據時間查找消息,是二分查找,比對消息的存儲時間
org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime

3. consumer 根據 broker 返回的 offsetTable 重置位點

// org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset
public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
    DefaultMQPushConsumerImpl consumer = null;
    try {
        MQConsumerInner impl = this.consumerTable.get(group);
        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
            consumer = (DefaultMQPushConsumerImpl) impl;
        } else {
            log.info("[reset-offset] consumer dose not exist. group={}", group);
            return;
        }
        consumer.suspend();

        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
        for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                ProcessQueue pq = entry.getValue();
                pq.setDropped(true);
                pq.clear();
            }
        }

        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
        }

        Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
        while (iterator.hasNext()) {
            MessageQueue mq = iterator.next();
            Long offset = offsetTable.get(mq);
            if (topic.equals(mq.getTopic()) && offset != null) {
                try {
                    consumer.updateConsumeOffset(mq, offset);
                    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                    iterator.remove();
                } catch (Exception e) {
                    log.warn("reset offset failed. group={}, {}", group, mq, e);
                }
            }
        }
    } finally {
        if (consumer != null) {
            consumer.resume();
        }
    }
}

相關的兩個請求碼
INVOKE_BROKER_TO_RESET_OFFSET
RESET_CONSUMER_CLIENT_OFFSET

當沒有消費者在線時

broker 直接修改 offset 並保存在 json 文件中

相關的請求碼

UPDATE_CONSUMER_OFFSET 

 

一個 topic 2 個分區,p0 和 p1,p0分布在 broker0 上,p1 分布在 broker1 上

消費組 group-zhang 有 2 個消費者 c0 和 c1 訂閱了該 topic,則 c0 分配到 p0,c1 分配到 p1

c0 和 broker0 及 broker1 都有連接,雖然 c0 只從 broker0 拉取數據,只提交 offset 到 broker0


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM