RocketMQ - 如何實現順序消息


參考文檔: https://xie.infoq.cn/article/fba37afd9bda31fb10eec651f

順序消息的使用場景

日常項目中需要保證順序的應用場景非常多,比如交易場景中的訂單創建、支付、退款等流程,先創建訂單才能支付,支付完成的訂單才能退款,這需要保證先進先出。又例如數據庫的 BinLog 消息,數據庫執行新增語句、修改語句,BinLog 消息得到順序也必須保證是新增消息、修改消息。

如何發送和消費順序消息

我們使用 RocketMQ 順序消息來模擬一下訂單的場景,順序消息分為兩部分:順序發送、順序消費。

  1. 順序發消息

server.port=8080spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.output.destination=TopicTestspring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
#設置同步發送spring.cloud.stream.rocketmq.bindings.output.producer.sync=true
 
@RestControllerpublic class OrderlyController {
@Autowired private Source source;
@GetMapping("/orderly") public String orderly() { List<String> types = Arrays.asList("創建訂單", "支付", "退款"); types.forEach(type -> { MessageBuilder builder = MessageBuilder.withPayload(type).setHeader(BinderHeaders.PARTITION_HEADER, 0); Message message = builder.build(); source.output().send(message); }); return "OK"; }}
 

上面代碼模擬了按順序依次發送創建、支付、退款消息到 TopicTest 中。在 application.properties 配置文件中指定 producer.sync=true,默認是異步發送,此處改為同步發送。

MessageBuilder 設置 Header 信息頭,表示這是一條順序消息,將消息固定地發送到第 0 個消息隊列。

  1. 順序收消息

server.port=8081spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.output.destination=TopicTestspring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
#設置同步發送spring.cloud.stream.rocketmq.bindings.output.producer.sync=true
 
@EnableBinding({Sink.class})@SpringBootApplicationpublic class App {    public static void main( String[] args )    {        SpringApplication.run(App.class);    }
@StreamListener(Sink.INPUT) public void receive(String msg) { System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis()); }}
 

程序運行后,可以在控制台看到日志輸出,也是按照順序打印出來的

 
TopicTest receive: 創建訂單, receiveTime= 1590503510075TopicTest receive: 支付, receiveTime= 1590503510076TopicTest receive: 退款, receiveTime= 1590503510077
 

順序發送的技術原理

RocketMQ 的順序消息分為 2 種情況:局部有序和全局有序。前面的例子是局部有序場景。

  • 局部有序:指發送同一個隊列的消息有序,可以在發送消息時指定隊列,在消費消息時也按順序消費。例如同一個訂單 ID 的消息要保證有序,不同訂單的消息沒有約束,相互不影響,不同訂單 ID 之間的消息時並行的。

  • 全局有序:設置 Topic 只有一個隊列可以實現全局有序,創建 Topic 時手動設置。此類場景極少,性能差,通常不推薦使用。

RocketMQ 中消息發送有三種方式:同步、異步、單項。

  • 同步:發送網絡請求后會同步等待 Broker 服務器返回結果,支持發送失敗重試,適用於比較重要的消息通知場景。

  • 異步:異步發送網絡請求,不會阻塞當前線程,不支持失敗重試,適用於對響應時間要求更高的場景。

  • 單向:單向發送原理和異步一致,但不支持回調。適用於響應時間非常端,對可靠性要求不高的場景,例如日志收集。

順序消息發送的原理比較簡單,同一類消息發送到相同的隊列即可。為了保證先發送的消息先存儲到消息隊列,必須使用同步發送的方式,否則可能出現先發送的消息后到消息隊列中,此時消息就亂序了。

RocketMQ 的核心代碼如下:

 
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) { return this.syncSendOrderly(destination, message, hashKey, (long)this.producer.getSendMsgTimeout()); }
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) { if (!Objects.isNull(message) && !Objects.isNull(message.getPayload())) { try { long now = System.currentTimeMillis(); // 轉成RocketMQ API中的Message對象 org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(this.objectMapper, this.charset, destination, message); // 調用發送消息接口 SendResult sendResult = this.producer.send(rocketMsg, this.messageQueueSelector, hashKey, timeout); long costTime = System.currentTimeMillis() - now; log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); return sendResult; } catch (Exception var12) { log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(var12.getMessage(), var12); } } else { log.error("syncSendOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } }}

選擇隊列的過程由 messageQueueSelector 和 hashKey 在實現類 SelectMessageQueueByHash 中完成

 
public class SelectMessageQueueByHash implements MessageQueueSelector {    public SelectMessageQueueByHash() {    }
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); }
value %= mqs.size(); return (MessageQueue)mqs.get(value); }}
 
  • 根據 hashKey 計算 hash 值,hashKey 是我們前面例子中訂單 ID,因此相同訂單 ID 的 hash 值相同。

  • 用 hash 值和隊列數 mqs.size()取模,得到一個索引值,結果小於隊列數。

  • 根據索引值從隊列列表中取出一個隊列 mqs.get(value),hash 值相同則隊列相同。

在隊列列表的獲取過程中,由 Producer 從 NameServer 根據 Topic 查詢 Broker 列表,緩存在本地內存中,以便下次從緩存中讀取。

普通發送的技術原理

RocketMQ 中除了順序消息外,還支持事務消息和延遲消息,非這三種特殊的消息稱為普通消息。日常開發中最常用的是普通消息,這是因為最常用的場景就是系統間的異步解耦和流量的削峰填谷,這些場景下盡量保證消息高性能收發即可。

從普通消息與順序消息的對比來看,普通消息在發送時選擇消息隊列的策略不同。普通消息發送選擇隊列有兩種機制:輪詢機制和故障規避機制。默認使用輪詢機制,一個 Topic 有多個隊列,輪詢選擇其中一個隊列。

輪詢機制的原理是路由信息 TopicPublishInfo 中維護了一個計數器 sendWhichQueue,每發送一次消息需要查詢一次路由,計算器就進行“+1”,通過計數器的值 index 與隊列的數量取模計算來實現輪詢算法。

 
public class TopicPublishInfo {	public MessageQueue selectOneMessageQueue(String lastBrokerName) {		// 第一次執行時,lastBrokerName = null        if (lastBrokerName == null) {            return this.selectOneMessageQueue();        } else {            int index = this.sendWhichQueue.getAndIncrement();
for(int i = 0; i < this.messageQueueList.size(); ++i) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) { pos = 0; } MessageQueue mq = (MessageQueue)this.messageQueueList.get(pos); // 當前選中的Queue所在Broker,不是上次發送的Broker if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return this.selectOneMessageQueue(); } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) { pos = 0; } return (MessageQueue)this.messageQueueList.get(pos); }}
 

輪詢算法簡單好用,但是有個弊端,如果輪詢選擇的隊列是在宕機的 Broker 上,會導致消息發送失敗,即使消息發送重試的時候重新選擇隊列,也可能還是在宕機的 Broker 上,無法規避發送失敗的情況,因此就有了故障規避機制。

順序消費的技術原理

RocketMQ 支持兩種消費模式:集群消費和廣播消費。兩者的區別是,在廣播消費模式下每條消息會被 ConsumerGroup 的每個 Consumer 消費,在集群消費模式下每條消息只會被 ConsumerGroup 的一個 Consumer 消費。

多數場景都使用集群消費,消息每次消費代表一次業務處理,集群消費表示每條消息由業務應用集群中任意一個服務實例來處理。少數場景使用廣播消費,例如數據發生變化,更新業務應用集群中每個服務的本地緩存,這就需要一條消息被整個集群都消費一次,默認是集群消費。

順序消費也叫做有序消費,原理是同一個消息隊列只允許 Consumer 中的一個消費線程拉取消費,Consumer 中有個消費線程池,多個線程會同時消費消息。在順序消費的場景下消費線程請求到 Broker 時會先申請獨占鎖,獲得鎖的請求則允許消費。

 
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable { private final ProcessQueue processQueue; private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) { this.processQueue = processQueue; this.messageQueue = messageQueue; }
public ProcessQueue getProcessQueue() { return this.processQueue; }
public MessageQueue getMessageQueue() { return this.messageQueue; }
public void run() { // 省略 try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { ConsumeMessageOrderlyService.log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } status = ConsumeMessageOrderlyService.this.messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable var23) { ConsumeMessageOrderlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", new Object[]{RemotingHelper.exceptionSimpleDesc(var23), ConsumeMessageOrderlyService.this.consumerGroup, msgs, this.messageQueue}); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } }}
 

消息消費成功后,會向 Broker 提交消費進度,更新消費位點信息,避免下次拉取到已消費的消息,順序消費中如果消費線程在監聽器中進行業務處理時拋出異常,則不會提交消費進度,消費進度會阻塞在當前這條消息,並不會繼續消費該隊列中的后續消息,從而保證順序消費。

在順序消費的場景下,特別需要注意對異常的處理,如果重試也失敗,會一直阻塞在當前消息,直到超出最大重試次數,從而在很長一段時間內無法消費后續消息造成隊列消息堆積。

並發消費的原理

RocketMQ 支持兩種消費方式:順序消費和並發消費。並發消費是默認的消費方式,日常開發過程中最常用的方式,除了順序消費就是並發消費。

並發消費也稱為亂序消費,其原理是同一個消息隊列提供給 Consumer 中的多個消費線程拉取消費。Consumer 中會維護一個消費線程池,多個消費線程可以並發去同一個消息隊列中拉取消息進行消費。如果某個消費線程在監聽器中進行業務處理時拋出異常,當前線程會進行重試,不影響其它消費線程和消費隊列的消費進度,消費成功的線程正常提交消費進度。

並發消費相比於順序消費沒有資源爭搶上鎖的過程,消費消息的速度比順序消費要快很多。

消息的冪等性

說到消息消費不得不提到消息的冪等性,業務代碼中通常收到一條消息進行一次業務邏輯處理,如果一條相同的消息被重復收到幾次,是否會導致業務重復處理?Consumer 能夠不重復接收消息?

RocketMQ 不保證消息不被重復消費,如果業務對消息重復消費非常敏感,必須要在業務層面進行冪等性處理,具體實現可以通過分布式鎖來完成。

在所有消息系統中消費消息有三種模式:at-most-once(最多一次)、at-least-once(最少一次)和 exactly-only-once(精確僅一次),分布式消息系統都是在三者間取平衡,前兩者是可行的並且被廣泛使用。

  • at-most-once:消息投遞后不論消息是否被消費成功,不會再重復投遞,有可能會導致消息未被消費,RocketMQ 未使用該方式。

  • at-lease-once:消息投遞后,消費完成后,向服務器返回 ACK,沒有消費則一定不會返回 ACK 消息。由於網絡異常、客戶端重啟等原因,服務器未能收到客戶端返回的 ACK,服務器則會再次投遞,這就會導致可能重復消費,RocketMQ 通過 ACK 來確保消息至少被消費一次。

  • exactly-only-once:必須下面兩個條件都滿足,才能認為消息是"Exactly Only Once"。 發送消息階段,不允許發送重復消息;消費消息階段,不允許消費重復的消息。在分布式系統環境下,如果要實現該模式,巨大的開銷不可避免。RocketMQ 沒有保證此特性,無法避免消息重復,由業務上進行冪等性處理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM