ActiveMQ可以和spring很好的集成,下面我們來看看,如何做個集成的demo。
(1)pom.xml引入相關jar
<!-- spring相關 begin --> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.1.5.RELEASE</version> </dependency> <!-- spring相關 end --> <!-- activeMQ相關 begin--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.4.RELEASE</version> </dependency> <!-- activeMQ相關 end-->
(2)添加生產者配置activemq-sender.xml
<description>JMS發布者應用配置</description> <!-- CachingConnectionFactory 連接工廠 (有緩存功能)--> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session緩存數量 --> <property name="sessionCacheSize" value="20" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 賬戶名 密碼--> <property name="brokerURL" value="tcp://192.168.56.129:61616" /> <property name="userName" value="parry" /> <property name="password" value="parry123" /> <!-- 是否異步發送 --> <property name="useAsyncSend" value="true"/> </bean> </property> </bean> <!-- 接收消息的目的地(一個主題)點對點隊列 --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息主題的名字 --> <constructor-arg index="0" value="messages" /> </bean> <!-- 接收配置JMS模版 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <property name="defaultDestination" ref="destination" /> <!-- value為true為發布/訂閱模式; value為false為點對點模式--> <property name="pubSubDomain" value="false"/> </bean>
(3)添加消費者配置activemq-consumer.xml
<description>JMS訂閱者應用配置</description> <!-- CachingConnectionFactory 連接工廠 (有緩存功能)--> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session緩存數量 --> <property name="sessionCacheSize" value="20" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 賬戶名 密碼--> <property name="brokerURL" value="tcp://192.168.56.129:61616" /> <property name="userName" value="parry" /> <property name="password" value="parry123" /> <!-- 是否異步發送 --> <property name="useAsyncSend" value="true"/> </bean> </property> </bean> <!-- 接收消息的目的地(一個主題)點對點隊列 --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息主題的名字 --> <constructor-arg index="0" value="messages" /> </bean> <!-- 消費者配置 (自己定義) --> <bean id="consumer" class="com.parry.MQ.funcion.Listener" /> <!-- 消息監聽容器 --> <bean id="myListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <property name="destination" ref="destination" /> <property name="messageListener" ref="consumer" /> <!-- 如果消息的接收速率,大於消息處理的速率時,可以采取線程池方式 --> <property name="taskExecutor" ref="queueMessageExecutor"/> <!-- 設置固定的線程數 --> <property name="concurrentConsumers" value="30"/> <!-- 設置動態的線程數 --> <property name="concurrency" value="20-50"/> <!-- 設置最大的線程數 --> <property name="maxConcurrentConsumers" value="80"/> </bean> <bean id="queueMessageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="30" /> <property name="maxPoolSize" value="80" /> <property name="daemon" value="true" /> <property name="keepAliveSeconds" value="120" /> </bean>
(4)新建一個發送消息的方法
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; /** * 發送消息 * @author Administrator * */ @Component public class QueueSender { @Autowired private JmsTemplate myJmsTemplate; /** * 發送一條消息到指定的隊列(目標) * * @param queueName * 隊列名稱 * @param message * 消息內容 */ public void send(String queueName, final String message) { myJmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
(5)添加監聽器
package com.parry.MQ.funcion; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 接收者監聽類 * @author Administrator * */ public class Listener implements MessageListener { public void onMessage(Message message) { // 業務處理 try { TextMessage message2 = (TextMessage) message; System.out.println("接收到信息:" + message2.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
(6)寫個一請求測試一下
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import com.parry.MQ.funcion.QueueSender; @Controller public class App { @Autowired private QueueSender sender; @RequestMapping("test") @ResponseBody public String Test() { sender.send("messages", "你好,這是我的第一條消息!"); return "Hello world"; } }
(7)測試結果
在ActiveMQ的管理后台,我們也能看到我們的消息(這里我多測試了幾次):