Springboot+ActiveMQ(ActiveMQ消息持久化,保證JMS的可靠性,消費者冪等性)


ActiveMQ 持久化設置:

在redis中提供了兩種持久化機制:RDB和AOF 兩種持久化方式,避免redis宕機以后,能數據恢復,所以持久化的功能 對高可用程序來說 很重要。

同樣在ActiveMQ 中 也提供了持久化的功能,在生產者 生產消息 到隊列中,可以通過設置 該消息在隊列中是否持久化。持久化以后,即使ActiveMQ重啟了,隊列中的消息也不會丟失

java中,在生產者 發送消息的時候可以通過api 設置 

producer.setDeliveryMode(DeliveryMode.PERSISTENT)
package com.example.demo.producter;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMqProducter {
    public static String url = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws JMSException {
        // 根據用戶名 和密碼,地址,獲取JMS 的連接工廠 61616端口 是mq服務的端口 而8161 是mq提供的管理后端的端口
        ActiveMQConnectionFactory connetionFactory = new ActiveMQConnectionFactory("admin", "admin", url);
        // 從連接工廠創建一條連接
        Connection connection = connetionFactory.createConnection();
        // 開啟連接
        connection.start();
        // 創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這里啟用的是自動應答 一個類似 接受 或者發送的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 用session創建一個
        Destination destination = session.createQueue("mq-msg");
        // MessageProducer:消息生產者
        MessageProducer producer = session.createProducer(destination);
        // 設置不持久化 NON_PERSISTENT  PERSISTENT設置持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 發送一條消息
        for (int i = 1; i <= 5; i++) {
            sendMsg(session, producer, i);
        }
        connection.close();

    }

    /**
     * 在指定的會話上,通過指定的消息生產者發出一條消息
     * 
     * @param session
     *            消息會話
     * @param producer
     *            消息生產者
     */
    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        // 創建一條文本消息
        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
        // 通過消息生產者發出消息
        producer.send(message);
    }

}

 

如何保證JMS可靠性:

 JMS:java消息服務,JMS服務客戶端之間通過異步的方式進行消息傳遞,在消息傳遞的時候,如何保證傳輸的消息是不是可靠。

 例如:生產者 生產消息發送到隊列的時候,如果有異常拋出。消費者從隊列中拉取消息消費的時候,程序異常,消費失敗。

ActiviMQ的消息簽收機制:客戶端成功接收一條消息的標志是一條消息被簽收,成功應答

消息的成功消費通常包含三個階段:客戶接收消息、客戶處理消息和消息被確認。

1.自動簽收(生產者和消費者 生產消息和消費消息,都是自動完成 )

生產者:

package com.example.demo.producter;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMqProducter {
    public static String url = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws JMSException {
        // 根據用戶名 和密碼,地址,獲取JMS 的連接工廠 61616端口 是mq服務的端口 而8161 是mq提供的管理后端的端口
        ActiveMQConnectionFactory connetionFactory = new ActiveMQConnectionFactory("admin", "admin", url);
        // 從連接工廠創建一條連接
        Connection connection = connetionFactory.createConnection();
        // 開啟連接
        connection.start();
        // 創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這里啟用的是自動應答   一個類似 接受 或者發送的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 用session創建一個
        Destination destination = session.createQueue("mq-msg");
        // MessageProducer:消息生產者
        MessageProducer producer = session.createProducer(destination);
        // 設置不持久化 NON_PERSISTENT  PERSISTENT設置持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 發送一條消息
        for (int i = 1; i <= 5; i++) {
            sendMsg(session, producer, i);
        }
        connection.close();

    }

    /**
     * 在指定的會話上,通過指定的消息生產者發出一條消息
     * 
     * @param session
     *            消息會話
     * @param producer
     *            消息生產者
     */
    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        // 創建一條文本消息
        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
        // 通過消息生產者發出消息
        producer.send(message);
    }

}

消費者:

package com.example.demo.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
    public static void main(String[] args) throws JMSException {
        // ConnectionFactory :連接工廠,JMS 用它創建連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");
        // JMS 客戶端到JMS Provider 的連接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息發送給誰.
        // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
        Destination destination = session.createQueue("mq-msg");
        // 消費者,消息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        while (true) {
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println("收到消息:" + message.getText());
            } else
                break;
        }
        session.close();
        connection.close();
    }

}

通過:

Session.AUTO_ACKNOWLEDGE 表示自動簽收
 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE)

 

2.手動簽收

生產者:

package com.example.demo.producter;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMqProducter {
    public static String url = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws JMSException {
        // 根據用戶名 和密碼,地址,獲取JMS 的連接工廠 61616端口 是mq服務的端口 而8161 是mq提供的管理后端的端口
        ActiveMQConnectionFactory connetionFactory = new ActiveMQConnectionFactory("admin", "admin", url);
        // 從連接工廠創建一條連接
        Connection connection = connetionFactory.createConnection();
        // 開啟連接
        connection.start();
        // 創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這里啟用的是自動應答 一個類似 接受 或者發送的線程
        Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
        // 用session創建一個
        Destination destination = session.createQueue("mq-msg");
        // MessageProducer:消息生產者
        MessageProducer producer = session.createProducer(destination);
        // 設置不持久化 NON_PERSISTENT PERSISTENT設置持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        // 發送一條消息
        for (int i = 1; i <= 5; i++) {
            sendMsg(session, producer, i);
        }
        connection.close();

    }

    /**
     * 在指定的會話上,通過指定的消息生產者發出一條消息
     * 
     * @param session
     *            消息會話
     * @param producer
     *            消息生產者
     */
    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        // 創建一條文本消息
        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
        // 通過消息生產者發出消息
        producer.send(message);
    }

}

消費者:

package com.example.demo.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
    public static void main(String[] args) throws JMSException {
        // ConnectionFactory :連接工廠,JMS 用它創建連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");
        // JMS 客戶端到JMS Provider 的連接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
        // Destination :消息的目的地;消息發送給誰.
        // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
        Destination destination = session.createQueue("mq-msg");
        // 消費者,消息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        while (true) {
            TextMessage message = (TextMessage) consumer.receive();
            message.acknowledge();//表示 手動簽收,應答,告訴隊列消費成功,可以清除消費的消息
            if (null != message) {
                System.out.println("收到消息:" + message.getText());
            } else
                break;
        }
        session.close();
        connection.close();
    }

}

生產者 和消費者中 需要將應該類型修改成:

Session.CLIENT_ACKNOWLEDGE
    Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);

消費者 還需要手動簽收:

    message.acknowledge();//表示 手動簽收,應答,告訴隊列消費成功,可以清除消費的消息

 

3.事務簽收

生產者:

package com.example.demo.producter;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMqProducter {
    public static String url = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws JMSException {
        // 根據用戶名 和密碼,地址,獲取JMS 的連接工廠 61616端口 是mq服務的端口 而8161 是mq提供的管理后端的端口
        ActiveMQConnectionFactory connetionFactory = new ActiveMQConnectionFactory("admin", "admin", url);
        // 從連接工廠創建一條連接
        Connection connection = connetionFactory.createConnection();
        // 開啟連接
        connection.start();
        // 創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這里啟用的是自動應答 一個類似 接受 或者發送的線程
        Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
        // 用session創建一個
        Destination destination = session.createQueue("mq-msg");
        // MessageProducer:消息生產者
        MessageProducer producer = session.createProducer(destination);
        // 設置不持久化 NON_PERSISTENT PERSISTENT設置持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        // 發送一條消息
        for (int i = 1; i <= 5; i++) {
            sendMsg(session, producer, i);
            session.commit();//生產者提交事務完成以后,該消息才能存放到隊列之中
        }
        connection.close();

    }

    /**
     * 在指定的會話上,通過指定的消息生產者發出一條消息
     * 
     * @param session
     *            消息會話
     * @param producer
     *            消息生產者
     */
    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        // 創建一條文本消息
        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
        // 通過消息生產者發出消息
        producer.send(message);
    }

}

消費者:

 1 package com.example.demo.consumer;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageConsumer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10 
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13 
14 public class Consumer {
15     public static void main(String[] args) throws JMSException {
16         // ConnectionFactory :連接工廠,JMS 用它創建連接
17         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");
18         // JMS 客戶端到JMS Provider 的連接
19         Connection connection = connectionFactory.createConnection();
20         connection.start();
21         // Session: 一個發送或接收消息的線程
22         Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
23         // Destination :消息的目的地;消息發送給誰.
24         // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
25         Destination destination = session.createQueue("mq-msg");
26         // 消費者,消息接收者
27         MessageConsumer consumer = session.createConsumer(destination);
28         while (true) {
29             TextMessage message = (TextMessage) consumer.receive();
30             session.commit();//表示成功消費 該條信息,應答完成,從隊列中清除該消息
31             if (null != message) {
32                 System.out.println("收到消息:" + message.getText());
33             } else
34                 break;
35         }
36         session.close();
37         connection.close();
38     }
39 
40 }

 

生產者 和消費者

Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);

開啟事務就可以了,至於選擇 哪種簽收類型,嘗試了,自動和 手動都可以,只需要 在生產者和消費者里面 提交事務就可以了。

並不是所有的消息中間件都是以這種方式保證消息的可行性,只是ActiveMQ 是用了這三種應答機制。

 

Springboot+Activemq整合

1 導入整合所需要的依賴:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

2 創建 application.properties文件

spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
server.port=8080
queue=myqueue

3.自定義配置文件QueueConfig 讀取配置文件的隊列名,根據隊列名字創建一個Queue

 

package com.example.demo;

import javax.jms.Queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class QueueConfig {

    @Value("${queue}")
    private String queue;

    @Bean
    public Queue logQueue() {
        return new ActiveMQQueue(queue);
    }
}

4.創建生產者,可以直接使用提供的模板 JmsMessagingTemplate 進行消息的發送:

package com.example.demo.producter;

import javax.jms.Queue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import com.example.demo.SpringbootActivemqApplication;

@Component
public class Producter {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    private Queue queue;
    private static Logger logger = LoggerFactory.getLogger(
Producter 
.class); public void send() { String str = "生產者生產數據:" + System.currentTimeMillis(); jmsMessagingTemplate.convertAndSend(queue, str); logger.info("生產者數據:{}", str); } }

 

5.啟動類:

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.example.demo.producter.Producter;
import com.example.demo.producter.consumer.Consumer;

@SpringBootApplication
@EnableScheduling
public class SpringbootActivemqApplication implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    public Producter producter;
    @Autowired
    public Consumer consumer;

    public static void main(String[] args) {
        SpringApplication.run(SpringbootActivemqApplication.class, args);
        //onApplicationEvent方法 在啟動springboot的時候 會運行該方法,可根據項目實際情況 選擇合適調用消息發送方法

    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        producter.send();
    }

}

 

6.啟動項目,控制台輸出內容:

 

   

 

7.創建消費者,創建消費者比較容易,只需要監聽隊列就可以:

 

package com.example.demo.producter.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = "${queue}")
    public void receive(String msg) {
        System.out.println("監聽器收到msg:" + msg);
    }

}

8.最后結果:

日志顯示先后順序 是因為選擇打印的方式不一樣.可以忽略不計

 

使用ActivetyMQ注意事項

 在Activetymq中,消費者在接受消息消費的時候,如果程序出現了異常,Activemq會有自動重試機制

 例如:

package com.example.demo.producter.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = "${queue}")
    public void receive(String msg) {
        System.out.println("監聽器收到msg:" + msg);
        int i = 1 / 0;
    }

}

 

通過 結果可以看出,該消息會一直重試,在項目中可能出現的異常原因 粗略算兩種吧:

1.需要通過修改代碼,重新打包發布的bug,比如空指針,格式異常,這些屬於開發人員程序的bug

2.調用第三方接口的時候,出現超時,或者一段時間內連接不上,如數據庫連接超時,過段時間可能就能連上這種

 

對於第一種出現的情況,即使Acitemq重試N次,程序還是會錯,所以這種情況就可以不需要讓Activemq重試了

對於第二種情況,則有必要讓mq重試,數據庫連接可能過段時間久恢復了,此時在重試 則可以正常消費。

對於第一種情況,如果是程序代碼引起的異常,重試機制 也起不到作用了,則 可以通過 日志記錄 然后人工手動補償數據,或者說定時job健康檢查

 

 

Activemq冪等性和消費者集群:

 什么叫冪等性:一次和多次請求某一個資源對於資源本身應該具有同樣的結果(網絡超時等問題除外)。也就是說,其任意多次執行對資源本身所產生的影響均與一次執行的影響相同。

舉例:在Activemq中,消費者消費消息的時候,數據庫里面的age字段的值是1,消費完消息以后 值變成2 ,此時程序有異常拋出。Activemq重試機制,消費者 在去消費該消息的時候,數據庫里面的值 應該依舊是2 不會變成3,多次消費同一條消息,不影響數據庫的結果.

package com.example.demo.producter.consumer;

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @JmsListener(destination = "${queue}")
    public void receive(TextMessage message, Session session) throws JMSException, InterruptedException {

        String msgid = message.getJMSMessageID();
        String msg = message.getText();
        System.out.println("監聽器收到msg:" + msg + "消息id" + msgid);
        // 檢測是否存在該消息id的key 如果不存在表示 第一次消費
        if (!redisTemplate.hasKey(msgid)) {
            // 數據庫age+1
            int age = 0;
            age++;
            redisTemplate.opsForValue().set(msgid, msgid);
        } else {
            // 如果存在 表示已經消費成功,在次接受該消息的時候,則手動簽收 避免在次重試
            message.acknowledge();
        }
    }

}

session.recover(); 方法 可以告訴Activemq 重試

消費者集群會不會出現消息被重復消費的情況呢:不會,在一個隊列的情況下,隊列是知道當前有多少個消費者連接,分發消息的時候不會出現重復消費

隊列或者生產者集群的時候 可能會出現重復消費的情況,可以使用zk避免該情況

 

 

以上都是Activemq的一些基本知識,掌握mq是必要掌握的技能

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM