1,首先在springmvc中使用activemq要先會配置 activamq, 先看看如下配置:
1-1, 首先在source文件夾新建一個,activemq.xml的配置文件,內容如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" 5 xmlns:jms="http://www.springframework.org/schema/jms" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 8 http://www.springframework.org/schema/context 9 http://www.springframework.org/schema/context/spring-context-4.0.xsd 10 http://www.springframework.org/schema/jms 11 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd 12 http://activemq.apache.org/schema/core 13 http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> 14 15 <context:property-placeholder location="classpath:datasource.properties" /> 16 <context:property-placeholder location="classpath:activeMQ.properties" /> 17 <!-- Spring Caching連接工廠 --> 18 <!-- 定義JmsTemplate的Topic類型 --> 19 <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> 20 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 21 <constructor-arg ref="connectionFactory" /> 22 <property name="connectionFactory" ref="jmsFactory" /> 23 <!-- 非pub/sub模型(發布/訂閱),即隊列模式 --> 24 <property name="pubSubDomain" value="true" /> 25 </bean> 26 27 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 28 <!-- <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 29 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory 30 <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 31 同上,同理 32 <constructor-arg ref="amqConnectionFactory" /> 33 Session緩存數量 34 <property name="sessionCacheSize" value="100" /> 35 </bean> --> 36 37 <!-- ActiveMQ 連接工廠 --> 38 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 39 如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼 --> 40 <!-- <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> --> 41 42 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" 43 destroy-method="stop"> 44 <property name="maxConnections" value="${jms.maxConnections}" /> 45 <!-- FIX BUG: Reconnect fail(Session is closed) --> 46 <property name="idleTimeout" value="0" /> 47 <property name="connectionFactory" ref="connectionFactory" /> 48 </bean> 49 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 50 <property name="brokerURL" value="${jms.url}" /> 51 <property name="userName" value="${jms.userName}" /> 52 <property name="password" value="${jms.password}" /> 53 </bean> 54 55 <!-- Spring JmsTemplate 的消息生產者 start --> 56 <!-- 定義JmsTemplate的Queue類型 --> 57 <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> 58 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 59 <constructor-arg ref="connectionFactory" /> 60 <property name="connectionFactory" ref="jmsFactory" /> 61 <!-- 非pub/sub模型(發布/訂閱),即隊列模式 --> 62 <property name="pubSubDomain" value="false" /> 63 </bean> 64 <!-- Spring JmsTemplate 的消息生產者 end --> 65 66 67 <!-- 各個消息消費者 start --> 68 <!-- 定義Queue監聽器 --> 69 <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> 70 <jms:listener destination="test.queue" ref="queueReceiver1"/> 71 <jms:listener destination="test.queue" ref="queueReceiver2"/> 72 </jms:listener-container> 73 <!-- Queue消費者的bean start --> 74 <bean id="queueReceiver1" class="com.eelink.ylt.mq.consumer.queue.QueueReceiver1"/> 75 <bean id="queueReceiver2" class="com.eelink.ylt.mq.consumer.queue.QueueReceiver2"/> 76 <!-- Queue消息消費者 end --> 77 78 79 <!-- Topic各個消費者的 start --> 80 <!-- 定義Topic監聽器 --> 81 <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> 82 <jms:listener destination="test.topic" ref="topicReceiver1"/> 83 <jms:listener destination="ceshi" ref="topicReceiver2"/> 84 </jms:listener-container> 85 <bean id="topicReceiver1" class="com.eelink.ylt.mq.consumer.topic.TopicReceiver1"/> 86 <bean id="topicReceiver2" class="com.eelink.ylt.mq.consumer.topic.TopicReceiver2"/> 87 <!-- Topic各個消費者的bean end --> 88 89 </beans>
1-2,構建生產者類, 在前面activamq.xml配置中已經寫明,只需引用jmsTopicTemplate 就可以發布topic模式的生產者,例如代碼如下(分別展示兩種發送方法,區別可以網上或者jms官網api查看,這里不做介紹)
1 package com.eelink.ylt.mq.producer.topic; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.beans.factory.annotation.Qualifier; 5 import org.springframework.jms.core.JmsTemplate; 6 import org.springframework.jms.core.MessageCreator; 7 import org.springframework.stereotype.Component; 8 9 import javax.jms.JMSException; 10 import javax.jms.Message; 11 import javax.jms.Session; 12 13 14 /** 15 * 16 * @author chenhuan 17 * @description Topic生產者發送消息到Topic 18 * 19 */ 20 21 @Component("topicSender") 22 public class TopicSender { 23 24 @Autowired 25 @Qualifier("jmsTopicTemplate") 26 private JmsTemplate jmsTemplate; 27 28 /** 29 * 發送一條消息到指定的topic(目標) 30 * @param topicName 31 * @param message 消息內容 32 */ 33 public void send(String topicName,final String message){ 34 jmsTemplate.send(topicName, new MessageCreator() { 35 @Override 36 public Message createMessage(Session session) throws JMSException { 37 return session.createTextMessage(message); 38 } 39 }); 40 for(int i=0;i<10;i++) { 41 jmsTemplate.convertAndSend(topicName, message); 42 } 43 } 44 45 }
1-3,創建生產者對應的消費者,消費者一直是處於監聽狀態, 當監聽到有生產者,消費者就接收生產者的消息進行消費,所以消費者先啟動
創建消費者類TopicReceiver1 實現消息監聽接口MessageListener 通過復寫onMessage方法來獲取消息內容, 代碼如下:
1 package com.eelink.ylt.mq.consumer.topic; 2 3 import org.springframework.stereotype.Component; 4 5 import javax.jms.JMSException; 6 import javax.jms.Message; 7 import javax.jms.MessageListener; 8 import javax.jms.TextMessage; 9 10 11 /** 12 * 13 * @author chen-liang 14 * @description Topic消息監聽器 15 * 16 */ 17 //@Component 18 public class TopicReceiver1 implements MessageListener { 19 20 private int count=0; 21 22 @Override 23 public void onMessage(Message message) { 24 try { 25 System.out.println("TopicReceiver1接收到消息:"+",第"+count+"次:"+((TextMessage)message).getText()); 26 count++; 27 } catch (JMSException e) { 28 e.printStackTrace(); 29 } 30 } 31 32 }
1-4,現在編寫測試類進行測試,我們通過controller,來創造生產者,(我這里一次寫完,包含Topic模式和queue模式一起)代碼如下:
package com.eelink.ylt.controller; import com.eelink.ylt.mq.producer.queue.QueueSender; import com.eelink.ylt.mq.producer.topic.TopicSender; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; /** * * @author liang * @description controller測試 */ @Controller // @RequestMapping("activemq") public class ActivemqController { @Resource QueueSender queueSender; @Resource TopicSender topicSender; /** * 發送消息到隊列 Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中 * * @param message * @return String */ @ResponseBody @RequestMapping("queueSender") public String queueSender(@RequestParam("message") String message) { String opt = ""; for (int i = 0; i < 100; i++) { try { queueSender.send("test.queue", message+i); System.out.println(message+i); opt = "suc"; } catch (Exception e) { opt = e.getCause().toString(); } } return opt; } /** * 發送消息到主題 Topic主題 :放入一個消息,所有訂閱者都會收到 這個是主題目的地是一對多的 * * @param message * @return String */ @ResponseBody @RequestMapping("topicSender") public String topicSender(HttpServletRequest req) { String opt = ""; String message = req.getParameter("message"); //for (int i = 0; i < 100; i++) { try { topicSender.send("test.topic", message); topicSender.send("ceshi", message); opt = "suc"; } catch (Exception e) { opt = e.getCause().toString(); } //} return opt; } }
2,現在我們來編寫queue模式的生產者和消費者
2-1, queue模式的生產者, 注解中也可以使用@Resource(name="jmsQueueTemplate") ,對於不清楚注解的可以網上查區別
1 package com.eelink.ylt.mq.producer.queue; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.beans.factory.annotation.Qualifier; 5 import org.springframework.jms.core.JmsTemplate; 6 import org.springframework.jms.core.MessageCreator; 7 import org.springframework.stereotype.Component; 8 9 import javax.jms.JMSException; 10 import javax.jms.Message; 11 import javax.jms.Session; 12 13 /** 14 * 15 * @author chen-liang 16 * @description 隊列消息生產者,發送消息到隊列 17 * 18 */ 19 @Component("queueSender") 20 public class QueueSender { 21 22 @Autowired 23 @Qualifier("jmsQueueTemplate") 24 private JmsTemplate jmsTemplate;//通過@Qualifier修飾符來注入對應的bean 25 26 /** 27 * 發送一條消息到指定的隊列(目標) 28 * @param queueName 隊列名稱 29 * @param message 消息內容 30 */ 31 public void send(String queueName,final String message){ 32 jmsTemplate.send(queueName, new MessageCreator() { 33 @Override 34 public Message createMessage(Session session) throws JMSException { 35 return session.createTextMessage(message); 36 } 37 }); 38 } 39 40 }
2-2,再創建queue模式的,消費者
package com.eelink.ylt.mq.consumer.queue; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * * @author liang * @description 隊列消息監聽器 * */ @Component public class QueueReceiver1 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
3,至此整個activemq的,所有過程簡單實力展示完畢,有需要更多了解的可以留言