引言
除了上篇文章所講的 ActiveMQ,還有一種流行的開源消息中間件叫 RabbitMQ。和 ActiveMQ 相比,它具有更高的性能。
RabbitMQ 不再基於 JMS 規范,也沒有選擇 Java 作為底層實現語言。 它基於另一種消息通信協議,名為 AMQP,並采用 Erlang 語言作為技術實現。 RabbitMQ 提供了眾多語言客戶端,能夠與 Spring 框架整合,Spring Boot 也提供了對 RabbitMQ 的支持。
RabbitMQ 官網: http://www.rabbitmq.com
啟動 RabbitMQ 服務器
運行 rabbitmq 容器
RabbitMQ 官方已經提供了自己的 Docker 容器,先下載 rabbitmq:3-management 鏡像來啟動 RabbitMQ 容器, 之所以選擇這個鏡像是因為它擁有一個 web 控制台,可以通過瀏覽器來訪問。
docker pull rabbitmq:3-management
RabbitMQ 除了控制台,還提供了 HTTP API 方式,可方便應用程序使用。
下面使用如下 Docker 命令啟動 RabbitMQ
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management
在啟動 RabbitMQ 容器時,它對宿主機暴露了兩個端口號
- 15672: 表示RabbitMQ 控制台端口號,可在瀏覽器中通過控制台來執行 RabbitMQ 的相關操作
- 5672 表示 RabbitMQ 監聽的TCP 端口號,應用程序可以通過該端口號與 RabbitMQ 建立 TCP 連接,並完成后續的異步消息通信
此外,啟動時還有兩個環境變量
- RABBITMQ_DEFAULT_USER : 設置控制台默認用戶名, 默認為 guest
- RABBITMQ_DEFAULT_PASS: 設置控制台默認密碼,默認為 guest
RabbitMQ 控制台
RabbitMQ 容器啟動完畢后,打開瀏覽器,並在地址欄中輸入 http://localhost:15672/
,並且輸入登錄的用戶名和密碼,就可以看到控制台如下所示
在上面管理界面中,包含 6 個功能菜單
- Overview: 用於查看 RabbitMQ 基本信息
- Connections: 用於查看 RabbitMQ 客戶端的連接信息
- Channels: 用於查看 RabbitMQ 的通道
- Exchanges:用於查看 RabbitMQ 的交換機
- Queues: 用於查看 RabbitMQ 的隊列
- Admin: 用於管理 RabbitMQ 的用戶,虛擬主機,策略等數據
Exchange 和 Queue
RabbitMQ 只有 Queue, 沒有 Topic,因為可通過 Exchange 與 Queue 的組合來實現 Topic 所具備的功能。RabbitMQ 的消息模型如下圖所示
在 Exchange 和 Queue 間有一個 Binding 關系,當消息從 Producer 發送到 Exchange 中時,會根據 Binding 來路由消息的去向。
- 如果 Binding 各不相同,那么該消息將路由到其中一個 Queue 中,隨后將被一個 Consumer 所消費,此時實現了 "點對點"的消息通信模型。
- 如果 Binding 完全相同,那么該消息就會路由到每個 Queue 中,隨后將會被每個 Consumer 消費,這樣就實現了 “發布與訂閱” 的消息通信模型
因此可將 Binding 理解為 Exchange 到 Queue 的路由規則,這些規則可通過 RabbitMQ 所提供的客戶端 API 來控制,也可通過 RabbitMQ 提供的控制台來管理。
RabbitMQ 提供了一個默認的 Exchange(AMQP default),在控制台的 Exchange 菜單中就可以看到它,簡單情況下,只需要使用默認的 Exchange 即可,當需要提供發布與訂閱功能時才會使用自定義的 Exchange。
開發服務端和客戶端
下面我們就將 Spring Boot 與 RabbitMQ 進行整合,先開發一個服務端作為消息的消費者,再開發一個客戶端作為消息的生產者,隨后運行客戶端,並查看服務端中接收到的消息。
開發服務端
創建一個名為 rabbitmq-hello-server 的 maven 項目或者 Spring Starter Project, 在 pom.xml 文件中添加下面 Maven 依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.19.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
Spring Boot 框架中已經添加了對 RabbitMQ 的支持,只需要依賴 spring-boot-starter-amqp
就可以啟動 RabbitMQ,此時還需要在 application.properties
配置文件中添加 RabbitMQ 的相關配置項
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
接下來創建 HelloServer 類,封裝服務端代碼
package demo.msa.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class HelloServer {
@RabbitListener(queues = "hello-queue")
public void receive(String message) {
System.out.println(message);
}
}
只需要在 receive()
方法上定義 @RabbitListener
,並且設置 queues
參數來指定消費者需要監聽的的隊列名稱。
最后,編寫一個 Spring Boot 應用程序啟動類來啟動服務器
package demo.msa.rabbitmq;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class RabbitmqHelloServerApplication {
@Bean
public Queue helloQueue() {
return new Queue("hello-queue");
}
public static void main(String[] args) {
SpringApplication.run(RabbitmqHelloServerApplication.class, args);
}
}
在 RabbitMQ 中,必須通過程序來顯式創建隊列。服務端啟動完畢后,將持續監聽 RabbitMQ 的 hello-queue 隊列中即將到來的消息,該消息由客戶端來發送。
開發客戶端
創建一個名為 rabbitmq-hello-client 的 maven 項目或者 Spring Starter Project, pom 中的依賴與服務端一致。客戶端的 application.properties 文件與服務端一致。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.19.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
接下來創建一個名為 HelloClient 的類,將其作為客戶端
package demo.msa.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HelloClient {
@Autowired
private RabbitTemplate rabbitmqTemplate;
public void send(String message) {
rabbitmqTemplate.convertAndSend("hello-queue", message);
}
}
最后編寫 Spring Boot 應用程序啟動類來啟動客戶端
package demo.msa.rabbitmq;
import javax.annotation.PostConstruct;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class RabbitmqHelloClientApplication {
@Autowired
private HelloClient helloClient;
@Bean
public Queue helloQueue() {
return new Queue("hello-queue");
}
@PostConstruct
public void init() {
helloClient.send("hello world!");
}
public static void main(String[] args) {
SpringApplication.run(RabbitmqHelloClientApplication.class, args).close();
}
}
與服務端一樣,此處使用 @Bean
注解的 helloQueue()
方法創建一個名為 hello-queue
的隊列,這樣可以保證當客戶端在服務端之前啟動時,也能創建所需的隊列。而且 RabbitMQ 可以確保不會創建同名的隊列,因此可分別在服務端與客戶端創建同名的隊列。
運行 main 方法可以啟動客戶端應用程序,此時將在服務端看到客戶端發送的消息,也可以在 RabbitMQ 控制台中看到消息隊列當前的狀態。
Java Bean 類型傳輸
上面發送和接收的消息只是 String 類型,如果發送的消息是一個普通的 Java Bean 類型,應該如何調用呢?
Java Bean 類型則必須實現 Serializable
序列化接口才能正常調用,這是因為 RabbitMQ 所傳送的消息是 byte[]
類型,當客戶端發送消息需要進行序列化(也就是講 Java 類型轉換為 byte[] 類型),當服務端接收消息前需要先反序列化,因此發送和接收的消息對象必須實現 JDK 的序列化接口。
除了這種序列化方式外,我們也可以使用 Jackson 來實現,而且 RabbitMQ 已經為我們提供了 jackson 序列化的方式,這種方式更加高效。所需要做的是定義一個 Jackson2JsonMessageConverter
的 Spring Bean。
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
結語
RabbitMQ 的性能非常高效和穩定,也能非常方便的與 Spring Boot 應用程序集成,還擁有非常豐富的官方文檔和控制台,因此選擇 RabbitMQ 作為服務之間的異步消息調用平台,將成為整個微服務架構中的 "消息中心"。
參考
- 《架構探險—輕量級微服務架構》