ActiveMQ (三):項目實踐


1. 簡單項目demo 

    Com.hoo.mq路徑下(除了com.hoo.mq.spring)是普通java中使用activeMQ.   

    Com.hoo.mq.spring路徑下是非web環境spring集成activeMQ

      Com.tgb.SpringActivemq路徑下是web環境下spring mvc集成activeMQ. 

  

    目錄結構如下:

    

 

  

    github下載地址:https://github.com/Monkey-mi/TestActiveMQ.git

 

2. web項目

  2.1 生產者

    spring-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
    
    <!-- 加載activemq的屬性配置文件 -->
    <context:property-placeholder location="classpath:config/activemq.properties" />

    <!-- ActiveMQ 連接工廠 -->
     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
    <!-- 如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼-->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="tcp://localhost:61618" userName="admin" password="topsun"  />

    <!-- Spring Caching連接工廠 -->
     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory 
     org.springframework.jms.connection.CachingConnectionFactory
     org.apache.activemq.pool.PooledConnectionFactory
     -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
          <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
          <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session緩存數量 -->
        <property name="sessionCacheSize" value="100" />
        <!-- 接收者ID,用於Topic訂閱者的永久訂閱 clientId 作為客戶端的標識,連接同一服務的客戶端不能擁有相同的 -->
        <property name="clientId" value="client-B" /> 
    </bean>
    
    <!-- Spring JmsTemplate 的消息生產者 start-->
    
    <!-- 定義JmsTemplate的Queue類型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->  
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(發布/訂閱),即隊列模式 -->
        <property name="pubSubDomain" value="false" />
        <!-- 訂閱消息持久化 -->
        <property name="deliveryPersistent" value="true" />
        <!-- 配置持久化,同上 deliveryPersistent
        <property name="deliveryMode" value="2" />
        -->
    </bean>
    
    <!-- 定義JmsTemplate的Topic類型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
         <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->  
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(發布/訂閱) -->
        <property name="pubSubDomain" value="true" />
        <!-- 訂閱消息持久化 -->
        <property name="deliveryPersistent" value="true" />
        <!-- 配置持久化,同上 deliveryPersistent
        <property name="deliveryMode" value="2" />
        -->
    </bean>
    
    <!--Spring JmsTemplate 的消息生產者 end-->

</beans>  

 

  QueueSender.java

package com.outsideasy.activemq.service;

import java.io.Serializable;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Component
public class QSender {
    //隊列名
    private static final String QUEUENAME = "outside.order";
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;//通過@Qualifier修飾符來注入對應的bean
    
    /**
     * 發送一條消息到指定的隊列(目標)
     * @params s 序列化對象 和 receiver 中的getObject對應
     */
    public void orderSend(final Serializable s){
            jmsTemplate.send(QUEUENAME, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(s);
            }
        });
    }
}

 

  PurchaseOrderService.java

    public void saveAccpetOrderByID(Map<String,Object> params){
        int status = getPurchaseOrderStatus(params);
        if(status ==10){
            params.put("order_status", 20);
            mapper.updateOrderStatus(params);
            purchaseOrderOperatingService.addPurchaseOrderOperating(params);
            //觸發PO確認接單
            PurchaseOrderVo purchaseOrderVo = mapper.getOrderDetailsByID(params);
            PurchaseOrderSender sender = new PurchaseOrderSender();//構造發送數據 這里的對象和接受者對象在項目中的包路徑相同,否則無法反序列化
            sender.setPur_order_id(purchaseOrderVo.getPur_order_id());
            sender.setOrder_bh(purchaseOrderVo.getOrder_bh());
            sender.setOrder_status(purchaseOrderVo.getOrder_status());
            sender.setAgreement_bh(purchaseOrderVo.getAgreement_bh());
            sender.setR_opreate_dt(purchaseOrderVo.getR_opreate_dt());
            sender.setSource_type(purchaseOrderVo.getSource_type());
            qSender.orderSend(sender);//調用activemq發送消息對象
        }else{
            
        }
    }

 

  2.2 消費者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

    <context:property-placeholder location="classpath:sysconfig/activemq.properties" />
    <bean id="qReceiver" class="com.outsideasy.activemq.service.QReceiver"/>

    <!-- ActiveMQ 連接工廠 -->
     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
    <!-- 如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼-->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="tcp://localhost:61618" userName="admin" password="topsun" /> 

    <!-- Spring Caching連接工廠 -->
     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory 
     org.springframework.jms.connection.CachingConnectionFactory
     org.apache.activemq.pool.PooledConnectionFactory
     -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
          <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
          <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session緩存數量 -->
        <property name="sessionCacheSize" value="100" />
        <!-- 接收者ID,用於Topic訂閱者的永久訂閱-->
        <property name="clientId" value="client-C" /> 
    </bean>

    
    <!-- 消息消費者 start-->
    <!-- 定義Queue監聽器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="outside.order" ref="qReceiver"/>
        <!-- <jms:listener destination="test.queue" ref="queueReceiver2"/> -->
    </jms:listener-container>
    
    <!-- 定義Topic監聽器 -->
    <jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory" acknowledge="auto" client-id="client-C">
        <!-- 注意:定義 subscription(即:durableSubscriptionName)持久化主題名字 -->
        <!-- <jms:listener destination="test.topic" subscription="topic_receiver1" ref="topicReceiver1"/>
        <jms:listener destination="test.topic" subscription="topic_receiver2" ref="topicReceiver2"/> -->
    </jms:listener-container>
    
    <!-- 消息消費者 end -->
</beans>  

 

  QReceiver.java

package com.outsideasy.activemq.service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.springframework.stereotype.Component;

import com.outsideasy.activemq.model.PurchaseOrderSender;

@Component
public class QReceiver implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            /*System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText());*/
            ObjectMessage objmsg=(ObjectMessage) message;
            PurchaseOrderSender purchaseOrderSender=(PurchaseOrderSender)objmsg.getObject(); //此消息對象和發送者的對象路徑相同
            System.out.println("QueueReceiver1接收到消息:"+purchaseOrderVo.getPur_order_id());
            System.out.println("****:"+purchaseOrderVo.toString());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

 

3. 注意事項

  3.1. 序列化與反序列化問題;

    傳輸為對象類型為對象:

      生產者和消費者對象的包路徑必須一致,否則無法反序列化。

    傳輸為Map類型:

      value值必須為Integer 、Double 、String...自定義對象無法作為Value傳輸。否則提示如下錯誤:

      Only objectified primitive objects, String, Map and List types are allowed

 

  3.2.權限配置

    3.2.1 服務訪問權限及賬號密碼設置

    在%ACTIVEMQ_HOME%\conf中activemq.xml中<xml 元素參見:ActiveMQ Xml Reference>

<broker ...>
        <plugins>
            <simpleAuthenticationPlugin>
                <users>         
                    <authenticationUser username="admin" password="topsun"
                    groups="admins,publishers,consumers"/>
                    
                    <authenticationUser username="publisher" password="topsun"
                    groups="publishers,consumers"/>
                    
                    <authenticationUser username="consumer" password="topsun"
                    groups="consumers"/>
                    
                    <authenticationUser username="guest" password="topsun"
                    groups="guests"/>
                </users>
            </simpleAuthenticationPlugin>
            
            <authorizationPlugin>
                <map>
                    <authorizationMap>
                        <authorizationEntries>
                          <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
                          <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
                          <authorizationEntry queue="publishers.>" read="publishers" write="publishers" admin="admins" />
                          <authorizationEntry queue="consumers." read="consumers" write="consumers" admin="admins" /> 
                          <authorizationEntry queue="test" read="guests" write="guests" />
                        </authorizationEntries>
                    </authorizationMap>
                </map>
            </authorizationPlugin>                      
        </plugins>

</broker>

  

    3.2.2 ActiveMQ 服務網頁控制台(web console)賬號密碼設置

      需要在%ACTIVEMQ_HOME%\bin中jetty-realm.properties中配置,如下:  

## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements.  See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License.  You may obtain a copy of the License at
## 
## http://www.apache.org/licenses/LICENSE-2.0
## 
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------

# Defines users that can access the web (console, demo, etc.) # username: password [,rolename ...]
admin: admin, admin
user: user, user

 

  3.3 clientId 標簽值

    每個客戶端只能在此服務中,只能使用唯一標識值。重復使用會報錯。

 

 

  參考資料:

    1. http://www.cnblogs.com/kszit/p/3596366.html

    2. http://shmilyaw-hotmail-com.iteye.com/blog/1897635

    3. http://www.cnblogs.com/hoojo/p/active_mq_jms_apache_activeMQ.html

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM