RocketMQ源碼 — 十、 RocketMQ順序消息


RocketMQ本身支持順序消息,在使用上發送順序消息和非順序消息有所區別

發送順序消息

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

send方法帶有參數MessageQueueSelector,MessageQueueSelector是讓用戶自己決定消息發送到哪一個隊列,如果是局部消息的話,用來決定消息與隊列的對應關系。

順序消息消費

consumer.registerMessageListener(new MessageListenerOrderly() {
    AtomicLong consumeTimes = new AtomicLong(0);

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        context.setAutoCommit(false);
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

從使用上可以推斷順序消息需要從發送到消費整個過程中保證有序,所以順序消息具體表現為

  • 發送消息是順序的
  • broker存儲消息是順序的
  • consumer消費是順序的

下面分別看看rmq怎么實現順序消息

發送順序消息

因為broker存儲消息有序的前提是producer發送消息是有序的,所以這兩個結合在一起說。

消息發布是有序的含義:producer發送消息應該是依次發送的,所以要求發送消息的時候保證:

  • 消息不能異步發送,同步發送的時候才能保證broker收到是有序的。
  • 每次發送選擇的是同一個MessageQueue

同步發送

從剛開始的例子中發送消息的時候,會調用下面的方法

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}

CommunicationMode.SYNC表明了producer發送消息的時候是同步發送的。同步發送表示,producer發送消息之后不會立即返回,會等待broker的response。

broker收到producer的請求之后雖然是啟動線程處理的,但是在線程中將消息寫入commitLog中以后會發送response給producer,producer在收到broker的response並且是處理成功之后才算是消息發送成功。

同一個MessageQueue

為了保證broker收到消息也是順序的,所以producer只能向其中一個隊列發送消息。因為只有是同一個隊列才能保證消息是發往同一個broker,只有同一個broker處理發來的消息才能保證順序。所以發送順序消息的時候需要用戶指定MessageQueue,在send調用過程中會調用下面的方法,下面的方法中回調了用戶指定的select queue的方法

private SendResult sendSelectImpl(
    Message msg,
    MessageQueueSelector selector,
    Object arg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);

    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        try {
        	// 調用用戶指定的select方法來選出一個queue,如果是全局順序,用戶必須保證自己選出的queue是同一個
            mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        } catch (Throwable e) {
            throw new MQClientException("select message queue throwed exception.", e);
        }

        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
        } else {
            throw new MQClientException("select message queue return null.", null);
        }
    }

    throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

這樣每次發送的消息都是同一個MessageQueue,也就是都會發送到同一個broker,這個發送消息的過程都保證了順序,也就保證了broker存儲在CommitLog中的消息也是順序的。

順序消費

保證了broker中物理存儲的消息是順序的,只要保證消息消費是順序的就能保證整個過程是順序消息了。

還是開始的例子中,順序消費和普通消費的listener是不一樣的,順序消費需要實現的是下面這個接口

org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly

在consumer啟動的時候會根據listener的類型判斷應該使用哪一個service來消費

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    // 順序消息
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    // 非順序消息
    this.consumeOrderly = false;
    this.consumeMessageService =
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

consumer拉取消息是按照offset拉取的,所以consumer能保證拉取到consumer的消息是連續有序的,但是consumer拉取到消息后又啟動了線程去處理消息,所以就不能保證先拉取到的消息先處理

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
// org.apache.rocketmq.client.consumer.PullCallback#onSuccess
// 將拉取到的消息放入ProcessQueue
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 這里的consumeMessageService就是ConsumeMessageOrderlyService
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispathToConsume);


// org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispathToConsume) {
    if (dispathToConsume) {
        // 拉取到的消息構造ConsumeRequest,然后放入線程池等待執行
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}

上面將請求放入線程池了,所以線程執行的順序又不確定了,那么consumer消費就變成無序的了嗎?

答案顯然是不會變成無序的,因為上面有一行關鍵的代碼

processQueue.putMessage(pullResult.getMsgFoundList());

先說一下ProcessQueue這個關鍵的數據結構。一個MessageQueue對應一個ProcessQueue,是一個有序隊列,該隊列記錄一個queueId下所有從brokerpull回來的消息,如果消費成功了就會從隊列中刪除。ProcessQueue有序的原因是維護了一個TreeMap

private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

msgTreeMap:里面維護了從broker pull回來的所有消息,TreeMap是有序的,key是Long類型的,沒有指定comparator,默認是將key強轉為Comparable,然后進行比較,因為key是當前消息的offset,而Long實現了Comparable接口,所以msgTreeMap里面的消息是按照offset排序的。

所以是ProcessQueue保證了拉取回來的消息是有序的,繼續上面說到的啟動線程執行ConsumeRequest.run方法來消費消息

Override
public void run() {
	// 保證一個隊列只有一個線程訪問,因為順序消息只有一個隊列,也就保證了只有一個線程消費
    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
    synchronized (objLock) {
        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
            final long beginTime = System.currentTimeMillis();
            for (boolean continueConsume = true; continueConsume; ) {
                // 從ProcessQueue中獲取消息進行消費,獲取出來的消息也是有序的
                final int consumeBatchSize =
                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                // 省略中間代碼
}

從ProcessQueue中獲取出來的消息是有序的,consumer保證了消費的有序性。

總結

rmq從發送消息、broker存儲消息到consumer消費消息整個過程中,環環相扣,最終保證消息是有序的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM