参考文档: https://xie.infoq.cn/article/fba37afd9bda31fb10eec651f
顺序消息的使用场景
日常项目中需要保证顺序的应用场景非常多,比如交易场景中的订单创建、支付、退款等流程,先创建订单才能支付,支付完成的订单才能退款,这需要保证先进先出。又例如数据库的 BinLog 消息,数据库执行新增语句、修改语句,BinLog 消息得到顺序也必须保证是新增消息、修改消息。
如何发送和消费顺序消息
我们使用 RocketMQ 顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费。
-
顺序发消息
server.port=8080
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
#设置同步发送
spring.cloud.stream.rocketmq.bindings.output.producer.sync=true
@RestController
public class OrderlyController {
@Autowired
private Source source;
@GetMapping("/orderly")
public String orderly() {
List<String> types = Arrays.asList("创建订单", "支付", "退款");
types.forEach(type -> {
MessageBuilder builder = MessageBuilder.withPayload(type).setHeader(BinderHeaders.PARTITION_HEADER, 0);
Message message = builder.build();
source.output().send(message);
});
return "OK";
}
}
上面代码模拟了按顺序依次发送创建、支付、退款消息到 TopicTest 中。在 application.properties 配置文件中指定 producer.sync=true,默认是异步发送,此处改为同步发送。
MessageBuilder 设置 Header 信息头,表示这是一条顺序消息,将消息固定地发送到第 0 个消息队列。
-
顺序收消息
server.port=8081
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
#设置同步发送
spring.cloud.stream.rocketmq.bindings.output.producer.sync=true
@EnableBinding({Sink.class})
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class);
}
@StreamListener(Sink.INPUT)
public void receive(String msg) {
System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
}
}
程序运行后,可以在控制台看到日志输出,也是按照顺序打印出来的
TopicTest receive: 创建订单, receiveTime= 1590503510075
TopicTest receive: 支付, receiveTime= 1590503510076
TopicTest receive: 退款, receiveTime= 1590503510077
顺序发送的技术原理
RocketMQ 的顺序消息分为 2 种情况:局部有序和全局有序。前面的例子是局部有序场景。
RocketMQ 中消息发送有三种方式:同步、异步、单项。
-
同步:发送网络请求后会同步等待 Broker 服务器返回结果,支持发送失败重试,适用于比较重要的消息通知场景。
-
异步:异步发送网络请求,不会阻塞当前线程,不支持失败重试,适用于对响应时间要求更高的场景。
-
单向:单向发送原理和异步一致,但不支持回调。适用于响应时间非常端,对可靠性要求不高的场景,例如日志收集。
顺序消息发送的原理比较简单,同一类消息发送到相同的队列即可。为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式,否则可能出现先发送的消息后到消息队列中,此时消息就乱序了。
RocketMQ 的核心代码如下:
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
return this.syncSendOrderly(destination, message, hashKey, (long)this.producer.getSendMsgTimeout());
}
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
if (!Objects.isNull(message) && !Objects.isNull(message.getPayload())) {
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(this.objectMapper, this.charset, destination, message);
SendResult sendResult = this.producer.send(rocketMsg, this.messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
return sendResult;
} catch (Exception var12) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(var12.getMessage(), var12);
}
} else {
log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
}
}
选择队列的过程由 messageQueueSelector 和 hashKey 在实现类 SelectMessageQueueByHash 中完成
public class SelectMessageQueueByHash implements MessageQueueSelector {
public SelectMessageQueueByHash() {
}
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value %= mqs.size();
return (MessageQueue)mqs.get(value);
}
}
-
根据 hashKey 计算 hash 值,hashKey 是我们前面例子中订单 ID,因此相同订单 ID 的 hash 值相同。
-
用 hash 值和队列数 mqs.size()取模,得到一个索引值,结果小于队列数。
-
根据索引值从队列列表中取出一个队列 mqs.get(value),hash 值相同则队列相同。
在队列列表的获取过程中,由 Producer 从 NameServer 根据 Topic 查询 Broker 列表,缓存在本地内存中,以便下次从缓存中读取。
普通发送的技术原理
RocketMQ 中除了顺序消息外,还支持事务消息和延迟消息,非这三种特殊的消息称为普通消息。日常开发中最常用的是普通消息,这是因为最常用的场景就是系统间的异步解耦和流量的削峰填谷,这些场景下尽量保证消息高性能收发即可。
从普通消息与顺序消息的对比来看,普通消息在发送时选择消息队列的策略不同。普通消息发送选择队列有两种机制:轮询机制和故障规避机制。默认使用轮询机制,一个 Topic 有多个队列,轮询选择其中一个队列。
轮询机制的原理是路由信息 TopicPublishInfo 中维护了一个计数器 sendWhichQueue,每发送一次消息需要查询一次路由,计算器就进行“+1”,通过计数器的值 index 与队列的数量取模计算来实现轮询算法。
public class TopicPublishInfo {
public MessageQueue selectOneMessageQueue(String lastBrokerName) {
if (lastBrokerName == null) {
return this.selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for(int i = 0; i < this.messageQueueList.size(); ++i) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0) {
pos = 0;
}
MessageQueue mq = (MessageQueue)this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return this.selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0) {
pos = 0;
}
return (MessageQueue)this.messageQueueList.get(pos);
}
}
轮询算法简单好用,但是有个弊端,如果轮询选择的队列是在宕机的 Broker 上,会导致消息发送失败,即使消息发送重试的时候重新选择队列,也可能还是在宕机的 Broker 上,无法规避发送失败的情况,因此就有了故障规避机制。
顺序消费的技术原理
RocketMQ 支持两种消费模式:集群消费和广播消费。两者的区别是,在广播消费模式下每条消息会被 ConsumerGroup 的每个 Consumer 消费,在集群消费模式下每条消息只会被 ConsumerGroup 的一个 Consumer 消费。
多数场景都使用集群消费,消息每次消费代表一次业务处理,集群消费表示每条消息由业务应用集群中任意一个服务实例来处理。少数场景使用广播消费,例如数据发生变化,更新业务应用集群中每个服务的本地缓存,这就需要一条消息被整个集群都消费一次,默认是集群消费。
顺序消费也叫做有序消费,原理是同一个消息队列只允许 Consumer 中的一个消费线程拉取消费,Consumer 中有个消费线程池,多个线程会同时消费消息。在顺序消费的场景下消费线程请求到 Broker 时会先申请独占锁,获得锁的请求则允许消费。
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public ProcessQueue getProcessQueue() {
return this.processQueue;
}
public MessageQueue getMessageQueue() {
return this.messageQueue;
}
public void run() {
try {
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
ConsumeMessageOrderlyService.log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
status = ConsumeMessageOrderlyService.this.messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable var23) {
ConsumeMessageOrderlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", new Object[]{RemotingHelper.exceptionSimpleDesc(var23), ConsumeMessageOrderlyService.this.consumerGroup, msgs, this.messageQueue});
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
}
}
消息消费成功后,会向 Broker 提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息,顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前这条消息,并不会继续消费该队列中的后续消息,从而保证顺序消费。
在顺序消费的场景下,特别需要注意对异常的处理,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息堆积。
并发消费的原理
RocketMQ 支持两种消费方式:顺序消费和并发消费。并发消费是默认的消费方式,日常开发过程中最常用的方式,除了顺序消费就是并发消费。
并发消费也称为乱序消费,其原理是同一个消息队列提供给 Consumer 中的多个消费线程拉取消费。Consumer 中会维护一个消费线程池,多个消费线程可以并发去同一个消息队列中拉取消息进行消费。如果某个消费线程在监听器中进行业务处理时抛出异常,当前线程会进行重试,不影响其它消费线程和消费队列的消费进度,消费成功的线程正常提交消费进度。
并发消费相比于顺序消费没有资源争抢上锁的过程,消费消息的速度比顺序消费要快很多。
消息的幂等性
说到消息消费不得不提到消息的幂等性,业务代码中通常收到一条消息进行一次业务逻辑处理,如果一条相同的消息被重复收到几次,是否会导致业务重复处理?Consumer 能够不重复接收消息?
RocketMQ 不保证消息不被重复消费,如果业务对消息重复消费非常敏感,必须要在业务层面进行幂等性处理,具体实现可以通过分布式锁来完成。
在所有消息系统中消费消息有三种模式:at-most-once(最多一次)、at-least-once(最少一次)和 exactly-only-once(精确仅一次),分布式消息系统都是在三者间取平衡,前两者是可行的并且被广泛使用。
-
at-most-once:消息投递后不论消息是否被消费成功,不会再重复投递,有可能会导致消息未被消费,RocketMQ 未使用该方式。
-
at-lease-once:消息投递后,消费完成后,向服务器返回 ACK,没有消费则一定不会返回 ACK 消息。由于网络异常、客户端重启等原因,服务器未能收到客户端返回的 ACK,服务器则会再次投递,这就会导致可能重复消费,RocketMQ 通过 ACK 来确保消息至少被消费一次。
-
exactly-only-once:必须下面两个条件都满足,才能认为消息是"Exactly Only Once"。 发送消息阶段,不允许发送重复消息;消费消息阶段,不允许消费重复的消息。在分布式系统环境下,如果要实现该模式,巨大的开销不可避免。RocketMQ 没有保证此特性,无法避免消息重复,由业务上进行幂等性处理。