activeMQ的安裝和使用


什么是ActiveMQ?

  一款開源的JMS具體實現,是一個易於使用的消息中間件,一個消息容器

安裝

  下載

    官方網站:http://activemq.apache.org/ 

  解壓

    linux下的安裝,解壓命令:tar zxvf activemq-x.x.x-bin.tar.gz

  啟動

  • 前端進程的方式啟動(控制台關閉則服務關閉)

    cd [activemq_install_dir]/bin./activemq console

 

  • 后台進程的方式啟動

    d [activemq_install_dir]/bin./activemq start

 

  測試是否啟動成功

    瀏覽器中輸入 http://127.0.0.1:8161/admin/登錄名/密碼: admin/admin

     Linux下ActiveMQ默認監聽的端口號:61616,可以通過netstat -nl|grep 61616 查看

  關閉

     如果啟動的是前端進程,那么可以直接在控制台 ctrl + C 關閉

 

      如果啟動的是后端進程 cd [activemq_install_dir]/bin./activemq stop

目錄結構

  

bin存放的是腳本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是說明文檔
examples存放的是簡單的實例
lib存放的是activemq所需jar包
webapps用於存放項目的目錄

與spring的整合

直接上代碼

所需jar包

<dependency>
       <groupId>javax.jms</groupId>
       <artifactId>javax.jms-api</artifactId>
       <version>2.0</version>
    </dependency>
    <!-- spring-jms API -->
    <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-jms</artifactId>
       <version>${spring.version}</version>
    </dependency>
    <!-- active-mq核心包 -->
    <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-core</artifactId>
       <version>5.7.0</version>
    </dependency>

xml代碼

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:p="http://www.springframework.org/schema/p"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xmlns:amq="http://activemq.apache.org/schema/core"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
                       http://www.springframework.org/schema/beans/spring-beans.xsd
                       http://www.springframework.org/schema/context
                       http://www.springframework.org/schema/context/spring-context.xsd
                       http://www.springframework.org/schema/jms
                       http://www.springframework.org/schema/jms/spring-jms.xsd
                       http://activemq.apache.org/schema/core
                       http://activemq.apache.org/schema/core/activemq-core.xsd">

        <!-- 配置連接ActiveMQ的ConnectionFactory -->
        <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
             <property name="brokerURL" value="tcp://localhost:61616"/>
         </bean>
              
        <!--為了提高效率,配置一個spring提供的緩存連接池-->
        <bean id="cachedConnectionFactory"
              class="org.springframework.jms.connection.CachingConnectionFactory"
              p:targetConnectionFactory-ref="amqConnectionFactory"
              p:sessionCacheSize="10"/>
              
        <!-- 定義JmsTemplate的Topic類型 -->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
             <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->  
            <constructor-arg ref="cachedConnectionFactory" />
            <!-- pub/sub模型(發布/訂閱) -->
            <property name="pubSubDomain" value="true" />
            <!-- 指定默認的destination -->
            <property name="defaultDestination" ref="topicDestination"/>
            <!-- deliveryMode, priority, timeToLive 的開關,要生效,必須配置explicitQosEnabled為true,默認false-->
             <property name="explicitQosEnabled" value="true" />   
            <!-- 發送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
            <property name="deliveryMode" value="2" /> 
        </bean>
            <!--Spring JmsTemplate 的消息生產者 end-->
        
        <!-- 配置queue的destination目的地-->
        <!-- 接收者 -->
        <bean id="activeMqReceiverDestination" class="org.apache.activemq.command.ActiveMQQueue">
               <!-- 指定隊列的名稱 -->
            <constructor-arg value="activeMqReceiver"/>
        </bean>
        
        <!-- 評論消息 -->
        <!-- <bean id="commentMessageDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="commentMessage"/>
        </bean> -->
        
        <!-- 發布任務消息 -->
        <!-- <bean id="releaseMessageDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="releaseMessage"/>
        </bean> -->
        <!-- 發布任務批量保存 -->
        <!-- <bean id="batchSaveTaskDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="batchSaveTask"/>
        </bean> -->
        <!-- 更新評論數量 -->
        <!-- <bean id="updateCommentNumberDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="updateCommentNumber"/>
        </bean> -->
        <!-- 回帖相關 -->
        <!-- <bean id="repliesDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="repliesDestination"/>
        </bean> -->
        
        <!-- 配置topic的Destination地址 -->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="myTopic"/>
        </bean>
        
        <!-- Spring JmsTemplate 的消息生產者 start-->
        <!-- 定義JmsTemplate的Queue類型 -->
        <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->  
            <constructor-arg ref="cachedConnectionFactory" />
            <!-- 非pub/sub模型(發布/訂閱),即隊列模式 -->
            <property name="pubSubDomain" value="false" />
             <!-- 指定默認的destination 
            <property name="defaultDestination" ref="queueDestination"/>-->
        </bean>
    
        
            
            <!-- 消息消費者相關配置 start-->
            <!-- 鮮花消息監聽類 -->
          <!--   <bean id="flowerMessageConsumerService" class="com.tfedu.discuss.service.mq.FlowerMessageConsumerService"/>
            評論消息監聽類
            <bean id="commentMessageConsumerService" class="com.tfedu.discuss.service.mq.CommentMessageConsumerService"/>
            發布消息監聽類
            <bean id="releaseMessageConsumerService" class="com.tfedu.discuss.service.mq.ReleaseMessageConsumerService"/>
            批量保存發布任務
            <bean id="batchSaveTaskConsumerService" class="com.tfedu.discuss.service.mq.BatchSaveTaskConsumerService"/>
            評論數維護監聽類
            <bean id="commentNumberMessageConsumerService" class="com.tfedu.discuss.service.mq.CommentNumberMessageConsumerService"/> -->
            <bean id="activeMqReceiverService" class="com.activemq.ActiveMqReceiverService"></bean>
             <!-- 定義Queue監聽器 -->
             <jms:listener-container destination-type="queue" container-type="default"  connection-factory="cachedConnectionFactory" acknowledge="transacted">
                  <!-- <jms:listener destination="flowerMessageDestination" ref="flowerMessageConsumerService"/>
                  <jms:listener destination="commentMessageDestination" ref="commentMessageConsumerService"/>
                  <jms:listener destination="releaseMessageDestination" ref="releaseMessageConsumerService"/>
                  <jms:listener destination="batchSaveTaskDestination" ref="batchSaveTaskConsumerService"/>
                  <jms:listener destination="updateCommentNumber" ref="commentNumberMessageConsumerService"/>
                  <jms:listener destination="repliesDestination" ref="repliesMessageConsumerService"/> -->
                  <jms:listener destination="activeMqReceiverDestination" ref="activeMqReceiverService"/>
             </jms:listener-container>
         <!-- 消息消費者相關配置 end-->
</beans>

消息生產者代碼

 

package com.activemq;

import javax.annotation.Resource;

import org.springframework.jms.core.JmsOperations;
import org.springframework.stereotype.Service;

@Service
public class ActiveMqSenderService {
    //JmsTemplate為JmsOperations的具體實現,一般注入接口解耦
        @Resource(name = "queueTemplate")
        private JmsOperations queueTemplate;

        /**
         * 發送鮮花消息
         * <p>
         * 贈送鮮花時觸發
         *
         * @param messageEntity 消息實體
         */
        public void sendFlowerMessage(MQMessageEntity messageEntity) {
            System.out.println("准備發送消息");
            queueTemplate.convertAndSend("activeMqReceiverDestination", messageEntity);
        }
}

 

消息接收者

 

package com.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.springframework.beans.factory.annotation.Autowired;

public class ActiveMqReceiverService  implements MessageListener{

    @Autowired
    private MessageService messageService;
    @Override
    public void onMessage(Message message) {
        ObjectMessage ObjectMessage = (ObjectMessage) message;
        MQMessageEntity messageEntity;
        try {
            messageEntity = (MQMessageEntity) ObjectMessage.getObject();
            messageService.messageFlower(messageEntity.getSourceId(), messageEntity.getSourceType(),
                    messageEntity.getSendId());
        } catch (JMSException e) {
            e.printStackTrace();
        } 

    }

}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM