MQ
Message Queue,消息隊列,FIFO 結構。
例如電商平台,在用戶支付訂單后執行對應的操作;
優點:
- 異步
- 削峰
- 解耦
缺點
- 增加系統復雜性
- 數據一致性
- 可用性
JMS
Java Message Service,Java消息服務,類似 JDBC 提供了訪問數據庫的標准,JMS 也制定了一套系統間消息通信的規范;
區別於 JDBC,JDK 原生包中並未定義 JMS 相關接口。
-
ConnectionFactory
-
Connection
-
Destination
-
Session
-
MessageConsumer
-
MessageProducer
-
Message
協作方式圖示為;
業界產品
ActiveMQ | RabbitMQ | RocketMQ | kafka | |
---|---|---|---|---|
單機吞吐量 | 萬級 | 萬級 | 10 萬級 | 10 萬級 |
可用性 | 高 | 高 | 非常高 | 非常高 |
可靠性 | 較低概率丟失消息 | 基本不丟 | 可以做到 0 丟失 | 可以做到 0 丟失 |
功能支持 | 較為完善 | 基於 erlang,並發強,性能好,延時低 | 分布式,拓展性好,支持分布式事務 | 較為簡單,主要應用與大數據實時計算,日志采集等 |
社區活躍度 | 低 | 中 | 高 | 高 |
ActiveMQ
作為 Apache 下的開源項目,完全支持 JMS 規范。並且 Spring Boot 內置了 ActiveMQ 的自動化配置,作為入門再適合不過。
快速開始
添加依賴;
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
消息發送;
// 1. 創建連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. 工廠創建連接
Connection connection = factory.createConnection();
// 3. 啟動連接
connection.start();
// 4. 創建連接會話session,第一個參數為是否在事務中處理,第二個參數為應答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 根據session創建消息隊列目的地
Destination queue = session.createQueue("test-queue");
// 6. 根據session和目的地queue創建生產者
MessageProducer producer = session.createProducer(queue);
// 7. 根據session創建消息實體
Message message = session.createTextMessage("hello world!");
// 8. 通過生產者producer發送消息實體
producer.send(message);
// 9. 關閉連接
connection.close();
Spring Boot 集成
自動注入參考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration
添加依賴;
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
添加 yaml 配置;
spring:
activemq:
broker-url: tcp://localhost:61616
jms:
#消息模式 true:廣播(Topic),false:隊列(Queue),默認時false
pub-sub-domain: true
收發消息;
@Autowired
private JmsTemplate jmsTemplate;
// 接收消息
@JmsListener(destination = "test")
public void receiveMsg(String msg) {
System.out.println(msg);
}
// 發送消息
public void sendMsg(String destination, String msg) {
jmsTemplate.convertAndSend(destination, msg);
}
高可用
基於 zookeeper 實現主從架構,修改 activemq.xml 節點 persistenceAdapter 配置;
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/levelDB"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"
zkPath="/activemq/leveldb-stores"
hostname="localhost"
/>
</persistenceAdapter>
broker 地址為:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false
負載均衡
在高可用集群節點 activemq.xml 添加節點 networkConnectors;
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/>
</networkConnectors>
更多詳細信息可參考:https://blog.csdn.net/haoyuyang/article/details/53931710
集群消費
由於發布訂閱模式,所有訂閱者都會接收到消息,在生產環境,消費者集群會產生消息重復消費問題。
ActiveMQ 提供 VirtualTopic 功能,解決多消費端接收同一條消息的問題。於生產者而言,VirtualTopic 就是一個 topic,對消費而言則是 queue。
在 activemq.xml 添加節點 destinationInterceptors;
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
生產者正常往 testTopic 中發送消息,訂閱者可修改訂閱主題為類似 consumer.A.testTopic 這樣來消費。
更多詳細信息可參考:https://blog.csdn.net/java_collect/article/details/82154829
RocketMQ
是一個隊列模型的消息中間件,具有高性能、高可靠、高實時、分布式特點。
架構圖示
-
Name Server
名稱服務器,類似於 Zookeeper 注冊中心,提供 Broker 發現;
-
Broker
RocketMQ 的核心組件,絕大部分工作都在 Broker 中完成,接收請求,處理消費,消息持久化等;
-
Producer
消息生產方;
-
Consumer
消息消費方;
快速開始
安裝后,依次啟動 nameserver 和 broker,可以用 mqadmin 管理主題、集群和 broker 等信息;
添加依賴;
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
消息發送;
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
Message msg = new Message(
"producer-topic",
"msg",
"hello world".getBytes()
);
//msg.setDelayTimeLevel(1);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
producer.shutdown();
delayLevel 從 1 開始默認依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
參考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。
消息接收;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("producer-topic", "msg");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
.\mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876
Spring Boot 集成
添加依賴;
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
添加 yaml 配置;
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: producer
發送消息;
@Autowired
private RocketMQTemplate mqTemplate;
public void sendMessage(String topic, String tag, String message) {
SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);
System.out.println(JSON.toJSONString(result));
}
接收消息;
@Component
@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")
public class MsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
Console 控制台
RocketMQ 拓展包提供了管理控制台;
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
重復消費
產生原因:
- 生產者重復投遞;
- 消息隊列異常;
- 消費者異常消費;
怎么解決重復消費的問題,換句話怎么保證消息消費的冪等性。
通常基於本地消息表的方案實現,消息處理過便不再處理。
順序消息
消息錯亂的原因:
- 一個消息隊列 queue,多個 consumer 消費;
- 一個 queue 對應一個 consumer,但是 consumer 多線程消費;
要保證消息的順序消費,有三個關鍵點:
- 消息順序發送
- 消息順序存儲
- 消息順序消費
參考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。
分布式事務
在分布式系統中,一個事務由多個本地事務組成。這里介紹一個基於 MQ 的分布式事務解決方案。
通過 broker 的 HA 高可用,和定時回查 prepare 消息的狀態,來保證最終一致性。