ActiveMQ與spring集成實現Queue模式


  ActiveMQ可以和spring很好的集成,下面我們來看看,如何做個集成的demo。

  (1)pom.xml引入相關jar

<!-- spring相關 begin -->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>4.1.5.RELEASE</version>
        </dependency>
        <!-- spring相關 end -->
        <!-- activeMQ相關 begin-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.1.4.RELEASE</version>
        </dependency>
        <!-- activeMQ相關  end-->

  (2)添加生產者配置activemq-sender.xml

<description>JMS發布者應用配置</description>
    
    <!-- CachingConnectionFactory 連接工廠 (有緩存功能)-->
    <bean id="cachingConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- Session緩存數量 -->
        <property name="sessionCacheSize" value="20" />
        <property name="targetConnectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <!-- MQ地址 賬戶名 密碼-->  
                <property name="brokerURL" value="tcp://192.168.56.129:61616" />
                <property name="userName" value="parry" />
                <property name="password" value="parry123" />
                <!-- 是否異步發送 -->
                <property name="useAsyncSend" value="true"/>
            </bean>  
        </property>  
    </bean>
    
    <!-- 接收消息的目的地(一個主題)點對點隊列 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 設置消息主題的名字 -->
        <constructor-arg index="0" value="messages" />
    </bean>
    
    <!-- 接收配置JMS模版 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
        <property name="defaultDestination" ref="destination" />
        <!-- value為true為發布/訂閱模式; value為false為點對點模式-->
        <property name="pubSubDomain" value="false"/>
    </bean>

  (3)添加消費者配置activemq-consumer.xml

<description>JMS訂閱者應用配置</description>
    <!-- CachingConnectionFactory 連接工廠 (有緩存功能)-->
    <bean id="cachingConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- Session緩存數量 -->
        <property name="sessionCacheSize" value="20" />
        <property name="targetConnectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <!-- MQ地址 賬戶名 密碼-->  
                <property name="brokerURL" value="tcp://192.168.56.129:61616" />
                <property name="userName" value="parry" />
                <property name="password" value="parry123" />
                <!-- 是否異步發送 -->
                <property name="useAsyncSend" value="true"/>
            </bean>  
        </property>  
    </bean>
    
    <!-- 接收消息的目的地(一個主題)點對點隊列 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 設置消息主題的名字 -->
        <constructor-arg index="0" value="messages" />
    </bean>
    
    <!-- 消費者配置 (自己定義) -->
    <bean id="consumer" class="com.parry.MQ.funcion.Listener" />
    
    <!-- 消息監聽容器 -->
    <bean id="myListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
        <property name="destination" ref="destination" />
        <property name="messageListener" ref="consumer" />
        <!-- 如果消息的接收速率,大於消息處理的速率時,可以采取線程池方式 -->
        <property name="taskExecutor" ref="queueMessageExecutor"/>
        <!-- 設置固定的線程數 -->
        <property name="concurrentConsumers" value="30"/>
        <!-- 設置動態的線程數 -->
        <property name="concurrency" value="20-50"/>
        <!-- 設置最大的線程數 -->
        <property name="maxConcurrentConsumers" value="80"/>
    </bean>
    <bean id="queueMessageExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="30" />
        <property name="maxPoolSize" value="80" />
        <property name="daemon" value="true" />
        <property name="keepAliveSeconds" value="120" />
    </bean>

  (4)新建一個發送消息的方法

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
/**
 * 發送消息
 * @author Administrator
 *
 */
@Component
public class QueueSender {
    
    @Autowired
    private JmsTemplate myJmsTemplate;

    /**
     * 發送一條消息到指定的隊列(目標)
     * 
     * @param queueName
     *            隊列名稱
     * @param message
     *            消息內容
     */
    public void send(String queueName, final String message) {
        myJmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

  (5)添加監聽器

package com.parry.MQ.funcion;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 接收者監聽類
 * @author Administrator
 *
 */
public class Listener implements MessageListener {

    public void onMessage(Message message) {
        // 業務處理
        try {
            TextMessage message2 = (TextMessage) message;
            System.out.println("接收到信息:" + message2.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  (6)寫個一請求測試一下

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import com.parry.MQ.funcion.QueueSender;

@Controller
public class App {
    
    @Autowired
    private QueueSender sender;
    
    @RequestMapping("test")
    @ResponseBody
    public String Test() {
        
        sender.send("messages", "你好,這是我的第一條消息!");
        return "Hello world";
    }
}

  (7)測試結果

  

  在ActiveMQ的管理后台,我們也能看到我們的消息(這里我多測試了幾次):

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM