DefaultMQPushConsumerImpl 的并发消费和顺序消费


DefaultMQPushConsumerImpl 拉取消息,放入 processQueue 的 TreeMap 中

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

consumeMessageService 分为并发消费和顺序消费

顺序消费,指同一时刻,一个 queue 只有一个线程在消费。只让一个线程消费,由加锁来实现,而顺序则由 TreeMap 来实现。

有一个事实是,DefaultMQPushConsumer#consumeMessageBatchMaxSize = 1,即默认的批量消费个数是 1,什么意思呢?

concurrently 消费,一个 queue 拉取到 32 条消息,则创建 32 个 ConsumeRequest 对象,1 个 ConsumeRequest 只有 1 条消息,提交到线程池中,运行 ConsumeRequest.run。

而 orderly 消费,一个 queue 拉取到 32 条消息,则创建一个 ConsumeRequest 对象,提交到线程池中,在 ConsumeRequest.run 方法中,一直 take offset 最小的消息,直到 TreeMap 空。

concurrently 创建 ConsumeRequest

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

orderly 创建 ConsumeRequest

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispathToConsume) {
    if (dispathToConsume) {
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}

concurrently ConsumeRequest#run 消费主体逻辑

// 是的,就是这么简单
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

orderly ConsumeRequest#run 消费主体逻辑

// 获取锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
    for (boolean continueConsume = true; continueConsume; ) {
        // 从 TreeMap 中获得消息
        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
        if (!msgs.isEmpty()) {
            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
        } else {
            continueConsume = false;
        }
    }
    ...
}

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        Object objLock = this.mqLockTable.get(mq);
        if (null == objLock) {
            objLock = new Object();
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }

        return objLock;
    }
}

 

关于 offset 提交
offset 是消费者从 broker 拉取的下一条消息的偏移量

顺序消费
take 消息时,把消息从 msgTreeMap 取出来,放入 consumingMsgOrderlyTreeMap 中
消费完成后,表示 consumingMsgOrderlyTreeMap 中的消息完全消费,清空 consumingMsgOrderlyTreeMap,设置 offset = this.consumingMsgOrderlyTreeMap.lastKey() + 1

// org.apache.rocketmq.client.impl.consumer.ProcessQueue#commit
public long commit() {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
            msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
            for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
                msgSize.addAndGet(0 - msg.getBody().length);
            }
            this.consumingMsgOrderlyTreeMap.clear();
            if (offset != null) {
                return offset + 1;
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("commit exception", e);
    }

    return -1;
}

并发消费

直接从 msgTreeMap 中删除消息,并返回 msgTreeMap 中第一条消息的 queue offset 值

// org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage
public long removeMessage(final List<MessageExt> msgs) {
    long result = -1;
    final long now = System.currentTimeMillis();
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            if (!msgTreeMap.isEmpty()) {
                result = this.queueOffsetMax + 1;
                int removedCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                    if (prev != null) {
                        removedCnt--;
                        msgSize.addAndGet(0 - msg.getBody().length);
                    }
                }
                msgCount.addAndGet(removedCnt);

                if (!msgTreeMap.isEmpty()) {
                    result = msgTreeMap.firstKey();
                }
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (Throwable t) {
        log.error("removeMessage exception", t);
    }

    return result;
}

消费失败

顺序消费,处理消息失败,如果重试次数小于阈值,则把消息从 consumingMsgOrderlyTreeMap 取出,重新放入 msgTreeMap,如果重试次数超过阈值,则把消息发送回 broker,broker 会根据重试次数把消息发往 SCHDULE_TOPIC_XXXX 或死信队列

并发消费,处理消息失败,会发送回 broker,发送失败,则继续消费。


提交 ConsumeRequest 的两个时机,一是拉取到消息,二是处理过程出现异常后延迟提交

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM