消息隊列MQ


1.定義和分類

1.1定義

MQ全稱為Message Queue,即消息隊列。“消息隊列”是在消息的傳輸過程中保存消息的容器。

它是典型的生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。

1.2分類

其常見的消息隊列產品如下表:

名稱 開發語言 時效性 說明
ActiveMQ java 毫秒級 基於JMS,是Apache出品,最流行的,能力強勁的開源消息總線
RabbitMQ

Erlang

微妙級

基於AMQP協議,erlang語言開發,穩定性好

Kafka Scala 毫秒級 分布式消息系統,高吞吐量。是Apache下的一個子項目
RocketMQ java 毫秒級 基於JMS,阿里巴巴產品,目前交由Apache基金會

具體的介紹見后續章節。它們的作用是異步處理,應用解耦,流量消峰。

1.3JMS與AMQP的異同

1)JMS全稱是java message service,即java消息服務。 AMQP全稱是Advanced Message Queuing Protocol,即高級消息隊列協議;

2)JMS定義了統一的接口,來對消息操作進行統一;AMQP通過規定協議來統一數據交互的格式;

3)JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的;

4)JMS規定了兩種消息模型;而AMQP的消息模型更加豐富。

2.ActiveMQ

2.1下載與安裝

官網:http://activemq.apache.org/

1)打開后點擊最新版本

 2)在新的頁面選擇linux對應的文件下載,這里以5.16.2為例說明

3)將apache-activemq-5.16.2-bin.tar.gz 上傳到Linux上,默認Linux上已安裝jdk8及以上版本。

4)解壓

tar  zxvf  apache-activemq-5.16.2-bin.tar.gz

5)給目錄賦權

chmod 777 apache-activemq-5.16.2
cd apache-activemq-5.16.2\bin
chmod 755 activemq 

6)修改配置文件,允許外部訪問

cd ..
vi conf/jetty.xml

把127.0.0.1改為本機ip,如下圖:

 7)啟動

cd bin
./activemq start

看到類似下圖的結果,說明啟動成功

停止命令

./activemq stop

重啟命令

./activemq restart

 8)進入管理頁

假設服務器地址為192.168.86.128 ,打開瀏覽器輸入地址,http://192.168.86.128:8161/ 即可進入ActiveMQ管理頁面。在進入時需要輸入用戶名和密碼,均是admin,進入后如下圖

 9)進入主界面。

其中端口8161是管理頁面,用戶名密碼都是admin;61616是服務端頁面,用戶名密碼是guest。

2.2消息模式

1)點對點模式

每個消息只有一個消費者。一旦被消費,消息就不再在消息隊列中。發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列。但接收者在成功接收消息之后需向隊列應答成功。

2)發布/訂閱模式

發布者發送到topic的消息,只有訂閱了topic的訂閱者才會收到消息。topic實現了發布和訂閱,當發布一個消息,所有訂閱這個topic的服務都能得到這個消息,所以從1到N個訂閱者都能得到這個消息的拷貝。也就是說這種模式每個消息可以有多個消費者,發布者和訂閱者之間有時間上的依賴性。另外訂閱者必須保持運行的狀態,才能接受發布者發布的消息。

2.3 基本入門

此代碼不在源碼中呈現,只做說明。

2.2.1環境准備

新建一個普通的maven工程,導入依賴

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>

2.2.2點對點模式

1)創建生產者QueueProducer

package com.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QueueConsumer {
    public static void main(String[] args) throws Exception {
        //1.創建連接工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.16886.128:61616");
        //2.獲取連接
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建隊列對象
        Queue queue = session.createQueue("test-queue");
        //6.創建消息消費
        MessageConsumer consumer = session.createConsumer(queue);

        //7.監聽消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.等待鍵盤輸入
        System.in.read();
        //9.關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
}

運行后通過ActiveMQ管理界面Queues菜單查詢,發現有一個消息等待消費

2)創建消費者QueueConsumer 

package com.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QueueConsumer {
    public static void main(String[] args) throws Exception {
        //1.創建連接工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616");
        //2.獲取連接
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建隊列對象
        Queue queue = session.createQueue("test-queue");
        //6.創建消息消費
        MessageConsumer consumer = session.createConsumer(queue);

        //7.監聽消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.等待鍵盤輸入
        System.in.read();
        //9.關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
}

運行之后再控制台打印了發送過來的消息,管理頁面的消息也被消費了。

2.2.3發布/訂閱模式

1)創建生產者TopicProducer

package com.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicProducer {
    public static void main(String[] args) throws Exception {
        //1.創建連接工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616");
        //2.獲取連接
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建主題對象
        Topic topic = session.createTopic("test-topic");
        //6.創建消息生產者
        MessageProducer producer = session.createProducer(topic);
        //7.創建消息
        TextMessage textMessage = session.createTextMessage("歡迎來到神奇jms世界");
        //8.發送消息
        producer.send(textMessage);
        //9.關閉資源
        producer.close();
        session.close();
        connection.close();
    }
}

2)創建消費者QueueConsumer 

package com.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicConsumer1 {
    public static void main(String[] args) throws Exception {
        //1.創建連接工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616");
        //2.獲取連接
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建主題對象
        Topic topic = session.createTopic("test-topic");
        //6.創建消息消費
        MessageConsumer consumer = session.createConsumer(topic);

        //7.監聽消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息1:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.等待鍵盤輸入
        System.in.read();
        //9.關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
}

 再創建一個消費者,開啟這兩個消費者,然后運行生產者,會發現兩個消費者會接收到消息。如果在生產者開啟之后開啟消費者,那么消費者是收不到消息的。在頁面選擇Topics查詢消息

2.4整合SpringBoot

源碼:https://github.com/zhongyushi-git/mq-collections.git。

1)新建兩個SpringBoot的項目,一個作為服務生產者,一個作為服務消費者

2)在生產者中導入依賴

        <!--activemq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

3)編寫生產者配置文件

# 應用名稱
spring.application.name=springboot-activemq-provider-demo
# 端口號
server.port=10010
# 配置服務器地址
spring.activemq.broker-url=tcp://192.168.86.128:61616
# activemq登錄名和密碼
spring.activemq.user=admin
spring.activemq.password=admin
# activemq模式,false點對點模式,true發布訂閱模式
spring.jms.pub-sub-domain=true
# 隊列名稱
activemq.queue=queue-msg
# 主題名稱
activemq.topic=topic-msg

4)編寫生產者配置類

package com.zxh.springbootactivemqproviderdemo.config;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;
import javax.jms.Topic;

@Configuration
public class ActiveMQConfig {

    @Value("${activemq.queue}")
    private String activemqQueue;

    //定義存放消息的隊列
    @Bean
    public Queue queue() {
        return new ActiveMQQueue(activemqQueue);
    }
}

5)編寫生產者controller接口

package com.zxh.springbootactivemqproviderdemo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.Queue;
import javax.jms.Topic;

@RestController
public class ProviderController {
 
    //注入存放消息的隊列
    @Autowired
    private Queue queue;

 
    //注入springboot封裝的工具類
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 點對點
     * @param name
     */
    @GetMapping("queue-send")
    public void queueSend(String name) {
        //發送消息
        jmsMessagingTemplate.convertAndSend(queue, name);
    }

}

6)編寫消費者配置文件

# 應用名稱
spring.application.name=springboot-activemq-demo
# 端口號
server.port=10011
# 配置服務器地址
spring.activemq.broker-url=tcp://192.168.86.128:61616
# activemq登錄名和密碼
spring.activemq.user=admin
spring.activemq.password=admin
# activemq模式,false點對點模式,true發布訂閱模式
spring.jms.pub-sub-domain=true
# 隊列名稱
activemq.queue=queue-msg
# 主題名稱
activemq.topic=topic-msg

7)編寫消費者監聽器

package com.zxh.springbootactivemqconsumerdemo.listener;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

@Component
public class ConsumerListener {

    @JmsListener(destination = "${activemq.queue}")
    public void receiveQueue(Message message) {
        if(message instanceof TextMessage){
            TextMessage textMessage= (TextMessage) message;
            try {
                System.out.println("收到的 queue message 是:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }

        }
    }

}

8)先啟動生產者,再啟動消費者,然后在瀏覽器輸入http://localhost:10010/queue-send?name=123,即可在消費者的控制台看到接收到的消息。

到這為止是點對點模式的消息,那么發布訂閱模式和其極為類似,在上述的代碼上稍作修改便可。

9)同時修改配置文件,把activemq模式改為發布訂閱模式,添加主題名稱

spring.jms.pub-sub-domain=true
# 主題名稱
activemq.topic=topic-msg

對於spring.jms.pub-sub-domain的值有兩種,分別是true和false,為true時就指定了是發布訂閱模式,為false時就指定了是點對點模式。

10)在生產者的配置類添加一個Bean

@Value("${activemq.topic}")
private String activemqTopic;


 //定義存放消息的主題
@Bean
public Topic topic() {
    return new ActiveMQTopic(activemqTopic);
}

11)在生產者的controller接口添加一個接口用作發布訂閱接口

    //注入存放消息的隊列
    @Autowired
    private Topic topic;  

     /**
     * 發布訂閱
     * @param name
     */
    @GetMapping("topic-send")
    public void topicSend(String name) {
        //發送消息
        jmsMessagingTemplate.convertAndSend(topic, name);
    }

12)在消費者的監聽器類添加發布訂閱模式的監聽

    @JmsListener(destination = "${activemq.topic}")
    public void receiveTopic(Message message) {
        if(message instanceof TextMessage){
            TextMessage textMessage= (TextMessage) message;
            try {
                System.out.println("收到的 topic message 是:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }

        }
    }

13)先啟動消費者,再啟動生產者,然后在瀏覽器輸入http://localhost:10010/topic-send?name=454545,即可在消費者的控制台看到接收到的消息,這便是發布/訂閱模式。

2.5 JMS消息組成

2.5.1JMS協議組成

結構 說明
JMS Provider 消息中間件
JMS Producer  消息生產者
JMS Consumer 消息消費者
JMS Message 消息

其中JMS Message是其最主要的部分,介紹如下。

2.5.2JMS消息組成

其有三部分組成,分別是消息頭、消息體、消息屬性。

1)消息頭

JMS消息頭預定義了若干字段用於客戶端與JMS提供者之間識別和發送消息,預編譯頭如下:

名稱 描述 設置者
JMSDestination 消息發送的目的地,是一個Topic或Queue send
JMSMessagelD 消息ID,需要以ID:開頭。不可篡改 send
JMSDeliveryMode 消息的發送模式,分為NON_PERSISTENT(持久化的)和PERSISTENT(非持久化的) send
JMSTimestamp  消息發送時的時間,可以理解為調用send()方法時的時間 send
JMSCorrelationID 關聯的消息ID client
JMSReplyTo 消息回復的目的地 client
JMSType 消息的類型 client
JMSExpiration  消息失效的時間,值0表明消息不會過期,默認值為0 send 
JMSPriority 息的優先級,0-4為普通的優化級,而5-9為高優先級,通常情況下,高優化級的消息需要優先與普通級發送。不可篡改 send

2)消息體

消息體就是真實發送的消息,包含5中格式。下面的演示代碼均以點對點模式為例。在消息生產者的controller中注入JmsTemplate

 @Autowired
 private JmsTemplate jmsTemplate;

原因是JmsMessagingTemplate無法通過指定消息類型來發送消息。

A:TextMessage字符串消息

    /**
     * 文本類型
     */
    @GetMapping("text")
    public void textMessage(){
        jmsTemplate.send(queue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("我是文本消息");
            }
        });
    }

這種方式和上述整合SpringBoot時的方式是一樣的,而在消費者端也是只獲取的文本類型的消息,因此接收消息的代碼不用再次編寫。

B:MapMessage鍵值對消息

鍵值對也就是map類型,可以設置key和value,通過set類型設置值,get類型獲取值。

    /**
     * map類型
     */
    @GetMapping("map")
    public void mapMessage(){
        jmsTemplate.send(queue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("name","張三");
                mapMessage.setInt("age",20);
                return mapMessage;
            }
        });
    }

在接收時需要進行類型的判斷,可參考TextMessage,其他類同。詳細代碼見源碼:

MapMessage mapMessage = (MapMessage) message;
System.out.println("收到message是:" + mapMessage.getString("name") + "," + mapMessage.getInt("age"));

C:ObjectMessage序列化的java對象消息

在生產者方創建User對象,並把此對象復制到消費者方。需要注意的是,對象一定要進行序列化操作,否則無法發送成功

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
    private String username;
    private Integer age;
    private String password;
}

發送消息:

    /**
     * object類型
     */
    @GetMapping("obj")
    public void ObjectMessage(){
        jmsTemplate.send(queue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                ObjectMessage message = session.createObjectMessage();
                User user=new User("admin",20,"1234");
                message.setObject(user);
                return message;
            }
        });
    }

接收消息:

ObjectMessage objectMessage = (ObjectMessage) message;
User user = (User) objectMessage.getObject();
System.out.println("收到message是:" + user.toString());

在向消費者發送消息時,發發生異常,也就是說對象不被activemq信任。

 那么就需要在配置文件進行配置,信任自定義對象。需要生產者和消費者方都配置

#配置信任列表
spring.activemq.packages.trust-all=true

D:BytesMessage字節流消息

以圖片的發送為例。發送消息:

    /**
     * byte類型
     */
    @GetMapping("byte")
    public void BytesMessage(){
        jmsTemplate.send(queue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                BytesMessage message = session.createBytesMessage();
                try {
                    File file = new File("C:\\Users\\zhongyushi\\Pictures\\1.jpg");
                    FileInputStream stream=new FileInputStream(file);
                    byte[] bytes = new byte[(int)file.length()];
                    stream.read(bytes);
                    message.writeBytes(bytes);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return message;
            }
        });
    }

接收消息:

BytesMessage bytesMessage = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bytes);
FileOutputStream fileOutputStream = new FileOutputStream("D://1.jpg");
fileOutputStream.write(bytes);
System.out.println("保存了");

上述的操作相當於圖片的復制。

E:StreamMessage字符流消息

可以發送任何類型的值,只是它沒有key只有value。

發送消息:

   /**
     * stream類型
     */
    @GetMapping("stream")
    public void streamMessage(){
        jmsTemplate.send(queue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                StreamMessage streamMessage = session.createStreamMessage();
                streamMessage.writeString("張三");
                streamMessage.writeInt(20);
                return streamMessage;
            }
        });
    }

接收消息:

StreamMessage streamMessage = (StreamMessage) message;
System.out.println("收到message是:" + streamMessage.readString() + "," + streamMessage.readInt());

需要注意的是,這種類型的消息,同一種類型(string,int等)可設置多次,后面設置的數據會拼接在前面設置的數據的后面,使用逗號分隔。

3)消息屬性

主要是給消息設置自定義的屬性,實現對消息的過濾。以文本消息為例說明:

在發送消息時設置屬性:

TextMessage textMessage = session.createTextMessage("我是文本消息");
textMessage.setStringProperty("訂單","order");
return textMessage;

在接收消息時獲取屬性:

TextMessage textMessage = (TextMessage) message;
System.out.println("自定義屬性:"+textMessage.getStringProperty("訂單"));
System.out.println("收到message是:" + textMessage.getText());

2.6消息持久化

消息持久化是保證消息不丟失的重要方式。ActiveMQ提供了三種消息的存儲方式:基於內存的消息存儲、基於日志的消息存儲、基於JDBC的消息存儲。

2.6.1基於內存的消息存儲

會把消息存儲到內存中,當ActiveMQ服務重啟后消息會丟失,不推薦使用。

2.6.2基於日志的消息存儲

對於SpringBoot的架構,ActiveMQ默認就使用日志存儲。KahaDB是ActiveMQ默認的日志存儲方式。

需要在生產者配置文件進行配置

# 消息持久化配置
spring.jms.template.delivery-mode=persistent

配置這種方式后,日志是默認存儲在ActiveMQ安裝目錄下的data/kahadb目錄下。

2.6.3、基於JDBC的消息存儲

可以把消息存儲到數據庫中。同樣也需要在生產者配置文件進行持久化的配置。

1)修改ActiveMQ安裝目錄下的conf/activemq.xml文件,

添加數據源:這里數據庫采用mysql,數據源使用的是windows本地的數據庫(需要開啟遠程訪問,關閉本機防火牆)

<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://192.168.86.1:3306/db_activemq"/>
        <property name="username" value="root"/>
        <property name="password" value="zys123456"/>
        <property name="poolPreparedStatements" value="true"/>
</bean>

指定使用數據源:把原來記錄日志方式改為指定jdbc方式

 <persistenceAdapter>
          <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
          <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
 </persistenceAdapter>

2)復制mysql驅動和數據庫連接池druid的ajr到ActiveMQ的lib目錄,版本可自主選擇

3)在本機新建數據庫db_activemq

4)重啟ActiveMQ服務,然后再啟動生產者,此時會在數據庫自動創建三個表

 

5)調用接口發送一條消息。打開表active_msgs,看到有一條消息待消費

6)啟動消費者,接收到消息,再次查看表active_msgs,待消費的一條記錄已被刪除

2.7消息事務

消息事務,是保證消息傳遞原子性的一個重要特征。一個事務性發送,其中一組消息要么能夠全部保證到達服務器,要么都不到達服務器。

2.7.1案例說明

先看下面的兩個案例。

1)案例1:循環發送數據,無異常

    @GetMapping("text2")
    public void textMessage2() {
        for (int i = 0; i < 10; i++) {
            jmsMessagingTemplate.convertAndSend(queue, "消息" + i);
        }
    }

這種情況下10數據都可以發送成功

1)案例2:循環發送數據,有異常

    @GetMapping("text2")
    public void textMessage2() {
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                int a = i / 0;
            }
            jmsMessagingTemplate.convertAndSend(queue, "消息" + i);
        }
    }

這種情況下前5條數據都可以發送成功,發生異常后后面5條數據不再發送了。那么這對消息的發送還是有影響了,違背了原子性的原則。

2.7.2生產者事務處理

針對上述的文件,SpringBoot提供了解決辦法。

1)在配置類中添加jms事務管理器

    /**
     * jms事務管理器
     * @param connectionFactory
     * @return
     */
    @Bean
    public PlatformTransactionManager createTransactionManager(@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory){
        return new JmsTransactionManager(connectionFactory);
    }

2)在發送消息的方法上添加注解@Transactional。

一般情況下,發送消息都在service層進行,這里方便演示便在controller層。@Transactional就是事務管理的注解,當發生異常時會自動回滾。它也可以用在數據庫的事務操作上。若同時存在於消息和數據的操作,那么事務會對它們同時生效。

2.7.3消費者事務處理

當在接收消息時,正常接收時應該提交事務,發生異常時回滾事務。在回滾時,MQ會重發6次消息,當6次都失敗時,會把此消息自動添加到死信隊列。以接收文本消息為例,代碼如下:

@JmsListener(destination = "${activemq.queue}")
    public void receiveQueue(Message message,Session session) {
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("收到message是:" + textMessage.getText());
                //提交事務
 session.commit();
            }
        } catch (Exception e) {
            e.printStackTrace();
            try {
                session.rollback();
            } catch (JMSException jmsException) { jmsException.printStackTrace(); }
        }
    }

在接收消息時添加了Session參數,來控制事務。

2.8消息確認機制

常用的消息確認機制有兩種,分別是自動確認(Session.AUTO_ACKNOWLEDGE)和手動確認(Session.CLIENT_ACKNOWLEDGE)。

在SpringBoot的架構中,開啟了事務,那么消息是自動確認的,不需要再進行確認。

2.9消息投遞方式

投遞方式有四種,同步、異步、延遲、定時投遞。

2.9.1同步投遞

消息生產者使用持久傳遞模式發送消息時,Producer.send()方法會被阻塞,直到broker發送一個確認消息給生產者,這個確認消息暗示broker已經成功接收到消息並把消息保存到二級存儲中。

2.9.2異步投遞

如果應用程序能夠容忍一些消息的丟失,那么可以使用異步發送。異步發送不會在受到broker的確認之前一直阻塞Producer.send方法。

2.9.3延遲投遞

消息延遲發送到服務器。

2.9.4定時投遞

采用定時方式發送消息到服務器。

2.10死信隊列

用來保存處理失敗或者過期的消息。

3.RabbitMQ

由於篇幅問題,請參考博客https://www.cnblogs.com/zys2019/p/12828152.html

4.Kafka

由於篇幅問題,請參考博客https://www.cnblogs.com/zys2019/p/13202787.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM