1,首先在springmvc中使用activemq要先会配置 activamq, 先看看如下配置:
1-1, 首先在source文件夹新建一个,activemq.xml的配置文件,内容如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" 5 xmlns:jms="http://www.springframework.org/schema/jms" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 8 http://www.springframework.org/schema/context 9 http://www.springframework.org/schema/context/spring-context-4.0.xsd 10 http://www.springframework.org/schema/jms 11 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd 12 http://activemq.apache.org/schema/core 13 http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> 14 15 <context:property-placeholder location="classpath:datasource.properties" /> 16 <context:property-placeholder location="classpath:activeMQ.properties" /> 17 <!-- Spring Caching连接工厂 --> 18 <!-- 定义JmsTemplate的Topic类型 --> 19 <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> 20 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 21 <constructor-arg ref="connectionFactory" /> 22 <property name="connectionFactory" ref="jmsFactory" /> 23 <!-- 非pub/sub模型(发布/订阅),即队列模式 --> 24 <property name="pubSubDomain" value="true" /> 25 </bean> 26 27 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 28 <!-- <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 29 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory 30 <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 31 同上,同理 32 <constructor-arg ref="amqConnectionFactory" /> 33 Session缓存数量 34 <property name="sessionCacheSize" value="100" /> 35 </bean> --> 36 37 <!-- ActiveMQ 连接工厂 --> 38 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 39 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码 --> 40 <!-- <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> --> 41 42 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" 43 destroy-method="stop"> 44 <property name="maxConnections" value="${jms.maxConnections}" /> 45 <!-- FIX BUG: Reconnect fail(Session is closed) --> 46 <property name="idleTimeout" value="0" /> 47 <property name="connectionFactory" ref="connectionFactory" /> 48 </bean> 49 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 50 <property name="brokerURL" value="${jms.url}" /> 51 <property name="userName" value="${jms.userName}" /> 52 <property name="password" value="${jms.password}" /> 53 </bean> 54 55 <!-- Spring JmsTemplate 的消息生产者 start --> 56 <!-- 定义JmsTemplate的Queue类型 --> 57 <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> 58 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 59 <constructor-arg ref="connectionFactory" /> 60 <property name="connectionFactory" ref="jmsFactory" /> 61 <!-- 非pub/sub模型(发布/订阅),即队列模式 --> 62 <property name="pubSubDomain" value="false" /> 63 </bean> 64 <!-- Spring JmsTemplate 的消息生产者 end --> 65 66 67 <!-- 各个消息消费者 start --> 68 <!-- 定义Queue监听器 --> 69 <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> 70 <jms:listener destination="test.queue" ref="queueReceiver1"/> 71 <jms:listener destination="test.queue" ref="queueReceiver2"/> 72 </jms:listener-container> 73 <!-- Queue消费者的bean start --> 74 <bean id="queueReceiver1" class="com.eelink.ylt.mq.consumer.queue.QueueReceiver1"/> 75 <bean id="queueReceiver2" class="com.eelink.ylt.mq.consumer.queue.QueueReceiver2"/> 76 <!-- Queue消息消费者 end --> 77 78 79 <!-- Topic各个消费者的 start --> 80 <!-- 定义Topic监听器 --> 81 <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> 82 <jms:listener destination="test.topic" ref="topicReceiver1"/> 83 <jms:listener destination="ceshi" ref="topicReceiver2"/> 84 </jms:listener-container> 85 <bean id="topicReceiver1" class="com.eelink.ylt.mq.consumer.topic.TopicReceiver1"/> 86 <bean id="topicReceiver2" class="com.eelink.ylt.mq.consumer.topic.TopicReceiver2"/> 87 <!-- Topic各个消费者的bean end --> 88 89 </beans>
1-2,构建生产者类, 在前面activamq.xml配置中已经写明,只需引用jmsTopicTemplate 就可以发布topic模式的生产者,例如代码如下(分别展示两种发送方法,区别可以网上或者jms官网api查看,这里不做介绍)
1 package com.eelink.ylt.mq.producer.topic; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.beans.factory.annotation.Qualifier; 5 import org.springframework.jms.core.JmsTemplate; 6 import org.springframework.jms.core.MessageCreator; 7 import org.springframework.stereotype.Component; 8 9 import javax.jms.JMSException; 10 import javax.jms.Message; 11 import javax.jms.Session; 12 13 14 /** 15 * 16 * @author chenhuan 17 * @description Topic生产者发送消息到Topic 18 * 19 */ 20 21 @Component("topicSender") 22 public class TopicSender { 23 24 @Autowired 25 @Qualifier("jmsTopicTemplate") 26 private JmsTemplate jmsTemplate; 27 28 /** 29 * 发送一条消息到指定的topic(目标) 30 * @param topicName 31 * @param message 消息内容 32 */ 33 public void send(String topicName,final String message){ 34 jmsTemplate.send(topicName, new MessageCreator() { 35 @Override 36 public Message createMessage(Session session) throws JMSException { 37 return session.createTextMessage(message); 38 } 39 }); 40 for(int i=0;i<10;i++) { 41 jmsTemplate.convertAndSend(topicName, message); 42 } 43 } 44 45 }
1-3,创建生产者对应的消费者,消费者一直是处于监听状态, 当监听到有生产者,消费者就接收生产者的消息进行消费,所以消费者先启动
创建消费者类TopicReceiver1 实现消息监听接口MessageListener 通过复写onMessage方法来获取消息内容, 代码如下:
1 package com.eelink.ylt.mq.consumer.topic; 2 3 import org.springframework.stereotype.Component; 4 5 import javax.jms.JMSException; 6 import javax.jms.Message; 7 import javax.jms.MessageListener; 8 import javax.jms.TextMessage; 9 10 11 /** 12 * 13 * @author chen-liang 14 * @description Topic消息监听器 15 * 16 */ 17 //@Component 18 public class TopicReceiver1 implements MessageListener { 19 20 private int count=0; 21 22 @Override 23 public void onMessage(Message message) { 24 try { 25 System.out.println("TopicReceiver1接收到消息:"+",第"+count+"次:"+((TextMessage)message).getText()); 26 count++; 27 } catch (JMSException e) { 28 e.printStackTrace(); 29 } 30 } 31 32 }
1-4,现在编写测试类进行测试,我们通过controller,来创造生产者,(我这里一次写完,包含Topic模式和queue模式一起)代码如下:
package com.eelink.ylt.controller; import com.eelink.ylt.mq.producer.queue.QueueSender; import com.eelink.ylt.mq.producer.topic.TopicSender; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; /** * * @author liang * @description controller测试 */ @Controller // @RequestMapping("activemq") public class ActivemqController { @Resource QueueSender queueSender; @Resource TopicSender topicSender; /** * 发送消息到队列 Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中 * * @param message * @return String */ @ResponseBody @RequestMapping("queueSender") public String queueSender(@RequestParam("message") String message) { String opt = ""; for (int i = 0; i < 100; i++) { try { queueSender.send("test.queue", message+i); System.out.println(message+i); opt = "suc"; } catch (Exception e) { opt = e.getCause().toString(); } } return opt; } /** * 发送消息到主题 Topic主题 :放入一个消息,所有订阅者都会收到 这个是主题目的地是一对多的 * * @param message * @return String */ @ResponseBody @RequestMapping("topicSender") public String topicSender(HttpServletRequest req) { String opt = ""; String message = req.getParameter("message"); //for (int i = 0; i < 100; i++) { try { topicSender.send("test.topic", message); topicSender.send("ceshi", message); opt = "suc"; } catch (Exception e) { opt = e.getCause().toString(); } //} return opt; } }
2,现在我们来编写queue模式的生产者和消费者
2-1, queue模式的生产者, 注解中也可以使用@Resource(name="jmsQueueTemplate") ,对于不清楚注解的可以网上查区别
1 package com.eelink.ylt.mq.producer.queue; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.beans.factory.annotation.Qualifier; 5 import org.springframework.jms.core.JmsTemplate; 6 import org.springframework.jms.core.MessageCreator; 7 import org.springframework.stereotype.Component; 8 9 import javax.jms.JMSException; 10 import javax.jms.Message; 11 import javax.jms.Session; 12 13 /** 14 * 15 * @author chen-liang 16 * @description 队列消息生产者,发送消息到队列 17 * 18 */ 19 @Component("queueSender") 20 public class QueueSender { 21 22 @Autowired 23 @Qualifier("jmsQueueTemplate") 24 private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean 25 26 /** 27 * 发送一条消息到指定的队列(目标) 28 * @param queueName 队列名称 29 * @param message 消息内容 30 */ 31 public void send(String queueName,final String message){ 32 jmsTemplate.send(queueName, new MessageCreator() { 33 @Override 34 public Message createMessage(Session session) throws JMSException { 35 return session.createTextMessage(message); 36 } 37 }); 38 } 39 40 }
2-2,再创建queue模式的,消费者
package com.eelink.ylt.mq.consumer.queue; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * * @author liang * @description 队列消息监听器 * */ @Component public class QueueReceiver1 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
3,至此整个activemq的,所有过程简单实力展示完毕,有需要更多了解的可以留言