什么是ActiveMQ?
一款開源的JMS具體實現,是一個易於使用的消息中間件,一個消息容器
安裝
下載
官方網站:http://activemq.apache.org/
解壓
linux下的安裝,解壓命令:tar zxvf activemq-x.x.x-bin.tar.gz
啟動
- 前端進程的方式啟動(控制台關閉則服務關閉)
cd [activemq_install_dir]/bin./activemq console
- 后台進程的方式啟動
d [activemq_install_dir]/bin./activemq start
測試是否啟動成功
瀏覽器中輸入 http://127.0.0.1:8161/admin/登錄名/密碼: admin/admin
Linux下ActiveMQ默認監聽的端口號:61616,可以通過netstat -nl|grep 61616 查看
關閉
如果啟動的是前端進程,那么可以直接在控制台 ctrl + C 關閉
如果啟動的是后端進程 cd [activemq_install_dir]/bin./activemq stop
目錄結構
bin存放的是腳本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是說明文檔
examples存放的是簡單的實例
lib存放的是activemq所需jar包
webapps用於存放項目的目錄
與spring的整合
直接上代碼
所需jar包
<dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0</version> </dependency> <!-- spring-jms API --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <!-- active-mq核心包 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
xml代碼
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- 配置連接ActiveMQ的ConnectionFactory --> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!--為了提高效率,配置一個spring提供的緩存連接池--> <bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" p:targetConnectionFactory-ref="amqConnectionFactory" p:sessionCacheSize="10"/> <!-- 定義JmsTemplate的Topic類型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <constructor-arg ref="cachedConnectionFactory" /> <!-- pub/sub模型(發布/訂閱) --> <property name="pubSubDomain" value="true" /> <!-- 指定默認的destination --> <property name="defaultDestination" ref="topicDestination"/> <!-- deliveryMode, priority, timeToLive 的開關,要生效,必須配置explicitQosEnabled為true,默認false--> <property name="explicitQosEnabled" value="true" /> <!-- 發送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久--> <property name="deliveryMode" value="2" /> </bean> <!--Spring JmsTemplate 的消息生產者 end--> <!-- 配置queue的destination目的地--> <!-- 接收者 --> <bean id="activeMqReceiverDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 指定隊列的名稱 --> <constructor-arg value="activeMqReceiver"/> </bean> <!-- 評論消息 --> <!-- <bean id="commentMessageDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="commentMessage"/> </bean> --> <!-- 發布任務消息 --> <!-- <bean id="releaseMessageDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="releaseMessage"/> </bean> --> <!-- 發布任務批量保存 --> <!-- <bean id="batchSaveTaskDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="batchSaveTask"/> </bean> --> <!-- 更新評論數量 --> <!-- <bean id="updateCommentNumberDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="updateCommentNumber"/> </bean> --> <!-- 回帖相關 --> <!-- <bean id="repliesDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="repliesDestination"/> </bean> --> <!-- 配置topic的Destination地址 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="myTopic"/> </bean> <!-- Spring JmsTemplate 的消息生產者 start--> <!-- 定義JmsTemplate的Queue類型 --> <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <constructor-arg ref="cachedConnectionFactory" /> <!-- 非pub/sub模型(發布/訂閱),即隊列模式 --> <property name="pubSubDomain" value="false" /> <!-- 指定默認的destination <property name="defaultDestination" ref="queueDestination"/>--> </bean> <!-- 消息消費者相關配置 start--> <!-- 鮮花消息監聽類 --> <!-- <bean id="flowerMessageConsumerService" class="com.tfedu.discuss.service.mq.FlowerMessageConsumerService"/> 評論消息監聽類 <bean id="commentMessageConsumerService" class="com.tfedu.discuss.service.mq.CommentMessageConsumerService"/> 發布消息監聽類 <bean id="releaseMessageConsumerService" class="com.tfedu.discuss.service.mq.ReleaseMessageConsumerService"/> 批量保存發布任務 <bean id="batchSaveTaskConsumerService" class="com.tfedu.discuss.service.mq.BatchSaveTaskConsumerService"/> 評論數維護監聽類 <bean id="commentNumberMessageConsumerService" class="com.tfedu.discuss.service.mq.CommentNumberMessageConsumerService"/> --> <bean id="activeMqReceiverService" class="com.activemq.ActiveMqReceiverService"></bean> <!-- 定義Queue監聽器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="cachedConnectionFactory" acknowledge="transacted"> <!-- <jms:listener destination="flowerMessageDestination" ref="flowerMessageConsumerService"/> <jms:listener destination="commentMessageDestination" ref="commentMessageConsumerService"/> <jms:listener destination="releaseMessageDestination" ref="releaseMessageConsumerService"/> <jms:listener destination="batchSaveTaskDestination" ref="batchSaveTaskConsumerService"/> <jms:listener destination="updateCommentNumber" ref="commentNumberMessageConsumerService"/> <jms:listener destination="repliesDestination" ref="repliesMessageConsumerService"/> --> <jms:listener destination="activeMqReceiverDestination" ref="activeMqReceiverService"/> </jms:listener-container> <!-- 消息消費者相關配置 end--> </beans>
消息生產者代碼
package com.activemq; import javax.annotation.Resource; import org.springframework.jms.core.JmsOperations; import org.springframework.stereotype.Service; @Service public class ActiveMqSenderService { //JmsTemplate為JmsOperations的具體實現,一般注入接口解耦 @Resource(name = "queueTemplate") private JmsOperations queueTemplate; /** * 發送鮮花消息 * <p> * 贈送鮮花時觸發 * * @param messageEntity 消息實體 */ public void sendFlowerMessage(MQMessageEntity messageEntity) { System.out.println("准備發送消息"); queueTemplate.convertAndSend("activeMqReceiverDestination", messageEntity); } }
消息接收者
package com.activemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import org.springframework.beans.factory.annotation.Autowired; public class ActiveMqReceiverService implements MessageListener{ @Autowired private MessageService messageService; @Override public void onMessage(Message message) { ObjectMessage ObjectMessage = (ObjectMessage) message; MQMessageEntity messageEntity; try { messageEntity = (MQMessageEntity) ObjectMessage.getObject(); messageService.messageFlower(messageEntity.getSourceId(), messageEntity.getSourceType(), messageEntity.getSendId()); } catch (JMSException e) { e.printStackTrace(); } } }