使用 RabbitMQ 實現異步調用


引言

除了上篇文章所講的 ActiveMQ,還有一種流行的開源消息中間件叫 RabbitMQ。和 ActiveMQ 相比,它具有更高的性能。

RabbitMQ 不再基於 JMS 規范,也沒有選擇 Java 作為底層實現語言。 它基於另一種消息通信協議,名為 AMQP,並采用 Erlang 語言作為技術實現。 RabbitMQ 提供了眾多語言客戶端,能夠與 Spring 框架整合,Spring Boot 也提供了對 RabbitMQ 的支持。

RabbitMQ 官網: http://www.rabbitmq.com

啟動 RabbitMQ 服務器

運行 rabbitmq 容器

RabbitMQ 官方已經提供了自己的 Docker 容器,先下載 rabbitmq:3-management 鏡像來啟動 RabbitMQ 容器, 之所以選擇這個鏡像是因為它擁有一個 web 控制台,可以通過瀏覽器來訪問。

docker pull rabbitmq:3-management

RabbitMQ 除了控制台,還提供了 HTTP API 方式,可方便應用程序使用。

下面使用如下 Docker 命令啟動 RabbitMQ

 docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management

在啟動 RabbitMQ 容器時,它對宿主機暴露了兩個端口號

  • 15672: 表示RabbitMQ 控制台端口號,可在瀏覽器中通過控制台來執行 RabbitMQ 的相關操作
  • 5672 表示 RabbitMQ 監聽的TCP 端口號,應用程序可以通過該端口號與 RabbitMQ 建立 TCP 連接,並完成后續的異步消息通信

此外,啟動時還有兩個環境變量

  • RABBITMQ_DEFAULT_USER : 設置控制台默認用戶名, 默認為 guest
  • RABBITMQ_DEFAULT_PASS: 設置控制台默認密碼,默認為 guest

RabbitMQ 控制台

RabbitMQ 容器啟動完畢后,打開瀏覽器,並在地址欄中輸入 http://localhost:15672/ ,並且輸入登錄的用戶名和密碼,就可以看到控制台如下所示

在上面管理界面中,包含 6 個功能菜單

  • Overview: 用於查看 RabbitMQ 基本信息
  • Connections: 用於查看 RabbitMQ 客戶端的連接信息
  • Channels: 用於查看 RabbitMQ 的通道
  • Exchanges:用於查看 RabbitMQ 的交換機
  • Queues: 用於查看 RabbitMQ 的隊列
  • Admin: 用於管理 RabbitMQ 的用戶,虛擬主機,策略等數據

Exchange 和 Queue

RabbitMQ 只有 Queue, 沒有 Topic,因為可通過 Exchange 與 Queue 的組合來實現 Topic 所具備的功能。RabbitMQ 的消息模型如下圖所示

在 Exchange 和 Queue 間有一個 Binding 關系,當消息從 Producer 發送到 Exchange 中時,會根據 Binding 來路由消息的去向。

  • 如果 Binding 各不相同,那么該消息將路由到其中一個 Queue 中,隨后將被一個 Consumer 所消費,此時實現了 "點對點"的消息通信模型。
  • 如果 Binding 完全相同,那么該消息就會路由到每個 Queue 中,隨后將會被每個 Consumer 消費,這樣就實現了 “發布與訂閱” 的消息通信模型

因此可將 Binding 理解為 Exchange 到 Queue 的路由規則,這些規則可通過 RabbitMQ 所提供的客戶端 API 來控制,也可通過 RabbitMQ 提供的控制台來管理。

RabbitMQ 提供了一個默認的 Exchange(AMQP default),在控制台的 Exchange 菜單中就可以看到它,簡單情況下,只需要使用默認的 Exchange 即可,當需要提供發布與訂閱功能時才會使用自定義的 Exchange。

開發服務端和客戶端

下面我們就將 Spring Boot 與 RabbitMQ 進行整合,先開發一個服務端作為消息的消費者,再開發一個客戶端作為消息的生產者,隨后運行客戶端,並查看服務端中接收到的消息。

開發服務端

創建一個名為 rabbitmq-hello-server 的 maven 項目或者 Spring Starter Project, 在 pom.xml 文件中添加下面 Maven 依賴

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.19.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
	</dependencies>

Spring Boot 框架中已經添加了對 RabbitMQ 的支持,只需要依賴 spring-boot-starter-amqp 就可以啟動 RabbitMQ,此時還需要在 application.properties 配置文件中添加 RabbitMQ 的相關配置項

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

接下來創建 HelloServer 類,封裝服務端代碼

package demo.msa.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class HelloServer {

    @RabbitListener(queues = "hello-queue")
    public void receive(String message) {
        System.out.println(message);
    }
}

只需要在 receive() 方法上定義 @RabbitListener ,並且設置 queues 參數來指定消費者需要監聽的的隊列名稱。

最后,編寫一個 Spring Boot 應用程序啟動類來啟動服務器

package demo.msa.rabbitmq;

import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqHelloServerApplication {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello-queue");
    }
    
	public static void main(String[] args) {
		SpringApplication.run(RabbitmqHelloServerApplication.class, args);
	}

}

在 RabbitMQ 中,必須通過程序來顯式創建隊列。服務端啟動完畢后,將持續監聽 RabbitMQ 的 hello-queue 隊列中即將到來的消息,該消息由客戶端來發送。

開發客戶端

創建一個名為 rabbitmq-hello-client 的 maven 項目或者 Spring Starter Project, pom 中的依賴與服務端一致。客戶端的 application.properties 文件與服務端一致。

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.19.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
	</dependencies>

接下來創建一個名為 HelloClient 的類,將其作為客戶端

package demo.msa.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HelloClient {
    
    @Autowired
    private RabbitTemplate rabbitmqTemplate;
    
    public void send(String message) {
        rabbitmqTemplate.convertAndSend("hello-queue", message);
    }
}

最后編寫 Spring Boot 應用程序啟動類來啟動客戶端

package demo.msa.rabbitmq;

import javax.annotation.PostConstruct;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqHelloClientApplication {
    
    @Autowired
    private HelloClient helloClient;
    
    @Bean
    public Queue helloQueue() {
        return new Queue("hello-queue");
    }
    
    @PostConstruct
    public void init() {
        helloClient.send("hello world!");
    }
    
	public static void main(String[] args) {
		SpringApplication.run(RabbitmqHelloClientApplication.class, args).close();
	}

}

與服務端一樣,此處使用 @Bean 注解的 helloQueue() 方法創建一個名為 hello-queue 的隊列,這樣可以保證當客戶端在服務端之前啟動時,也能創建所需的隊列。而且 RabbitMQ 可以確保不會創建同名的隊列,因此可分別在服務端與客戶端創建同名的隊列。

運行 main 方法可以啟動客戶端應用程序,此時將在服務端看到客戶端發送的消息,也可以在 RabbitMQ 控制台中看到消息隊列當前的狀態。

Java Bean 類型傳輸

上面發送和接收的消息只是 String 類型,如果發送的消息是一個普通的 Java Bean 類型,應該如何調用呢?

Java Bean 類型則必須實現 Serializable 序列化接口才能正常調用,這是因為 RabbitMQ 所傳送的消息是 byte[] 類型,當客戶端發送消息需要進行序列化(也就是講 Java 類型轉換為 byte[] 類型),當服務端接收消息前需要先反序列化,因此發送和接收的消息對象必須實現 JDK 的序列化接口。

除了這種序列化方式外,我們也可以使用 Jackson 來實現,而且 RabbitMQ 已經為我們提供了 jackson 序列化的方式,這種方式更加高效。所需要做的是定義一個 Jackson2JsonMessageConverter 的 Spring Bean。

    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

結語

RabbitMQ 的性能非常高效和穩定,也能非常方便的與 Spring Boot 應用程序集成,還擁有非常豐富的官方文檔和控制台,因此選擇 RabbitMQ 作為服務之間的異步消息調用平台,將成為整個微服務架構中的 "消息中心"。

參考

  • 《架構探險—輕量級微服務架構》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM