ActiveMQ是一種消息中間件,它實現了JMS規范,提供點對點和訂閱-發布兩種模式。下面介紹下ActiveMQ的使用;
一、環境的搭建
首先我們需要下載ActiveMQ的安裝包,下載地址http://activemq.apache.org/activemq-510-release.html。
直接解壓並拷貝到C盤中。最終的目錄為:C:\Program Files\ActiveMQ。
下面就是啟動ActiveMQ了,方法是進入到bin目錄下,win32目錄下,找到activemq.bat並雙擊;
然后便可以啟動ActiveMQ了,啟動的界面如下圖所示;
然后在瀏覽器中輸入如下地址http://localhost:8161/admin/ (用戶名:admin,密碼:damin)便可以進入到ActiveMQ的控制台中了。
下面就是代碼的創建了。
二、項目代碼
1、maven依賴包:
<!-- ActiveMQ的依賴 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.13.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-broker</artifactId> <version>5.13.1</version> </dependency> <!-- ActiveMQ+Spring的依賴 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.12.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.9.RELEASE</version> </dependency>
2、ActiveMQ和Spring的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd" default-autowire="byName"> <!--第一步: 配置connectionFactory --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> <property name="userName" value="admin"/> <property name="password" value="admin"/> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory"/> <property name="maxConnections" value="10"/> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> </bean> <!--第二步:配置JmsTemplate之消息生產者 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!--隊列目的地,點對點--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue"/> </bean> <!--主題目的地,訂閱發布--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic"/> </bean> <!--第三步:配置JmsTemplate之消息消費者 --> <!-- 消息監聽器 --> <bean id="consumerMessageListener" class="com.zte.ems.springjms.listener.ConsumerMessageListener"/> <!-- com.zte.ems.springjms.listener.ConsumerMessageListener這個類是自己創建的,實現MessageListener 接口 --> <!-- 消息監聽容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="queue_destination" ref="queueDestination" /> <property name="topic_destination" ref="topicDestination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean> </beans>
說明:
1)第一步:配置ConnectionFactory
ConnectionFactory是用於產生到JMS服務器的鏈接的,Spring為我們提供了多個ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory對於建立JMS服務器鏈接的請求會一直返回同一個鏈接,並且會忽略Connection的close方法調用。CachingConnectionFactory繼承了SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還新增了緩存功能,它可以緩存Session、MessageProducer和MessageConsumer。這里我們使用SingleConnectionFactory來作為示例。Spring提供的ConnectionFactory只是Spring用於管理ConnectionFactory的,真正產生到JMS服務器鏈接的ConnectionFactory還得是由JMS服務廠商提供,並且需要把它注入到Spring提供的ConnectionFactory中。我們這里使用的是ActiveMQ實現的JMS,所以在我們這里真正的可以產生Connection的就應該是由ActiveMQ提供的ConnectionFactory。 ActiveMQ為我們提供了一個PooledConnectionFactory,通過往里面注入一個ActiveMQConnectionFactory可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗。
2)第二步:配置消息生成者
配置好ConnectionFactory之后我們就需要配置生產者。生產者負責產生消息並發送到JMS服務器,這通常對應的是我們的一個業務邏輯服務實現類。但是我們的服務實現類是怎么進行消息的發送的呢?這通常是利用Spring為我們提供的JmsTemplate類來實現的,所以配置生產者其實最核心的就是配置進行消息發送的JmsTemplate。對於消息發送者而言,它在發送消息的時候要知道自己該往哪里發,為此,我們在定義JmsTemplate的時候需要往里面注入一個Spring提供的ConnectionFactory對象。在真正利用JmsTemplate進行消息發送的時候,我們需要知道消息發送的目的地,即destination。在Jms中有一個用來表示目的地的Destination接口,它里面沒有任何方法定義,只是用來做一個標識而已。當我們在使用JmsTemplate進行消息發送時沒有指定destination的時候將使用默認的Destination。默認Destination可以通過在定義jmsTemplate bean對象時通過屬性defaultDestination或defaultDestinationName來進行注入,defaultDestinationName對應的就是一個普通字符串。在ActiveMQ中實現了兩種類型的Destination,一個是點對點的ActiveMQQueue,另一個就是支持訂閱/發布模式的ActiveMQTopic。在定義這兩種類型的Destination時我們都可以通過一個name屬性來進行構造。
3)配置消息消費者
生產者往指定目的地Destination發送消息后,接下來就是消費者對指定目的地的消息進行消費了。那么消費者是如何知道有生產者發送消息到指定目的地Destination了呢?這是通過Spring為我們封裝的消息監聽容器MessageListenerContainer實現的,它負責接收信息,並把接收到的信息分發給真正的MessageListener進行處理。每個消費者對應每個目的地都需要有對應的MessageListenerContainer。對於消息監聽容器而言,除了要知道監聽哪個目的地之外,還需要知道到哪里去監聽,也就是說它還需要知道去監聽哪個JMS服務器,這是通過在配置MessageConnectionFactory的時候往里面注入一個ConnectionFactory來實現的。所以我們在配置一個MessageListenerContainer的時候有三個屬性必須指定,一個是表示從哪里監聽的ConnectionFactory;一個是表示監聽什么的Destination;一個是接收到消息以后進行消息處理的MessageListener。Spring一共為我們提供了兩種類型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。
SimpleMessageListenerContainer會在一開始的時候就創建一個會話session和消費者Consumer,並且會使用標准的JMS MessageConsumer.setMessageListener()方法注冊監聽器讓JMS提供者調用監聽器的回調函數。它不會動態的適應運行時需要和參與外部的事務管理。兼容性方面,它非常接近於獨立的JMS規范,但一般不兼容Java EE的JMS限制。大多數情況下我們還是使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會動態的適應運行時需要,並且能夠參與外部的事務管理。它很好的平衡了對JMS提供者要求低、先進功能如事務參與和兼容Java EE環境。
3.1)定義處理消息的MessageLisenter
package com.zte.ems.springjim.listener; import javax.jms.Message; import javax.jms.MessageListener; public class ConsumerMessageListener implements MessageListener { public void onMessage(Message arg0) { // 這里我們就是監聽生產者發送過來的消息,然后進行處理 } }
3.2)消息監聽器容器:
消息監聽器容器(message listener container)是一個特殊的bean,它可以監控JMS目的地並等待消息到達。一旦有消息到達,它就取出消息,然后把消息傳遞給任意一個對此消息感興趣的消息監聽器。