這次真的忽略了一些ActiveMQ內心的嬌艷


好久沒總結了,內心有點空虛了,所以今天主要給園里的朋友們分享一點兒這幾天使用ActiveMQ過程中踩過的小坑,雖然說這東西簡單易用,代碼幾行配置也就幾行,問題不大但是后果有點嚴重,所以就要必要總結一下了。

首先ActiveMQ有倆種消息隊列模式:點對點和發布訂閱,這倆種都有不可替代的應用場景,前者適用於消息唯一傳遞的業務,后者適用於分布式環境下進行多面數據同步的操作。

其次一些關於它的官方簡介和安裝步驟我就不占博客園數據庫的內存了,寫了也沒啥鳥用,用爛的朋友想要提取點兒精華,沒接觸過的朋友請先安裝一個玩玩點對點和發布訂閱模式吧(http://www.cnblogs.com/1315925303zxz/p/6377551.html),理解一下這倆種機制的區別和出現消息臨界值時的特性,我下面也放一些我前期用於測試的Demo,其中總結了一些他們二者的主要區別,都是實戰中必須要考慮的因素可以參考:

假設:存在一個消息生產者、多個消費者,分別在點對點和發布訂閱模式下進行消息獲取,當出現消息臨界值的時候都有什么現象?這些需要朋友你自己體會,我能做的只有送上代碼供各位測試了。

消息生產者

 1 public class ProducerDemo2 {
 2     
 3     private static Random r = new Random();
 4 
 5     public static void main(String[] args) {
 6         ConnectionFactory connectionFactory; // 連接工廠
 7         Connection connection = null; // 連接
 8         Session session; // 會話 接受或者發送消息的線程
 9         Destination destination; // 消息的目的地
10         MessageProducer messageProducer;//消息生產者
11         // 實例化連接工廠
12         connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://10.0.40.73:61616");    //用戶名、密碼、連接地址
13         try {
14             // 通過連接工廠獲取連接
15             connection = connectionFactory.createConnection();
16             // 啟動連接
17             connection.start();
18             // 創建session
19             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);        //true:支持事務    false:不支持事務
20             // 創建/選擇一個消息隊列
21             destination = session.createQueue("BMW");    //點對點
22             //destination = session.createTopic("ToyotaYQ");    //發布訂閱
23             //創建消息生產者
24             messageProducer = session.createProducer(destination);
25             //創建一條文本消息 
26             int num = r.nextInt(100);
27             TextMessage message = session.createTextMessage("ActiveMQ 生產者2發送消息"+num);
28             System.out.println("生產者2發送消息:"+num);
29             //通過消息生產者發出消息 
30             messageProducer.send(message);
31             //提交
32             session.commit();
33         } catch (JMSException e) {
34             e.printStackTrace();
35         }
36     }
37 }

消息消費者(可以copy多個進行消息爭搶測試)

 1 public class ConsumerDemo2 {
 2 
 3     // private static final String USERNAME =
 4     // ActiveMQConnection.DEFAULT_USER;//默認連接用戶名(amdin)
 5     // private static final String PASSWORD =
 6     // ActiveMQConnection.DEFAULT_PASSWORD;//默認連接密碼(admin)
 7     // private static final String BROKEURL =
 8     // ActiveMQConnection.DEFAULT_BROKER_URL;//默認連接地址(tcp://localhost:61616)
 9 
10     public static void main(String[] args) {
11         ConnectionFactory connectionFactory; // 連接工廠
12         Connection connection = null; // 連接
13         Session session; // 會話 接受或者發送消息的線程
14         Destination destination; // 消息的目的地
15         MessageConsumer messageConsumer; // 消息的消費者
16         // 實例化連接工廠
17         connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://10.0.40.73:61616");    //用戶名、密碼、連接地址
18         try {
19             // 通過連接工廠獲取連接
20             connection = connectionFactory.createConnection();
21             // 啟動連接
22             connection.start();
23             // 創建session
24             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
25             // 創建/選擇一個消息隊列
26             destination = session.createQueue("BMW");            //點對點模式
27             //destination = session.createTopic("ToyotaYQ");    //發布訂閱模式
28             // 創建消息消費者
29             messageConsumer = session.createConsumer(destination);
30 
31             while (true) {
32                  //設置消費者接收消息的時間(3s)
33                 TextMessage textMessage = (TextMessage)messageConsumer.receive(3000);
34                 if(textMessage == null){
35                     System.out.println("沒有消息");
36                 }else{
37                     System.out.println(textMessage.getText());
38                     //System.exit(0);
39                 }
40             }
41         } catch (JMSException e) {
42             e.printStackTrace();
43         }
44     }
45 }

注意事項:

1、生產者發送消息時必須支持事務,就是創建消息會話的時候設置為true,否則會出現“javax.jms.IllegalStateException: Not a transacted session”異常。消費者可以不支持。

2、點對點模式下,同一時刻只能有一個消費者從隊列中獲取消息內容,如果存在多個消費者,則會出現消息爭搶現象直到消息全部搶完,處於阻塞狀態,如果再有消息被放進來時,接着會進行爭搶,但是只會有一個消費者獲取到消息,不會出現多個消費者搶到消息的情況。

3、點對點模式下,生產者發送消息時,消費者可以處於離線狀態,當消費者再次運行時可以接收到歷史消息;但是在發布訂閱模式下,消費者必須處於運行狀態獲取消息,歷史消息也是不會被獲取到的。

 

實戰上線后踩過的坑以及解決方案:

1、用戶訂單入庫成功后發送到MQ中的訂單消息丟失,出現處理訂單遺漏的情況?

解決方案1:打開消息持久開關。

因為Activemq支持兩種消息傳送模式:

PERSISTENT (持久消息)該模式是activemq默認的傳送方式,此模式下可以保證消息只會被成功傳送一次和成功使用一次,消息具有可靠性。在消息傳遞到目標消費者,在消費者沒有成功應答前,消息不會丟失。所以很自然的,需要一個地方來持久性存儲。如果消息消費者在進行消費過程發生失敗,則消息會被再次投遞;

 NON_PERSISTENT(非持久消息)該模式適用於消息不重要的,可以接受消息丟失的哪一類消息,這種消息只會被投遞一次,消息不會在持久性存儲中存儲,也不會保證消息丟失后的重新投遞。

與spring整合使用ActiveMQ配置文件如下:

  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!-- deliveryMode, priority, timeToLive 的開關,要生效,必須配置為true,默認false-->
        <property name="explicitQosEnabled" value="true" />  
        <!-- 【發送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久】--> 
        <property name="deliveryMode" value="2" /> 
    </bean>

解決方案2:設置消息重發機制。

ActiveMQ針對消息丟失情況提供了消息重發機制,假設消息發送失敗,為了解決這一尷尬局面,我們可以在實際項目中配置消息重發機制,以防萬一。

  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.136.139:61616"/>
        <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />  <!-- 引用重發機制 --> 
    </bean>
    
    <!-- ActiveMQ消息發送失敗后的重發機制 -->
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">  
        <!--是否在每次嘗試重新發送失敗后,增長這個等待時間 -->  
        <property name="useExponentialBackOff" value="true"></property>  
        <!--重發次數,默認為6次   這里設置為1次 -->  
        <property name="maximumRedeliveries" value="1"></property>  
        <!--重發時間間隔,默認為1秒 -->  
        <property name="initialRedeliveryDelay" value="1000"></property>  
        <!--第一次失敗后重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是value -->  
        <property name="backOffMultiplier" value="2"></property>  
        <!--最大傳送延遲,只在useExponentialBackOff為true時有效(V5.5),假設首次重連間隔為10ms,倍數為2,那么第   
                                   二次重連時間間隔為 20ms,第三次重連時間間隔為40ms,當重連時間間隔大的最大重連時間間隔時,以后每次重連時間間隔都為最大重連時間間隔。 -->  
        <property name="maximumRedeliveryDelay" value="1000"></property>  
    </bean>

 

前天任務調度服務上線后,出現消息丟失的情況,根據以上解決方案是否能夠根治這種情況,本人也是不太能夠保證,希望稍后能在留言區得到各位老兄的幫助。

 

我的博客即將搬運同步至騰訊雲+社區,邀請大家一同入駐:https://cloud.tencent.com/developer/support-plan

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM