一、入門
1. 消息中間件的定義
沒有標准定義,一般認為,采用消息傳送機制/消息隊列 的中間件技術,進行數據交流,用在分布式系統的集成
2. 為什么要用消息中間件
解決分布式系統之間消息的傳遞。
電商場景:
用戶下單減庫存,調用物流系統。隨着業務量的增大,需要對系統進行拆分(服務化和業務拆分),拆分后的系統之間的交互一般用RPC(遠程過程調用)。如果系統擴充到有幾十個接口,就需要用消息中間件來解決問題。
3. 消息中間件和RPC有什么區別
3.1 功能特點:
在架構上,RPC和Message的差異點:Message有一個中間結點Message Queue,可以把消息存儲。
3.2 消息的特點
Message Queue把請求的壓力保存一下,逐漸釋放出來,讓處理者按照自己的節奏來處理。
Message Queue引入一下新的結點,讓系統的可靠性會受Message Queue結點的影響。
Message Queue是異步單向的消息。發送消息設計成是不需要等待消息處理的完成。
所以對於有同步返回需求,用Message Queue則變得麻煩了。
3.3 PRC的特點
同步調用,對於要等待返回結果/處理結果的場景,RPC是可以非常自然直覺的使用方式。
# RPC也可以是異步調用。
由於等待結果,Consumer(Client)會有線程消耗。
如果以異步RPC的方式使用,Consumer(Client)線程消耗可以去掉。但不能做到像消息一樣暫存消息/請求,壓力會直接傳導到服務Provider。
3.4 適用場合說明
希望同步得到結果的場合,RPC合適。
希望使用簡單,則RPC;RPC操作基於接口,使用簡單,使用方式模擬本地調用。異步的方式編程比較復雜。
不希望發送端(RPC Consumer、Message Sender)受限於處理端(RPC Provider、Message Receiver)的速度時,使用Message Queue。
隨着業務增長,有的處理端處理量會成為瓶頸,會進行同步調用到異步消息的改造。
這樣的改造實際上有調整業務的使用方式。
比如原來一個操作頁面提交后就下一個頁面會看到處理結果;改造后異步消息后,下一個頁面就會變成“操作已提交,完成后會得到通知”。
4. 消息中間件的使用場景
4.1 異步處理
用戶注冊(50ms),還需發送郵件(50ms)和短信(50ms)
串行:(150ms)用戶注冊—》發送郵件----》發送短信
並行(100ms):用戶注冊—》發送郵件----》發送短信
消息中間件(56ms):
用戶注冊(50ms)—》(6ms)消息中間件《-----發送郵件《-----發送短信
說明:
用戶注冊時,可能還需要同時發送郵件和短信,使用串行的方式進行處理時花費的時間比較久;這時就會考慮並行處理,即用戶注冊完以后,同時啟動兩個兩個線程去發送郵件和短信,這樣時間就會花費得更少。
如果引入消息中間件的話就會比並行處理更快,即用戶注冊時,把注冊信息放到消息中間件里面,然后發送郵件和短信的程序自己去消息中間件里面那用戶注冊的消息來消費
4.2 應用的解耦
訂單系統---》庫存系統(強耦合)
消息中間件:訂單系統---》消息中間件《----庫存系統(解耦)
原因:下訂單以后還要同步去修改庫存,沒有必要耦合在一起
說明:
用戶下訂單時,可能還需要去更新庫存,這個時候下訂單的后,還需要同步去更新庫存,這樣兩個系統之間就會有很強的耦合。
所以這個時候引入消息中間件,當用戶下訂單后,直接把訂單信息放入消息中間件里面,接着就不用管了,庫存系統直接去消息中間件里面那訂單信息來消費就行了,這樣訂單系統和庫存系統之間就解耦了
4.3 流量的削峰
用戶請求-----》秒殺應用
應用的前端加入消息隊列
用戶請求-----》消息隊列《----秒殺應用
原因:用戶訪問太多,服務器承受不了
說明:
在做秒殺的時候會有許多訪問,可能導致系統承受不住。這個時候就需要在應用的前端加入消息隊列,然后秒殺系統就可以直接去消息隊列里面拿消息來消費就行了,秒殺系統是可以選擇性的拿消息過來消費的,如果消息太多就會選擇性的丟棄一些消息
4.4 日志處理
錯誤日志---》消息隊列《----日志處理
用戶行為日志--》消息隊列(kafka)《-----日志的存儲或流式處理
原因:機器太多,日志查看不方便
說明:
當系統太多的時候,部署了很多機器,每台機器上面都有日志,定位問題的時候不可能去每一台機器去看日志。
這個時候就需要引入消息中間件,把所有的日志放到消息中間件里面,然后在通過一個應用去讀取日志存庫或者展示
4.5 純粹的消息通信
點對點通信
5. 常見消息中間件比較
說明:
kafka和RabbitMQ的比較
1)RabbitMq比kafka成熟,在可用性上,穩定性上,可靠性上,RabbitMq超過kafka
2)Kafka設計的初衷就是處理日志的,可以看做是一個日志系統,針對性很強,所以它並沒有具備一個成熟MQ應該具備的特性
3)Kafka的性能(吞吐量、tps)比RabbitMq要強
二、JMS規范
1. 什么是JMS規范
JMS(Java Messaging Service)規范,本質是API,Java平台消息中間件的規范,java應用程序之間進行消息交換。並且通過提供標准的產生、發送、接收消息的接口簡化企業應用的開發。對應的實現ActiveMQ
2. JMS對象模型包含如下幾個要素
1)連接工廠:創建一個JMS連接
2)JMS連接:客戶端和服務器之間的一個連接。
3)JMS會話:客戶端和服務器會話的狀態,建立在連接之上的
4)JMS目的:消息隊列
5)JMS生產者:消息的生成
6)JMS消費者:接收和消費消息
7)Broker:消息中間件的實例(ActiveMQ)
3. JMS規范中的點對點模式
隊列,一個消息只有一個消費者(即使有多個接受者監聽隊列),消費者是要向隊列應答成功
4. JMS規范中的主題模式(發布訂閱)
發布到Topic的消息會被當前主題所有的訂閱者消費
5. JMS規范中的消息類型
TextMessage,MapMessage,ObjectMessage,BytesMessage,StreamMessage
三、ActiveMQ使用
1. 原生ActiveMQ的API的使用
編碼過程參考JMS對象模型的幾個要素:
1)連接工廠:創建一個JMS連接
2)JMS連接:客戶端和服務器之間的一個連接。
3)JMS會話:客戶端和服務器會話的狀態,建立在連接之上的
4)JMS目的:消息隊列
5)JMS生產者:消息的生成
6)JMS消費者:接收和消費消息
7)Broker:消息中間件的實例(ActiveMQ)
1.1 准備工作:
1)在ActiveMQ官網下載windows版的ActiveMQ,下載地址:http://activemq.apache.org/activemq-580-release.html
下載后啟動,
在瀏覽器中輸入地址http://127.0.0.1:8161/admin訪問,用戶名和密碼為admin/admin
2)新建一名為OriginalActiveMQ的maven工程,結構如下:
1.2 在pom.xml文件里面引入如下依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency>
1.3 新建一個JMS消息生產者
package com.study.demo; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * * @Description: java原生ActiveMQ的API的使用-JMS消息生產者 * @author leeSmall * @date 2018年9月13日 * */ public class JmsProducer { //默認連接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默認連接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默認連接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //發送的消息數量 private static final int SENDNUM = 10; //編碼過程參考JMS對象模型的幾個要素 public static void main(String[] args) { //1.連接工廠:創建一個JMS連接 ConnectionFactory connectionFactory; //2.JMS連接:客戶端和服務器之間的一個連接 Connection connection = null; //3.JMS會話:客戶端和服務器會話的狀態,建立在JMS連接之上的 Session session; //4.JMS目的:消息隊列 Destination destination; //5.JMS生產者:消息的生成 MessageProducer messageProducer; //創建一個ActiveMQ的連接工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL); try { //通過連接工廠創建一個JMS連接 connection = connectionFactory.createConnection(); //開啟JMS連接 connection.start(); /* * 通過JMS連接創建一個JMS會話 * * createSession參數取值說明: * 第一個參數:為true表示啟用事務 * 第二個參數:消息的確認模式: * AUTO_ACKNOWLEDGE 自動簽收 * CLIENT_ACKNOWLEDGE 客戶端自行調用 * ACKNOWLEDGE 方法簽收 * DUPS_OK_ACKNOWLEDGE 不是必須簽收 * 消息可能會重復發送 在第二次重新傳送消息的時候,消息頭的JmsDelivered會被置為true標示當前消息已經傳送過一次, * 客戶端需要進行消息的重復處理控制。 */ session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //通過JMS會話創建一個JMS目的,即消息隊列 destination = session.createQueue("firstMSG"); //通過JMS會話和JMS目的創建一個JMS生產者,即消息生產者 messageProducer = session.createProducer(destination); //發送10條消息 for(int i=0;i<SENDNUM;i++){ //生成消息 String msg = "發送消息"+i+" "+System.currentTimeMillis(); TextMessage message = session.createTextMessage(msg); System.out.println("發送消息:"+msg); //發送消息 messageProducer.send(message); } //提交JMS會話 session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally { if(connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
1.4 新建一個JMS消息消費者
package com.study.demo; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * * @Description: java原生ActiveMQ的API的使用-JMS消息消費者 * @author leeSmall * @date 2018年9月13日 * */ public class JmsConsumer { //默認連接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默認連接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默認連接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //編碼過程參考JMS對象模型的幾個要素 public static void main(String[] args) { //1.連接工廠:創建一個JMS連接 ConnectionFactory connectionFactory; //2.JMS連接:客戶端和服務器之間的一個連接 Connection connection = null; //3.JMS會話:客戶端和服務器會話的狀態,建立在JMS連接之上的 Session session; //4.JMS目的:消息隊列 Destination destination; //5.JMS消費者:接收和消費消息 MessageConsumer messageConsumer; //創建一個ActiveMQ的連接工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL); try { //通過連接工廠創建一個JMS連接 connection = connectionFactory.createConnection(); //開啟JMS連接 connection.start(); /* * 通過JMS連接創建一個JMS會話 * * createSession參數取值說明: * 第一個參數:為true表示啟用事務 * 第二個參數:消息的確認模式: * AUTO_ACKNOWLEDGE 自動簽收 * CLIENT_ACKNOWLEDGE 客戶端自行調用 * ACKNOWLEDGE 方法簽收 * DUPS_OK_ACKNOWLEDGE 不是必須簽收 * 消息可能會重復發送 在第二次重新傳送消息的時候,消息頭的JmsDelivered會被置為true標示當前消息已經傳送過一次, * 客戶端需要進行消息的重復處理控制。 */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //通過JMS會話創建一個JMS目的,即消息隊列 destination = session.createQueue("firstMSG"); //通過JMS會話和JMS目的創建一個JMS消費者,即消息消費者 messageConsumer = session.createConsumer(destination); //讀取消息 while(true){ //使用receive方法消費一個消息,如果超過10s沒有得到消息就跳過 TextMessage textMessage = (TextMessage)messageConsumer.receive(10000); if(textMessage != null){ System.out.println("Accept msg : "+textMessage.getText()); }else{ break; } } } catch (JMSException e) { e.printStackTrace(); } } }
1.5 啟動JMS消息生產者,查看ActiveMQ管理界面
說明:
可以看到有10條消息待消費,0個消費者,10條消息入隊,0條消息出隊
1.6 啟動JMS消息消費者,查看ActiveMQ管理界面
說明:
可以看到,此時有0條消息待消費,1個消費者,10條消息入隊,10條消息出隊
2. Spring提供的ActiveMQ的使用
2.1 新建1個名為ActiveMQProducer的maven web工程,結構如下
2.2 在pom.xml文件里面引入如下依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.study.demo</groupId> <artifactId>ActiveMQProducer</artifactId> <packaging>war</packaging> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQProducer Maven Webapp</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>javax</groupId> <artifactId>javaee-web-api</artifactId> <version>7.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.3.11.RELEASE</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> <version>1.2</version> </dependency> <!--日志 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.0.13</version> </dependency> <!--JSON --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.9.6</version> </dependency> <!-- xbean --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.3.1</version> </dependency> <!--ActiveMq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.11.RELEASE</version> </dependency> </dependencies> <build> <finalName>ActiveMQProducer</finalName> <resources> <resource> <directory>${basedir}/src/main/java</directory> <includes> <include>**/*.xml</include> </includes> </resource> </resources> </build> </project>
2.3 新建一個/ActiveMQProducer/src/main/java/applicationContext.xml文件,並在里面添加Spring的ActiveMQ相關配置
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- 配置掃描路徑 --> <context:component-scan base-package="com.study.demo"> <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/> </context:component-scan> <!-- ActiveMQ 連接工廠 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- Spring Caching連接工廠 --> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <property name="sessionCacheSize" value="100"></property> </bean> <!-- Spring JmsTemplate 的消息生產者 start--> <!-- 定義JmsTemplate的Queue類型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- 隊列模式--> <property name="pubSubDomain" value="false"></property> </bean> <!-- 定義JmsTemplate的Topic類型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- 發布訂閱模式--> <property name="pubSubDomain" value="true"></property> </bean> <!--Spring JmsTemplate 的消息生產者 end--> </beans>
說明:
要在Spring的配置文件中增加命名空間
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms=http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"
2.4 新建一個隊列消息生產者
package com.study.demo.mq.producer.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; /** * * @Description: 隊列消息生產者,發送消息到隊列 * @author leeSmall * @date 2018年9月13日 * */ @Component("queueSender") public class QueueSender { @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; /*@Autowired private GetResponse getResponse;*/ public void send(String queueName,final String message){ jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(message); return msg; } }); } }
2.5 新建一個Topic生產者
package com.study.demo.mq.producer.topic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * * @Description: Topic生產者發送消息到Topic * @author leeSmall * @date 2018年9月13日 * */ @Component("topicSender") public class TopicSender { @Autowired @Qualifier("jmsTopicTemplate") private JmsTemplate jmsTemplate; public void send(String queueName,final String message){ jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(message); return msg; } }); } }
2.6 新建一個控制器來接收從頁面發送的消息和調用消息生產者發送消息
package com.study.demo.controller; import com.study.demo.mq.producer.queue.QueueSender; import com.study.demo.mq.producer.topic.TopicSender; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; /** * * @Description: controller測試 * @author leeSmall * @date 2018年9月13日 * */ @Controller public class ActivemqController { @Resource QueueSender queueSender; @Resource TopicSender topicSender; /** * 發送消息到隊列 * Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中 * @param message * @return String */ @ResponseBody @RequestMapping("queueSender") public String queueSender(@RequestParam("message")String message){ String opt=""; try { queueSender.send("test.queue",message); opt = "suc"; } catch (Exception e) { opt = e.getCause().toString(); } return opt; } /** * 發送消息到主題 * Topic主題 :放入一個消息,所有訂閱者都會收到 * 這個是主題目的地是一對多的 * @param message * @return String */ @ResponseBody @RequestMapping("topicSender") public String topicSender(@RequestParam("message")String message){ String opt = ""; try { topicSender.send("test.topic",message); opt = "suc"; } catch (Exception e) { opt = e.getCause().toString(); } return opt; } }
2.7 新增/ActiveMQProducer/src/main/java/spring-mvc.xml配置文件
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"> <!-- <mvc:default-servlet-handler />--> <mvc:resources mapping="/js/**" location="/js/"/> <mvc:annotation-driven content-negotiation-manager="contentNegotiationManager" /> <context:component-scan base-package="com.study.demo"> <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller" /> </context:component-scan> <bean id="stringHttpMessageConverter" class="org.springframework.http.converter.StringHttpMessageConverter"> <property name="supportedMediaTypes"> <list> <bean class="org.springframework.http.MediaType"> <constructor-arg index="0" value="text" /> <constructor-arg index="1" value="plain" /> <constructor-arg index="2" value="UTF-8" /> </bean> </list> </property> </bean> <bean id="mappingJackson2HttpMessageConverter" class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" /> <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter"> <property name="messageConverters"> <list> <ref bean="stringHttpMessageConverter" /> <ref bean="mappingJackson2HttpMessageConverter" /> </list> </property> </bean> <bean id="contentNegotiationManager" class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean"> <property name="mediaTypes"> <map> <entry key="html" value="text/html" /> <entry key="pdf" value="application/pdf" /> <entry key="xsl" value="application/vnd.ms-excel" /> <entry key="xml" value="application/xml" /> <entry key="json" value="application/json" /> </map> </property> <property name="defaultContentType" value="text/html" /> </bean> <bean id="viewResolver" class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver"> <property name="order" value="0" /> <property name="contentNegotiationManager" ref="contentNegotiationManager" /> <property name="viewResolvers"> <list> <bean class="org.springframework.web.servlet.view.BeanNameViewResolver" /> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="viewClass" value="org.springframework.web.servlet.view.JstlView" /> <property name="prefix" value="/WEB-INF/pages/" /> <property name="suffix" value=".jsp"></property> </bean> </list> </property> <property name="defaultViews"> <list> <bean class="org.springframework.web.servlet.view.json.MappingJackson2JsonView"> <property name="extractValueFromSingleKeyModel" value="true" /> </bean> </list> </property> </bean> </beans>
2.8 新增一個頁面/ActiveMQProducer/src/main/webapp/index.jsp來發送消息
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%> <% String path = request.getContextPath(); System.out.println(path); String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + path + "/"; System.out.println(basePath); %> <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> <html> <head> <base href="<%=basePath%>"> <title>ActiveMQ Demo程序</title> <meta http-equiv="pragma" content="no-cache"> <meta http-equiv="cache-control" content="no-cache"> <meta http-equiv="expires" content="0"> <script type="text/javascript" src="<%=basePath%>js/jquery-1.11.0.min.js"></script> <style type="text/css"> .h1 { margin: 0 auto; } #producer{ width: 48%; border: 1px solid blue; height: 80%; align:center; margin:0 auto; } body{ text-align :center; } div { text-align :center; } textarea{ width:80%; height:100px; border:1px solid gray; } button{ background-color: rgb(62, 156, 66); border: none; font-weight: bold; color: white; height:30px; } </style> <script type="text/javascript"> function send(controller){ if($("#message").val()==""){ $("#message").css("border","1px solid red"); return; }else{ $("#message").css("border","1px solid gray"); } $.ajax({ type: 'post', url:'<%=basePath%>/'+controller, dataType:'text', data:{"message":$("#message").val()}, success:function(data){ if(data=="suc"){ $("#status").html("<font color=green>發送成功</font>"); setTimeout(clear,1000); }else{ $("#status").html("<font color=red>"+data+"</font>"); setTimeout(clear,5000); } }, error:function(data){ $("#status").html("<font color=red>ERROR:"+data["status"]+","+data["statusText"]+"</font>"); setTimeout(clear,5000); } }); } function clear(){ $("#status").html(""); } </script> </head> <body> <h1>Hello ActiveMQ</h1> <div id="producer"> <h2>Producer</h2> <textarea id="message"></textarea> <br> <button onclick="send('queueSender')">發送Queue消息</button> <button onclick="send('topicSender')">發送Topic消息</button> <br> <span id="status"></span> </div> </body> </html>
到此生產者代碼編寫完成!
2.9 在8080端口的Tomcat啟動ActiveMQProducer,在瀏覽器輸入http://localhost:8080/ActiveMQProducer/地址訪問
2.10 新建一個名為ActiveMQConsumer的maven web工程,結構如下:
2.11 在pom.xml文件里面引入如下依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.study.demo</groupId> <artifactId>ActiveMQConsumer</artifactId> <packaging>war</packaging> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQConsumer Maven Webapp</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>javax</groupId> <artifactId>javaee-web-api</artifactId> <version>7.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.3.11.RELEASE</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> <version>1.2</version> </dependency> <!--日志 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.0.13</version> </dependency> <!--JSON --> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.9</version><!-- 1.9.13 --> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.9</version><!-- 1.9.13 --> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency> <!-- xbean --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.3.1</version> </dependency> <!--ActiveMq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.11.RELEASE</version> </dependency> </dependencies> <build> <finalName>ActiveMQConsumer</finalName> <resources> <resource> <directory>${basedir}/src/main/java</directory> <includes> <include>**/*.xml</include> </includes> </resource> </resources> </build> </project>
2.12 新建一個/ActiveMQConsumer/src/main/java/applicationContext.xml文件,並在里面添加Spring的ActiveMQ相關配置
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- 配置掃描路徑 --> <context:component-scan base-package="com.study.demo"> <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/> </context:component-scan> <!-- ActiveMQ 連接工廠 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- Spring Caching連接工廠 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <property name="sessionCacheSize" value="100" /> </bean> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- 隊列模式--> <property name="pubSubDomain" value="false"></property> </bean> </beans>
2.13 新建兩個隊列消息監聽器,並在applicationContext.xml里面配置
隊列消息監聽器1:
package com.study.demo.mq.consumer.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; /** * * @Description: 隊列消息監聽器 * @author leeSmall * @date 2018年9月13日 * */ @Component public class QueueReceiver1 implements MessageListener { public void onMessage(Message message) { try { String textMsg = ((TextMessage)message).getText(); System.out.println("QueueReceiver1 accept msg : "+textMsg); } catch (JMSException e) { e.printStackTrace(); } } }
隊列消息監聽器2:
package com.study.demo.mq.consumer.queue; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * * @Description: 隊列消息監聽器 * @author leeSmall * @date 2018年9月13日 * */ @Component public class QueueReceiver2 implements MessageListener { public void onMessage(Message message) { try { String textMsg = ((TextMessage)message).getText(); System.out.println("QueueReceiver2 accept msg : "+textMsg); } catch (JMSException e) { e.printStackTrace(); } } }
applicationContext.xml配置:
<!-- 定義Queue監聽器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener> <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener> </jms:listener-container>
2.14 新建兩個Topic消息監聽器,並在applicationContext.xml里面配置
Topic消息監聽器1:
package com.study.demo.mq.consumer.topic; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * * @Description: Topic消息監聽器 * @author leeSmall * @date 2018年9月13日 * */ @Component public class TopicReceiver1 implements MessageListener { public void onMessage(Message message) { try { String textMsg = ((TextMessage)message).getText(); System.out.println("TopicReceiver1 accept msg : "+textMsg); } catch (JMSException e) { e.printStackTrace(); } } }
Topic消息監聽器2:
package com.study.demo.mq.consumer.topic; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * * @Description: Topic消息監聽器 * @author leeSmall * @date 2018年9月13日 * */ @Component public class TopicReceiver2 implements MessageListener { public void onMessage(Message message) { try { String textMsg = ((TextMessage)message).getText(); System.out.println("TopicReceiver2 accept msg : "+textMsg); } catch (JMSException e) { e.printStackTrace(); } } }
applicationContext.xml配置:
<!-- 定義Topic監聽器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener> <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener> </jms:listener-container>
2.15 新增/ActiveMQConsumer/src/main/java/spring-mvc.xml配置
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"> <!-- <mvc:default-servlet-handler />--> <mvc:resources mapping="/js/**" location="/js/"/> <mvc:annotation-driven content-negotiation-manager="contentNegotiationManager" /> <context:component-scan base-package="com.study.demo"> <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller" /> </context:component-scan> <bean id="stringHttpMessageConverter" class="org.springframework.http.converter.StringHttpMessageConverter"> <property name="supportedMediaTypes"> <list> <bean class="org.springframework.http.MediaType"> <constructor-arg index="0" value="text" /> <constructor-arg index="1" value="plain" /> <constructor-arg index="2" value="UTF-8" /> </bean> </list> </property> </bean> <bean id="mappingJacksonHttpMessageConverter" class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" /> <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter"> <property name="messageConverters"> <list> <ref bean="stringHttpMessageConverter" /> <ref bean="mappingJacksonHttpMessageConverter" /> </list> </property> </bean> <bean id="contentNegotiationManager" class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean"> <property name="mediaTypes"> <map> <entry key="html" value="text/html" /> <entry key="pdf" value="application/pdf" /> <entry key="xsl" value="application/vnd.ms-excel" /> <entry key="xml" value="application/xml" /> <entry key="json" value="application/json" /> </map> </property> <property name="defaultContentType" value="text/html" /> </bean> <bean id="viewResolver" class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver"> <property name="order" value="0" /> <property name="contentNegotiationManager" ref="contentNegotiationManager" /> <property name="viewResolvers"> <list> <bean class="org.springframework.web.servlet.view.BeanNameViewResolver" /> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="viewClass" value="org.springframework.web.servlet.view.JstlView" /> <property name="prefix" value="/WEB-INF/pages/" /> <property name="suffix" value=".jsp"></property> </bean> </list> </property> <property name="defaultViews"> <list> <bean class="org.springframework.web.servlet.view.json.MappingJackson2JsonView"> <property name="extractValueFromSingleKeyModel" value="true" /> </bean> </list> </property> </bean> </beans>
到此消費者代碼編寫完成!
2.16 在Tomcat8081下啟動ActiveMQConsumer
2.17 測試
在生產者的消息發送頁面分別發送隊列消息和topic消息查看效果
發送隊列消息:
查看消費者的狀態:
發送topic消息
查看消費者的狀態:
可以看到一個隊列消息只能被一個隊列消費者消費,一個topic消息可以被多個topic消費者消費
參考文章: