消息中間件解決方案續
上一篇中我們講到了在Spring工程中基本的使用消息中間件,這里就不在繼續贅述。
針對消息中間件,這篇講解兩個我們常遇到的問題。
問題1:如果我們的消息的接收過程中發生異常,怎么解決?
問題2:發布訂閱模式(Topic)下如果消費端宕機引起的消息的丟失,怎么解決?
問題解決方案:
問題1暫時有兩種解決方案:第一種是開啟消息確認機制,第二種開啟事務。下面會在點對點模式下進行演示。
問題2的解決方案:實現發布訂閱消息的持久化保存。
根據上面的解決方案搭建工程如下:測試消息的重發使用的是點對點模式(queue)
問題一解決方案如下:
方案一:消息確認機制
1.消息的生產類QueueProducer
1 @Component 2 public class QueueProducer { 3 4 @Autowired 5 private JmsTemplate jmsTemplate; 6 7 @Autowired // 注意Destination是javax.jms.Destination; 8 private Destination queueTextDestination; 9 10 /** 11 * 點對點方式發送文本信息 12 * @param message 13 */ 14 public void sendTestMessage(final String message){ 15 jmsTemplate.send(queueTextDestination, new MessageCreator() { 16 @Override 17 public Message createMessage(Session session) throws JMSException { 18 return session.createTextMessage(message); 19 } 20 }); 21 } 22 }
2.消息的監聽類MyMessageListenerQueueAcknowledge
1 public class MyMessageListenerQueueAcknowledge implements SessionAwareMessageListener { 2 3 @Override 4 public void onMessage(Message message, Session session) throws JMSException { 5 // 為了在點對點模式情況下記錄消息發送的次數 6 System.out.println(System.currentTimeMillis()+"請接收開啟了消息確認機制的消息"); 7 8 try { // 模擬發生異常 9 if(1==1){ 10 throw new RuntimeException("出異常了"); 11 } 12 TextMessage textMessage = (TextMessage)message; 13 System.out.println(textMessage); 14 System.out.println("queue模式接收到新消息"+textMessage.getText()); 15 16 message.acknowledge();// 接收完成,通知activeMq我們正常消費完 17 } catch (JMSException e) { 18 session.recover();// 發生異常,通知activeMQ需要恢復消息發送重新消費 19 e.printStackTrace(); 20 } 21 } 22 }
3.消息生產者的配置文件producer-queue.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:amq="http://activemq.apache.org/schema/core" 6 xmlns:jms="http://www.springframework.org/schema/jms" 7 xsi:schemaLocation="http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context.xsd"> 11 12 <!--包掃描--> 13 <context:component-scan base-package="com.buwei"></context:component-scan> 14 15 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 16 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 17 <property name="brokerURL" value="tcp://192.168.25.128:61616"/> 18 </bean> 19 20 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 21 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 22 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 23 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 24 </bean> 25 26 <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> 27 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 28 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 29 <property name="connectionFactory" ref="connectionFactory"/> 30 </bean> 31 32 <!--這個是隊列目的地,點對點的 文本信息--> 33 <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> 34 <constructor-arg value="jms-queue"/> 35 </bean> 36 37 </beans>
4.消息消費者的配置文件consumer-queue-acknowledge.xml
這是需要主要第53行配置的消息的確認模式為CLIENT_ACKNOWLEDGE,有兩種配置在注釋用有指出。
其中第19行以及22-25注釋掉的配置是對於消息重發的一些規則的配置,為了不影響測試效果,這里先注釋掉。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:amq="http://activemq.apache.org/schema/core" 6 xmlns:jms="http://www.springframework.org/schema/jms" 7 xsi:schemaLocation="http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context.xsd"> 11 12 <!--包掃描--> 13 <context:component-scan base-package="com.buwei"></context:component-scan> 14 15 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 16 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 17 <property name="brokerURL" value="tcp://192.168.25.128:61616"/> 18 <!--配置消息重發的是一些設置--> 19 <!--<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>--> 20 </bean> 21 22 <!--實現消息重發的bean--> 23 <!--<bean id="activeMQRedeliveryPolicy" class = "org.apache.activemq.RedeliveryPolicy"> 24 <property name="maximumRedeliveries" value="2"></property> 25 </bean>--> 26 27 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 28 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 29 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 30 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 31 </bean> 32 33 <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> 34 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 35 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 36 <property name="connectionFactory" ref="connectionFactory"/> 37 </bean> 38 39 <!--這個是消息目的地,點對點的 文本信息--> 40 <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> 41 <constructor-arg value="jms-queue"/> 42 </bean> 43 44 <!--我的監聽類--> 45 <bean id="myMessageListenerQueue" class="com.buwei.MyMessageListenerQueueAcknowledge"></bean> 46 47 <!--消息監聽容器--> 48 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" > 49 <property name="connectionFactory" ref="connectionFactory" /> 50 <property name="destination" ref="queueTextDestination" /> 51 <property name="messageListener" ref="myMessageListenerQueue" /> 52 <!--設置消息的確認模式,數字2對應為ClIENT_ACKNOWLEDGE模式,也可以設置屬性sessionAcknowledgeModeName的名稱來實現--> 53 <property name="sessionAcknowledgeMode" value="2"/> 54 </bean> 55 </beans>
5.配置消息生產者的測試類QueueProducerTest
1 @RunWith(SpringJUnit4ClassRunner.class) 2 @ContextConfiguration(locations = "classpath:producer-queue.xml") 3 public class QueueProducerTest { 4 5 @Autowired 6 private QueueProducer queueProducer; 7 8 @Test 9 public void queueSendTest(){ 10 queueProducer.sendTestMessage("SpringJms-queue模式,吃了嘛?"); 11 } 12 }
6.配置消息消費者測試類
1 @RunWith(SpringJUnit4ClassRunner.class) 2 @ContextConfiguration(locations = "classpath:consumer-queue-acknowledge.xml") 3 public class QueueConsumerAcknowledgeTest { 4 @Test 5 public void queueAcknowledgeReceiveTest(){ 6 try { 7 // 這里是為了使消息監聽持續進行 8 System.in.read(); 9 } catch (IOException e) { 10 e.printStackTrace(); 11 } 12 } 13 }
7.執行測試
首先開啟消息消費者的測試類中的測試方法,然后開啟消息生產者的測試類中的測試方法
控制台打印如下:
也就是在消息接收發生異常的情況下,消息的確認機制讓消息生產者再次重發了6次消息,這個也是消息中間件默認的重發次數,我們可以通過我在consumer配置文件中的注釋掉的activeMQRedeliveryPolicy這個bean來設置重發的次數。
方案二:開啟事務管理
消息的生產者QueueProducer類、配置文件producer-queue.xml、生產者測試類QueueProducerTest繼續沿用上面方案一中的,其他的配置如下:
1.消息的監聽類MyMessageListenerQueueTransaction
1 public class MyMessageListenerQueueTransaction implements SessionAwareMessageListener { 2 3 @Override 4 public void onMessage(Message message, Session session) throws JMSException { 5 6 // 為了在點對點模式情況下記錄消息發送的次數 7 System.out.println(System.currentTimeMillis()+"請接收開啟了事務管理機制的消息"); 8 9 try { 10 if (1 == 1) { 11 throw new RuntimeException("出異常了"); 12 } 13 TextMessage textMessage = (TextMessage) message; 14 System.out.println(textMessage); 15 System.out.println("queue模式接收到新消息" + textMessage.getText()); 16 17 session.commit();// 接收成功,提交事務 18 } catch (JMSException e) { 19 session.rollback();// 發生異常,消息回滾重新發送 20 e.printStackTrace(); 21 } 22 } 23 }
2.配置了事務管理的消費者配置文件
— —這里主要的不同的是第54行以及58-60行,配置了事務管理相關的內容
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:amq="http://activemq.apache.org/schema/core" 6 xmlns:jms="http://www.springframework.org/schema/jms" 7 xsi:schemaLocation="http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context.xsd"> 11 12 <!--包掃描--> 13 <context:component-scan base-package="com.buwei"></context:component-scan> 14 15 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 16 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 17 <property name="brokerURL" value="tcp://192.168.25.128:61616"/> 18 <!--配置消息重發的是一些設置--> 19 <!--<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>--> 20 </bean> 21 22 <!--對消息重發進行屬性設置的bean--> 23 <!--<bean id="activeMQRedeliveryPolicy" class = "org.apache.activemq.RedeliveryPolicy"> 24 <property name="maximumRedeliveries" value="2"></property> 25 </bean>--> 26 27 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 28 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 29 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 30 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 31 </bean> 32 33 <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> 34 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 35 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 36 <property name="connectionFactory" ref="connectionFactory"/> 37 </bean> 38 39 <!--這個是消息目的地,點對點的 文本信息--> 40 <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> 41 <constructor-arg value="jms-queue"/> 42 </bean> 43 44 <!--我的監聽類--> 45 <bean id="myMessageListenerQueue" class="com.buwei.MyMessageListenerQueueTransaction"></bean> 46 47 <!--消息監聽容器--> 48 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" > 49 <property name="connectionFactory" ref="connectionFactory" /> 50 <property name="destination" ref="queueTextDestination" /> 51 <property name="messageListener" ref="myMessageListenerQueue" /> 52 53 <!--設置開啟事務管理--> 54 <property name="transactionManager" ref="transactionManager"/> 55 </bean> 56 57 <!--配置事務管理的bean--> 58 <bean id="transactionManager" class = "org.springframework.jms.connection.JmsTransactionManager"> 59 <property name="connectionFactory" ref="connectionFactory"/> 60 </bean> 61 </beans>
3.配置了事務管理的消息消費者的測試類
1 @RunWith(SpringJUnit4ClassRunner.class) 2 @ContextConfiguration(locations = "classpath:consumer-queue-transaction.xml") 3 public class QueueConsumerTransactionTest { 4 5 @Test 6 public void queueTransactionReceiveTest(){ 7 try { // 這里是為了使消息監聽持續進行 8 System.in.read(); 9 } catch (IOException e) { 10 e.printStackTrace(); 11 } 12 } 13 }
4.執行測試
同樣的先開啟消費端的測試類中的方法,再開啟生產者的測試類中的測試方法,控制台打印如下:
一樣的,在默認情況下,如果發生異常,消息會回滾6次。
總結:消息的重發可以通過設置消息確認機制或者事務管理的方式來實現,系統默認的可重發次數是6次,加上原來的1次總共是發送7次。可以通過配置RedeliveryPolicy類來修改默認值。
問題二解決方案如下:
在消費端開啟持久的消息訂閱服務,主要的也是在消費者的配置文件中進行配置
1.消息的生產者TopicProducer類
1 @Component 2 public class TopicProducer { 3 @Autowired 4 private JmsTemplate jmsTemplate; 5 6 @Autowired // 注意Destination是javax.jms.Destination; 7 private Destination topicTextDestination; 8 9 /** 10 * 發布訂閱方式發送 11 * @param message 12 */ 13 public void sendTestMessage(final String message){ 14 jmsTemplate.send(topicTextDestination, new MessageCreator() { 15 @Override 16 public Message createMessage(Session session) throws JMSException { 17 return session.createTextMessage(message); 18 } 19 }); 20 } 21 }
2.消息的監聽類MyMessageListenerTopic
1 public class MyMessageListenerTopic implements MessageListener { 2 3 @Override 4 public void onMessage(Message message) { 5 TextMessage textMessage = (TextMessage)message; 6 try { 7 System.out.println("topic模式接收到新消息"+textMessage.getText()); 8 } catch (JMSException e) { 9 e.printStackTrace(); 10 } 11 } 12 }
3.消息的生產者的配置文件producer-topic.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:amq="http://activemq.apache.org/schema/core" 6 xmlns:jms="http://www.springframework.org/schema/jms" 7 xsi:schemaLocation="http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context.xsd"> 11 12 <!--包掃描--> 13 <context:component-scan base-package="com.buwei"></context:component-scan> 14 15 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 16 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 17 <property name="brokerURL" value="tcp://192.168.25.128:61616"/> 18 </bean> 19 20 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 21 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 22 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 23 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 24 </bean> 25 26 <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> 27 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 28 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 29 <property name="connectionFactory" ref="connectionFactory"/> 30 </bean> 31 32 <!--這個是發布、訂閱模式 文本信息--> 33 <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic"> 34 <constructor-arg value="jms-topic"/> 35 </bean> 36 37 </beans>
4.消息的消費者的配置文件consumer-topic.xml
主要在第25行以及第49行的配置,聲明消息的訂閱者的id,以及將訂閱者id加入到到消息的監聽容器中的持久化主體訂閱者中
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:amq="http://activemq.apache.org/schema/core" 6 xmlns:jms="http://www.springframework.org/schema/jms" 7 xsi:schemaLocation="http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context.xsd"> 11 12 <!--包掃描--> 13 <context:component-scan base-package="com.buwei"></context:component-scan> 14 15 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 16 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 17 <property name="brokerURL" value="tcp://192.168.25.128:61616"/> 18 </bean> 19 20 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 21 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 22 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 23 <property name="sessionCacheSize" value="100"/> 24 <!--聲明消息的訂閱者的id--> 25 <property name="clientId" value="buwei"/> 26 </bean> 27 28 <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> 29 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 30 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 31 <property name="connectionFactory" ref="connectionFactory"/> 32 </bean> 33 34 <!--這個消息目的地,發布訂閱的,文本信息--> 35 <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic"> 36 <constructor-arg value="jms-topic"/> 37 </bean> 38 39 <!--我的監聽類--> 40 <bean id="myMessageListenerTopic" class="com.buwei.MyMessageListenerTopic"></bean> 41 42 <!--消息監聽容器--> 43 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 44 <property name="connectionFactory" ref="connectionFactory" /> 45 <property name="destination" ref="topicTextDestination" /> 46 <property name="messageListener" ref="myMessageListenerTopic" /> 47 48 <!--指明持久化消息的訂閱者的名稱,對應connectionFactory中的clientId--> 49 <property name="durableSubscriptionName" value="buwei"></property> 50 </bean> 51 </beans>
5.生產者的測試類TopicProducerTest
1 @RunWith(SpringJUnit4ClassRunner.class) 2 @ContextConfiguration(locations= "classpath:producer-topic.xml ") 3 public class TopicProducerTest { 4 @Autowired 5 private TopicProducer topicProducer; 6 7 @Test 8 public void topicSendTest(){ 9 topicProducer.sendTestMessage("SpringJms-topic模式,吃好了"); 10 } 11 }
6.消費者的測試類TopicConsumerTest
1 @RunWith(SpringJUnit4ClassRunner.class) 2 @ContextConfiguration(locations = "classpath:consumer-topic.xml") 3 public class TopicConsumerTest { 4 5 @Test 6 public void topicReceiveTest(){ 7 try { 8 System.in.read(); 9 } catch (IOException e) { 10 e.printStackTrace(); 11 } 12 } 13 }
7.開啟測試
這里為了實現持久化,我們需要先運行消息的消費者測試類中的方法以實現消息的訂閱,然后停止消費者的方法,再來執行兩次生產者測試類中的方法,再次運行消費者測試類中的方法,控制台打印如下:
我們可以發現在實現了訂閱機制之后,即使消費者宕機,只要再上線仍然可以收到在宕機期間生產者發送的消息。
但是如果沒有實現訂閱的話那在Topic模式下進行的就是廣播形式,即生產者發送消息時,消費端在線即可收到消息,如果錯過了就是一輩子。。。。
補充:
死信隊列:沒有被我們正常消費的消息就會存入到死信隊列當中。
activeMQ中的消息數據其實都是存放在文件中的,可以通過修改activeMQ安裝目錄config下的activemq.xml進行配置。例如如果我們需要重新發送死信隊列當中的消息就可以重新進行讀取來進行重發。
當然這里只是簡單的介紹一下,死信隊列的用處如果有了解的話希望大家能夠補充。:)
查看了一下網上的一些資源發現解決問題的配置很多種,這里僅作為自己學習的一種記錄,大家共勉。