ActiveMq 存在兩種通信方式:點對點、發布訂閱
如果是點對點模式,此消息會默認保存在服務端,直到有消費者將其消費掉。所以此時消息不會丟失。
如果是發布訂閱的通信方式,默認情況下只通知一次,如果沒有接收到此消息就沒有了。這種場景只適合消息送達率要求不高的情況。
1、如果要求消息必須送達,不可以丟失的話,需要配置持久化訂閱。
2、每個訂閱端定義一個id,在訂閱時向activeMq注冊,發布消息和接收消息需要配置發送模式為持久化。
此時如果客戶端接收不到消息的話,消息會持久化在服務端,直到客戶端接收后為止。
消息生產者的配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd"> <!--第三方工廠 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> <property name="userName" value="admin"></property> <property name="password" value="admin"></property> <property name="useAsyncSend" value="true" /> </bean> <!-- ActiveMQ為我們提供了一個PooledConnectionFactory,通過往里面注入一個ActiveMQConnectionFactory 可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴於 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="100" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <!-- topic目的地配置,其實不管是topic還是queue則他們的底層實現不同但是通過封裝api就差不多了,而在spring中更是簡單 --> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-topic" /> </bean> <!-- spring 使用jmsTemplate來實現消息的發送和接受 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="defaultDestination" ref="destinationTopic"></property> <!-- deliveryMode, priority, timeToLive 的開關,要生效,必須配置explicitQosEnabled為true,默認false--> <property name="explicitQosEnabled" value="true" /> <!-- 進行持久化 --> <!-- 發送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久--> <property name="deliveryMode" value="2" /> </bean> </beans>
消息消費者1:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd"> <!--第三方工廠 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> <property name="userName" value="admin"></property> <property name="password" value="admin"></property> <property name="useAsyncSend" value="true" /> </bean> <!-- ActiveMQ為我們提供了一個PooledConnectionFactory,通過往里面注入一個ActiveMQConnectionFactory 可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴於 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="100" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!--消費者標示id --> <property name="clientId" value="clientId_001" /> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <!-- topic目的地配置,其實不管是topic還是queue則他們的底層實現不同但是通過封裝api就差不多了,而在spring中更是簡單 --> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-topic" /> </bean> <!--消息消費者監聽類 --> <bean id="myMessageListener" class="springs.activemq.Service.MyMessageListener" /> <!--監聽容器的配置 --> <bean id="myListenerContainer"(可以不必寫id) class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <!--消息目的地 --> <property name="destination" ref="destinationTopic" /> <!--消息監聽類 --> <property name="messageListener" ref="myMessageListener" /> <!-- 發布訂閱模式 --> <property name="pubSubDomain" value="true" /> <!-- 消息持久化值設置為true --> <property name="subscriptionDurable" value="true" /> <!--消息接收超時 --> <property name="receiveTimeout" value="10000" /> <!-- 接收者ID --> <property name="clientId" value="clientId_001" /> <property name="durableSubscriptionName" value="clientId_001" /> </bean> </beans>
消費者2:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd"> <!--第三方工廠 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> <property name="userName" value="admin"></property> <property name="password" value="admin"></property> <property name="useAsyncSend" value="true" /> </bean> <!-- ActiveMQ為我們提供了一個PooledConnectionFactory,通過往里面注入一個ActiveMQConnectionFactory 可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴於 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="100" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!--消費者標示id --> <property name="clientId" value="clientId_001" /> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <!-- topic目的地配置,其實不管是topic還是queue則他們的底層實現不同但是通過封裝api就差不多了,而在spring中更是簡單 --> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-topic" /> </bean> <!--消息消費者監聽類 --> <bean id="myMessageListener" class="springs.activemq.Service.MyMessageListener" /> <!--監聽容器的配置 --> <bean id="myListenerContainer"(可以不必寫id) class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <!--消息目的地 --> <property name="destination" ref="destinationTopic" /> <!--消息監聽類 --> <property name="messageListener" ref="myMessageListener" /> <!-- 發布訂閱模式 --> <property name="pubSubDomain" value="true" /> <!-- 消息持久化值設置為true --> <property name="subscriptionDurable" value="true" /> <!--消息接收超時 --> <property name="receiveTimeout" value="10000" /> <!-- 接收者ID --> <property name="clientId" value="clientId_002" /> <property name="durableSubscriptionName" value="clientId_002" /> </bean> </beans>