Spring Boot 整合 ActiveMQ 實現手動確認和重發消息


    消息隊列中間件是分布式系統中重要的組件,已經逐漸成為企業系統內部通信的核心手段。主要功能包括松耦合、異步消息、流量削鋒、可靠投遞、廣播、流量控制、最終一致性等。實現高性能,高可用,可伸縮和最終一致性架構。消息形式支持點對點和訂閱-發布。

   消息隊列中間件常見的應用場景包括應用解耦、異步處理、流量錯峰與流控、日志處理等等。目前常見的消息隊列中間件有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ和RocketMQ等。本文在Spring Boot整合ActiveMQ的基礎上,以點對點模式為例介紹兩種消息處理機制——手動確認和重發消息。

   之所以介紹消息重發機制,是因為ActiveMQ被消費失敗的消息將會駐留在隊列中,因為沒有進行消息確認,所以,下次還會監聽到這條消息。

軟件環境

  • Apache ActiveMQ 5.15.9
  • Java version 11
  • IntelliJ IDEA 2020.2 (Ultimate Edition)
  • Spring Boot 2.3.1.RELEASE

 

1.引入ActiveMQ pom 坐標和基本配置

   引入pom坐標: 

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
            <version>2.3.3.RELEASE</version>
        </dependency>

2. 修改配置文件

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=true spring.activemq.pool.enabled=false #默認值false,表示點到點模式,true時代表發布訂閱模式 spring.jms.pub-sub-domain=false spring.activemq.user=wiener spring.activemq.password=wiener123

3. SpringBoot 的啟動類配置

   在 SpringBoot 的啟動類上加上一個 @EnableJms 注解。我的項目去掉此注解也可以正常啟動,故沒有添加,你的項目如果啟動失敗,請添加。 

4. ActiveMQ 連接配置

   當我們發送消息的時候,會出現發送失敗的情況,此時我們需要用到ActiveMQ提供的消息重發機制,重新發送消息。那么問題來了,我們怎么知道消息是否發送成功呢?ActiveMQ還有消息確認機制,消費者在接收到消息的時候可以進行確認。本節將學習ActiveMQ消息的確認機制和重發機制。

   消息確認機制有五種,在session對象中定義了如下四種:

  1. SESSION_TRANSACTED= 0:事務提交並確認
  2. AUTO_ACKNOWLEDGE= 1 :自動確認
  3. CLIENT_ACKNOWLEDGE= 2:客戶端手動確認
  4. UPS_OK_ACKNOWLEDGE= 3: 自動批量確認

   在ActiveMQSession類中補充了一個自定義的ACK機制:

   INDIVIDUAL_ACKNOWLEDGE= 4:單條消息確認。

 

   消息確認機制的使用場景分兩種:

   1、帶事務的session

   如果session帶有事務,並且事務成功提交,則消息被自動簽收。如果事務回滾,則消息會被再次傳送。

   2、不帶事務的session

   不帶事務的session的簽收方式,取決於session的配置。

   新建一個配置類ActiveMQConfig用來配置 ActiveMQ 連接屬性,在工廠中設置開啟單條消息確認模式INDIVIDUAL_ACKNOWLEDGE。注意:ActiveMQ 默認是開啟事務的,且消息確認機制與事務機制是沖突的,只能二選一,所以演示消息確認前,請先關閉事務。

   配置代碼如下:

import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.RedeliveryPolicy; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.config.SimpleJmsListenerContainerFactory; /** * 配置 Active MQ * @author Wiener * @date 2020/9/15 */ @Configuration public class ActiveMQConfig { @Value("${spring.activemq.user}") private String userName; @Value("${spring.activemq.password}") private String password; @Value("${spring.activemq.broker-url}") private String brokerUrl; /** * 配置名字為givenConnectionFactory的連接工廠 * @return
     */ @Bean("givenConnectionFactory") public ActiveMQConnectionFactory connectionFactory() { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, brokerUrl); // 設置重發機制
         RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setUseExponentialBackOff(Boolean.TRUE); // 消息處理失敗重新處理次數,默認為6次
         policy.setMaximumRedeliveries(2); // 重發時間間隔,默認為1秒
         policy.setInitialRedeliveryDelay(1000L); policy.setBackOffMultiplier(2); policy.setMaximumRedeliveryDelay(1000L); activeMQConnectionFactory.setRedeliveryPolicy(policy); return activeMQConnectionFactory; } /** * 在Queue模式中,對消息的監聽需要對containerFactory進行配置 * * @param givenConnectionFactory * @return
     */ @Bean("queueListener") public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ActiveMQConnectionFactory givenConnectionFactory) { SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); // 關閉事務
         factory.setSessionTransacted(false); // 手動確認消息
 factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); factory.setPubSubDomain(false); factory.setConnectionFactory(givenConnectionFactory); return factory; } @Bean("topicListener") public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory givenConnectionFactory){ //設置為發布訂閱模式, 默認情況下使用生產消費者方式
         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(givenConnectionFactory); return bean; } }

這里設置消息發送失敗時重發次數為2次,其系統默認值為6次,源碼如下:

 

5.手動確認和重發消息

   JmsTemplate是消息處理核心類(線程安全),用於發送和接收消息,在發送或者是消費消息的同時也會對所需資源進行創建和釋放。消息消費采用receive方式(同步、阻塞的),這里不做具體描述。關於消息的發送,常用的方式為send()以及convertAndSend()兩種,其中send()發送需要指定消息格式,使用convertAndSend可以根據定義好的MessageConvert自動轉換消息格式。

   消息生產者代碼比較簡單,主要是調用 jmsMessagingTemplate 的 convertAndSend() 方法。發送的消息如果是對象類型的,可以將其轉成 json 字符串。

@Service("producer") public class Producer { /** * 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝 */ @Autowired private JmsMessagingTemplate jmsTemplate; /** * 發送消息 * * @param destination 指定消息目的地 * @param message 待發送的消息 */
    public void sendMessage(Destination destination, final String message) { jmsTemplate.convertAndSend(destination, message); } }

其中,Student 類的定義如下:

/** * @author Wiener */ @Getter @Setter @ToString @Component public class Student implements Serializable { private static final long serialVersionUID = 4481359068243928443L; private Long id; /** 姓名 */
    private String name; private String birthDay; public Student() { } public Student(Long id, String name, String birthDay) { this.id = id; this.name = name; this.birthDay = birthDay; } }

   在消費者監聽類中,我們使用@JmsListener監聽隊列消息。大家請注意,如果我們不在@JmsListener中指定containerFactory,那么將使用默認配置,默認配置中Session是開啟了事務的,即使我們設置了手動Ack也是無效的。

   定義兩個名為 "east7-queue" 的消費者,在消費消息時,二者默認會輪詢消費。在消費者消費完一條消息之后,調用 message.acknowledge() 進行消息的手動確認。如果在消費者中未進行手動確認,由於 ActiveMQ 進行了持久化消息,那么在下次啟動項目的時候還會再次發送該消息。

import com.eg.wiener.dto.Student; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.activemq.command.ActiveMQMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.TextMessage; @Component public class ConsumerListener { private static Logger logger = LoggerFactory.getLogger(ConsumerListener.class); // 記錄重發總次數
    private static int num = 0; /** * east7-queue普通隊列:消費者1 */ @JmsListener(destination = "east7-queue", containerFactory = "queueListener") public void receiveQueueTest1(ActiveMQMessage message, Session session) throws JsonProcessingException, JMSException { logger.info("receiveQueueTest:1-mode {}", session.getAcknowledgeMode()); String text = null; if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; ObjectMapper objectMapper = new ObjectMapper(); Student student = objectMapper.readValue(textMessage.getText(), Student.class); logger.info("queue1接收到student:{}", student); // 手動確認
              try { message.acknowledge(); } catch (JMSException e) { // 此不可省略 重發信息使用
 session.recover(); } } } /** * east7-queue普通隊列:消費者2 */ @JmsListener(destination = "east7-queue", containerFactory = "queueListener") public void receiveQueueTest2(ActiveMQMessage message, Session session) throws JMSException, JsonProcessingException { logger.info("receiveQueueTest:2-transacted mode {}", session.getTransacted()); String msg = null; if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; msg = textMessage.getText(); ObjectMapper objectMapper = new ObjectMapper(); Student student = objectMapper.readValue(msg, Student.class); logger.info("queue2接收到student:{}", student); try { if (msg.contains("典韋5") || msg.contains("典韋6") || msg.contains("典韋7")) { throw new JMSException("故意拋出的異常"); } message.acknowledge(); } catch (JMSException e) { ++ num; System.out.println(String.format("觸發重發機制,num = %s, msg = %s", num, msg)); session.recover(); } } } }

   這里為了驗證消息重發機制,故意在消費消息的時候拋出了一個異常。我們的Junit測試用例如下:

 @Test public void ackMsgTest() throws JsonProcessingException { Destination destination = new ActiveMQQueue("east7-queue"); Student student = new Student(1L, "張三", DateUtils.parseDateSM(new Date())); ObjectMapper objectMapper = new ObjectMapper(); for (int i = 0; i < 10; i++) { student.setId(i * 10L); student.setName("典韋" + i); producer.sendMessage(destination, objectMapper.writeValueAsString(student)); } sleep(5000L); //給消費者足夠的時間消費消息
 } private void sleep(long time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } }

   執行測試函數后,可以發現兩個消費者交替消費消息。而且可以發現,每個錯誤消息重發次數為3次,與我們的預期吻合。

觸發重發機制,num = 1, msg = {"id":500,"name":"典韋5","birthDay":"2020-09-20 16:31:39"} 觸發重發機制,num = 2, msg = {"id":500,"name":"典韋5","birthDay":"2020-09-20 16:31:39"} 觸發重發機制,num = 3, msg = {"id":500,"name":"典韋5","birthDay":"2020-09-20 16:31:39"} 觸發重發機制,num = 4, msg = {"id":700,"name":"典韋7","birthDay":"2020-09-20 16:31:39"} 觸發重發機制,num = 5, msg = {"id":700,"name":"典韋7","birthDay":"2020-09-20 16:31:39"} 觸發重發機制,num = 6, msg = {"id":700,"name":"典韋7","birthDay":"2020-09-20 16:31:39"}

Reference

https://www.cnblogs.com/liuyuan1227/p/10776189.html

https://blog.csdn.net/chinese_cai/article/details/108342611

https://blog.csdn.net/p_programmer/article/details/88384138

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM