基於ActiveMQ的Topic的數據同步——初步實現


一、背景介紹

公司自成立以來,一直以做項目為主,算是經累經驗吧,自去年以來,我們部門准備將以前的項目做成產品,大概細分了幾個小的產品,部們下面又分了幾個團隊,分別負責產品的研發,而我們屬於平台團隊,負責公用組件、開發平台的研發。

 

前期各個項目組使用的技術、框架等都不一樣,想把技術、框架統一起來比較困難,並且在早期項目研發的時,各自為戰,沒有形成合力,有些共性的東西,都是各自做自己的,現在轉將項目做成產品時,首先就是要將共性的東西,抽取出來,做成組件,通過SOA架構,將組件的服務和能力暴露出來,提高組件的重用性,例如郵件服務,任務一個產品或者系統通過標准的接口,即可發送郵件,不需要重新編寫郵件的代碼,短信服務、權限服務等

 

由於幾個項目之間有些數據是共有的,例如人員、組織,HR系統已經有人員、組織的功能,在做其它項目時,人員、組織也需要,例如4A平台,這就需要將人員、組織的數據同步,將來的目標,是由ESB同步,由於時間緊,暫時選擇了ActiveMQ的方式,HR系統中的人員、組織的數據項很多,而其它系統需要的很少,可能只需要人員和組織的名稱及其標識列,並且數據量不大,不會一次性發送上百個人員或者組織的信息,基於這個考慮,通過將人員、組織信息的數據放在消息內放到消息中件上,各個系統通過訂閱的方式獲取消息中的數據。

 

二、實現

1、安裝ActiveMQ

到ActiveMQ的官方網站下載ActiveMQ,我下載的5.7.0版,解壓,例如D盤,打開bin目錄,執行acticemq.bat,啟動ActiveMQ。

我是基於spring編寫的,新建兩個Java工程,將Spring和ActiveMQ的包導入工程中,以下是pom文件:

<dependencies>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>3.8.1</version>
    <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>4.0.5.RELEASE</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/javax.jms/jms -->
    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>jms</artifactId>
        <version>1.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/log4j/log4j -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.16</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core -->

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.13.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.13.0</version>
    </dependency>
    
  </dependencies>

 

 

2、發送接收的前提

 為發送和接收方式,把將要發送的信息封裝成對象,分別為用戶和組織的對象,包括了用戶和組織的信息,我們來看看這兩個對象

用戶對象,BaseModel是一個基類,封裝的用ID,創建人,創建時,最后更新人,最后更新時間,這個對象不再單獨列出來

package model;

import java.io.Serializable;
import java.util.Date;


/**
 * 用戶對象
 * 
 * 
 * @author Administrator
 * @version $Id$
 */
public class JmsFaUser  extends BaseModel implements Serializable {
    private static final long serialVersionUID = 1L;
    private Long id;
    private String userNo;
    private String userName;
    private String userType;
    private String identity;
    private String region;
    private String userStatus;
    private String officeEmail;
    private String employeeWorkNo;
    private Long orgId;
    private String description;
    private String attribute1;
    private String attribute2;
    private String attribute3;
    private Long OId;
    private String userSex;
    private String mobileTel;
    private String officeTel;
    private String selfEmail;
    private  Date  lastUpdatedDate;
    // Constructors

    /** default constructor */
    public JmsFaUser() {
    }

    public String getUserNo() {
        return this.userNo;
    }

    public void setUserNo(String userNo) {
        this.userNo = userNo;
    }

    public String getUserName() {
        return this.userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getUserType() {
        return this.userType;
    }

    public void setUserType(String userType) {
        this.userType = userType;
    }

    public String getIdentity() {
        return this.identity;
    }

    public void setIdentity(String identity) {
        this.identity = identity;
    }

    public String getRegion() {
        return this.region;
    }

    public void setRegion(String region) {
        this.region = region;
    }

    public String getUserStatus() {
        return this.userStatus;
    }

    public void setUserStatus(String userStatus) {
        this.userStatus = userStatus;
    }

    public String getOfficeEmail() {
        return this.officeEmail;
    }

    public void setOfficeEmail(String officeEmail) {
        this.officeEmail = officeEmail;
    }

    public String getEmployeeWorkNo() {
        return this.employeeWorkNo;
    }

    public void setEmployeeWorkNo(String employeeWorkNo) {
        this.employeeWorkNo = employeeWorkNo;
    }

    public String getDescription() {
        return this.description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public String getAttribute1() {
        return this.attribute1;
    }

    public void setAttribute1(String attribute1) {
        this.attribute1 = attribute1;
    }

    public String getAttribute2() {
        return this.attribute2;
    }

    public void setAttribute2(String attribute2) {
        this.attribute2 = attribute2;
    }

    public String getAttribute3() {
        return this.attribute3;
    }

    public void setAttribute3(String attribute3) {
        this.attribute3 = attribute3;
    }

    public String getUserSex() {
        return this.userSex;
    }

    public void setUserSex(String userSex) {
        this.userSex = userSex;
    }

    /**
     * @return the oId
     */
    public Long getOId() {
        return OId;
    }

    /**
     * @param oId
     *            the oId to set
     */
    public void setOId(Long oId) {
        OId = oId;
    }

    public String getSelfEmail() {
        return this.selfEmail;
    }

    public void setSelfEmail(String selfEmail) {
        this.selfEmail = selfEmail;
    }

    /**
     * @param orgId
     *            the orgId to set
     */
    public void setOrgId(Long orgId) {
        this.orgId = orgId;
    }

    /**
     * @return the orgId
     */
    public Long getOrgId() {
        return orgId;
    }

    /**
     * @param officeTel
     *            the officeTel to set
     */
    public void setOfficeTel(String officeTel) {
        this.officeTel = officeTel;
    }

    /**
     * @return the officeTel
     */
    public String getOfficeTel() {
        return officeTel;
    }

    /**
     * @param mobileTel
     *            the mobileTel to set
     */
    public void setMobileTel(String mobileTel) {
        this.mobileTel = mobileTel;
    }

    /**
     * @return the mobileTel
     */
    public String getMobileTel() {
        return mobileTel;
    }

    /**
     * @param id
     *            the id to set
     */
    public void setId(Long id) {
        this.id = id;
    }

    /**
     * @return the id
     */
    public Long getId() {
        return id;
    }

    public Date getLastUpdatedDate() {
        return lastUpdatedDate;
    }

    public void setLastUpdatedDate(Date lastUpdatedDate) {
        this.lastUpdatedDate = lastUpdatedDate;
    }

}

 

組織對象

package model;

import java.io.Serializable;
import java.util.Date;


/**
 * 組織對象
 * 
 * 
 * @author Administrator
 * @version $Id$
 */
public class JmsOrganize extends  BaseModel  implements Serializable{
    private static final long serialVersionUID = 1L;  
    private Long id;  
    private String orgName; //中文名  
    private String orgFullName;  
    private String orgEngName;//英文名  
    private Long orgTypeNo;  
    private String orgLevel;  
    private Long parentOrgId;  
    private String orgCode;  
    private String orgDesc;  
    private String isbranch;
    private  Date lastUpdatedDate;
      
      
    // Constructors  
  
    /** default constructor */  
    public JmsOrganize() {  
    }  
     
    /** default constructor */  
    public JmsOrganize(Long id, String orgName) {  
        this.setId(id);  
        this.orgName = orgName;  
          
    }  
      
    public JmsOrganize(Long id, Long parendId) {  
        this.setId(id);  
        this.parentOrgId = parendId;  
          
    }  
      
      
    /** minimal constructor */  
    public JmsOrganize(String orgName) {  
        this.orgName = orgName;  
    }  
  
  
    public String getOrgName() {  
        return this.orgName;  
    }  
  
    public void setOrgName(String orgName) {  
        this.orgName = orgName;  
    }  
  
    public String getOrgFullName() {  
        return this.orgFullName;  
    }  
  
    public void setOrgFullName(String orgFullName) {  
        this.orgFullName = orgFullName;  
    }  
  
    public Long getOrgTypeNo() {  
        return this.orgTypeNo;  
    }  
  
    public void setOrgTypeNo(Long orgTypeNo) {  
        this.orgTypeNo = orgTypeNo;  
    }  
  
    public String getOrgLevel() {  
        return this.orgLevel;  
    }  
  
    public void setOrgLevel(String orgLevel) {  
        this.orgLevel = orgLevel;  
    }  
  
    public String getOrgDesc() {  
        return this.orgDesc;  
    }  
  
    public void setOrgDesc(String orgDesc) {  
        this.orgDesc = orgDesc;  
    }  
  
  
    public String getIsbranch() {  
        return this.isbranch;  
    }  
  
    public void setIsbranch(String isbranch) {  
        this.isbranch = isbranch;  
    }  
  
    /** 
     * @param parentOrgId the parentOrgId to set 
     */  
    public void setParentOrgId(Long parentOrgId) {  
        this.parentOrgId = parentOrgId;  
    }  
  
    /** 
     * @return the parentOrgId 
     */  
    public Long getParentOrgId() {  
        return parentOrgId;  
    }  
  
    /** 
     * @param orgCode the orgCode to set 
     */  
    public void setOrgCode(String orgCode) {  
        this.orgCode = orgCode;  
    }  
  
    /** 
     * @return the orgCode 
     */  
    public String getOrgCode() {  
        return orgCode;  
    }  
  
    /** 
     * @param orgEngName the orgEngName to set 
     */  
    public void setOrgEngName(String orgEngName) {  
        this.orgEngName = orgEngName;  
    }  
  
    /** 
     * @return the orgEngName 
     */  
    public String getOrgEngName() {  
        return orgEngName;  
    }  
  
    /** 
     * @param id the id to set 
     */  
    public void setId(Long id) {  
        this.id = id;  
    }  
  
    /** 
     * @return the id 
     */  
    public Long getId() {  
        return id;  
    }

    public Date getLastUpdatedDate() {
        return lastUpdatedDate;
    }

    public void setLastUpdatedDate(Date lastUpdatedDate) {
        this.lastUpdatedDate = lastUpdatedDate;
    } 
}

 

由於發送的是對象,所以提供一個轉換器,Convertor,該類要繼承Spring的MessageConvertor

package com.wc82.util;


import java.text.DateFormat;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import model.JmsFaUser;
import model.JmsOrganize;

import org.apache.log4j.Logger;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;


public class FaJmsConverter    implements  MessageConverter {
    
    
    private static final Logger logger =Logger.getLogger(FaJmsConverter.class);
    
    public Message toMessage(Object obj, Session session)
            throws JMSException, MessageConversionException {
         ObjectMessage objMsg=session.createObjectMessage();      
            if(obj instanceof JmsFaUser){  
                JmsFaUser user = (JmsFaUser)obj;
                logger.info("The user message's userId is " + user.getId());  
                objMsg.setStringProperty("dataFlag", "FaUser");  
                objMsg.setStringProperty("userName", user.getUserName());  
                objMsg.setLongProperty("userId", user.getId());  
                objMsg.setStringProperty("officeEmail", user.getOfficeEmail());  
                objMsg.setStringProperty("selfEmail", user.getSelfEmail());  
                if(user.getOfficeTel() != null){  
                    objMsg.setStringProperty("officeTel", user.getOfficeTel());  
                }else{  
                    objMsg.setLongProperty("officeTel", new Long(0));  
                }  
                if(user.getMobileTel() != null){  
                    objMsg.setStringProperty("mobileTel", user.getMobileTel());  
                }else{  
                    objMsg.setLongProperty("mobileTel", new Long(0));  
                }  
                if(user.getLastUpdatedDate() != null){  
                    objMsg.setStringProperty("lastUpdatedDate", DateFormat.getDateTimeInstance().format(user.getLastUpdatedDate()));  
                }  
                  
            }else if(obj instanceof JmsOrganize){  
                JmsOrganize org = (JmsOrganize)obj;  
                logger.info("The org message's userId is " + org.getId());  
                objMsg.setStringProperty("dataFlag", "Organize");  
                objMsg.setLongProperty("orgId", org.getId());  
                objMsg.setStringProperty("orgName", org.getOrgName());  
                if(org.getLastUpdatedDate() != null){  
                    objMsg.setStringProperty("lastUpdatedDate", DateFormat.getDateTimeInstance().format(org.getLastUpdatedDate()));  
                }  
            }  
            return objMsg;  
    }

    public Object fromMessage(Message message) throws JMSException,
            MessageConversionException {
        logger.info("from message");
        ObjectMessage objMessage = (ObjectMessage)message;  
        String dataFlag = objMessage.getStringProperty("dataFlag");  
        if("FaUser".equals(dataFlag)){  
            JmsFaUser user = new JmsFaUser();  
            user.setId(objMessage.getLongProperty("userId"));  
            user.setUserName(objMessage.getStringProperty("userName"));  
            user.setOfficeEmail(objMessage.getStringProperty("officeEmail"));  
            user.setSelfEmail(objMessage.getStringProperty("selfEmail"));  
            user.setOfficeTel(objMessage.getStringProperty("officeTel"));  
            user.setMobileTel(objMessage.getStringProperty("mobileTel"));  
            String lastDate = objMessage.getStringProperty("lastUpdatedDate");  
            try {  
                if(lastDate != null){  
                    user.setLastUpdatedDate(DateFormat.getDateTimeInstance().parse(lastDate));  
                }  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
            return user;  
        }else if("Organize".equals(dataFlag)){  
            Long orgId = objMessage.getLongProperty("orgId");  
            String orgName = objMessage.getStringProperty("orgName");  
            JmsOrganize organize = new JmsOrganize(orgId, orgName);  
            String lastDate = objMessage.getStringProperty("lastUpdatedDate");  
            try {  
                organize.setLastUpdatedDate(DateFormat.getDateTimeInstance().parse(lastDate));  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
            return organize;  
        }  
        return null;
    }


}

 

 

3、發送

Sprng為我們提供了JMSTemplate,基於這個發送消息,我們先來看看Spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
    <!-- 外部屬性文件的定義  -->  
    <bean id="propertyConfigurer"  
            class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
        <property name="locations">   
            <list>   
                <value>classpath:sender.properties</value>   
            </list>  
        </property>  
    </bean>  
  
    <!-- 配置connectionFactory -->  
    <bean id="jmsSenderFactory" class="org.apache.activemq.pool.PooledConnectionFactory"  
        destroy-method="stop">  
        <property name="connectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <property name="brokerURL" value="${jms.sendBrokerURL}">  
                </property>  
                <property name="useAsyncSend" value="true"></property>  
                <property name="userName" value="admin"></property>  
                <property name="password" value="admin"></property> 
            </bean>  
        </property>  
        <property name="maxConnections" value="100"></property>  
    </bean>  
  
    <bean id="jmsConverter" class="com.wc82.util.FaJmsConverter" />  
  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <property name="connectionFactory" ref="jmsSenderFactory"></property>  
        <property name="defaultDestination" ref="destination" />  
        <!-- 區別它采用的模式為false是p2p為true是訂閱 -->  
        <property name="pubSubDomain" value="true" />  
        <property name="messageConverter" ref="jmsConverter"></property>  
    </bean>  
  
    <!-- 發送消息的目的地(一個隊列) -->  
    <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 設置消息隊列的名字 -->  
        <constructor-arg index="0" value="${jms.sendDestinationName}" />  
    </bean>  
  
    <bean id="messageProducer" class="message.MessageProducer">  
        <property name="jmsTemplate" ref="jmsTemplate"></property>  
    </bean>  
    
</beans>

 

根據上面的配置,只需要獲得messageProducer這個Bean,便可以發送,下面我來看看MessageProducer這個類及其接口

package message.face;

public interface IJMSMessageProducer {
     public abstract void converAndSendObjectMessage(Object obj);
}

 

實現類

package message;

import message.face.IJMSMessageProducer;

import org.apache.log4j.Logger;
import org.springframework.jms.core.JmsTemplate;

public class MessageProducer implements IJMSMessageProducer {
    
    private static final Logger logger =Logger.getLogger(MessageProducer.class);
    
    private JmsTemplate jmsTemplate;

    /**
     * @param jmsTemplate
     *            the jmsTemplate to set
     */
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * com.vispractice.faf.jms.IJMSMessageProducer#converAndSendObjectMessage
     * (java.lang.Object)
     * 
     * @date 2012-10-25
     * 
     * @user
     */
    public void converAndSendObjectMessage(Object obj) {
        jmsTemplate.convertAndSend(obj);
        logger.info("The message pub success, the Object is " + obj);
    }

}

 

 

發送測試

package com.wc82.ActivemqTransData;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import message.face.IJMSMessageProducer;
import model.JmsFaUser;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SendTest {
        public static void main(String[] args) {
            
            
            Properties pro=new Properties();
            try {
                InputStream input=SendTest.class.getResourceAsStream("/sender.properties");
                pro.load(input);
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                pro.setProperty("count", new Integer(0).toString());
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            Object obj=pro.get("jms.jmsSendConverterClass");
            ApplicationContext ac = new ClassPathXmlApplicationContext(  
                    "jms-sender.xml");  
            IJMSMessageProducer messageProducer = (IJMSMessageProducer) ac.getBean("messageProducer");  
            JmsFaUser user = new JmsFaUser();  
            user.setUserName("ssss");  
            user.setId(new Long(111));  
            messageProducer.converAndSendObjectMessage(user);
        }
}

 

那發送到神馬位置呢?在發送消息的Spring配置文件里面,有一個jms.sendBrokerURL,這個值是在sender.properties文件中配置的,方便修改,我們來看一下關於發送消息時所以配置的參數信息

jms.sendBrokerURL=tcp\://localhost\:61616
jms.sendDestinationName=faJMS

 

第一個指地址,因為ActiveMQ部署的是我本機,所以使用localhost,端口號在部署的時候,就是默認的

第二個JMS的名稱,這個可以自取

如果發送,可以通過localhost:8161的控制台,看到你所發送的消息,這個地址是ActiveMQ的Web控制台,接下為我們看看接收

 

4、接收

MQ給我提供一種方式,當接收到消息的時候,自動去執行我們業務代碼,

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
    <description>jms receiver configuration</description>  
      
      <!-- 外部屬性文件的定義  -->  
    <bean id="propertyConfigurer"  
            class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
        <property name="locations">   
            <list>   
                <value>classpath:receiver.properties</value>   
            </list>  
        </property>  
    </bean> 
    
    <!-- 配置connectionFactory -->  
    <bean id="jmsReceiverFactory" class="org.apache.activemq.pool.PooledConnectionFactory"  
        destroy-method="stop">  
        <property name="connectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <property name="brokerURL" value="${jms.receiveBrokerURL}" />  
<!--                 <property name="userName" value="reader"></property> -->  
<!--                 <property name="password" value="readeryxtech"></property> -->  
            </bean>  
        </property>  
        <property name="maxConnections" value="100"></property>  
    </bean>  
  
    <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 設置消息隊列的名字 -->  
        <constructor-arg index="0" value="${jms.receiveDestinationName}" />  
    </bean>  
  
    <bean id="jmsConverter" class="com.wc82.util.FaJmsConverter" />  
      
      
    <!--異步調用消息 -->  
    <bean id="receive"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="jmsReceiverFactory"></property>  
        <property name="destination" ref="destination"></property>  
        <property name="messageListener" ref="messageListener"></property>
    </bean>  
      
      <bean id="delegate" class="message.MessageConsumer"></bean>
      
    <bean id="messageListener"  
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
        <property name="delegate" ref="delegate"></property>  
        <property name="defaultListenerMethod" value="onMessage"></property>  
        <property name="messageConverter" ref="jmsConverter" />  
    </bean>
    
</beans>

 

先看一下messageListener,這個監聽,是自己寫的,其中delegate是當你接收消息之后所執行的業務代碼的bean,我這里將這個bean做成配置,方便修改,而defaultListenerMethod是指定執行的方法,這里設置定了,執行omMessage方法,也就是說類你可以指定,但是onMessage方法必須要有,而convertor也是可配的,與發送的convertor是一樣的,這個轉換器發送與接收都必須要使用

注意:這里為神馬要將執行的類的bean做成可配呢?因為我在做這一塊的工作時,我不知道是由那個bean,但是類中的方法我可設置,這樣做的目地就是在消息的消費者不在去關心這個消息,而只需要寫一個類,里面有onMessage方法,在這個方法里做自己的業務邏輯即可,將其關注點放到業務處理上,那腫么樣來設計這個方法的名字呢,我們可以設計了一個接口,如果消費者想消費這個消息,就必須實現這上接口,下面我們來看看這個接口

package message.face;

public interface FaJmsReceiveListener {
    public void onMessage(Object baseModel);
}

這個接口中只有一個方法就是onMessage,而實現類則交由具體的消費者,因為消費可能在實現的時候可能引用別的bean,進而處理別的業務,例如入庫,所以我們在做接收消息的配置上,只設計處理消息的bean的名字,不設置具體的class,只要消費者配置了這個bean的名字即可,當然這個名字也是可配的,配置在了init.propertis中,我們來看看這個接口的實現類

package message;

import org.apache.log4j.Logger;

import message.face.FaJmsReceiveListener;
import model.JmsFaUser;
import model.JmsOrganize;

public class MessageConsumer   implements  FaJmsReceiveListener{
    
    private  static  final  Logger logger=Logger.getLogger(MessageConsumer.class);
    
    public void onMessage(Object baseModel) {
        if(baseModel instanceof JmsFaUser){  
            JmsFaUser user = (JmsFaUser)baseModel;  
            logger.info(user.getId());
        }else if (baseModel instanceof JmsOrganize){  
            JmsOrganize org = (JmsOrganize)baseModel;  
            logger.info(org.getId());
        }  
    }

}

 

上述的代碼可以共用,當發送的類變更時,只需要編寫Convertor,並在init.properties中配置上即可,而消息的消費者只需要實現該接口即可

 

測試類

package com.wc82.ActivemqTransData;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ReceiveTest {
    public static void main(String[] args) {  
        ApplicationContext a2c = new ClassPathXmlApplicationContext("jms-receiver.xml");  
        System.out.println("receiver");  
    }  
}

接下來,看看接收的receiver.properties

jms.receiveBrokerURL=tcp\://localhost\:61616
jms.receiveDestinationName=faJMS
 

第一個、第二個、第三個參數不再詳細述說,第四就是執行業務邏輯的bean的名字,這個參數可以設定死也可以設定靈活

上述只是初步的能發送和接收消息,后續是考慮安全、性能的問題。

注意測試時:要先打開消息接收程序,然后再打開消息發送程序。activemq是實時性的,不會將消息保存下來,發送消息的程序在發送消息時,如果接收程序沒有啟動,那么這個接收程序就接收不到這個消息。下一節,就是用來解決這個持久訂閱的問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM