ActiveMq数据提交不成功、持久化订阅


ActiveMq 存在两种通信方式:点对点、发布订阅
如果是点对点模式,此消息会默认保存在服务端,直到有消费者将其消费掉。所以此时消息不会丢失。

如果是发布订阅的通信方式,默认情况下只通知一次,如果没有接收到此消息就没有了。这种场景只适合消息送达率要求不高的情况。
1、如果要求消息必须送达,不可以丢失的话,需要配置持久化订阅。
2、每个订阅端定义一个id,在订阅时向activeMq注册,发布消息和接收消息需要配置发送模式为持久化。
此时如果客户端接收不到消息的话,消息会持久化在服务端,直到客户端接收后为止。

消息生产者的配置:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
                  http://www.springframework.org/schema/beans/spring-beans-3.2.xsd  
                  http://www.springframework.org/schema/context  
                  http://www.springframework.org/schema/context/spring-context-3.2.xsd  
                  http://www.springframework.org/schema/aop   
                  http://www.springframework.org/schema/aop/spring-aop.xsd        
                  http://www.springframework.org/schema/task  
                 http://www.springframework.org/schema/task/spring-task-3.2.xsd  
                 http://www.springframework.org/schema/tx   
                 http://www.springframework.org/schema/tx/spring-tx-3.2.xsd">  
    <!--第三方工厂 -->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />  
        <property name="userName" value="admin"></property>  
        <property name="password" value="admin"></property>  
        <property name="useAsyncSend" value="true" />  
    </bean>  
    <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory   
        可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  
        <property name="connectionFactory" ref="targetConnectionFactory" />  
        <property name="maxConnections" value="100" />  
    </bean>  
  
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory"  
        class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory" />  
    </bean>  
  
    <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->  
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg index="0" value="spring-topic" />  
    </bean>  
  
    <!-- spring 使用jmsTemplate来实现消息的发送和接受 -->  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <property name="connectionFactory" ref="connectionFactory"></property>  
        <property name="defaultDestination" ref="destinationTopic"></property>  
        <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置explicitQosEnabled为true,默认false-->  
        <property name="explicitQosEnabled" value="true" />  
        <!-- 进行持久化 -->  
        <!-- 发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->      
        <property name="deliveryMode" value="2" />  
    </bean>  
</beans>  


消息消费者1:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
                  http://www.springframework.org/schema/beans/spring-beans-3.2.xsd  
                  http://www.springframework.org/schema/context  
                  http://www.springframework.org/schema/context/spring-context-3.2.xsd  
                  http://www.springframework.org/schema/aop   
                  http://www.springframework.org/schema/aop/spring-aop.xsd        
                  http://www.springframework.org/schema/task  
                 http://www.springframework.org/schema/task/spring-task-3.2.xsd  
                 http://www.springframework.org/schema/tx   
                 http://www.springframework.org/schema/tx/spring-tx-3.2.xsd">  
    <!--第三方工厂 -->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />  
        <property name="userName" value="admin"></property>  
        <property name="password" value="admin"></property>  
        <property name="useAsyncSend" value="true" />  
    </bean>  
    <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory   
        可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  
        <property name="connectionFactory" ref="targetConnectionFactory" />  
        <property name="maxConnections" value="100" />  
    </bean>  
  
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory"  
        class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!--消费者标示id -->  
        <property name="clientId" value="clientId_001" />  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory" />  
    </bean>  
  
  
    <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->  
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg index="0" value="spring-topic" />  
    </bean>  
  
    <!--消息消费者监听类 -->  
    <bean id="myMessageListener" class="springs.activemq.Service.MyMessageListener" />  
    <!--监听容器的配置 -->  
    <bean id="myListenerContainer"(可以不必写id)
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <!--消息目的地 -->  
        <property name="destination" ref="destinationTopic" />  
        <!--消息监听类 -->  
        <property name="messageListener" ref="myMessageListener" />  
        <!-- 发布订阅模式 -->  
        <property name="pubSubDomain" value="true" />  
        <!-- 消息持久化值设置为true -->  
        <property name="subscriptionDurable" value="true" />  
        <!--消息接收超时 -->  
        <property name="receiveTimeout" value="10000" />  
        <!-- 接收者ID -->  
        <property name="clientId" value="clientId_001" />  
        <property name="durableSubscriptionName" value="clientId_001" />  
    </bean>  
</beans>  


消费者2:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
                  http://www.springframework.org/schema/beans/spring-beans-3.2.xsd  
                  http://www.springframework.org/schema/context  
                  http://www.springframework.org/schema/context/spring-context-3.2.xsd  
                  http://www.springframework.org/schema/aop   
                  http://www.springframework.org/schema/aop/spring-aop.xsd        
                  http://www.springframework.org/schema/task  
                 http://www.springframework.org/schema/task/spring-task-3.2.xsd  
                 http://www.springframework.org/schema/tx   
                 http://www.springframework.org/schema/tx/spring-tx-3.2.xsd">  
    <!--第三方工厂 -->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />  
        <property name="userName" value="admin"></property>  
        <property name="password" value="admin"></property>  
        <property name="useAsyncSend" value="true" />  
    </bean>  
    <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory   
        可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  
        <property name="connectionFactory" ref="targetConnectionFactory" />  
        <property name="maxConnections" value="100" />  
    </bean>  
  
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory"  
        class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!--消费者标示id -->  
        <property name="clientId" value="clientId_001" />  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory" />  
    </bean>  
  
  
    <!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->  
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg index="0" value="spring-topic" />  
    </bean>  
  
    <!--消息消费者监听类 -->  
    <bean id="myMessageListener" class="springs.activemq.Service.MyMessageListener" />  
    <!--监听容器的配置 -->  
    <bean id="myListenerContainer"(可以不必写id)
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <!--消息目的地 -->  
        <property name="destination" ref="destinationTopic" />  
        <!--消息监听类 -->  
        <property name="messageListener" ref="myMessageListener" />  
        <!-- 发布订阅模式 -->  
        <property name="pubSubDomain" value="true" />  
        <!-- 消息持久化值设置为true -->  
        <property name="subscriptionDurable" value="true" />  
        <!--消息接收超时 -->  
        <property name="receiveTimeout" value="10000" />  
        <!-- 接收者ID -->  
        <property name="clientId" value="clientId_002" />  
        <property name="durableSubscriptionName" value="clientId_002" />  
    </bean>  
</beans>  

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM