1.定義和分類
1.1定義
MQ全稱為Message Queue,即消息隊列。“消息隊列”是在消息的傳輸過程中保存消息的容器。
它是典型的生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。
1.2分類
其常見的消息隊列產品如下表:
名稱 | 開發語言 | 時效性 | 說明 |
ActiveMQ | java | 毫秒級 | 基於JMS,是Apache出品,最流行的,能力強勁的開源消息總線 |
RabbitMQ | Erlang |
微妙級 |
基於AMQP協議,erlang語言開發,穩定性好 |
Kafka | Scala | 毫秒級 | 分布式消息系統,高吞吐量。是Apache下的一個子項目 |
RocketMQ | java | 毫秒級 | 基於JMS,阿里巴巴產品,目前交由Apache基金會 |
具體的介紹見后續章節。它們的作用是異步處理,應用解耦,流量消峰。
1.3JMS與AMQP的異同
1)JMS全稱是java message service,即java消息服務。 AMQP全稱是Advanced Message Queuing Protocol,即高級消息隊列協議;
2)JMS定義了統一的接口,來對消息操作進行統一;AMQP通過規定協議來統一數據交互的格式;
3)JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的;
4)JMS規定了兩種消息模型;而AMQP的消息模型更加豐富。
2.ActiveMQ
2.1下載與安裝
官網:http://activemq.apache.org/,
1)打開后點擊最新版本
2)在新的頁面選擇linux對應的文件下載,這里以5.16.2為例說明
3)將apache-activemq-5.16.2-bin.tar.gz 上傳到Linux上,默認Linux上已安裝jdk8及以上版本。
4)解壓
tar zxvf apache-activemq-5.16.2-bin.tar.gz
5)給目錄賦權
chmod 777 apache-activemq-5.16.2 cd apache-activemq-5.16.2\bin chmod 755 activemq
6)修改配置文件,允許外部訪問
cd ..
vi conf/jetty.xml
把127.0.0.1改為本機ip,如下圖:
7)啟動
cd bin
./activemq start
看到類似下圖的結果,說明啟動成功
停止命令
./activemq stop
重啟命令
./activemq restart
8)進入管理頁
假設服務器地址為192.168.86.128 ,打開瀏覽器輸入地址,http://192.168.86.128:8161/ 即可進入ActiveMQ管理頁面。在進入時需要輸入用戶名和密碼,均是admin,進入后如下圖
9)進入主界面。
其中端口8161是管理頁面,用戶名密碼都是admin;61616是服務端頁面,用戶名密碼是guest。
2.2消息模式
1)點對點模式
每個消息只有一個消費者。一旦被消費,消息就不再在消息隊列中。發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列。但接收者在成功接收消息之后需向隊列應答成功。
2)發布/訂閱模式
發布者發送到topic的消息,只有訂閱了topic的訂閱者才會收到消息。topic實現了發布和訂閱,當發布一個消息,所有訂閱這個topic的服務都能得到這個消息,所以從1到N個訂閱者都能得到這個消息的拷貝。也就是說這種模式每個消息可以有多個消費者,發布者和訂閱者之間有時間上的依賴性。另外訂閱者必須保持運行的狀態,才能接受發布者發布的消息。
2.3 基本入門
此代碼不在源碼中呈現,只做說明。
2.2.1環境准備
新建一個普通的maven工程,導入依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
2.2.2點對點模式
1)創建生產者QueueProducer
package com.test; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueConsumer { public static void main(String[] args) throws Exception { //1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.16886.128:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建隊列對象 Queue queue = session.createQueue("test-queue"); //6.創建消息消費 MessageConsumer consumer = session.createConsumer(queue); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close(); } }
運行后通過ActiveMQ管理界面Queues菜單查詢,發現有一個消息等待消費
2)創建消費者QueueConsumer
package com.test; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueConsumer { public static void main(String[] args) throws Exception { //1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建隊列對象 Queue queue = session.createQueue("test-queue"); //6.創建消息消費 MessageConsumer consumer = session.createConsumer(queue); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close(); } }
運行之后再控制台打印了發送過來的消息,管理頁面的消息也被消費了。
2.2.3發布/訂閱模式
1)創建生產者TopicProducer
package com.test; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicProducer { public static void main(String[] args) throws Exception { //1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建主題對象 Topic topic = session.createTopic("test-topic"); //6.創建消息生產者 MessageProducer producer = session.createProducer(topic); //7.創建消息 TextMessage textMessage = session.createTextMessage("歡迎來到神奇jms世界"); //8.發送消息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close(); } }
2)創建消費者QueueConsumer
package com.test; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicConsumer1 { public static void main(String[] args) throws Exception { //1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建主題對象 Topic topic = session.createTopic("test-topic"); //6.創建消息消費 MessageConsumer consumer = session.createConsumer(topic); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息1:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close(); } }
再創建一個消費者,開啟這兩個消費者,然后運行生產者,會發現兩個消費者會接收到消息。如果在生產者開啟之后開啟消費者,那么消費者是收不到消息的。在頁面選擇Topics查詢消息
2.4整合SpringBoot
源碼:https://github.com/zhongyushi-git/mq-collections.git。
1)新建兩個SpringBoot的項目,一個作為服務生產者,一個作為服務消費者
2)在生產者中導入依賴
<!--activemq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
3)編寫生產者配置文件
# 應用名稱 spring.application.name=springboot-activemq-provider-demo # 端口號 server.port=10010 # 配置服務器地址 spring.activemq.broker-url=tcp://192.168.86.128:61616 # activemq登錄名和密碼 spring.activemq.user=admin spring.activemq.password=admin # activemq模式,false點對點模式,true發布訂閱模式 spring.jms.pub-sub-domain=true # 隊列名稱 activemq.queue=queue-msg # 主題名稱 activemq.topic=topic-msg
4)編寫生產者配置類
package com.zxh.springbootactivemqproviderdemo.config; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; import javax.jms.Topic; @Configuration public class ActiveMQConfig { @Value("${activemq.queue}") private String activemqQueue; //定義存放消息的隊列 @Bean public Queue queue() { return new ActiveMQQueue(activemqQueue); } }
5)編寫生產者controller接口
package com.zxh.springbootactivemqproviderdemo.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.jms.Queue; import javax.jms.Topic; @RestController public class ProviderController { //注入存放消息的隊列 @Autowired private Queue queue; //注入springboot封裝的工具類 @Autowired private JmsMessagingTemplate jmsMessagingTemplate; /** * 點對點 * @param name */ @GetMapping("queue-send") public void queueSend(String name) { //發送消息 jmsMessagingTemplate.convertAndSend(queue, name); } }
6)編寫消費者配置文件
# 應用名稱 spring.application.name=springboot-activemq-demo # 端口號 server.port=10011 # 配置服務器地址 spring.activemq.broker-url=tcp://192.168.86.128:61616 # activemq登錄名和密碼 spring.activemq.user=admin spring.activemq.password=admin # activemq模式,false點對點模式,true發布訂閱模式 spring.jms.pub-sub-domain=true # 隊列名稱 activemq.queue=queue-msg # 主題名稱 activemq.topic=topic-msg
7)編寫消費者監聽器
package com.zxh.springbootactivemqconsumerdemo.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; @Component public class ConsumerListener { @JmsListener(destination = "${activemq.queue}") public void receiveQueue(Message message) { if(message instanceof TextMessage){ TextMessage textMessage= (TextMessage) message; try { System.out.println("收到的 queue message 是:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
8)先啟動生產者,再啟動消費者,然后在瀏覽器輸入http://localhost:10010/queue-send?name=123,即可在消費者的控制台看到接收到的消息。
到這為止是點對點模式的消息,那么發布訂閱模式和其極為類似,在上述的代碼上稍作修改便可。
9)同時修改配置文件,把activemq模式改為發布訂閱模式,添加主題名稱
spring.jms.pub-sub-domain=true # 主題名稱 activemq.topic=topic-msg
對於spring.jms.pub-sub-domain的值有兩種,分別是true和false,為true時就指定了是發布訂閱模式,為false時就指定了是點對點模式。
10)在生產者的配置類添加一個Bean
@Value("${activemq.topic}") private String activemqTopic; //定義存放消息的主題 @Bean public Topic topic() { return new ActiveMQTopic(activemqTopic); }
11)在生產者的controller接口添加一個接口用作發布訂閱接口
//注入存放消息的隊列 @Autowired private Topic topic; /** * 發布訂閱 * @param name */ @GetMapping("topic-send") public void topicSend(String name) { //發送消息 jmsMessagingTemplate.convertAndSend(topic, name); }
12)在消費者的監聽器類添加發布訂閱模式的監聽
@JmsListener(destination = "${activemq.topic}") public void receiveTopic(Message message) { if(message instanceof TextMessage){ TextMessage textMessage= (TextMessage) message; try { System.out.println("收到的 topic message 是:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
13)先啟動消費者,再啟動生產者,然后在瀏覽器輸入http://localhost:10010/topic-send?name=454545,即可在消費者的控制台看到接收到的消息,這便是發布/訂閱模式。
2.5 JMS消息組成
2.5.1JMS協議組成
結構 | 說明 |
JMS Provider | 消息中間件 |
JMS Producer | 消息生產者 |
JMS Consumer | 消息消費者 |
JMS Message | 消息 |
其中JMS Message是其最主要的部分,介紹如下。
2.5.2JMS消息組成
其有三部分組成,分別是消息頭、消息體、消息屬性。
1)消息頭
JMS消息頭預定義了若干字段用於客戶端與JMS提供者之間識別和發送消息,預編譯頭如下:
名稱 | 描述 | 設置者 |
JMSDestination | 消息發送的目的地,是一個Topic或Queue | send |
JMSMessagelD | 消息ID,需要以ID:開頭。不可篡改 | send |
JMSDeliveryMode | 消息的發送模式,分為NON_PERSISTENT(持久化的)和PERSISTENT(非持久化的) | send |
JMSTimestamp | 消息發送時的時間,可以理解為調用send()方法時的時間 | send |
JMSCorrelationID | 關聯的消息ID | client |
JMSReplyTo | 消息回復的目的地 | client |
JMSType | 消息的類型 | client |
JMSExpiration | 消息失效的時間,值0表明消息不會過期,默認值為0 | send |
JMSPriority | 息的優先級,0-4為普通的優化級,而5-9為高優先級,通常情況下,高優化級的消息需要優先與普通級發送。不可篡改 | send |
2)消息體
消息體就是真實發送的消息,包含5中格式。下面的演示代碼均以點對點模式為例。在消息生產者的controller中注入JmsTemplate
@Autowired private JmsTemplate jmsTemplate;
原因是JmsMessagingTemplate無法通過指定消息類型來發送消息。
A:TextMessage字符串消息
/** * 文本類型 */ @GetMapping("text") public void textMessage(){ jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("我是文本消息"); } }); }
這種方式和上述整合SpringBoot時的方式是一樣的,而在消費者端也是只獲取的文本類型的消息,因此接收消息的代碼不用再次編寫。
B:MapMessage鍵值對消息
鍵值對也就是map類型,可以設置key和value,通過set類型設置值,get類型獲取值。
/** * map類型 */ @GetMapping("map") public void mapMessage(){ jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name","張三"); mapMessage.setInt("age",20); return mapMessage; } }); }
在接收時需要進行類型的判斷,可參考TextMessage,其他類同。詳細代碼見源碼:
MapMessage mapMessage = (MapMessage) message; System.out.println("收到message是:" + mapMessage.getString("name") + "," + mapMessage.getInt("age"));
C:ObjectMessage序列化的java對象消息
在生產者方創建User對象,並把此對象復制到消費者方。需要注意的是,對象一定要進行序列化操作,否則無法發送成功
@Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private String username; private Integer age; private String password; }
發送消息:
/** * object類型 */ @GetMapping("obj") public void ObjectMessage(){ jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { ObjectMessage message = session.createObjectMessage(); User user=new User("admin",20,"1234"); message.setObject(user); return message; } }); }
接收消息:
ObjectMessage objectMessage = (ObjectMessage) message; User user = (User) objectMessage.getObject(); System.out.println("收到message是:" + user.toString());
在向消費者發送消息時,發發生異常,也就是說對象不被activemq信任。
那么就需要在配置文件進行配置,信任自定義對象。需要生產者和消費者方都配置
#配置信任列表 spring.activemq.packages.trust-all=true
D:BytesMessage字節流消息
以圖片的發送為例。發送消息:
/** * byte類型 */ @GetMapping("byte") public void BytesMessage(){ jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { BytesMessage message = session.createBytesMessage(); try { File file = new File("C:\\Users\\zhongyushi\\Pictures\\1.jpg"); FileInputStream stream=new FileInputStream(file); byte[] bytes = new byte[(int)file.length()]; stream.read(bytes); message.writeBytes(bytes); } catch (Exception e) { e.printStackTrace(); } return message; } }); }
接收消息:
BytesMessage bytesMessage = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bytes);
FileOutputStream fileOutputStream = new FileOutputStream("D://1.jpg");
fileOutputStream.write(bytes);
System.out.println("保存了");
上述的操作相當於圖片的復制。
E:StreamMessage字符流消息
可以發送任何類型的值,只是它沒有key只有value。
發送消息:
/** * stream類型 */ @GetMapping("stream") public void streamMessage(){ jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { StreamMessage streamMessage = session.createStreamMessage(); streamMessage.writeString("張三"); streamMessage.writeInt(20); return streamMessage; } }); }
接收消息:
StreamMessage streamMessage = (StreamMessage) message; System.out.println("收到message是:" + streamMessage.readString() + "," + streamMessage.readInt());
需要注意的是,這種類型的消息,同一種類型(string,int等)可設置多次,后面設置的數據會拼接在前面設置的數據的后面,使用逗號分隔。
3)消息屬性
主要是給消息設置自定義的屬性,實現對消息的過濾。以文本消息為例說明:
在發送消息時設置屬性:
TextMessage textMessage = session.createTextMessage("我是文本消息");
textMessage.setStringProperty("訂單","order");
return textMessage;
在接收消息時獲取屬性:
TextMessage textMessage = (TextMessage) message; System.out.println("自定義屬性:"+textMessage.getStringProperty("訂單")); System.out.println("收到message是:" + textMessage.getText());
2.6消息持久化
消息持久化是保證消息不丟失的重要方式。ActiveMQ提供了三種消息的存儲方式:基於內存的消息存儲、基於日志的消息存儲、基於JDBC的消息存儲。
2.6.1基於內存的消息存儲
會把消息存儲到內存中,當ActiveMQ服務重啟后消息會丟失,不推薦使用。
2.6.2基於日志的消息存儲
對於SpringBoot的架構,ActiveMQ默認就使用日志存儲。KahaDB是ActiveMQ默認的日志存儲方式。
需要在生產者配置文件進行配置
# 消息持久化配置
spring.jms.template.delivery-mode=persistent
配置這種方式后,日志是默認存儲在ActiveMQ安裝目錄下的data/kahadb目錄下。
2.6.3、基於JDBC的消息存儲
可以把消息存儲到數據庫中。同樣也需要在生產者配置文件進行持久化的配置。
1)修改ActiveMQ安裝目錄下的conf/activemq.xml文件,
添加數據源:這里數據庫采用mysql,數據源使用的是windows本地的數據庫(需要開啟遠程訪問,關閉本機防火牆)
<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.86.1:3306/db_activemq"/> <property name="username" value="root"/> <property name="password" value="zys123456"/> <property name="poolPreparedStatements" value="true"/> </bean>
指定使用數據源:把原來記錄日志方式改為指定jdbc方式
<persistenceAdapter> <!-- <kahaDB directory="${activemq.data}/kahadb"/> --> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter>
2)復制mysql驅動和數據庫連接池druid的ajr到ActiveMQ的lib目錄,版本可自主選擇
3)在本機新建數據庫db_activemq
4)重啟ActiveMQ服務,然后再啟動生產者,此時會在數據庫自動創建三個表
5)調用接口發送一條消息。打開表active_msgs,看到有一條消息待消費
6)啟動消費者,接收到消息,再次查看表active_msgs,待消費的一條記錄已被刪除
2.7消息事務
消息事務,是保證消息傳遞原子性的一個重要特征。一個事務性發送,其中一組消息要么能夠全部保證到達服務器,要么都不到達服務器。
2.7.1案例說明
先看下面的兩個案例。
1)案例1:循環發送數據,無異常
@GetMapping("text2") public void textMessage2() { for (int i = 0; i < 10; i++) { jmsMessagingTemplate.convertAndSend(queue, "消息" + i); } }
這種情況下10數據都可以發送成功
1)案例2:循環發送數據,有異常
@GetMapping("text2") public void textMessage2() { for (int i = 0; i < 10; i++) { if (i == 5) { int a = i / 0; } jmsMessagingTemplate.convertAndSend(queue, "消息" + i); } }
這種情況下前5條數據都可以發送成功,發生異常后后面5條數據不再發送了。那么這對消息的發送還是有影響了,違背了原子性的原則。
2.7.2生產者事務處理
針對上述的文件,SpringBoot提供了解決辦法。
1)在配置類中添加jms事務管理器
/** * jms事務管理器 * @param connectionFactory * @return */ @Bean public PlatformTransactionManager createTransactionManager(@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory){ return new JmsTransactionManager(connectionFactory); }
2)在發送消息的方法上添加注解@Transactional。
一般情況下,發送消息都在service層進行,這里方便演示便在controller層。@Transactional就是事務管理的注解,當發生異常時會自動回滾。它也可以用在數據庫的事務操作上。若同時存在於消息和數據的操作,那么事務會對它們同時生效。
2.7.3消費者事務處理
當在接收消息時,正常接收時應該提交事務,發生異常時回滾事務。在回滾時,MQ會重發6次消息,當6次都失敗時,會把此消息自動添加到死信隊列。以接收文本消息為例,代碼如下:
@JmsListener(destination = "${activemq.queue}") public void receiveQueue(Message message,Session session) { try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.println("收到message是:" + textMessage.getText()); //提交事務 session.commit(); } } catch (Exception e) { e.printStackTrace(); try { session.rollback(); } catch (JMSException jmsException) { jmsException.printStackTrace(); } } }
在接收消息時添加了Session參數,來控制事務。
2.8消息確認機制
常用的消息確認機制有兩種,分別是自動確認(Session.AUTO_ACKNOWLEDGE)和手動確認(Session.CLIENT_ACKNOWLEDGE)。
在SpringBoot的架構中,開啟了事務,那么消息是自動確認的,不需要再進行確認。
2.9消息投遞方式
投遞方式有四種,同步、異步、延遲、定時投遞。
2.9.1同步投遞
消息生產者使用持久傳遞模式發送消息時,Producer.send()方法會被阻塞,直到broker發送一個確認消息給生產者,這個確認消息暗示broker已經成功接收到消息並把消息保存到二級存儲中。
2.9.2異步投遞
如果應用程序能夠容忍一些消息的丟失,那么可以使用異步發送。異步發送不會在受到broker的確認之前一直阻塞Producer.send方法。
2.9.3延遲投遞
消息延遲發送到服務器。
2.9.4定時投遞
采用定時方式發送消息到服務器。
2.10死信隊列
用來保存處理失敗或者過期的消息。
3.RabbitMQ
由於篇幅問題,請參考博客https://www.cnblogs.com/zys2019/p/12828152.html
4.Kafka
由於篇幅問題,請參考博客https://www.cnblogs.com/zys2019/p/13202787.html