rocketMQ 重置消息 offset,有兩種情形:一種是有消費者在線,另一種則是無消費者在線。
命令行
mqadmin.cmd resetOffsetByTime -n localhost:9876 -t topic-zhang -g group-zhang -s yyyy-MM-dd#HH:mm:ss:SSS
首先描述存在在線消費者的情況:
1. 命令行程序和 nameserver 及 broker 交互
// org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { // 從 nameserver 獲知 topic 分布在哪些 broker 上 TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas(); Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>(); if (brokerDatas != null) { for (BrokerData brokerData : brokerDatas) { // 默認選擇主節點,主節點下線,則隨機選一個從 String addr = brokerData.selectBrokerAddr(); if (addr != null) { Map<MessageQueue, Long> offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC); if (offsetTable != null) { allOffsetTable.putAll(offsetTable); } } } } return allOffsetTable; }
2. broker 查詢 topic 分區,指定時間的消息 offset,並通知連接該 broker 的 consumer
org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset // 根據時間查找消息,是二分查找,比對消息的存儲時間 org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime
3. consumer 根據 broker 返回的 offsetTable 重置位點
// org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) { DefaultMQPushConsumerImpl consumer = null; try { MQConsumerInner impl = this.consumerTable.get(group); if (impl != null && impl instanceof DefaultMQPushConsumerImpl) { consumer = (DefaultMQPushConsumerImpl) impl; } else { log.info("[reset-offset] consumer dose not exist. group={}", group); return; } consumer.suspend(); ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) { MessageQueue mq = entry.getKey(); if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) { ProcessQueue pq = entry.getValue(); pq.setDropped(true); pq.clear(); } } try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { } Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator(); while (iterator.hasNext()) { MessageQueue mq = iterator.next(); Long offset = offsetTable.get(mq); if (topic.equals(mq.getTopic()) && offset != null) { try { consumer.updateConsumeOffset(mq, offset); consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq)); iterator.remove(); } catch (Exception e) { log.warn("reset offset failed. group={}, {}", group, mq, e); } } } } finally { if (consumer != null) { consumer.resume(); } } }
相關的兩個請求碼
INVOKE_BROKER_TO_RESET_OFFSET
RESET_CONSUMER_CLIENT_OFFSET
當沒有消費者在線時
broker 直接修改 offset 並保存在 json 文件中
相關的請求碼
UPDATE_CONSUMER_OFFSET
一個 topic 2 個分區,p0 和 p1,p0分布在 broker0 上,p1 分布在 broker1 上
消費組 group-zhang 有 2 個消費者 c0 和 c1 訂閱了該 topic,則 c0 分配到 p0,c1 分配到 p1
c0 和 broker0 及 broker1 都有連接,雖然 c0 只從 broker0 拉取數據,只提交 offset 到 broker0