一、背景介紹
公司自成立以來,一直以做項目為主,算是經累經驗吧,自去年以來,我們部門准備將以前的項目做成產品,大概細分了幾個小的產品,部們下面又分了幾個團隊,分別負責產品的研發,而我們屬於平台團隊,負責公用組件、開發平台的研發。
前期各個項目組使用的技術、框架等都不一樣,想把技術、框架統一起來比較困難,並且在早期項目研發的時,各自為戰,沒有形成合力,有些共性的東西,都是各自做自己的,現在轉將項目做成產品時,首先就是要將共性的東西,抽取出來,做成組件,通過SOA架構,將組件的服務和能力暴露出來,提高組件的重用性,例如郵件服務,任務一個產品或者系統通過標准的接口,即可發送郵件,不需要重新編寫郵件的代碼,短信服務、權限服務等
由於幾個項目之間有些數據是共有的,例如人員、組織,HR系統已經有人員、組織的功能,在做其它項目時,人員、組織也需要,例如4A平台,這就需要將人員、組織的數據同步,將來的目標,是由ESB同步,由於時間緊,暫時選擇了ActiveMQ的方式,HR系統中的人員、組織的數據項很多,而其它系統需要的很少,可能只需要人員和組織的名稱及其標識列,並且數據量不大,不會一次性發送上百個人員或者組織的信息,基於這個考慮,通過將人員、組織信息的數據放在消息內放到消息中件上,各個系統通過訂閱的方式獲取消息中的數據。
二、實現
1、安裝ActiveMQ
到ActiveMQ的官方網站下載ActiveMQ,我下載的5.7.0版,解壓,例如D盤,打開bin目錄,執行acticemq.bat,啟動ActiveMQ。
我是基於spring編寫的,新建兩個Java工程,將Spring和ActiveMQ的包導入工程中,以下是pom文件:
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.0.5.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/javax.jms/jms --> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency> <!-- https://mvnrepository.com/artifact/log4j/log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.13.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.13.0</version> </dependency> </dependencies>
2、發送接收的前提
為發送和接收方式,把將要發送的信息封裝成對象,分別為用戶和組織的對象,包括了用戶和組織的信息,我們來看看這兩個對象
用戶對象,BaseModel是一個基類,封裝的用ID,創建人,創建時,最后更新人,最后更新時間,這個對象不再單獨列出來
package model; import java.io.Serializable; import java.util.Date; /** * 用戶對象 * * * @author Administrator * @version $Id$ */ public class JmsFaUser extends BaseModel implements Serializable { private static final long serialVersionUID = 1L; private Long id; private String userNo; private String userName; private String userType; private String identity; private String region; private String userStatus; private String officeEmail; private String employeeWorkNo; private Long orgId; private String description; private String attribute1; private String attribute2; private String attribute3; private Long OId; private String userSex; private String mobileTel; private String officeTel; private String selfEmail; private Date lastUpdatedDate; // Constructors /** default constructor */ public JmsFaUser() { } public String getUserNo() { return this.userNo; } public void setUserNo(String userNo) { this.userNo = userNo; } public String getUserName() { return this.userName; } public void setUserName(String userName) { this.userName = userName; } public String getUserType() { return this.userType; } public void setUserType(String userType) { this.userType = userType; } public String getIdentity() { return this.identity; } public void setIdentity(String identity) { this.identity = identity; } public String getRegion() { return this.region; } public void setRegion(String region) { this.region = region; } public String getUserStatus() { return this.userStatus; } public void setUserStatus(String userStatus) { this.userStatus = userStatus; } public String getOfficeEmail() { return this.officeEmail; } public void setOfficeEmail(String officeEmail) { this.officeEmail = officeEmail; } public String getEmployeeWorkNo() { return this.employeeWorkNo; } public void setEmployeeWorkNo(String employeeWorkNo) { this.employeeWorkNo = employeeWorkNo; } public String getDescription() { return this.description; } public void setDescription(String description) { this.description = description; } public String getAttribute1() { return this.attribute1; } public void setAttribute1(String attribute1) { this.attribute1 = attribute1; } public String getAttribute2() { return this.attribute2; } public void setAttribute2(String attribute2) { this.attribute2 = attribute2; } public String getAttribute3() { return this.attribute3; } public void setAttribute3(String attribute3) { this.attribute3 = attribute3; } public String getUserSex() { return this.userSex; } public void setUserSex(String userSex) { this.userSex = userSex; } /** * @return the oId */ public Long getOId() { return OId; } /** * @param oId * the oId to set */ public void setOId(Long oId) { OId = oId; } public String getSelfEmail() { return this.selfEmail; } public void setSelfEmail(String selfEmail) { this.selfEmail = selfEmail; } /** * @param orgId * the orgId to set */ public void setOrgId(Long orgId) { this.orgId = orgId; } /** * @return the orgId */ public Long getOrgId() { return orgId; } /** * @param officeTel * the officeTel to set */ public void setOfficeTel(String officeTel) { this.officeTel = officeTel; } /** * @return the officeTel */ public String getOfficeTel() { return officeTel; } /** * @param mobileTel * the mobileTel to set */ public void setMobileTel(String mobileTel) { this.mobileTel = mobileTel; } /** * @return the mobileTel */ public String getMobileTel() { return mobileTel; } /** * @param id * the id to set */ public void setId(Long id) { this.id = id; } /** * @return the id */ public Long getId() { return id; } public Date getLastUpdatedDate() { return lastUpdatedDate; } public void setLastUpdatedDate(Date lastUpdatedDate) { this.lastUpdatedDate = lastUpdatedDate; } }
組織對象
package model; import java.io.Serializable; import java.util.Date; /** * 組織對象 * * * @author Administrator * @version $Id$ */ public class JmsOrganize extends BaseModel implements Serializable{ private static final long serialVersionUID = 1L; private Long id; private String orgName; //中文名 private String orgFullName; private String orgEngName;//英文名 private Long orgTypeNo; private String orgLevel; private Long parentOrgId; private String orgCode; private String orgDesc; private String isbranch; private Date lastUpdatedDate; // Constructors /** default constructor */ public JmsOrganize() { } /** default constructor */ public JmsOrganize(Long id, String orgName) { this.setId(id); this.orgName = orgName; } public JmsOrganize(Long id, Long parendId) { this.setId(id); this.parentOrgId = parendId; } /** minimal constructor */ public JmsOrganize(String orgName) { this.orgName = orgName; } public String getOrgName() { return this.orgName; } public void setOrgName(String orgName) { this.orgName = orgName; } public String getOrgFullName() { return this.orgFullName; } public void setOrgFullName(String orgFullName) { this.orgFullName = orgFullName; } public Long getOrgTypeNo() { return this.orgTypeNo; } public void setOrgTypeNo(Long orgTypeNo) { this.orgTypeNo = orgTypeNo; } public String getOrgLevel() { return this.orgLevel; } public void setOrgLevel(String orgLevel) { this.orgLevel = orgLevel; } public String getOrgDesc() { return this.orgDesc; } public void setOrgDesc(String orgDesc) { this.orgDesc = orgDesc; } public String getIsbranch() { return this.isbranch; } public void setIsbranch(String isbranch) { this.isbranch = isbranch; } /** * @param parentOrgId the parentOrgId to set */ public void setParentOrgId(Long parentOrgId) { this.parentOrgId = parentOrgId; } /** * @return the parentOrgId */ public Long getParentOrgId() { return parentOrgId; } /** * @param orgCode the orgCode to set */ public void setOrgCode(String orgCode) { this.orgCode = orgCode; } /** * @return the orgCode */ public String getOrgCode() { return orgCode; } /** * @param orgEngName the orgEngName to set */ public void setOrgEngName(String orgEngName) { this.orgEngName = orgEngName; } /** * @return the orgEngName */ public String getOrgEngName() { return orgEngName; } /** * @param id the id to set */ public void setId(Long id) { this.id = id; } /** * @return the id */ public Long getId() { return id; } public Date getLastUpdatedDate() { return lastUpdatedDate; } public void setLastUpdatedDate(Date lastUpdatedDate) { this.lastUpdatedDate = lastUpdatedDate; } }
由於發送的是對象,所以提供一個轉換器,Convertor,該類要繼承Spring的MessageConvertor
package com.wc82.util; import java.text.DateFormat; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import model.JmsFaUser; import model.JmsOrganize; import org.apache.log4j.Logger; import org.springframework.jms.support.converter.MessageConversionException; import org.springframework.jms.support.converter.MessageConverter; public class FaJmsConverter implements MessageConverter { private static final Logger logger =Logger.getLogger(FaJmsConverter.class); public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException { ObjectMessage objMsg=session.createObjectMessage(); if(obj instanceof JmsFaUser){ JmsFaUser user = (JmsFaUser)obj; logger.info("The user message's userId is " + user.getId()); objMsg.setStringProperty("dataFlag", "FaUser"); objMsg.setStringProperty("userName", user.getUserName()); objMsg.setLongProperty("userId", user.getId()); objMsg.setStringProperty("officeEmail", user.getOfficeEmail()); objMsg.setStringProperty("selfEmail", user.getSelfEmail()); if(user.getOfficeTel() != null){ objMsg.setStringProperty("officeTel", user.getOfficeTel()); }else{ objMsg.setLongProperty("officeTel", new Long(0)); } if(user.getMobileTel() != null){ objMsg.setStringProperty("mobileTel", user.getMobileTel()); }else{ objMsg.setLongProperty("mobileTel", new Long(0)); } if(user.getLastUpdatedDate() != null){ objMsg.setStringProperty("lastUpdatedDate", DateFormat.getDateTimeInstance().format(user.getLastUpdatedDate())); } }else if(obj instanceof JmsOrganize){ JmsOrganize org = (JmsOrganize)obj; logger.info("The org message's userId is " + org.getId()); objMsg.setStringProperty("dataFlag", "Organize"); objMsg.setLongProperty("orgId", org.getId()); objMsg.setStringProperty("orgName", org.getOrgName()); if(org.getLastUpdatedDate() != null){ objMsg.setStringProperty("lastUpdatedDate", DateFormat.getDateTimeInstance().format(org.getLastUpdatedDate())); } } return objMsg; } public Object fromMessage(Message message) throws JMSException, MessageConversionException { logger.info("from message"); ObjectMessage objMessage = (ObjectMessage)message; String dataFlag = objMessage.getStringProperty("dataFlag"); if("FaUser".equals(dataFlag)){ JmsFaUser user = new JmsFaUser(); user.setId(objMessage.getLongProperty("userId")); user.setUserName(objMessage.getStringProperty("userName")); user.setOfficeEmail(objMessage.getStringProperty("officeEmail")); user.setSelfEmail(objMessage.getStringProperty("selfEmail")); user.setOfficeTel(objMessage.getStringProperty("officeTel")); user.setMobileTel(objMessage.getStringProperty("mobileTel")); String lastDate = objMessage.getStringProperty("lastUpdatedDate"); try { if(lastDate != null){ user.setLastUpdatedDate(DateFormat.getDateTimeInstance().parse(lastDate)); } } catch (Exception e) { e.printStackTrace(); } return user; }else if("Organize".equals(dataFlag)){ Long orgId = objMessage.getLongProperty("orgId"); String orgName = objMessage.getStringProperty("orgName"); JmsOrganize organize = new JmsOrganize(orgId, orgName); String lastDate = objMessage.getStringProperty("lastUpdatedDate"); try { organize.setLastUpdatedDate(DateFormat.getDateTimeInstance().parse(lastDate)); } catch (Exception e) { e.printStackTrace(); } return organize; } return null; } }
3、發送
Sprng為我們提供了JMSTemplate,基於這個發送消息,我們先來看看Spring配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <!-- 外部屬性文件的定義 --> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:sender.properties</value> </list> </property> </bean> <!-- 配置connectionFactory --> <bean id="jmsSenderFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.sendBrokerURL}"> </property> <property name="useAsyncSend" value="true"></property> <property name="userName" value="admin"></property> <property name="password" value="admin"></property> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <bean id="jmsConverter" class="com.wc82.util.FaJmsConverter" /> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsSenderFactory"></property> <property name="defaultDestination" ref="destination" /> <!-- 區別它采用的模式為false是p2p為true是訂閱 --> <property name="pubSubDomain" value="true" /> <property name="messageConverter" ref="jmsConverter"></property> </bean> <!-- 發送消息的目的地(一個隊列) --> <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 設置消息隊列的名字 --> <constructor-arg index="0" value="${jms.sendDestinationName}" /> </bean> <bean id="messageProducer" class="message.MessageProducer"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean> </beans>
根據上面的配置,只需要獲得messageProducer這個Bean,便可以發送,下面我來看看MessageProducer這個類及其接口
package message.face; public interface IJMSMessageProducer { public abstract void converAndSendObjectMessage(Object obj); }
實現類
package message; import message.face.IJMSMessageProducer; import org.apache.log4j.Logger; import org.springframework.jms.core.JmsTemplate; public class MessageProducer implements IJMSMessageProducer { private static final Logger logger =Logger.getLogger(MessageProducer.class); private JmsTemplate jmsTemplate; /** * @param jmsTemplate * the jmsTemplate to set */ public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } /* * (non-Javadoc) * * @see * com.vispractice.faf.jms.IJMSMessageProducer#converAndSendObjectMessage * (java.lang.Object) * * @date 2012-10-25 * * @user */ public void converAndSendObjectMessage(Object obj) { jmsTemplate.convertAndSend(obj); logger.info("The message pub success, the Object is " + obj); } }
發送測試類
package com.wc82.ActivemqTransData; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import message.face.IJMSMessageProducer; import model.JmsFaUser; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SendTest { public static void main(String[] args) { Properties pro=new Properties(); try { InputStream input=SendTest.class.getResourceAsStream("/sender.properties"); pro.load(input); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); pro.setProperty("count", new Integer(0).toString()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } Object obj=pro.get("jms.jmsSendConverterClass"); ApplicationContext ac = new ClassPathXmlApplicationContext( "jms-sender.xml"); IJMSMessageProducer messageProducer = (IJMSMessageProducer) ac.getBean("messageProducer"); JmsFaUser user = new JmsFaUser(); user.setUserName("ssss"); user.setId(new Long(111)); messageProducer.converAndSendObjectMessage(user); } }
那發送到神馬位置呢?在發送消息的Spring配置文件里面,有一個jms.sendBrokerURL,這個值是在sender.properties文件中配置的,方便修改,我們來看一下關於發送消息時所以配置的參數信息
jms.sendBrokerURL=tcp\://localhost\:61616 jms.sendDestinationName=faJMS
第一個指地址,因為ActiveMQ部署的是我本機,所以使用localhost,端口號在部署的時候,就是默認的
第二個JMS的名稱,這個可以自取
如果發送,可以通過localhost:8161的控制台,看到你所發送的消息,這個地址是ActiveMQ的Web控制台,接下為我們看看接收
4、接收
MQ給我提供一種方式,當接收到消息的時候,自動去執行我們業務代碼,
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <description>jms receiver configuration</description> <!-- 外部屬性文件的定義 --> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:receiver.properties</value> </list> </property> </bean> <!-- 配置connectionFactory --> <bean id="jmsReceiverFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.receiveBrokerURL}" /> <!-- <property name="userName" value="reader"></property> --> <!-- <property name="password" value="readeryxtech"></property> --> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 設置消息隊列的名字 --> <constructor-arg index="0" value="${jms.receiveDestinationName}" /> </bean> <bean id="jmsConverter" class="com.wc82.util.FaJmsConverter" /> <!--異步調用消息 --> <bean id="receive" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsReceiverFactory"></property> <property name="destination" ref="destination"></property> <property name="messageListener" ref="messageListener"></property> </bean> <bean id="delegate" class="message.MessageConsumer"></bean> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="delegate"></property> <property name="defaultListenerMethod" value="onMessage"></property> <property name="messageConverter" ref="jmsConverter" /> </bean> </beans>
先看一下messageListener,這個監聽,是自己寫的,其中delegate是當你接收消息之后所執行的業務代碼的bean,我這里將這個bean做成配置,方便修改,而defaultListenerMethod是指定執行的方法,這里設置定了,執行omMessage方法,也就是說類你可以指定,但是onMessage方法必須要有,而convertor也是可配的,與發送的convertor是一樣的,這個轉換器發送與接收都必須要使用
注意:這里為神馬要將執行的類的bean做成可配呢?因為我在做這一塊的工作時,我不知道是由那個bean,但是類中的方法我可設置,這樣做的目地就是在消息的消費者不在去關心這個消息,而只需要寫一個類,里面有onMessage方法,在這個方法里做自己的業務邏輯即可,將其關注點放到業務處理上,那腫么樣來設計這個方法的名字呢,我們可以設計了一個接口,如果消費者想消費這個消息,就必須實現這上接口,下面我們來看看這個接口
package message.face; public interface FaJmsReceiveListener { public void onMessage(Object baseModel); }
這個接口中只有一個方法就是onMessage,而實現類則交由具體的消費者,因為消費可能在實現的時候可能引用別的bean,進而處理別的業務,例如入庫,所以我們在做接收消息的配置上,只設計處理消息的bean的名字,不設置具體的class,只要消費者配置了這個bean的名字即可,當然這個名字也是可配的,配置在了init.propertis中,我們來看看這個接口的實現類
package message; import org.apache.log4j.Logger; import message.face.FaJmsReceiveListener; import model.JmsFaUser; import model.JmsOrganize; public class MessageConsumer implements FaJmsReceiveListener{ private static final Logger logger=Logger.getLogger(MessageConsumer.class); public void onMessage(Object baseModel) { if(baseModel instanceof JmsFaUser){ JmsFaUser user = (JmsFaUser)baseModel; logger.info(user.getId()); }else if (baseModel instanceof JmsOrganize){ JmsOrganize org = (JmsOrganize)baseModel; logger.info(org.getId()); } } }
上述的代碼可以共用,當發送的類變更時,只需要編寫Convertor,並在init.properties中配置上即可,而消息的消費者只需要實現該接口即可
測試類
package com.wc82.ActivemqTransData; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ReceiveTest { public static void main(String[] args) { ApplicationContext a2c = new ClassPathXmlApplicationContext("jms-receiver.xml"); System.out.println("receiver"); } }
接下來,看看接收的receiver.properties
jms.receiveBrokerURL=tcp\://localhost\:61616 jms.receiveDestinationName=faJMS
第一個、第二個、第三個參數不再詳細述說,第四就是執行業務邏輯的bean的名字,這個參數可以設定死也可以設定靈活
上述只是初步的能發送和接收消息,后續是考慮安全、性能的問題。
注意測試時:要先打開消息接收程序,然后再打開消息發送程序。activemq是實時性的,不會將消息保存下來,發送消息的程序在發送消息時,如果接收程序沒有啟動,那么這個接收程序就接收不到這個消息。下一節,就是用來解決這個持久訂閱的問題。