Message Broker是一種消息驗證、傳輸、路由的架構模式,其設計目標主要應用於下面這些場景:
- 消息路由到一個或多個目的地
- 消息轉化為其他的表現方式
- 執行消息的聚集、消息的分解,並將結果發送到他們的目的地,然后重新組合相應返回給消息用戶
- 調用Web服務來檢索數據
- 響應事件或錯誤
- 使用發布-訂閱模式來提供內容或基於主題的消息路由
AMQP 是 Advanced Message Queuing Protocol 的簡稱,它是一個面向消息中間件的開放式標准應用層協議。AMQP定義了這些特性:
- 消息方向
- 消息隊列
- 消息路由(包括:點到點和發布-訂閱模式)
- 可靠性
- 安全性
RabbitMQ 是以 AMQP 協議實現的一種中間件產品,也稱為面向消息的中間件,它可以支持多種操作系統,多種編程語言,幾乎可以覆蓋所有主流的企業級技術平台。下面介紹 RabbitMQ 的基本概念:
- Broker:可以理解為消息隊列服務器的實體,是一個中間件應用,負責接收消息生產者的消息,然后將消息發送至消息接收者或者其他的 Broker
-
Exchange:消息交換機,是消息第一個到達的地方,消息通過他指定的路由規則分發到不同的消息隊列中去,有如下幾種類型:
- Direct:完全按照 key 進行投遞,比如,綁定時設置了 Routing Key 為 abc,那么客戶端提交消息,只有設置了 Key 為 abc 的才會被投遞到隊列
- Topic:對於 Key 進行模式匹配后進行投遞,可以使用符號 # 匹配一個或多個詞,符號 * 匹配正好一個詞。比如, abc.# 可以匹配 abc.def.ghi ,而 abc.* 只能匹配 abc.def
- Fanout:不需要任何 Key,采取廣播的模式,一個消息進來時,投遞到與該交換機綁定的所以隊列
- Queue:消息隊列,消息通過發送和路由之后最終到達的地方,到達 Queue 的消息即進入邏輯上等待消費的狀態。每個消息都會被發送到一個或多個隊列。
- Binding:綁定,將 Exchange 和 Queue 按照路由規則綁定起來
- Routing Key:路由關鍵字,Exchange 根據該關鍵字進行消息投遞
- Virtual Host:虛擬主機,他是對 Broker 的虛擬划分,將消費者、生產者和依賴的 AMQP 相關結構進行隔離,一般情況都是為了安全考慮。
- Connection:連接,代表生產者、消費者、Broker 之間進行通信的物理網絡
- Channel:消息通道,用於連接生產者和消費者的邏輯結構,在客戶端的每個連接里,可以建立多個 Channel,每個 Channel 代表一個會話任務,通過 Channel 可以隔離同一個連接的不同交互內容。
- Producer:消息生產者,制造消息並發送消息的程序
- Consumer:消息消費者,接收消息並處理消息的程序
快速入門
我們通過在 Spring Boot 應用中整合 RabbitMQ ,實現一個簡單的發送、接收消息的示例:
-
創建項目
創建一個 Spring Boot 項目,命名為 spring-boot-rabbitmq,並增加 spring-boot-starter-amqp 依賴,pom.xml 文件內容如下:
<?xmlversion="1.0"encoding="UTF-8"?>
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.lixue.bus</groupId>
<artifactId>spring-boot-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-rabbitmq</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.12.RELEASE</version>
<relativePath/><!--lookupparentfromrepository-->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
-
創建配置
在 src/main/resources 目錄中創建 application.yml 配置文件,並增加 RabbitMQ 相關配置,內容如下:
#配置應用名稱
spring:
application:
name:rabbitmq-bus
#配置RabbitMQ信息
rabbitmq:
#配置連接信息
addresses:192.168.2.215:5672
username:lixue
password:liyong
virtual-host:/
#開啟發送確認模式
publisher-confirms:true
-
消息生產者
創建消息生產者 Sender 類,通過注入 AmqpTemplate 接口的實例(org.springframework.amqp.rabbit.core.RabbitTemplate)來實現消息的發送,AmqpTemplate 接口定義了一套針對 AMQP 協議的基礎操作,Spring Boot 會根據配置來注入其具體實現。
package org.lixue.bus;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class Sender{
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context="Hello"+new Date();
//將消息發送到路由Key=send的隊列
amqpTemplate.convertAndSend("send",context);
}
}
-
消息消費者
創建消息消費者 Receiver 類,通過注入 AmqpTemplate 接口的實例(org.springframework.amqp.rabbit.core.RabbitTemplate)來實現消息的接收處理,也可以使用 @RabbitListener 和 @RabbitHandler 注解 來指定消息接收的隊列和消息處理方法,使用注解處理消息如下:
package org.lixue.bus;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues="send")
public class Receiver{
@RabbitHandler
public void process(String message){
System.out.println("receive:"+message);
}
}
如果不使用注解,可以通過 AmqpTemplate 接口的實例來獲取隊列的消息,代碼如下:
package org.lixue.bus;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Receiver{
@Autowired
private AmqpTemplate amqpTemplate;
public void receive(){
String val=(String)amqpTemplate.receiveAndConvert("send");
if(val!=null){
System.out.println("receive:"+message);
}
}
}
-
創建配置類
創建 RabbitMQ 的配置類 RabbitMQConfig,用來配置隊列、交換器、路由等高級信息,代碼如下:
package org.lixue.bus;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig{
/**
*配置隊列相關
*/
@Bean
public Queue newQueue(){
return new Queue("send");
}
@Bean
public Queue newQueue2(){
return new Queue("send2");
}
/**
*配置交換器相關
*/
@Bean
public Exchange newDirectExchange(){
return new DirectExchange("directExchange",true,true);
}
@Bean
public Exchange newTopicExchange(){
return new TopicExchange("topicExchange",true,true);
}
@Bean
public Exchange newFanoutExchange(){
return new FanoutExchange("fanoutExchange",true,true);
}
/**
*配置隊列和交換器綁定
*/
@Bean
public Binding newDirectBinding(){
return BindingBuilder.bind(newQueue()).to(newDirectExchange()).with("send").noargs();
}
@Bean
public Binding newDirectBinding1(){
return BindingBuilder.bind(newQueue2()).to(newDirectExchange()).with("send2").noargs();
}
}
-
測試驗證
創建消息提供者的單元測試,在單元測試中執行消息發送方法,並執行單元測試,如果使用注解的方式接收消息,則不需要做額外處理,Spring Boot 啟動后會接收消息,如下:
package org.lixue.bus;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.*;
@RunWith(SpringRunner.class)
@SpringBootTest
publi cclass SenderTest{
@Autowired
private Sender sender;
@Test
public void send() throws Exception{
for(inti=0;i<1000000;i++){
sender.send(i);
}
}
}
日志輸出如下:
2018-05-04 16:23:39.085 INFO 20176 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.2.215:5672]
2018-05-04 16:23:39.294 INFO 20176 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#232532cf:0/SimpleConnection@7c7f9a81 [delegate=amqp://lixue@192.168.2.215:5672/, localPort= 63144]
2018-05-04 16:23:39.300 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (directExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2018-05-04 16:23:39.300 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (topicExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2018-05-04 16:23:39.301 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (fanoutExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2018-05-04 16:23:39.476 INFO 20176 --- [ main] org.lixue.bus.SenderTest : Started SenderTest in 4.536 seconds (JVM running for 6.026)
receive:Hello Fri May 04 16:23:39 CST 2018
receive:Hello Fri May 04 16:23:39 CST 2018
receive:Hello Fri May 04 16:23:39 CST 2018
receive:Hello Fri May 04 16:23:39 CST 2018
