activemq的配置和使用范例


1,首先在springmvc中使用activemq要先會配置 activamq,  先看看如下配置:

1-1, 首先在source文件夾新建一個,activemq.xml的配置文件,內容如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
 5     xmlns:jms="http://www.springframework.org/schema/jms"
 6     xsi:schemaLocation="http://www.springframework.org/schema/beans   
 7         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
 8         http://www.springframework.org/schema/context   
 9         http://www.springframework.org/schema/context/spring-context-4.0.xsd
10         http://www.springframework.org/schema/jms
11         http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
12         http://activemq.apache.org/schema/core
13         http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
14 
15     <context:property-placeholder location="classpath:datasource.properties" />
16     <context:property-placeholder location="classpath:activeMQ.properties" />
17     <!-- Spring Caching連接工廠 -->
18     <!-- 定義JmsTemplate的Topic類型 -->
19     <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
20          <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象  --> 
21         <constructor-arg ref="connectionFactory" />
22         <property name="connectionFactory" ref="jmsFactory" />
23         <!-- 非pub/sub模型(發布/訂閱),即隊列模式 -->
24         <property name="pubSubDomain" value="true" />
25     </bean>
26     
27      <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory   -->
28     <!-- <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
29         目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory  
30           <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
31           同上,同理
32         <constructor-arg ref="amqConnectionFactory" />
33         Session緩存數量
34         <property name="sessionCacheSize" value="100" />
35     </bean> -->
36         
37     <!-- ActiveMQ 連接工廠 -->
38      <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供
39     如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼 -->
40     <!-- <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"  /> -->
41     
42     <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
43         destroy-method="stop">
44         <property name="maxConnections" value="${jms.maxConnections}" />
45             <!-- FIX BUG: Reconnect fail(Session is closed) -->
46         <property name="idleTimeout" value="0" />
47         <property name="connectionFactory" ref="connectionFactory" />
48     </bean>
49     <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
50         <property name="brokerURL" value="${jms.url}" />
51         <property name="userName" value="${jms.userName}" />
52         <property name="password" value="${jms.password}" />
53     </bean>
54         
55     <!-- Spring JmsTemplate 的消息生產者 start     -->
56     <!-- 定義JmsTemplate的Queue類型 -->
57     <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
58         <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象  --> 
59         <constructor-arg ref="connectionFactory" />
60         <property name="connectionFactory" ref="jmsFactory" />
61         <!-- 非pub/sub模型(發布/訂閱),即隊列模式 -->
62         <property name="pubSubDomain" value="false" />
63     </bean>                        
64     <!-- Spring JmsTemplate 的消息生產者 end -->
65     
66         
67     <!-- 各個消息消費者 start     -->
68     <!-- 定義Queue監聽器 -->
69     <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
70         <jms:listener destination="test.queue" ref="queueReceiver1"/>
71         <jms:listener destination="test.queue" ref="queueReceiver2"/>
72     </jms:listener-container>
73     <!-- Queue消費者的bean start -->        
74     <bean id="queueReceiver1" class="com.eelink.ylt.mq.consumer.queue.QueueReceiver1"/>
75     <bean id="queueReceiver2" class="com.eelink.ylt.mq.consumer.queue.QueueReceiver2"/>
76     <!-- Queue消息消費者 end -->
77 
78     
79     <!-- Topic各個消費者的 start -->
80     <!-- 定義Topic監聽器 -->
81     <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
82         <jms:listener destination="test.topic" ref="topicReceiver1"/>
83         <jms:listener destination="ceshi" ref="topicReceiver2"/>
84     </jms:listener-container>                
85     <bean id="topicReceiver1" class="com.eelink.ylt.mq.consumer.topic.TopicReceiver1"/>
86     <bean id="topicReceiver2" class="com.eelink.ylt.mq.consumer.topic.TopicReceiver2"/>
87     <!-- Topic各個消費者的bean end -->
88 
89 </beans>

1-2,構建生產者類, 在前面activamq.xml配置中已經寫明,只需引用jmsTopicTemplate 就可以發布topic模式的生產者,例如代碼如下(分別展示兩種發送方法,區別可以網上或者jms官網api查看,這里不做介紹)

 1 package com.eelink.ylt.mq.producer.topic;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.beans.factory.annotation.Qualifier;
 5 import org.springframework.jms.core.JmsTemplate;
 6 import org.springframework.jms.core.MessageCreator;
 7 import org.springframework.stereotype.Component;
 8 
 9 import javax.jms.JMSException;
10 import javax.jms.Message;
11 import javax.jms.Session;
12 
13 
14 /**
15  * 
16  * @author chenhuan
17  * @description   Topic生產者發送消息到Topic
18  * 
19  */
20 
21 @Component("topicSender")
22 public class TopicSender {
23     
24     @Autowired
25     @Qualifier("jmsTopicTemplate")
26     private JmsTemplate jmsTemplate;
27     
28     /**
29      * 發送一條消息到指定的topic(目標)
30      * @param topicName
31      * @param message 消息內容
32      */
33     public void send(String topicName,final String message){
34         jmsTemplate.send(topicName, new MessageCreator() {
35             @Override
36             public Message createMessage(Session session) throws JMSException {
37                 return session.createTextMessage(message);
38             }
39         });
40         for(int i=0;i<10;i++) {
41             jmsTemplate.convertAndSend(topicName, message);
42         }
43     }
44 
45 }

1-3,創建生產者對應的消費者,消費者一直是處於監聽狀態, 當監聽到有生產者,消費者就接收生產者的消息進行消費,所以消費者先啟動
   創建消費者類TopicReceiver1 實現消息監聽接口MessageListener  通過復寫onMessage方法來獲取消息內容, 代碼如下:

 1 package com.eelink.ylt.mq.consumer.topic;
 2 
 3 import org.springframework.stereotype.Component;
 4 
 5 import javax.jms.JMSException;
 6 import javax.jms.Message;
 7 import javax.jms.MessageListener;
 8 import javax.jms.TextMessage;
 9 
10 
11 /**
12  * 
13  * @author chen-liang
14  * @description  Topic消息監聽器
15  * 
16  */
17 //@Component
18 public class TopicReceiver1 implements MessageListener {
19     
20     private int count=0;
21 
22     @Override
23     public void onMessage(Message message) {
24         try {
25             System.out.println("TopicReceiver1接收到消息:"+",第"+count+"次:"+((TextMessage)message).getText());
26             count++;
27         } catch (JMSException e) {
28             e.printStackTrace();
29         }
30     }
31     
32 }

1-4,現在編寫測試類進行測試,我們通過controller,來創造生產者,(我這里一次寫完,包含Topic模式和queue模式一起)代碼如下:

package com.eelink.ylt.controller;

import com.eelink.ylt.mq.producer.queue.QueueSender;
import com.eelink.ylt.mq.producer.topic.TopicSender;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;

/**
 * 
 * @author liang
 * @description controller測試
 */
@Controller
// @RequestMapping("activemq")
public class ActivemqController {

    @Resource
    QueueSender queueSender;
    @Resource
    TopicSender topicSender;

    /**
     * 發送消息到隊列 Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中
     * 
     * @param message
     * @return String
     */
    @ResponseBody
    @RequestMapping("queueSender")
    public String queueSender(@RequestParam("message") String message) {
        String opt = "";
        for (int i = 0; i < 100; i++) {
            try {                
                queueSender.send("test.queue", message+i);
                System.out.println(message+i);
                opt = "suc";
            } catch (Exception e) {
                opt = e.getCause().toString();
            }
        }

        return opt;
    }

    /**
     * 發送消息到主題 Topic主題 :放入一個消息,所有訂閱者都會收到 這個是主題目的地是一對多的
     * 
     * @param message
     * @return String
     */
    @ResponseBody
    @RequestMapping("topicSender")
    public String topicSender(HttpServletRequest req) {
        String opt = "";
        String message = req.getParameter("message");
        //for (int i = 0; i < 100; i++) {
        try {
            topicSender.send("test.topic", message);
            topicSender.send("ceshi", message);            
            opt = "suc";
        } catch (Exception e) {
            opt = e.getCause().toString();
        }
        //}
        return opt;
    }

}

2,現在我們來編寫queue模式的生產者和消費者
2-1, queue模式的生產者, 注解中也可以使用@Resource(name="jmsQueueTemplate") ,對於不清楚注解的可以網上查區別

 1 package com.eelink.ylt.mq.producer.queue;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.beans.factory.annotation.Qualifier;
 5 import org.springframework.jms.core.JmsTemplate;
 6 import org.springframework.jms.core.MessageCreator;
 7 import org.springframework.stereotype.Component;
 8 
 9 import javax.jms.JMSException;
10 import javax.jms.Message;
11 import javax.jms.Session;
12 
13 /**
14  * 
15  * @author chen-liang
16  * @description  隊列消息生產者,發送消息到隊列
17  * 
18  */
19 @Component("queueSender")
20 public class QueueSender {
21     
22     @Autowired
23     @Qualifier("jmsQueueTemplate")
24     private JmsTemplate jmsTemplate;//通過@Qualifier修飾符來注入對應的bean
25     
26     /**
27      * 發送一條消息到指定的隊列(目標)
28      * @param queueName 隊列名稱
29      * @param message 消息內容
30      */
31     public void send(String queueName,final String message){
32             jmsTemplate.send(queueName, new MessageCreator() {
33             @Override
34             public Message createMessage(Session session) throws JMSException {
35                 return session.createTextMessage(message);
36             }
37         });
38     }
39     
40 }

2-2,再創建queue模式的,消費者

package com.eelink.ylt.mq.consumer.queue;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 
 * @author liang
 * @description  隊列消息監聽器
 * 
 */
@Component
public class QueueReceiver1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

3,至此整個activemq的,所有過程簡單實力展示完畢,有需要更多了解的可以留言

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM