ActiveMQ詳細入門使用教程


ActiveMQ介紹

     MQ是消息中間件,是一種在分布式系統中應用程序借以傳遞消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的開源項目,完全支持JMS1.1和J2EE1.4規范的JMS Provider實現。 
特點: 
1、支持多種語言編寫客戶端 
2、對spring的支持,很容易和spring整合 
3、支持多種傳輸協議:TCP,SSL,NIO,UDP等 
4、支持AJAX 
消息形式: 
1、點對點(queue) 
2、一對多(topic) 

ActiveMQ安裝

這里寫圖片描述
我這里提供一個安裝好的虛擬機:http://download.csdn.net/download/liuyuanq123/10217892 
服務器運行后,我們可以直接訪問到activeMQ的界面: 
這里寫圖片描述
然后點擊queues可以看到現在沒有一條消息: 
這里寫圖片描述

ActiveMQ測試


      編寫一個測試類對ActiveMQ進行測試,首先得向pom文件中添加ActiveMQ相關的jar包:

 

     <dependency>  
         <groupId>org.apache.activemq</groupId>  
         <artifactId>activemq-all</artifactId>  
    </dependency>  

 

queue的發送代碼如下:

    public void testMQProducerQueue() throws Exception{
        //1、創建工廠連接對象,需要制定ip和端口號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連接工廠創建一個連接對象
        Connection connection = connectionFactory.createConnection();
        //3、開啟連接
        connection.start();
        //4、使用連接對象創建會話(session)對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話對象創建目標對象,包含queue和topic(一對一和一對多)
        Queue queue = session.createQueue("test-queue");
        //6、使用會話對象創建生產者對象
        MessageProducer producer = session.createProducer(queue);
        //7、使用會話對象創建一個消息對象
        TextMessage textMessage = session.createTextMessage("hello!test-queue");
        //8、發送消息
        producer.send(textMessage);
        //9、關閉資源
        producer.close();
        session.close();
        connection.close();
    }

接收代碼:

  public void TestMQConsumerQueue() throws Exception{
        //1、創建工廠連接對象,需要制定ip和端口號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連接工廠創建一個連接對象
        Connection connection = connectionFactory.createConnection();
        //3、開啟連接
        connection.start();
        //4、使用連接對象創建會話(session)對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話對象創建目標對象,包含queue和topic(一對一和一對多)
        Queue queue = session.createQueue("test-queue");
        //6、使用會話對象創建生產者對象
        MessageConsumer consumer = session.createConsumer(queue);
        //7、向consumer對象中設置一個messageListener對象,用來接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // TODO Auto-generated method stub
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        //8、程序等待接收用戶消息
        System.in.read();
        //9、關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

然后當我們運行queue發送的時候可以看到隊列里已經有一條消息了,但沒有發送出去: 
這里寫圖片描述
然后在運行queue 的接收端,可以看到消息已經發出了: 
這里寫圖片描述
這里寫圖片描述
接着對topic進行測試,發送代碼如下:

public void TestTopicProducer() throws Exception{
        //1、創建工廠連接對象,需要制定ip和端口號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連接工廠創建一個連接對象
        Connection connection = connectionFactory.createConnection();
        //3、開啟連接
        connection.start();
        //4、使用連接對象創建會話(session)對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話對象創建目標對象,包含queue和topic(一對一和一對多)
        Topic topic = session.createTopic("test-topic");
        //6、使用會話對象創建生產者對象
        MessageProducer producer = session.createProducer(topic);
        //7、使用會話對象創建一個消息對象
        TextMessage textMessage = session.createTextMessage("hello!test-topic");
        //8、發送消息
        producer.send(textMessage);
        //9、關閉資源
        producer.close();
        session.close();
        connection.close();
    }

接收代碼:

    public void TestTopicConsumer() throws Exception{
        //1、創建工廠連接對象,需要制定ip和端口號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連接工廠創建一個連接對象
        Connection connection = connectionFactory.createConnection();
        //3、開啟連接
        connection.start();
        //4、使用連接對象創建會話(session)對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話對象創建目標對象,包含queue和topic(一對一和一對多)
        Topic topic = session.createTopic("test-topic");
        //6、使用會話對象創建生產者對象
        MessageConsumer consumer = session.createConsumer(topic);
        //7、向consumer對象中設置一個messageListener對象,用來接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // TODO Auto-generated method stub
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        //8、程序等待接收用戶消息
        System.in.read();
        //9、關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

然后運行topic發送: 
這里寫圖片描述
可以看到消息已經發送出去。再運行topic接收: 
這里寫圖片描述
可以看到有了一個消費者,但是沒有接收的消息,這是因為正常情況下我們的topic消息不會再服務器持久化,所以要先打開消費者,再打開生產者,這個時候我們再運行生產者發送一條消息看到消息已經接收到了: 
這里寫圖片描述
這里寫圖片描述

ActiveMQ整合spring及項目中運用

      activeMQ與spring看一整合到一起使用,除了添加ActiveMQ相關的jar包外,還需要添加spring的jar包:

    <dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-context</artifactId>  
    </dependency>  

然后編寫applicationContext-activemq.xml文件, 
代碼如下:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd  
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd  
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd  
    http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd  
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">  

    <!-- 配置能夠產生connection的connectionfactory,由JMS對應的服務廠商提供 -->
    <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
    </bean>
    <!-- 配置spring管理真正connectionfactory的connectionfactory,相當於spring對connectionfactory的一層封裝 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
    </bean>
    <!-- 配置生產者 -->
    <!-- Spring使用JMS工具類,可以用來發送和接收消息 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這里是配置的spring用來管理connectionfactory的connectionfactory -->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    <!-- 配置destination -->
    <!-- 隊列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="spring-queue"/>
    </bean>
    <!-- 話題目的地 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-add-topic"/>
    </bean>
</beans>  

然后在我們淘淘商城中,商品添加到數據庫的時候,對應也要添加數據到我們的solr索引中,所以生產者應該在插入數據后創建: 
這里寫圖片描述
當然,在xml文件中配置好的jmstemplate和destination也要注入進來:

    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name="itemAddTopic")
    private Destination destination;

然后消費者應該寫在我們的搜索工程中,首先添加spring和activeMQ的jar包,然后配置xml文件,再編寫一個監聽器,當接收到消息時,就講數據存入索引庫,xml文件代碼如下:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd  
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd  
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd  
    http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd  
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">  

    <!-- 配置能夠產生connection的connectionfactory,由JMS對應的服務廠商提供 -->
    <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
    </bean>
    <!-- 配置spring管理真正connectionfactory的connectionfactory,相當於spring對connectionfactory的一層封裝 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
    </bean>
    <!-- 配置destination -->
    <!-- 隊列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="spring-queue"/>
    </bean>
    <!-- 話題目的地 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-add-topic"/>
    </bean>
    <!-- 配置監聽器 -->
    <bean id="myListener" class="com.taotao.search.listener.MyListener"/>
    <bean id="itemAddListener" class="com.taotao.search.listener.ItemAddListener"/>
    <!-- 系統監聽器 -->
<!--    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="queueDestination"/>
        <property name="messageListener" ref="myListener"/>
    </bean> -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="itemAddTopic"/>
        <property name="messageListener" ref="itemAddListener"/>
    </bean>
</beans>  

接收消息代碼: 
這里寫圖片描述
最后同時打開測試即可。

 

轉載地址:https://blog.csdn.net/liuyuanq123/article/details/79109218

 

ActiveMQ介紹

     MQ是消息中間件,是一種在分布式系統中應用程序借以傳遞消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的開源項目,完全支持JMS1.1和J2EE1.4規范的JMS Provider實現。 
特點: 
1、支持多種語言編寫客戶端 
2、對spring的支持,很容易和spring整合 
3、支持多種傳輸協議:TCP,SSL,NIO,UDP等 
4、支持AJAX 
消息形式: 
1、點對點(queue) 
2、一對多(topic) 

ActiveMQ安裝

這里寫圖片描述
我這里提供一個安裝好的虛擬機:http://download.csdn.net/download/liuyuanq123/10217892 
服務器運行后,我們可以直接訪問到activeMQ的界面: 
這里寫圖片描述
然后點擊queues可以看到現在沒有一條消息: 
這里寫圖片描述

ActiveMQ測試


      編寫一個測試類對ActiveMQ進行測試,首先得向pom文件中添加ActiveMQ相關的jar包:

     <dependency>  
         <groupId>org.apache.activemq</groupId>  
         <artifactId>activemq-all</artifactId>  
    </dependency>  
  • 1
  • 2
  • 3
  • 4

queue的發送代碼如下:

    public void testMQProducerQueue() throws Exception{
        //1、創建工廠連接對象,需要制定ip和端口號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連接工廠創建一個連接對象
        Connection connection = connectionFactory.createConnection();
        //3、開啟連接
        connection.start();
        //4、使用連接對象創建會話(session)對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話對象創建目標對象,包含queue和topic(一對一和一對多)
        Queue queue = session.createQueue("test-queue");
        //6、使用會話對象創建生產者對象
        MessageProducer producer = session.createProducer(queue);
        //7、使用會話對象創建一個消息對象
        TextMessage textMessage = session.createTextMessage("hello!test-queue");
        //8、發送消息
        producer.send(textMessage);
        //9、關閉資源
        producer.close();
        session.close();
        connection.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

接收代碼:

    public void TestMQConsumerQueue() throws Exception{
        //1、創建工廠連接對象,需要制定ip和端口號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連接工廠創建一個連接對象
        Connection connection = connectionFactory.createConnection();
        //3、開啟連接
        connection.start();
        //4、使用連接對象創建會話(session)對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話對象創建目標對象,包含queue和topic(一對一和一對多)
        Queue queue = session.createQueue("test-queue");
        //6、使用會話對象創建生產者對象
        MessageConsumer consumer = session.createConsumer(queue);
        //7、向consumer對象中設置一個messageListener對象,用來接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // TODO Auto-generated method stub
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        //8、程序等待接收用戶消息
        System.in.read();
        //9、關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

然后當我們運行queue發送的時候可以看到隊列里已經有一條消息了,但沒有發送出去: 
這里寫圖片描述
然后在運行queue 的接收端,可以看到消息已經發出了: 
這里寫圖片描述
這里寫圖片描述
接着對topic進行測試,發送代碼如下:

    public void TestTopicProducer() throws Exception{
        //1、創建工廠連接對象,需要制定ip和端口號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連接工廠創建一個連接對象
        Connection connection = connectionFactory.createConnection();
        //3、開啟連接
        connection.start();
        //4、使用連接對象創建會話(session)對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話對象創建目標對象,包含queue和topic(一對一和一對多)
        Topic topic = session.createTopic("test-topic");
        //6、使用會話對象創建生產者對象
        MessageProducer producer = session.createProducer(topic);
        //7、使用會話對象創建一個消息對象
        TextMessage textMessage = session.createTextMessage("hello!test-topic");
        //8、發送消息
        producer.send(textMessage);
        //9、關閉資源
        producer.close();
        session.close();
        connection.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

接收代碼:

    public void TestTopicConsumer() throws Exception{
        //1、創建工廠連接對象,需要制定ip和端口號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連接工廠創建一個連接對象
        Connection connection = connectionFactory.createConnection();
        //3、開啟連接
        connection.start();
        //4、使用連接對象創建會話(session)對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話對象創建目標對象,包含queue和topic(一對一和一對多)
        Topic topic = session.createTopic("test-topic");
        //6、使用會話對象創建生產者對象
        MessageConsumer consumer = session.createConsumer(topic);
        //7、向consumer對象中設置一個messageListener對象,用來接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // TODO Auto-generated method stub
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        //8、程序等待接收用戶消息
        System.in.read();
        //9、關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

然后運行topic發送: 
這里寫圖片描述
可以看到消息已經發送出去。再運行topic接收: 
這里寫圖片描述
可以看到有了一個消費者,但是沒有接收的消息,這是因為正常情況下我們的topic消息不會再服務器持久化,所以要先打開消費者,再打開生產者,這個時候我們再運行生產者發送一條消息看到消息已經接收到了: 
這里寫圖片描述
這里寫圖片描述

ActiveMQ整合spring及項目中運用


      activeMQ與spring看一整合到一起使用,除了添加ActiveMQ相關的jar包外,還需要添加spring的jar包:

    <dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-context</artifactId>  
    </dependency>  
  • 1
  • 2
  • 3
  • 4

然后編寫applicationContext-activemq.xml文件, 
代碼如下:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">  

    <!-- 配置能夠產生connection的connectionfactory,由JMS對應的服務廠商提供 -->
    <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
    </bean>
    <!-- 配置spring管理真正connectionfactory的connectionfactory,相當於spring對connectionfactory的一層封裝 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
    </bean>
    <!-- 配置生產者 -->
    <!-- Spring使用JMS工具類,可以用來發送和接收消息 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這里是配置的spring用來管理connectionfactory的connectionfactory -->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    <!-- 配置destination -->
    <!-- 隊列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="spring-queue"/>
    </bean>
    <!-- 話題目的地 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-add-topic"/>
    </bean>
</beans>  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

然后在我們淘淘商城中,商品添加到數據庫的時候,對應也要添加數據到我們的solr索引中,所以生產者應該在插入數據后創建: 
這里寫圖片描述
當然,在xml文件中配置好的jmstemplate和destination也要注入進來:

    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name="itemAddTopic")
    private Destination destination;
  • 1
  • 2
  • 3
  • 4

然后消費者應該寫在我們的搜索工程中,首先添加spring和activeMQ的jar包,然后配置xml文件,再編寫一個監聽器,當接收到消息時,就講數據存入索引庫,xml文件代碼如下:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">  

    <!-- 配置能夠產生connection的connectionfactory,由JMS對應的服務廠商提供 -->
    <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
    </bean>
    <!-- 配置spring管理真正connectionfactory的connectionfactory,相當於spring對connectionfactory的一層封裝 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
    </bean>
    <!-- 配置destination -->
    <!-- 隊列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="spring-queue"/>
    </bean>
    <!-- 話題目的地 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-add-topic"/>
    </bean>
    <!-- 配置監聽器 -->
    <bean id="myListener" class="com.taotao.search.listener.MyListener"/>
    <bean id="itemAddListener" class="com.taotao.search.listener.ItemAddListener"/>
    <!-- 系統監聽器 -->
<!-- <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="myListener"/> </bean> -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="itemAddTopic"/>
        <property name="messageListener" ref="itemAddListener"/>
    </bean>
</beans>  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

接收消息代碼: 
這里寫圖片描述
最后同時打開測試即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM