ActiveMq數據提交不成功、持久化訂閱


ActiveMq 存在兩種通信方式:點對點、發布訂閱
如果是點對點模式,此消息會默認保存在服務端,直到有消費者將其消費掉。所以此時消息不會丟失。

如果是發布訂閱的通信方式,默認情況下只通知一次,如果沒有接收到此消息就沒有了。這種場景只適合消息送達率要求不高的情況。
1、如果要求消息必須送達,不可以丟失的話,需要配置持久化訂閱。
2、每個訂閱端定義一個id,在訂閱時向activeMq注冊,發布消息和接收消息需要配置發送模式為持久化。
此時如果客戶端接收不到消息的話,消息會持久化在服務端,直到客戶端接收后為止。

消息生產者的配置:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
                  http://www.springframework.org/schema/beans/spring-beans-3.2.xsd  
                  http://www.springframework.org/schema/context  
                  http://www.springframework.org/schema/context/spring-context-3.2.xsd  
                  http://www.springframework.org/schema/aop   
                  http://www.springframework.org/schema/aop/spring-aop.xsd        
                  http://www.springframework.org/schema/task  
                 http://www.springframework.org/schema/task/spring-task-3.2.xsd  
                 http://www.springframework.org/schema/tx   
                 http://www.springframework.org/schema/tx/spring-tx-3.2.xsd">  
    <!--第三方工廠 -->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />  
        <property name="userName" value="admin"></property>  
        <property name="password" value="admin"></property>  
        <property name="useAsyncSend" value="true" />  
    </bean>  
    <!-- ActiveMQ為我們提供了一個PooledConnectionFactory,通過往里面注入一個ActiveMQConnectionFactory   
        可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴於 activemq-pool包 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  
        <property name="connectionFactory" ref="targetConnectionFactory" />  
        <property name="maxConnections" value="100" />  
    </bean>  
  
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory"  
        class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory" />  
    </bean>  
  
    <!-- topic目的地配置,其實不管是topic還是queue則他們的底層實現不同但是通過封裝api就差不多了,而在spring中更是簡單 -->  
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg index="0" value="spring-topic" />  
    </bean>  
  
    <!-- spring 使用jmsTemplate來實現消息的發送和接受 -->  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <property name="connectionFactory" ref="connectionFactory"></property>  
        <property name="defaultDestination" ref="destinationTopic"></property>  
        <!-- deliveryMode, priority, timeToLive 的開關,要生效,必須配置explicitQosEnabled為true,默認false-->  
        <property name="explicitQosEnabled" value="true" />  
        <!-- 進行持久化 -->  
        <!-- 發送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->      
        <property name="deliveryMode" value="2" />  
    </bean>  
</beans>  


消息消費者1:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
                  http://www.springframework.org/schema/beans/spring-beans-3.2.xsd  
                  http://www.springframework.org/schema/context  
                  http://www.springframework.org/schema/context/spring-context-3.2.xsd  
                  http://www.springframework.org/schema/aop   
                  http://www.springframework.org/schema/aop/spring-aop.xsd        
                  http://www.springframework.org/schema/task  
                 http://www.springframework.org/schema/task/spring-task-3.2.xsd  
                 http://www.springframework.org/schema/tx   
                 http://www.springframework.org/schema/tx/spring-tx-3.2.xsd">  
    <!--第三方工廠 -->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />  
        <property name="userName" value="admin"></property>  
        <property name="password" value="admin"></property>  
        <property name="useAsyncSend" value="true" />  
    </bean>  
    <!-- ActiveMQ為我們提供了一個PooledConnectionFactory,通過往里面注入一個ActiveMQConnectionFactory   
        可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴於 activemq-pool包 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  
        <property name="connectionFactory" ref="targetConnectionFactory" />  
        <property name="maxConnections" value="100" />  
    </bean>  
  
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory"  
        class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!--消費者標示id -->  
        <property name="clientId" value="clientId_001" />  
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory" />  
    </bean>  
  
  
    <!-- topic目的地配置,其實不管是topic還是queue則他們的底層實現不同但是通過封裝api就差不多了,而在spring中更是簡單 -->  
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg index="0" value="spring-topic" />  
    </bean>  
  
    <!--消息消費者監聽類 -->  
    <bean id="myMessageListener" class="springs.activemq.Service.MyMessageListener" />  
    <!--監聽容器的配置 -->  
    <bean id="myListenerContainer"(可以不必寫id)
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <!--消息目的地 -->  
        <property name="destination" ref="destinationTopic" />  
        <!--消息監聽類 -->  
        <property name="messageListener" ref="myMessageListener" />  
        <!-- 發布訂閱模式 -->  
        <property name="pubSubDomain" value="true" />  
        <!-- 消息持久化值設置為true -->  
        <property name="subscriptionDurable" value="true" />  
        <!--消息接收超時 -->  
        <property name="receiveTimeout" value="10000" />  
        <!-- 接收者ID -->  
        <property name="clientId" value="clientId_001" />  
        <property name="durableSubscriptionName" value="clientId_001" />  
    </bean>  
</beans>  


消費者2:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
                  http://www.springframework.org/schema/beans/spring-beans-3.2.xsd  
                  http://www.springframework.org/schema/context  
                  http://www.springframework.org/schema/context/spring-context-3.2.xsd  
                  http://www.springframework.org/schema/aop   
                  http://www.springframework.org/schema/aop/spring-aop.xsd        
                  http://www.springframework.org/schema/task  
                 http://www.springframework.org/schema/task/spring-task-3.2.xsd  
                 http://www.springframework.org/schema/tx   
                 http://www.springframework.org/schema/tx/spring-tx-3.2.xsd">  
    <!--第三方工廠 -->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />  
        <property name="userName" value="admin"></property>  
        <property name="password" value="admin"></property>  
        <property name="useAsyncSend" value="true" />  
    </bean>  
    <!-- ActiveMQ為我們提供了一個PooledConnectionFactory,通過往里面注入一個ActiveMQConnectionFactory   
        可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴於 activemq-pool包 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  
        <property name="connectionFactory" ref="targetConnectionFactory" />  
        <property name="maxConnections" value="100" />  
    </bean>  
  
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory"  
        class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!--消費者標示id -->  
        <property name="clientId" value="clientId_001" />  
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory" />  
    </bean>  
  
  
    <!-- topic目的地配置,其實不管是topic還是queue則他們的底層實現不同但是通過封裝api就差不多了,而在spring中更是簡單 -->  
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg index="0" value="spring-topic" />  
    </bean>  
  
    <!--消息消費者監聽類 -->  
    <bean id="myMessageListener" class="springs.activemq.Service.MyMessageListener" />  
    <!--監聽容器的配置 -->  
    <bean id="myListenerContainer"(可以不必寫id)
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <!--消息目的地 -->  
        <property name="destination" ref="destinationTopic" />  
        <!--消息監聽類 -->  
        <property name="messageListener" ref="myMessageListener" />  
        <!-- 發布訂閱模式 -->  
        <property name="pubSubDomain" value="true" />  
        <!-- 消息持久化值設置為true -->  
        <property name="subscriptionDurable" value="true" />  
        <!--消息接收超時 -->  
        <property name="receiveTimeout" value="10000" />  
        <!-- 接收者ID -->  
        <property name="clientId" value="clientId_002" />  
        <property name="durableSubscriptionName" value="clientId_002" />  
    </bean>  
</beans>  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM