ActiveMQ 在java中的使用,通過單例模式、工廠實現
Jms規范里的兩種message傳輸方式Topic和Queue,兩者的對比如下表():
Topic | Queue | |
概要 | Publish Subscribe messaging 發布訂閱消息 | Point-to-Point 點對點 |
有無狀態 | topic數據默認不落地,是無狀態的。 | Queue數據默認會在mq服務器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲。 |
完整性保障 | 並不保證publisher發布的每條數據,Subscriber都能接受到。 | Queue保證每條數據都能被receiver接收。 |
消息是否會丟失 | 一般來說publisher發布消息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丟失了。 | Sender發送消息到目標Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丟失。 |
消息發布接收策略 | 一對多的消息發布接收策略,監聽同一個topic地址的多個sub都能收到publisher發送的消息。Sub接收完通知mq服務器 | 一對一的消息發布接收策略,一個sender發送的消息,只能有一個receiver接收。receiver接收完后,通知mq服務器已接收,mq服務器對queue里的消息采取刪除或其他操作。 |
一、導jar包
activemq的依賴包
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jul-to-slf4j</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.1.RELEASE</version> <exclusions> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> </exclusion> </exclusions> </dependency>
二、java代碼
創建一下四個java文件,成為mq的公共數據連接池
1、連接工廠 配置
package com.broadsense.iov.base.jms; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.jms.connection.CachingConnectionFactory; /** * 連接工廠 配置 * * @author flm * 2017年10月13日 */ public class ConnectionFactory { private static final String URL = "tcp://10.10.1.1:61616"; private static final String USERNAME = "hkadmin"; private static final String PASSWORD = "hk667"; private static final int SESSIONCACHESIZE = 20; private javax.jms.ConnectionFactory factory; public static synchronized javax.jms.ConnectionFactory getInstance() { if (SingletonHolder.INSTANCE.factory == null) { SingletonHolder.INSTANCE.build(); } return SingletonHolder.INSTANCE.factory; } private void build() { AMQConfigBean bean = loadConfigure(); this.factory = buildConnectionFactory(bean); } private javax.jms.ConnectionFactory buildConnectionFactory(AMQConfigBean bean) { javax.jms.ConnectionFactory targetFactory = new ActiveMQConnectionFactory(bean.getUserName(), bean.getPassword(), bean.getBrokerURL()); CachingConnectionFactory connectoryFacotry = new CachingConnectionFactory(); connectoryFacotry.setTargetConnectionFactory(targetFactory); connectoryFacotry.setSessionCacheSize(bean.getSessionCacheSize()); return connectoryFacotry; } private AMQConfigBean loadConfigure() { if ("tcp://10.10.1.1:61616" != null) { try { return new AMQConfigBean("tcp://10.10.1.1:61616", "hkadmin", "hk667", 20); } catch (Exception e) { throw new IllegalStateException("load amq config error!"); } } throw new IllegalStateException("load amq config error!"); } private static class AMQConfigBean { private String brokerURL; private String userName; private String password; private int sessionCacheSize; public AMQConfigBean() { } public AMQConfigBean(String brokerURL, String userName, String password, int sessionCacheSize) { this.brokerURL = brokerURL; this.userName = userName; this.password = password; this.sessionCacheSize = sessionCacheSize; } public String getBrokerURL() { return this.brokerURL; } public void setBrokerURL(String brokerURL) { this.brokerURL = brokerURL; } public String getUserName() { return this.userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return this.password; } public void setPassword(String password) { this.password = password; } public int getSessionCacheSize() { return this.sessionCacheSize; } public void setSessionCacheSize(int sessionCacheSize) { this.sessionCacheSize = sessionCacheSize; } } private static class SingletonHolder { static ConnectionFactory INSTANCE = new ConnectionFactory(null); } }
2、模版
package com.broadsense.iov.base.jms; import org.springframework.jms.core.JmsTemplate;
/**
* 模板廠
*
* @author flm
* 2017年10月13日
*/
public class JmsTemplateFactory { private final javax.jms.ConnectionFactory factory; private JmsTemplate topicJmsTemplate; private JmsTemplate queueJmsTemplate; public static JmsTemplateFactory getInstance() { return SingletonHolder.INSTANCE; } private JmsTemplateFactory() { this.factory = ConnectionFactory.getInstance(); } public synchronized JmsTemplate getTopicJmsTemplate() { if (this.topicJmsTemplate == null) { this.topicJmsTemplate = createTemplate(this.factory, true); } return this.topicJmsTemplate; } public synchronized JmsTemplate getQueueJmsTemplate() { if (this.queueJmsTemplate == null) { this.queueJmsTemplate = createTemplate(this.factory, false); } return this.queueJmsTemplate; } private JmsTemplate createTemplate(javax.jms.ConnectionFactory factory, boolean pubSubDomain) { JmsTemplate template = new JmsTemplate(factory); template.setPubSubDomain(pubSubDomain); return template; } public static class SingletonHolder { static JmsTemplateFactory INSTANCE = new JmsTemplateFactory(null); } }
3、消費者 模版
package com.broadsense.iov.base.jms; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.jms.Destination; import javax.jms.MessageListener; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.listener.SimpleMessageListenerContainer; /** * JMS監聽器 創建消費者 * * @author flm * 2017年10月13日 */ public class JMSListener { private static final Logger LOGGER = LoggerFactory.getLogger(JMSListener.class); private static final Map<String, Destination> MQDESTS = new ConcurrentHashMap();
/**
* 開啟一個 點對點的 消息隊列監聽 的消費者
*
* @param queueName 隊列名稱
* @param subName 訂閱者的名字
* @param listener 監聽
*/
public static synchronized void startJmsQueueListener(String queueName, MessageListener listener)
{ startJmsQueueListener(queueName, null, listener); } public static synchronized void startJmsQueueListener(String queueName, String subName, MessageListener listener) { Destination dst = (Destination)MQDESTS.get("QUEUE_" + queueName); if (dst == null) { ActiveMQQueue mq = new ActiveMQQueue(queueName); startJmsListener(mq, subName, listener); MQDESTS.put("QUEUE_" + queueName, mq); } else { LOGGER.warn(queueName + " already started"); } }
/**
* 開啟 一對多 主題的 消息監聽的消費者
*
* @param topicName 主題消息名稱
* @param subName 訂閱者的名字
* @param listener 監聽
*/
public static synchronized void startJmsTopicListener(String topicName, MessageListener listener) { startJmsTopicListener(topicName, null, listener); } public static synchronized void startJmsTopicListener(String topicName, String subName, MessageListener listener) { ActiveMQTopic mq = new ActiveMQTopic(topicName); startJmsListener(mq, subName, listener); MQDESTS.put("QUEUE_" + topicName, mq); }
/**
* 開始 消息監聽器 消費者
*
* @param dest 目的地
* @param subName 持久訂閱的名字
* @param msgListener 消息監聽器
*/
private static void startJmsListener(Destination dest, String subName, MessageListener msgListener) { javax.jms.ConnectionFactory factory = ConnectionFactory.getInstance(); SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer(); listener.setConnectionFactory(factory); listener.setDestination(dest); listener.setMessageListener(msgListener); if ((subName != null) && (subName != "")) { listener.setDurableSubscriptionName(subName); } listener.start(); } }
4、生產者 模版
package com.broadsense.iov.base.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; /** * 創建 jms生產者 * * @author flm * 2017年10月13日 */ public class JMSPublisher {
/**
* 發送消息
* Topic 生產者
*
* @param dest 目的地
* @param msg 消息內容
*/
public static void sendTopicMessage(String dest, String msg) { JmsTemplateFactory.getInstance().getTopicJmsTemplate().send(dest, new MessageCreator(msg) { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(this.val$msg); } }); }
/**
* 發送消息
* Queue 生產者
*
* @param dest 目的地
* @param msg 消息內容
*/
public static void sendQueueMessage(String dest, String msg) { JmsTemplateFactory.getInstance().getQueueJmsTemplate().send(dest, new MessageCreator(msg) { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(this.val$msg); } }); } }
三、activemq的使用
1、創建一個junit測試,@Test 發布、接受、即可看到消息,mq管理后台也可以看到

package com.broadsense.iov.base.jms; import com.broadsense.iov.base.jms.JMSListener; import com.broadsense.iov.base.jms.JMSPublisher; import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.junit.Test; /** * * @author flm */ public class JMSPublisherTest { public JMSPublisherTest() { } /** * 生產者 發布消息 * @throws */ @Test public void testSendMessage() throws InterruptedException { for (int idx = 1; idx < 3; idx++) { /* * 生產者 發布 消息到 queue/queue_b 的隊列中 */ JMSPublisher.sendQueueMessage("queue/queue_b", String.valueOf(idx * 1111)); /* * 生產者 發布消息 到 topic/send 的Topic 主題中 */ //JMSPublisher.sendTopicMessage("topic/send", String.valueOf(idx * 1111)); } } /** * 消費者 訂閱接受消息 */ @Test public void receiver() { /* * 消費者 訂閱主題 topic/send 是否有消息發布,有側打印出來 (通過 onMessage 監聽) */ /*JMSListener.startJmsTopicListener("topic/send", new MessageListener() { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; System.out.println("== 收到一個JMS消息..." + msg.getText()); } } catch (JMSException e) { e.printStackTrace(); } } });*/ /* * 消費者 訂閱隊列 queue/queue_b 是否有消息發布,有側打印出來 (通過 onMessage 監聽) */ JMSListener.startJmsQueueListener("queue/queue_b" ,new MessageListener() { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; System.out.println("== 收到一個JMS消息..." + msg.getText()); } } catch (JMSException e) { e.printStackTrace(); } } }); try { System.in.read(); } catch (IOException ex) { Logger.getLogger(JMSPublisherTest.class.getName()).log(Level.SEVERE, null, ex); } } }
2、真正的項目實現
在項目的中具體實現,是加載一個類來實現訂閱消息
加載啟動一個訂閱的主題,給一個類MQ()處理

package com.ifengSearch.track.dao; import org.springframework.stereotype.Repository; import com.broadsense.iov.base.jms.JMSListener; /** * 項目啟動即 開啟 * 通過 spring 依賴加載 Lister 訂閱topic/send * @author flm * @2017年10月16日 */ @Repository public class Lister { public Lister(){ try { JMSListener.startJmsTopicListener("topic/send",new QM());// QM() 訂閱 主題 topic/send } catch (Exception e) { } } }
MQ()訂閱消息的處理類,通過實現

package com.ifengSearch.track.dao; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 通過 實現 MessageListener 的 onMessage 來監聽消息 * 接受、處理消息 * @author flm * @2017年10月16日 */ public class MQ implements MessageListener { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; System.out.println("== 收到一個JMS消息..." + msg.getText()); } } catch (JMSException e) { e.printStackTrace(); } } }