Java ActiveMQ 講解(二)Spring ActiveMQ整合+注解消息監聽


對於ActiveMQ消息的發送,原聲的api操作繁瑣,而且如果不進行二次封裝,打開關閉會話以及各種創建操作也是夠夠的了。那么,Spring提供了一個很方便的去收發消息的框架,spring jms。整合Spring后,代碼不僅變得非常優雅,而且易用性和擴展性更好。

廢話不多說,直接開搞。

 

1. maven依賴

        <!-- activemq -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${springframework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>${activemq.version}</version>
        </dependency>

 

2.命名空間引入

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-3.2.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

 

3. Xml配置

    <amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://${activemq.ip}:61616" userName="${activemq.username}" password="${activemq.password}" />

    <bean id="jmsConnectionFactoryExtend" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="jmsConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 消息處理器 -->
    <bean id="jmsMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />

    <!-- ====Producer side start==== -->

    <!-- 定義JmsTemplate的Queue類型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="jmsConnectionFactoryExtend" />
        <!-- 非pub/sub模型(發布/訂閱),即隊列模式 -->
        <property name="pubSubDomain" value="false" />
        <property name="messageConverter" ref="jmsMessageConverter"></property>
    </bean>

    <!-- 定義JmsTemplate的Topic類型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="jmsConnectionFactoryExtend" />
        <!-- pub/sub模型(發布/訂閱) -->
        <property name="pubSubDomain" value="true" />
        <property name="messageConverter" ref="jmsMessageConverter"></property>
    </bean>

    <jms:listener-container destination-type="queue" container-type="default" connection-factory="jmsConnectionFactoryExtend" acknowledge="auto" concurrency="5-10">
        <jms:listener destination="testqueue" ref="queueReciver" />
    </jms:listener-container>
    

第一個是配置我們的mq連接,ip+端口號,帳號密碼的信息。

第二個是引入spring的mq連接池。可以配置緩存的連接數。

第三個是消息處理器,Spring默認提供了基於Jdk Serializable的消息處理和MappingJackson2MessageConventer,其實這兩個挺常用,在Spring Redis中,在Spring MVC中,都有着這幾種conventer的身影。

下面是兩個發送消息的模版類,類似於之前講過的RedisTemplate。向其注入上面定義的消息處理器,代碼中我們會用到。(其實類中已經判斷如果不進行注入就設置一個默認的,但是自己注入的話,方便我們控制)

 listener-container是Spring提供的一個監聽器容器,用於統一控制我們的監聽類來接收處理消息。這里面有一些配置,schema有說明。可以配置響應模式,消費者數量等。開啟多消費者,有助於加快隊列處理速度。

 

4.注解方式的實現

如果要用注解的方式,就不需要在xml中自己定義消息監聽容器了。只需要加入以下的代碼:

<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
        <property name="connectionFactory" ref="jmsConnectionFactoryExtend"/>
    </bean>
    
    <!-- 監聽注解支持 -->
    <jms:annotation-driven/>

這樣,配置我們消費處理類上的@listener注解,即可監聽對應的queue或者topic消息。

 

5.生產者代碼

隊列消息:

@Resource
@Component("queueSender")
public class QueueSender {

    @Resource(name = "jmsQueueTemplate")
    private JmsTemplate jmsQueueTemplate;// 通過@Qualifier修飾符來注入對應的bean
    

    public void send(String destination, final Object message) {
        jmsQueueTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return jmsQueueTemplate.getMessageConverter().toMessage(message, session);
            }
        });
    }

}

 

 

訂閱消息:

@Component
public class TopicSender {
    
    @Resource(name="jmsTopicTemplate")
    private JmsTemplate jmsTemplate;
    
    
    /**
     * 發送一條消息到指定的隊列(目標)
     * @param queueName 隊列名稱
     * @param message 消息內容
     */
    public void publish(String destination,final Object message){
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return jmsTemplate.getMessageConverter().toMessage(message, session);
            }
        });
    }

}

 

 

6.消費者代碼

package cn.test.activemq.consumer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.stereotype.Component;

import cn.test.MqBean;
import cn.test.activemq.message.types.QueueDefination;

/**
 * @author Han
 */
@Component("spqueueconsumertest")
public class SpringQueueReciverTest extends MessageListenerAdapter{
    private static final Logger log = LoggerFactory.getLogger(SpringQueueReciverTest.class);
    
    
    
    @JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")
    public void onMessagehehe(Message message, Session session) throws JMSException {
        try {
            MqBean bean = (MqBean) getMessageConverter().fromMessage(message);
            System.out.println(bean.getName());
            System.out.println(session);
            message.acknowledge();
            message.acknowledge();
        } catch (MessageConversionException | JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}

上面的@JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")是在用注解方式監聽的時候加入。如果用xml配置容易,可以忽略。

 

附上MqBean

public class MqBean implements Serializable{
    private Integer age;
    private String name;
    public Integer getAge() {
        return age;
    }
    public void setAge(Integer age) {
        this.age = age;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    
}

 

 

運行效果截圖:

 

 

ActiveMQ的基本用法大概就這些了。后續如果有新的發現包括優化發面的,再繼續發吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM