參考文檔: https://xie.infoq.cn/article/fba37afd9bda31fb10eec651f
順序消息的使用場景
日常項目中需要保證順序的應用場景非常多,比如交易場景中的訂單創建、支付、退款等流程,先創建訂單才能支付,支付完成的訂單才能退款,這需要保證先進先出。又例如數據庫的 BinLog 消息,數據庫執行新增語句、修改語句,BinLog 消息得到順序也必須保證是新增消息、修改消息。
如何發送和消費順序消息
我們使用 RocketMQ 順序消息來模擬一下訂單的場景,順序消息分為兩部分:順序發送、順序消費。
-
順序發消息
server.port=8080
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
#設置同步發送
spring.cloud.stream.rocketmq.bindings.output.producer.sync=true
@RestController
public class OrderlyController {
@Autowired
private Source source;
@GetMapping("/orderly")
public String orderly() {
List<String> types = Arrays.asList("創建訂單", "支付", "退款");
types.forEach(type -> {
MessageBuilder builder = MessageBuilder.withPayload(type).setHeader(BinderHeaders.PARTITION_HEADER, 0);
Message message = builder.build();
source.output().send(message);
});
return "OK";
}
}
上面代碼模擬了按順序依次發送創建、支付、退款消息到 TopicTest 中。在 application.properties 配置文件中指定 producer.sync=true,默認是異步發送,此處改為同步發送。
MessageBuilder 設置 Header 信息頭,表示這是一條順序消息,將消息固定地發送到第 0 個消息隊列。
-
順序收消息
server.port=8081
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
#設置同步發送
spring.cloud.stream.rocketmq.bindings.output.producer.sync=true
@EnableBinding({Sink.class})
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class);
}
@StreamListener(Sink.INPUT)
public void receive(String msg) {
System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
}
}
程序運行后,可以在控制台看到日志輸出,也是按照順序打印出來的
TopicTest receive: 創建訂單, receiveTime= 1590503510075
TopicTest receive: 支付, receiveTime= 1590503510076
TopicTest receive: 退款, receiveTime= 1590503510077
順序發送的技術原理
RocketMQ 的順序消息分為 2 種情況:局部有序和全局有序。前面的例子是局部有序場景。
RocketMQ 中消息發送有三種方式:同步、異步、單項。
-
同步:發送網絡請求后會同步等待 Broker 服務器返回結果,支持發送失敗重試,適用於比較重要的消息通知場景。
-
異步:異步發送網絡請求,不會阻塞當前線程,不支持失敗重試,適用於對響應時間要求更高的場景。
-
單向:單向發送原理和異步一致,但不支持回調。適用於響應時間非常端,對可靠性要求不高的場景,例如日志收集。
順序消息發送的原理比較簡單,同一類消息發送到相同的隊列即可。為了保證先發送的消息先存儲到消息隊列,必須使用同步發送的方式,否則可能出現先發送的消息后到消息隊列中,此時消息就亂序了。
RocketMQ 的核心代碼如下:
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
return this.syncSendOrderly(destination, message, hashKey, (long)this.producer.getSendMsgTimeout());
}
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
if (!Objects.isNull(message) && !Objects.isNull(message.getPayload())) {
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(this.objectMapper, this.charset, destination, message);
SendResult sendResult = this.producer.send(rocketMsg, this.messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
return sendResult;
} catch (Exception var12) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(var12.getMessage(), var12);
}
} else {
log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
}
}
選擇隊列的過程由 messageQueueSelector 和 hashKey 在實現類 SelectMessageQueueByHash 中完成
public class SelectMessageQueueByHash implements MessageQueueSelector {
public SelectMessageQueueByHash() {
}
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value %= mqs.size();
return (MessageQueue)mqs.get(value);
}
}
-
根據 hashKey 計算 hash 值,hashKey 是我們前面例子中訂單 ID,因此相同訂單 ID 的 hash 值相同。
-
用 hash 值和隊列數 mqs.size()取模,得到一個索引值,結果小於隊列數。
-
根據索引值從隊列列表中取出一個隊列 mqs.get(value),hash 值相同則隊列相同。
在隊列列表的獲取過程中,由 Producer 從 NameServer 根據 Topic 查詢 Broker 列表,緩存在本地內存中,以便下次從緩存中讀取。
普通發送的技術原理
RocketMQ 中除了順序消息外,還支持事務消息和延遲消息,非這三種特殊的消息稱為普通消息。日常開發中最常用的是普通消息,這是因為最常用的場景就是系統間的異步解耦和流量的削峰填谷,這些場景下盡量保證消息高性能收發即可。
從普通消息與順序消息的對比來看,普通消息在發送時選擇消息隊列的策略不同。普通消息發送選擇隊列有兩種機制:輪詢機制和故障規避機制。默認使用輪詢機制,一個 Topic 有多個隊列,輪詢選擇其中一個隊列。
輪詢機制的原理是路由信息 TopicPublishInfo 中維護了一個計數器 sendWhichQueue,每發送一次消息需要查詢一次路由,計算器就進行“+1”,通過計數器的值 index 與隊列的數量取模計算來實現輪詢算法。
public class TopicPublishInfo {
public MessageQueue selectOneMessageQueue(String lastBrokerName) {
if (lastBrokerName == null) {
return this.selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for(int i = 0; i < this.messageQueueList.size(); ++i) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0) {
pos = 0;
}
MessageQueue mq = (MessageQueue)this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return this.selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0) {
pos = 0;
}
return (MessageQueue)this.messageQueueList.get(pos);
}
}
輪詢算法簡單好用,但是有個弊端,如果輪詢選擇的隊列是在宕機的 Broker 上,會導致消息發送失敗,即使消息發送重試的時候重新選擇隊列,也可能還是在宕機的 Broker 上,無法規避發送失敗的情況,因此就有了故障規避機制。
順序消費的技術原理
RocketMQ 支持兩種消費模式:集群消費和廣播消費。兩者的區別是,在廣播消費模式下每條消息會被 ConsumerGroup 的每個 Consumer 消費,在集群消費模式下每條消息只會被 ConsumerGroup 的一個 Consumer 消費。
多數場景都使用集群消費,消息每次消費代表一次業務處理,集群消費表示每條消息由業務應用集群中任意一個服務實例來處理。少數場景使用廣播消費,例如數據發生變化,更新業務應用集群中每個服務的本地緩存,這就需要一條消息被整個集群都消費一次,默認是集群消費。
順序消費也叫做有序消費,原理是同一個消息隊列只允許 Consumer 中的一個消費線程拉取消費,Consumer 中有個消費線程池,多個線程會同時消費消息。在順序消費的場景下消費線程請求到 Broker 時會先申請獨占鎖,獲得鎖的請求則允許消費。
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public ProcessQueue getProcessQueue() {
return this.processQueue;
}
public MessageQueue getMessageQueue() {
return this.messageQueue;
}
public void run() {
try {
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
ConsumeMessageOrderlyService.log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
status = ConsumeMessageOrderlyService.this.messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable var23) {
ConsumeMessageOrderlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", new Object[]{RemotingHelper.exceptionSimpleDesc(var23), ConsumeMessageOrderlyService.this.consumerGroup, msgs, this.messageQueue});
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
}
}
消息消費成功后,會向 Broker 提交消費進度,更新消費位點信息,避免下次拉取到已消費的消息,順序消費中如果消費線程在監聽器中進行業務處理時拋出異常,則不會提交消費進度,消費進度會阻塞在當前這條消息,並不會繼續消費該隊列中的后續消息,從而保證順序消費。
在順序消費的場景下,特別需要注意對異常的處理,如果重試也失敗,會一直阻塞在當前消息,直到超出最大重試次數,從而在很長一段時間內無法消費后續消息造成隊列消息堆積。
並發消費的原理
RocketMQ 支持兩種消費方式:順序消費和並發消費。並發消費是默認的消費方式,日常開發過程中最常用的方式,除了順序消費就是並發消費。
並發消費也稱為亂序消費,其原理是同一個消息隊列提供給 Consumer 中的多個消費線程拉取消費。Consumer 中會維護一個消費線程池,多個消費線程可以並發去同一個消息隊列中拉取消息進行消費。如果某個消費線程在監聽器中進行業務處理時拋出異常,當前線程會進行重試,不影響其它消費線程和消費隊列的消費進度,消費成功的線程正常提交消費進度。
並發消費相比於順序消費沒有資源爭搶上鎖的過程,消費消息的速度比順序消費要快很多。
消息的冪等性
說到消息消費不得不提到消息的冪等性,業務代碼中通常收到一條消息進行一次業務邏輯處理,如果一條相同的消息被重復收到幾次,是否會導致業務重復處理?Consumer 能夠不重復接收消息?
RocketMQ 不保證消息不被重復消費,如果業務對消息重復消費非常敏感,必須要在業務層面進行冪等性處理,具體實現可以通過分布式鎖來完成。
在所有消息系統中消費消息有三種模式:at-most-once(最多一次)、at-least-once(最少一次)和 exactly-only-once(精確僅一次),分布式消息系統都是在三者間取平衡,前兩者是可行的並且被廣泛使用。
-
at-most-once:消息投遞后不論消息是否被消費成功,不會再重復投遞,有可能會導致消息未被消費,RocketMQ 未使用該方式。
-
at-lease-once:消息投遞后,消費完成后,向服務器返回 ACK,沒有消費則一定不會返回 ACK 消息。由於網絡異常、客戶端重啟等原因,服務器未能收到客戶端返回的 ACK,服務器則會再次投遞,這就會導致可能重復消費,RocketMQ 通過 ACK 來確保消息至少被消費一次。
-
exactly-only-once:必須下面兩個條件都滿足,才能認為消息是"Exactly Only Once"。 發送消息階段,不允許發送重復消息;消費消息階段,不允許消費重復的消息。在分布式系統環境下,如果要實現該模式,巨大的開銷不可避免。RocketMQ 沒有保證此特性,無法避免消息重復,由業務上進行冪等性處理。