前面介紹了ActiveMQ的基本安裝使用,並寫了簡單的生產者、消費者。下面主要介紹ActiveMQ的消費者的Listener、Spring整合ActiveMQ。
一、消費者Listener
之前創建的消費者,接收消息的時候都是直接使用consumer.receive(),每次消費一條數據,啟動一次獲取一次,十分的木訥。實際開發工作中,基本不會使用此種方式,一般,消息的消費者都是持續監聽目標隊列Queue或者主題Topic,主要應用程序不主動關閉,會一直監聽消費消息數據。
JMS listener的消費者:
package com.cfang.mq.simpleCase; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerListener { public static void main(String[] args) { ConsumerListener consumerListener = new ConsumerListener(); consumerListener.listenMessage(); } public void listenMessage() { ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://172.31.31.160:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("tp_simple_queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //阻塞代碼,模擬應用程序不關閉。如果關閉了,那監聽也自動停止了 System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if(consumer != null){ try { consumer.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(session != null){ try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(connection != null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
二、Spring整合
Spring整合ActiveMQ非常的便捷,Spring提供了JmsTemplate對其進行操作,非常的方便,下面從配置文件到程序代碼逐步介紹。
1、Spring-jms配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <context:component-scan base-package="com.cfang.amq"> </context:component-scan> <!-- 配置ActiveMQConnectionFactory連接工廠對象 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://172.31.31.160:61616" userName="admin" password="admin"/> <!-- 配置connectionFactory的連接池信息 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="amqConnectionFactory"></property> <property name="maxConnections" value="10"></property> </bean> <!-- 帶有緩存功能的連接工廠,Session緩存大小可配置 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory"></property> <property name="sessionCacheSize" value="100"></property> </bean> <!-- 配置JmsTemplate --> <bean id="template" class="org.springframework.jms.core.JmsTemplate"> <!-- 給定連接工廠, 必須是spring創建的連接工廠. --> <property name="connectionFactory" ref="connectionFactory"></property> <!-- 可選 - 默認目的地命名 --> <property name="defaultDestinationName" value="tp_simple_queue"></property> </bean> <!-- 配置生產者Producer --> <bean id="springProducer" class="com.cfang.amq.SpringProducer"/> <!-- 配置消費listener --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="destinationName" value="tp_simple_queue"/> <property name="messageListener" ref="springConsumer"></property> <property name="concurrentConsumers" value="1"/> </bean> <!-- 消費者 --> <bean id="springConsumer" class="com.cfang.amq.SpringConsumer"/> </beans>
2、單元測試
package com.cfang.prebo.activemq; import java.io.IOException; import java.util.Scanner; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.cfang.amq.SpringProducer; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext-jms.xml" }) public class SpringListenerTest { @Autowired private SpringProducer springConsumer; @Test public void start() throws Exception { System.out.println("======start"); //發送消息 Scanner scanner = new Scanner(System.in); while(true) { System.out.print("producer send msg : "); String line = scanner.nextLine(); if("exit".equals(line)) { break; } springConsumer.sendMsg(null, line); } //阻塞 // System.in.read(); } }
3、運行結果: