具體原理不進行深入,會用就好。
第一:當然是先安裝ActiveMQ,選擇操作系統位數,安裝成功以后,輸入網址http://ip:8161/admin/,會出現相關頁面,賬號密碼都是admin。在這個頁面上可以看到消息隊列的信息。consume和send使用較多。
第二:創建produce和consume。
基本的原理都是一樣的,JMS(java message send)的13個規范之一,消息中間件的一些組件拼接連通就行了。具體參考代碼注釋。
容易出錯的地方是消息的格式。
這里有點對點模式和發布/訂閱模式。我使用的時候是在初始化ContextListener的時候初始化ActiveMQ,然后並重載了onMessage事件(MessageConsumer里面定義了一些接口,可以去使用),這樣就不需要一直主動去詢問提供者了,隊列中有消息時會自動觸發該事件。
參考代碼:
package com.enjoyor.soa.traffic.server.nmim.listener; import javax.jms.Message; import javax.jms.MessageListener; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import com.enjoyor.soa.traffic.server.nmim.util.ActiveMQUtil; import com.enjoyor.soa.traffic.server.nmim.util.Constants; import com.enjoyor.soa.traffic.server.nmim.util.InitConfig; import com.enjoyor.soa.traffic.server.nmim.service.inner.MqManageService; import com.enjoyor.soa.traffic.util.frame.spring.SpringContextUtil; import com.enjoyor.soa.traffic.util.helper.LoggerHelper; import com.enjoyor.soa.traffic.util.pojo.ResultPojo; /** * * ContextListener * 2016-11-25 * */ public class ContextListener implements ServletContextListener { public void contextDestroyed(ServletContextEvent arg0) { System.out.print("contextDestroyed"); } MqManageService manageService; public void contextInitialized(ServletContextEvent arg0) { System.out.print("contextInitialized"); String MQNAME = ""; InitConfig initConfig = new InitConfig(); initConfig.init(); MQNAME = InitConfig.confCache.get(Constants.MQNAME).toString(); ActiveMQUtil.getInstance().receive("queue://" + MQNAME, new MessageListener() { @Override public void onMessage(Message msg) { try { if(null==manageService){ manageService = (MqManageService) SpringContextUtil.getBean("mqManageService"); manageService.setListDic(); } //MqManageService manageService = (MqManageService) SpringContextUtil.getBean("mqManageService"); ResultPojo pojo = manageService.getMqMsg(msg); if(!pojo.getappCode().equals("0")){ LoggerHelper.LOG.error("接收失敗"+pojo.getResultList()); } } catch (Exception e) { LoggerHelper.LOG.error(e); } } }); } } ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ package com.enjoyor.soa.traffic.server.nmim.util; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class ActiveMQUtil { private ActiveMQUtil(){ } private JmsTemplate jmsTemplate; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } private static ActiveMQUtil mq; public static ActiveMQUtil getInstance(){ if(mq==null){ ApplicationContext context = new ClassPathXmlApplicationContext("activemq.xml"); mq=(ActiveMQUtil)context.getBean("activeMQUtil"); } return mq; } public Destination getDestination(String name){ Destination dest=null; boolean pubSubDomain=name.toLowerCase().indexOf("topic://")==0; String destName = name.replaceFirst("(?i)(topic|queue)://", ""); if(pubSubDomain){ dest= new ActiveMQTopic(destName); }else{ dest= new ActiveMQQueue(destName); } return dest; } public String receive(Destination dest) { try { TextMessage txtmsg = (TextMessage) getInstance().getJmsTemplate().receive(dest); if(null!=txtmsg)return txtmsg.getText(); } catch (Exception e) { e.printStackTrace(); } return null; } public String receive(String name) { return receive(getDestination(name)); } public Connection receive(Destination dest,MessageListener listener){ try{ ConnectionFactory factory = getInstance().getJmsTemplate().getConnectionFactory(); Connection connection = factory.createConnection(); connection.start(); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(dest); consumer.setMessageListener(listener); return connection; } catch (JMSException e) { } return null; } public Connection receive(String name,MessageListener listener){ return receive(getDestination(name),listener); } public void send(Destination dest,final String msg){ try { jmsTemplate.send(dest,new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(msg); return message; } }); } catch (Exception e) { e.printStackTrace(); } } public void send(String name,final String msg){ send(getDestination(name),msg); } }
注意將ContextListener初始化時注入spring容器中。任何類都需要一個入口,只有進入spring容器中,在jvm中被調用,才有存在的價值。