一、AMQP 概述
AMQP(Advanced Message Queuing Protocol),高級消息隊列協議。
簡單回憶一下JMS的消息模型,可能會有助於理解AMQP的消息模型。在JMS中,有三個主要的參與者:消息的生產者、消息的消費者以及在生產者和消費者之間傳遞消息的通道(隊列或主題)。在JMS中,通道有助於解耦消息的生產者和消費者,但是這兩者依然會與通道相耦合。與之不同的是,AMQP的生產者並不會直接將消息發布到隊列中。AMQP在消息的生產者以及傳遞信息的隊列之間引入了一種間接的機制:Exchange。如下圖:
哈哈,筆主從今天開始也要學着自己畫圖了。
來看看 AMQP 消息的通信過程。首先,生產者把消息發給 Exchange,並帶有一個 routing key。其次,Exchange 和 隊列 之間 通過 binging 通信,binging 上也有 一個 routing key,AMQP定義了四種不同類型的Exchange,每一種都有不同的路由算法,根據Exchange的算法不同,它可能會使用消息的routing key或參數,並與 binding 的routing key或參數進行對比,來決定是否要將信息放到隊列中。然后,消費者從每個隊列中取出消息。
Exchange 的路由算法:
- Direct:如果 消息的routing key 與 binding的routing key 直接匹配的話,消息將會路由到該隊列上;
- Topic:如果 消息的routing key 與 binding的routing key 符合通配符匹配的話,消息將會路由到該隊列上;
- Headers:如果 消息參數表中的頭信息和值 都與 bingding參數表中 相匹配,消息將會路由到該隊列上;
- Fanout:不管消息的routing key和參數表的頭信息/值是什么,消息將會路由到所有隊列上。
AMQP 與 JMS 的區別:
1、AMQP為消息定義了線路層(wire-level protocol)的協議,而JMS所定義的是API規范。JMS的API協議能夠確保所有的實現都能通過通用的API來使用,但是並不能保證某個JMS實現所發送的消息能夠被另外不同的JMS實現所使用。而AMQP的線路層協議規范了消息的格式,消息在生產者和消費者間傳送的時候會遵循這個格式。這樣AMQP在互相協作方面就要優於JMS——它不僅能跨不同的AMQP實現,還能跨語言和平台。
2、JMS 支持TextMessage、MapMessage 等復雜的消息類型;而AMQP 僅支持 byte[] 消息類型(復雜的類型可序列化后發送),個人認為這也是它能夠跨平台和跨語言使用的原因之一。
3、由於Exchange 提供的路由算法,AMQP可以提供多樣化的路由方式來傳遞消息到消息隊列,而 JMS 僅支持 隊列 和 主題/訂閱 方式兩種。
二、Spring 集成 RabbitMQ
RabbitMQ是一個流行的開源消息代理,它實現了AMQP。這里先介紹一下關於 RabbitMQ 的基本概念:
-
- Broker:可以理解為消息隊列服務器的實體,它是一個中間件應用,負責接收生產者的消息,然后將消息發送至消息接收者或者其他的 Broker
- Exchange:消息交換機,是消息第一個到達的地方,消息通過它指定的路由規則,分發到不同的消息隊列中去。
- Queue:消息隊列,消息通過發送和路由之后最終到達的地方,到達Queue的消息即進入邏輯上等待消費的狀態。每個消息都會被發送到一個或多個隊列。
- Binding:綁定,它的作用就是把 Exchange 和 Queue 按照路由規則綁定起來,也就是 Exchange 和 Queue 之間的虛擬連接。
- RoutingKey:路由關鍵字,Exchange 根據這個關鍵字 進行消息投遞。
- Virtual host:虛擬主機,它是對Broker的虛擬划分,將消費者、生產者和它們依賴的AMQP相關結構進行隔離,一般都是為了安全考虛。比如,我們可以在一個Broker中設置多個虛擬主機,對不同用戶進行權限的分離 。
- Connection:連接,代表生產者、消費者、Broker之間進行通信的物理網絡。
- Channel:消息通道,用於連接生產者和消費者的邏輯結構。在客戶端的每個連接里,可建立多個Channel, 每個Channel代表一個會話任務,通過Channel可以隔離同一連接中的不同交互內容。
- Producer:消息生產者,制造消息並發送消息的程序。
- Consumer:消息消費者,接收消息並處理消息的程序。
RabbitMQ 支持消息的持久化,也就是將數據寫在磁盤上。為了數據安全考慮,大多數情況下都會選擇持久化。消息隊列持久化包括三個部分:
- Exchange 持久化,在聲明時指定durable => 1 。
- Queue 待久化,在聲明時指定durable => 1。
- 消息持久化,在投遞時指定delivery_mode => 2 (1是非持久化)。
如果 Exchange 和 Queue 都是持久化的,那么它們之間的 Binding 也是持久化的。如果 Exchange 和 Queue 兩者之間有一個是持久化的,一個是非持久化的,就不允許建立綁定。
Spring Data AMQP 為 RabbitMQ 提供了支持,包括 RabbitMQ連接工廠、模板以及Spring配置命名空間。
首先,需要安裝 RabbitMQ,我們可以在 http://www.rabbitmq.com/download.html 上找到安裝指南,具體怎么安裝,不是這篇博文的重點,請筆友們自行解決。
接下來,讓我們一起來看看,Spring 和 RabbitMQ 的集成:
1、pom 依賴
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.3.RELEASE</version> </dependency>
2、連接工廠 和 admin

<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.userName}" password="${rabbitmq.password}"/> <rabbit:admin connection-factory="connectionFactory"/>
admin 元素會自動創建一個RabbitMQ管理組件,它會自動創建隊列、Exchange以及binding
3、聲明隊列、Exchange以及binding
聲明隊列:

<rabbit:queue name="queue1"/> <rabbit:queue name="queue2"/> <rabbit:queue name="queue3"/> <rabbit:queue name="queue4"/> <rabbit:queue name="queue5"/> <rabbit:queue name="queue6"/>
聲明 Exchange 以及 binding:
direct-exchange:

<rabbit:direct-exchange name="directExchange"> <rabbit:bindings> <rabbit:binding key="queue1" queue="queue1"/> <rabbit:binding key="queue2" queue="queue2"/> <rabbit:binding key="queue3" queue="queue3"/> </rabbit:bindings> </rabbit:direct-exchange>
如果消息的routing key 與 routing key 直接匹配的話,消息將會路由到該隊列上。
topic-exchange

<rabbit:topic-exchange name="topicExchange"> <rabbit:bindings> <rabbit:binding pattern="routing.*" queue="queue2"/> <rabbit:binding pattern="routing.*" queue="queue3"/> </rabbit:bindings> </rabbit:topic-exchange>
消息的 routing key 與 binding的routing key 符合通配符匹配的話,消息將會路由到該隊列上。
這個通配符匹配特別坑,賊坑!我本來寫了個 "routing*" ,自以為能匹配 "routingrrr" 這樣的字符,不行!然后我又寫了個"routing?"、"rounting.",預想着能不能匹配單個任意字符,不行!
終於我得出了一個結論,只能使用 "*"(匹配 0 個或任意多個)通配符,並且,並且!"*" 前面一定要有 個 "." ! 太可怕了,不知道我總結的對不對哈!
headers-exchange

<rabbit:headers-exchange name="headersExchange"> <rabbit:bindings> <rabbit:binding queue="queue4" key="betty" value="rubble" /> <rabbit:binding queue="queue5" key="barney" value="rubble" /> </rabbit:bindings> </rabbit:headers-exchange>
消息參數表中的頭信息和值都與bingding參數表中相匹配,消息將會路由到該隊列上。
這個用法比較少用,也比較難用,原因是因為它僅支持 發送 byte[] 的消息類型。
fanout-exchange

<rabbit:fanout-exchange name="fanoutExchange"> <rabbit:bindings> <rabbit:binding queue="queue5"/> <rabbit:binding queue="queue6"/> </rabbit:bindings> </rabbit:fanout-exchange>
這個是最簡單粗暴的匹配規則,不管消息的routing key和參數表的頭信息/值是什么,消息將會路由到所有隊列上。
4、發送和接收消息
還是Spring的那一套,Spring 為我們提供了一個模板 bean(rabbitTemplate) 來發送和接收消息。其中,像前文提到的 jmsTemplate 那樣,rabbitTemplate 也為我們 提供了 convertAndSend() 方法來自動轉換和發送消息,提供了receiveAndConvret() 方法來接收和自動轉換成對象(消息和對象之間默認的消息轉換器是SimpleMessageConverter,它適用於String、Serializable實例以及字節數組)。另外,rabbitTemplate 也照常提供了 send() 和 receive() 方法來發送和接收消息,不過貌似僅支持發送字節數組...
配置 rabbitTemplate:

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="directExchange" routing-key="queue1"/>
下面僅演示 通配符路由方式 和 header 路由方式 發送和接收消息。其他具體詳細的內容可參考我下面附上的源碼:
通配符路由方式:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext.xml") public class TopicExchange { @Autowired private RabbitTemplate rabbitTemplate; @Test public void convertAndSend(){ List<String> list = new ArrayList<>(); list.add("java"); list.add("python"); list.add("c++"); rabbitTemplate.convertAndSend("topicExchange","routing.123", list); } @Test public void receiveAndConvert(){ List<String> queue2List =(List) rabbitTemplate.receiveAndConvert("queue2"); printList(queue2List); System.out.println("----------------華麗的分隔符-----------------"); List<String> queue3List =(List) rabbitTemplate.receiveAndConvert("queue3"); printList(queue3List); } private <E> void printList(List<E> list){ if (list != null && list.size() > 0){ for (Object o : list){ System.out.println("-----------------"+ o +"---------------"); } } } }
header 路由方式:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext.xml") public class HeadersExchangeTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void convertAndSend(){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("betty", "rubble"); messageProperties.setHeader("fred", "flintstone"); messageProperties.setHeader("barney", "rubble"); String str = new String("Hello RabbitMQ"); Message message = new Message(str.getBytes(), messageProperties); rabbitTemplate.convertAndSend("headersExchange","",message); } @Test public void receiveAndConvert(){ Message queue4 = rabbitTemplate.receive("queue4"); System.out.println("第一個輸出:" + new String(queue4.getBody())); Message queue5 = rabbitTemplate.receive("queue5"); System.out.println("第三個輸出:" + new String(queue5.getBody())); } }
5、定義消息驅動的AMQP POJO
用 receive()和 receiveAndConvert()方法都會立即返回,如果隊列中沒有等待的消息時,將會得到 null。Spring AMQP提供了消息驅動POJO的支持,也就是相當於一個監聽器,監聽某些隊列,當消息到達指定隊列的時候,可以立即調用方法處理該消息。
listener-container 配置:
<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3" type="direct"> <rabbit:listener ref="handlerListener" method="handler" queue-names="queue5,queue6"/> </rabbit:listener-container>
其中,ref 指定Spring bean 的 id,method 指定 該bean中處理隊列中消息的方法,queue-names 指定要監聽哪些隊列,隊列之間用 "," 分隔。
三、結語
祝大家五一節快樂!
演示源碼下載鏈接:https://github.com/JMCuixy/SpringMessageRabbitMQ
參考資料:1、《Spring 實戰第四版》