RocketMQ本身支持順序消息,在使用上發送順序消息和非順序消息有所區別
發送順序消息
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
send方法帶有參數MessageQueueSelector,MessageQueueSelector是讓用戶自己決定消息發送到哪一個隊列,如果是局部消息的話,用來決定消息與隊列的對應關系。
順序消息消費
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
從使用上可以推斷順序消息需要從發送到消費整個過程中保證有序,所以順序消息具體表現為
- 發送消息是順序的
- broker存儲消息是順序的
- consumer消費是順序的
下面分別看看rmq怎么實現順序消息
發送順序消息
因為broker存儲消息有序的前提是producer發送消息是有序的,所以這兩個結合在一起說。
消息發布是有序的含義:producer發送消息應該是依次發送的,所以要求發送消息的時候保證:
- 消息不能異步發送,同步發送的時候才能保證broker收到是有序的。
- 每次發送選擇的是同一個MessageQueue
同步發送
從剛開始的例子中發送消息的時候,會調用下面的方法
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}
CommunicationMode.SYNC表明了producer發送消息的時候是同步發送的。同步發送表示,producer發送消息之后不會立即返回,會等待broker的response。
broker收到producer的請求之后雖然是啟動線程處理的,但是在線程中將消息寫入commitLog中以后會發送response給producer,producer在收到broker的response並且是處理成功之后才算是消息發送成功。
同一個MessageQueue
為了保證broker收到消息也是順序的,所以producer只能向其中一個隊列發送消息。因為只有是同一個隊列才能保證消息是發往同一個broker,只有同一個broker處理發來的消息才能保證順序。所以發送順序消息的時候需要用戶指定MessageQueue,在send調用過程中會調用下面的方法,下面的方法中回調了用戶指定的select queue的方法
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
// 調用用戶指定的select方法來選出一個queue,如果是全局順序,用戶必須保證自己選出的queue是同一個
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
這樣每次發送的消息都是同一個MessageQueue,也就是都會發送到同一個broker,這個發送消息的過程都保證了順序,也就保證了broker存儲在CommitLog中的消息也是順序的。
順序消費
保證了broker中物理存儲的消息是順序的,只要保證消息消費是順序的就能保證整個過程是順序消息了。
還是開始的例子中,順序消費和普通消費的listener是不一樣的,順序消費需要實現的是下面這個接口
org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
在consumer啟動的時候會根據listener的類型判斷應該使用哪一個service來消費
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
// 順序消息
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
// 非順序消息
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
consumer拉取消息是按照offset拉取的,所以consumer能保證拉取到consumer的消息是連續有序的,但是consumer拉取到消息后又啟動了線程去處理消息,所以就不能保證先拉取到的消息先處理
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
// org.apache.rocketmq.client.consumer.PullCallback#onSuccess
// 將拉取到的消息放入ProcessQueue
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 這里的consumeMessageService就是ConsumeMessageOrderlyService
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispathToConsume);
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
// 拉取到的消息構造ConsumeRequest,然后放入線程池等待執行
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
上面將請求放入線程池了,所以線程執行的順序又不確定了,那么consumer消費就變成無序的了嗎?
答案顯然是不會變成無序的,因為上面有一行關鍵的代碼
processQueue.putMessage(pullResult.getMsgFoundList());
先說一下ProcessQueue這個關鍵的數據結構。一個MessageQueue對應一個ProcessQueue,是一個有序隊列,該隊列記錄一個queueId下所有從brokerpull回來的消息,如果消費成功了就會從隊列中刪除。ProcessQueue有序的原因是維護了一個TreeMap
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
msgTreeMap:里面維護了從broker pull回來的所有消息,TreeMap是有序的,key是Long類型的,沒有指定comparator,默認是將key強轉為Comparable,然后進行比較,因為key是當前消息的offset,而Long實現了Comparable接口,所以msgTreeMap里面的消息是按照offset排序的。
所以是ProcessQueue保證了拉取回來的消息是有序的,繼續上面說到的啟動線程執行ConsumeRequest.run方法來消費消息
Override
public void run() {
// 保證一個隊列只有一個線程訪問,因為順序消息只有一個隊列,也就保證了只有一個線程消費
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
// 從ProcessQueue中獲取消息進行消費,獲取出來的消息也是有序的
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
// 省略中間代碼
}
從ProcessQueue中獲取出來的消息是有序的,consumer保證了消費的有序性。
總結
rmq從發送消息、broker存儲消息到consumer消費消息整個過程中,環環相扣,最終保證消息是有序的。