一、概念
- 異步消息簡介
與遠程調用機制以及REST接口類似,異步消息也是用於應用程序之間通信的。
RMI、Hessian、Burlap、HTTP invoker和Web服務在應用程序之間的通信機制是同步的,即客戶端應用程序直接與遠程服務相交互,並且一直等到遠程過程完成后才繼續執行。而消息是異步發送的,客戶端不需要等待服務處理消息,甚至不需要等待消息投遞完成。客戶端發送消息,然后繼續執行,這是因為客戶端假定服務最終可以收到並處理這條消息。
- 優缺點
優點:
-
- 異步通信。客戶端無需等待服務端的響應,節省時間,提升客戶端的效率。
- 面向消息與解耦。客戶端不需要與特定的方法簽名綁定,任何可以處理數據的隊列或主題訂閱者都可以處理由客戶端發送的消息,而客戶端不必了解遠程服務的任何規范。
- 位置獨立。由於客戶端並不直接與服務端通信,而是把消息交由消息代理。因此,只要服務能夠從隊列或主題中獲取消息即可,消息客戶端根本不需要關注服務來自哪里。而且可以使用服務器集群監聽同一個消息代理提升服務器負載。
缺點:
-
- 增加復雜度。毫無疑問,消息代理這個東西是多出來的,需要維護成本。
- 暫時的不一致性。異步消息方式可以確保最終的一致性,但是可能存在客戶端把消息給了消息隊列,而服務端暫時還沒處理這個隊列導致的暫時不一致性問題。
- 應用場景
-
- 客戶端並不需要服務端的反饋,諸如此類的非核心流程異步化處理。
- 流量削峰。比如很多的秒殺場景,用戶的請求,服務器接收后,首先寫入消息隊列,接着再根據業務做后續處理。
- 日志處理。將消息隊列用在日志處理中,比如Kafka的應用,解決大量日志傳輸的問題。
- 消息通訊。消息隊列一般都內置了高效的通信機制,因此也可以用於單純的消息通訊,比如實現點對點消息隊列或者聊天室等。
- 消息模型
點對點消息模型
在點對點模型中,每一條消息都有一個發送者和一個接收者,如圖17.3所示。當消息代理得到消息時,它將消息放入一個隊列中。當接收者請求隊列中的下一條消息時,消息會從隊列中取出,並投遞給接收者。因為消息投遞后會從隊列中刪除,這樣就可以保證消息只能投遞給一個接收者。
發布-訂閱消息模型
在發布—訂閱消息模型中,消息會發送給一個主題。與隊列類似,多個接收者都可以監聽一個主題。但是,與隊列不同的是,消息不再是只投遞給一個接收者,而是主題的所有訂閱者都會接收到此消息的副本,如圖17.4所示。
二、集成實現JMS
Java消息服務(Java Message Service ,JMS)是一個Java標准,定義了使用消息代理的通用API。借助JMS,所有遵從規范的實現都使用通用的接口,這就類似於JDBC為數據庫操作提供了通用的接口一樣。
Spring通過基於模板的抽象為JMS功能提供了支持,這個模板也就是JmsTemplate。使用JmsTemplate,能夠非常容易地在消息生產方發送隊列和主題消息,在消費消息的那一方,也能夠非常容易地接收這些消息。Spring還提供了消息驅動POJO的理念:這是一個簡單的Java對象,它能夠以異步的方式響應隊列或主題上到達的消息。
接下來讓我們來看看在Spring中如何集成實現JMS:
-
搭建消息代理
我們首先需要一個消息代理,作為客戶端和服務端通信的中介。ActiveMQ是一個偉大的開源消息代理產品,也是使用JMS進行異步消息傳遞的最佳選擇。下載地址:http://activemq.apache.org/ ,下載完成后解壓縮到本地硬盤,在bin目錄下,我們可以看到為各種操作系統所創建的對應子目錄。在這些子目錄下,我們可以找到用於啟動ActiveMQ的腳本。
啟動好ActiveMQ后,添加如下的 pom 依賴:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency>
-
建立連接工廠、消息目的地
連接工廠:

<!--1、ActiveMQ 工廠 2、amq命名空間方式 3、默認監聽端口61616 4、默認用戶名:admin 密碼:admin --> <amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin"/>
消息目的地:
消息目的地又分為 隊列 和 主題 兩種:

<!--1、定義消息目的地,可以是隊列或者主題兩種方式 2、借助physicalName屬性指定消息通道的名稱--> <amq:queue id="queueDestination" physicalName="queueName"/> <amq:topic id="topicDestination" physicalName="topicName"/>
-
使用 JmsTemplate
為了消除冗余和重復的JMS代碼,Spring 給出的解決方案就是JmsTemplate。JmsTemplate可以創建連接、獲得會話以及發送和接收消息。這使得我們可以專注於構建要發送的消息或者處理接收到的消息。另外,JmsTemplate可以處理所有拋出的笨拙的JMSException異常。

<!--1、jmsTemplate 2、defaultDestination 定義了默認的消息目的地 3、messageConverter 消息轉換器 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="connectionFactory" p:defaultDestination-ref="queueDestination" p:messageConverter-ref="messageConverter"/> <!--MessageConvert--> <bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
JmsTemplate 可以非常的簡單的實現消息的發送和接收功能,讓我們來看看吧!
發送消息(convertAndSend):

@Autowired private JmsOperations jmsOperations; /** * jmsOperations.convertAndSend() 方法,"queueName" 不填寫,用默認的 Destination */ @Test public void convertAndSend(){ Map<String ,Object> map = new HashMap<>(16); map.put("java", "java"); map.put("python", "python"); map.put("c++", "c++"); jmsOperations.convertAndSend("queueName", map); }
接收消息(receiveAndConvert):

@Autowired private JmsOperations jmsOperations; /** * jmsOperations 的 receiveAndConvert() 方法 */ @Test public void receiveAndConvert(){ Map<String, Object> map = (Map) jmsOperations.receiveAndConvert("queueName"); }
這里有幾點需要說明一下;
1、除了 convertAndSend() 和 receiveAndConvert() 方法,JmsTempalte 還支持 send() 和 receive() 方法來發送和接收消息,就是寫起來麻煩點,還要自己處理 JMSException。可參考我的源碼~
2、convertAndSend() 和 receiveAndConvert() 方法 如果不指定 消息通道名稱,即上面的 "queueName"。采用JmsTemplate 默認設置的,即 defaultDestination 關聯的消息目的地中的消息通道。
3、convertAndSend() 和 receiveAndConvert() 方法 能便捷的實現 發送和接收消息功能,原因是 消息轉換器 !發送時,JmsTemplate 先把消息內容轉換成對應Message;接收時,JmsTemplate 再把對應Message 轉換回消息內容。JmsTemplate 定義了多個消息轉換器。如上,我用了 SimpleMessageConverter 轉換器,也就是 JmsTemplate 中默認使用的轉換器(不設置用的就是這個轉換器)。如果需要,還可自定義轉換器呢!
-
創建消息監聽器
使用JmsTemplate接收消息的最大缺點在於receive()和receiveAndConvert()方法都是同步的。這意味着接收者必須耐心等待消息的到來,因此這些方法會一直被阻塞,直到有可用消息(或者直到超時)。同步接收異步發送的消息,是不是感覺很怪異?
如果一發送消息就能被對應的方法處理,豈不美哉?
<jms:listener-container connection-factory="connectionFactory"> <jms:listener destination="queueName" ref="queueMessageHandler" method="handle"/> <jms:listener destination="topicName" ref="topicMessageHandler" method="handle"/> </jms:listener-container>
在這里,我們在消息監聽器容器中包含了消息監聽器。消息監聽器容器(message listener container)是一個特殊的bean,它可以監控JMS目的地並等待消息到達。一旦有消息到達,它取出消息,然后把消息傳給任意一個對此消息感興趣的消息監聽器。注意!關鍵詞 任意一個 !說明即使多個消息監聽器監聽同一個消息通道,仍然只會有一個消息監聽器執行!!另外,destination 指的是消息通道的名稱,並不是JMS目的地的 id 。ref 連接的是 Spring 的 bean 。methon 指的是這個bean中處理這個 消息的方法,需要注意的是 這個方法的形參!如果放入消息通道的數據類型是 字符串的話,那這個方法的形參也要用字符串接收;如果放入消息通道的數據類型是 集合的話,那這個方法的形參也要用對應集合類型接收。
三、使用基於消息的RPC
為了支持基於消息的RPC,Spring提供了JmsInvokerServiceExporter,它可以把bean導出為基於消息的服務;同時,為客戶端提供了JmsInvokerProxyFactoryBean來使用這些服務。
- 導出基於JMS的服務
把bean導出為基於消息的服務,利用的是Spring的 JmsInvokerServiceExporter,如下:
<bean id="jmsServer" class="org.springframework.jms.remoting.JmsInvokerServiceExporter" p:serviceInterface="org.springframework.message.activemq.rpc.JmsServer" p:service-ref="jmsServerImpl"/>
這個bean的屬性描述了導出的服務應該是什么樣子的。service-ref 屬性設置為 jmsServerImpl 的引用,它是遠程服務的實現。同時,serviceInterface 屬性設置為遠程服務對外提供接口的全限定類名。
JmsInvokerServiceExporter 可以充當JMS監聽器來進行服務間的通信。即客戶端 調用這個服務的時候,就可以立即 用這個服務的實現 來處理客戶端的調用啦!因為我們監聽了這個服務!如下:
<jms:listener-container connection-factory="connectionFactory"> <!--利用jms監聽器導出消息服務--> <jms:listener destination="sparta" ref="jmsServer"/> </jms:listener-container>
我們為JMS監聽器容器指定了連接工廠,所以它能夠知道如何連接消息代理,而<jms:listener>聲明指定了遠程消息的目的地。
- 使用基於JMS的服務
JmsInvokerProxyFactoryBean 是一個遠程代理工廠bean,代理了通過JmsInvokerServiceExporter所導出的JMS服務。它隱藏了訪問遠程服務的細節,並提供一個易用的接口,通過該接口客戶端與遠程服務進行交互。
<!--遠程代理工廠 bean ,供客戶端訪問--> <bean id="jmsServerProxy" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean" p:serviceInterface="org.springframework.message.activemq.rpc.JmsServer" p:connectionFactory-ref="connectionFactory" p:queueName="sparta"/>
對於serviceInterface,指定了代理應該通過 JmsServer 接口暴露功能。queueName 指定要連接的消息代理的名稱。
測試類:

@Test public void test01(){ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); JmsServer service = (JmsServer)context.getBean("jmsServerProxy"); service.doServer("Hello Message"); }
tips:使用基於消息的RPC,只研究了用法。還沒想到用在什么場景~ 各位指教?
演示代碼下載:https://github.com/JMCuixy/SpringMessage
參考資料:《Spring 實戰第四版》