Spring Boot 整合 RabbitMQ


Spring Boot 整合 RabbitMQ

Spring Boot 提供了 spring-boot-starter-amqp 組件對實現了 AMQP 協議的消息隊列(RabbitMQ)的快速整合。

#1. hello world

提示

我們分發送和接收 2 部分來學習 Spring Boot 和 RabbitMQ 的整合。

  1. 在 pom.xml 中引入 spring-boot-starter-amqp

    rabbitmq-01

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 
    Copied!

    注意

    雖然你在界面上選擇的是 RabbitMQ ,但是本質上引入的是 AMQP ,因為 RabbitMQ 是 AMQP 的一種實現,也是默認實現。

  2. 啟用自動配置

    老規矩,使用 @EnableRabbit 注解標注於配置類上,以表示使用 RabbitMQ 的注解功能。

  3. 配置文件

    配置 RabbitMQ 的連接地址、端口以及賬戶信息:

    spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=root spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=hemiao ## ----------------------------------------------- logging.level.root=INFO logging.level.xxx.yyy.zzz=DEBUG logging.pattern.console=${CONSOLE_LOG_PATTERN:\ %clr(${LOG_LEVEL_PATTERN:%5p}) \ %clr([%15.15t]){faint} \ %clr(%-40.40logger{39}){cyan} \ %clr(:){faint} %m%n\ ${LOG_EXCEPTION_CONVERSION_WORD:%wEx}} 
    Copied!
  4. 編寫消息接收者/消費者的代碼:HelloReceiver.java

    @Slf4j @Component public class HelloReceiver { @RabbitListener(queues = "Q1") public void process(String hello) { log.info("Receiver : {}", hello); } } 
    Copied!
  5. 驗證

    在 RabbitMQ 的管理台頁面上,直接向 Exchange 發送消息,確保 Exchange 會把消息轉到 Q1 隊列,隨后,你會發現你寫的代碼自動觸發執行了。

#2. 創建 Exchange、Queue 和 Binding

提示

類似於 Hibernate/JPA 和 spring-data-elasticsearch 的自動建表建庫功能,spring-boot-starter-amqp 可以幫我們去創建 Exchange、Queue 以及它倆之間的 Binding 關系。但是,這個功能有利有弊,有人喜歡,有人不喜歡。

#創建 Exchange

@Bean public Exchange exchange() { // return new TopicExchange("test-exchange-1"); return new TopicExchange("test-exchange-1", true, false); } 
Copied!

參數說明:

參數 說明
name 字符串值,exchange 的名稱。
durable 布爾值,表示該 exchage 是否持久化。
它決定了當 RabbitMQ 重啟后,你是否還能 “看到” 重啟前創建的 exchange 。
autoDelete 布爾值,表示當該 exchange 沒“人”(queue)用時,是否會被自動刪除。
即,實現邏輯上的臨時交換機。項目啟動時連接到 RabbitMQ ,創建交換機;項目停止時斷開連接,RabbitMQ 自動刪除交換機。

不指定 durable 和 autoDelete 時,默認分別是 true 和 false 。表示持久化、不用自動刪除。

補充,這背后調用的是原生 API 中的 Channel 的 .exchangeDeclare() 方法。

#創建 Queue

@Bean public Queue queue() { // return new Queue("test-queue-1"); return new Queue("test-queue-1", true, false, false); } 
Copied!

參數說明:

參數 說明
name 字符串值,queue 的名稱。
durable 布爾值,表示該 queue 是否持久化。
它決定了當 RabbitMQ 重啟后,你是否還能 “看到” 重啟前創建的 queue 。
另外,需要注意的是,queue 的持久化不等於其中的消息也會被持久化。
exclusive 布爾值,表示該 queue 是否排它式使用。排它式使用意味着僅聲明他的連接可見/可用,其它連接不可見/不可用。
autoDelete 布爾值,表示當該 queue 沒“人”(connection)用時,是否會被自動刪除。
即,實現邏輯上的臨時隊列。項目啟動時連接到 RabbitMQ ,創建隊列;項目停止時斷開連接,RabbitMQ 自動刪除隊列。

不指定 durable、exclusive 和 autoDelete 時,默認為 true 、 false 和 false 。表示持久化、非排它、不用自動刪除。

補充,這背后調用的是原生 API 中的 Channel 的 .queueDeclare() 方法。

#創建 Binding

@Bean public Binding binding(Exchange exchange, Queue queue) { return BindingBuilder .bind(queue).to(exchange).with("*.orange.*") .noargs(); } 
Copied!

#3. 發送消息

spring-rabbit 提供了 RabbitTemplate 來簡化原生 API 的消息發送方法。

(最簡單的情況下),你可以直接要求 Spring 給你注入一個 RabbitTemplate,通過它來發送消息:

@Autowired private RabbitTemplate rabbitTemplate; @Test public void demo() { rabbitTemplate.convertAndSend("queue-demo-1", "hello world"); } 
Copied!

.convertAndSend 方法的第一個參數是 routing-key,第二個參數是你所要發送的消息。

在沒有明確指定 Exchange 的情況下,該消息發送給了 RabbitMQ 的 default-exchange。而 default-exchage 是將 routing-key 視為 queue-name 。

也就是說,上述代碼中的 routing-key 是 queue-demo-1,那么該消息最終是發送給 queue-demo-1 隊列。

提示

.convertAndSend 方法是 .send 方法的包裝/簡化。.send 方法的調用相對比較繁瑣。

#4. 接收/消費消息(PUSH 型)

接收/消費消息的方式有兩種:Push 型和 Pull 型。

Push 型表示由 RabbitMQ Broker 負責將消息推送給消費者。消費者在一開始指定/配置監聽哪個隊列的消息后,就無需考慮其它。當該隊列收到消息后,消費者的指定方法就會被觸發執行。

PUSH 消費的配置非常簡單,對你的消費者類的 “被觸發方法” 標注 @RabbitListener 注解。當然,前提是消費者類要托管給 Spring:

@Component public class Consumer1 { private static final Logger log = LoggerFactory.getLogger(Consumer1.class); @RabbitListener(queues = "queue-demo-1") public void process(String message) { log.info("Consumer 1: {}", message); } } 
Copied!

#5. 對象的支持

Spring Boot 已經完美支持對象的發送和接收,不需要額外的配置。

警告

所傳遞的對象必須要實現 Serializable 接口。

!聲明隊列

@Bean public Queue departmentQueue() { return new Queue("hello"); } 
Copied!
@Autowired private RabbitTemplate rabbitTemplate; @Test public void demo() { rabbitTemplate.convertAndSend("hello", LocalDate.now()); } @Slf4j @Component public class MessageReceiver { @RabbitListener(queues = "hello") public void process(LocalDate date) { log.info("Receiver : {}", date); } } 
Copied!

#6. Topic Exchange

Topic 是 RabbitMQ 中最靈活的一種方式,可以根據 routing_key 自由地綁定不同的隊列。

考慮到環境中殘留的之前的相關信息對測試的影響,如果發現測試代碼的執行結果『莫名其妙』,記得在 RabbitMQ 的 web 管理系統中將相關內容清除干凈,構造一個純凈的測試環境測試。

rabbitmq

首先對 Topic 規則配置:

/* 兩個 Queue */ @Bean("Q1") public Queue queue1() { return new Queue("Q1"); } @Bean("Q2") public Queue queue2() { return new Queue("Q2"); } /* 一個 Exchange */ @Bean public TopicExchange topicExchange() { return new TopicExchange("topic-exchange"); } /* 三個 Binding:關聯 Exchange 和 Queue */ @Bean public Binding binding1(@Qualifier("Q1") Queue queue, TopicExchange topicExchange) { return BindingBuilder .bind(queue).to(topicExchange).with("*.orange.*") .noargs(); } @Bean public Binding binding21(@Qualifier("Q2") Queue queue, TopicExchange topicExchange) { return BindingBuilder .bind(queue).to(topicExchange).with("*.*.rabbit") .noargs(); } @Bean public Binding binding22(@Qualifier("Q2") Queue queue, TopicExchange topicExchange) { return BindingBuilder .bind(queue).to(topicExchange).with("lazy.#") .noargs(); } 
Copied!

即便不編寫消費者,你也可以在 15672 管理台頁面上,直接看到各個 Queue 中有多少條消息。

創建兩個消費者:

@Slf4j @Component public class C1 { @RabbitListener(queues = "Q1") public void process(String message) { log.info("C1: {}", message); } } @Slf4j @Component public class C2 { @RabbitListener(queues = "Q2") public void process(String message) { log.info("C2: {}", message); } } 
Copied!

測試:(這里偷了個懶,沒有去創建發送者類,直接在 Junit 中使用了 AmqpTemplate 發送消息)

@Autowired private AmqpTemplate rabbitTemplate; @Test public void demo1() throws InterruptedException { rabbitTemplate.convertAndSend("testTopic", "hello.orange", "hello orange"); rabbitTemplate.convertAndSend("testTopic", "hello.orange.world", "hello orange world"); rabbitTemplate.convertAndSend("testTopic", "hello.world.rabbit", "hello world rabbit"); rabbitTemplate.convertAndSend("testTopic", "lazy", "lazy"); rabbitTemplate.convertAndSend("testTopic", "lazy.good", "good"); rabbitTemplate.convertAndSend("testTopic", "lazy.good.bye", "goodbye"); Thread.sleep(1000L); } 
Copied!

#7. Fanout Exchange

@Bean("green") public Queue greenQueue() { return new Queue("green"); } @Bean("red") public Queue redQueue() { return new Queue("red"); } @Bean("orange") public Queue orangeQueue() { return new Queue("orange"); } @Bean public FanoutExchange exchange() { return new FanoutExchange("testFanout"); } @Bean public Binding binging1(FanoutExchange exchange, @Qualifier("green") Queue queue) { return BindingBuilder .bind(queue).to(exchange).with("") .noargs(); } @Bean public Binding binging2(FanoutExchange exchange, @Qualifier("red") Queue queue) { return BindingBuilder .bind(red).to(exchange).with("") .noargs(); } @Bean public Binding binging3(FanoutExchange exchange, @Qualifier("orange") Queue queue) { return BindingBuilder .bind(orange).to(exchange).with("") .noargs(); } 
Copied!
@Test public void demo2() throws InterruptedException { rabbitTemplate.convertAndSend("blue", "", "green"); rabbitTemplate.convertAndSend("blue", "", "red"); rabbitTemplate.convertAndSend("blue", "", "orange"); Thread.sleep(1000L); } 
Copied!

Customer-A、Customer-B、Customer-C 都會收到這三條消息,即,控制台會打印出 9 條日志。

#8. 接收/消費消息(PULL 型)

PULL 型消費意味着需要消費者主動從 RabbitMQ Broker 上『取』消息。

PULL 型消費『不依靠』@RabbitListener 注解。而是需要在代碼中手動調用 .receiveAndConvert 方法。

.receiveAndConvert 方法是 .receive 方法的簡化版。

@Test public void demo5() { rabbitTemplate.convertAndSend("queue-demo-1", "hello world"); } @Test public void demo4() { log.info("{}", rabbitTemplate.receiveAndConvert("queue-demo-1")); } 
Copied!

#9. 發送者確認

注意

發送者如何知道自己所發送的消費成功抵達了 RabbitMQ Broker 中的 Exchange 中,乃至成功抵達了 RabbitMQ Broker 中的 Queue 中?

生產者確認

#確認消息已到 Exchange

RabbitMQ 有一個配置屬性 spring.rabbitmq.publisher-confirm-type 控制是否開啟確認功能。該屬性默認值是 NONE ,表示不開啟消息確認。

  • publisher-confirm-type = SIMPLE

    當改屬性的值為 SIMPLE 時,表示支持以簡單(同步阻塞等待)方式獲得確認與否的信息。

    這里會調用 Template#waitForConfirms 方法,不過這個方法有個要求,它必須在 Template#invoke 方法中使用。

    String str = rabbitTemplate.invoke((operations) -> { // 參數 operations 實際上就是 Template 。 operations.convertAndSend("red", "hello world"); log.info("{}", operations.waitForConfirms(1000)); // 阻塞等待 1 秒,以獲得確認信息。 return "over"; // lambda 表達式的值將成為 invoke 方法的返回值。 }); log.info("{}", str); 
    Copied!

    你可以向不存在的 Exchange 發送消息已驗證效果。

  • publisher-confirm-type = CORRELATED

    當改屬性的值為 CORRELATED 時,表示支持以異步回調方式獲得確認與否的信息。

    在之前的代碼中,是 spring-rabbit 幫我們創建 ConnectionFactory,再進一步創建 RabbitTemplate,並注入到我們的代碼中進而被我們使用。

    現在由於需要對 RabbitTemplate 進行設置,因此,我們需要自己創建並設置 RabbitTemplate。(不過,還是需要 spring-rabbit 幫我們創建 Connection Factory,並注入)

    @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // 當 Exchange 收到消息后,這里設置的回調方法會被觸發執行 rabbitTemplate.setConfirmCallback( ... ); return rabbitTemplate; } 
    Copied!

    你可以使用 lamda 表達式來簡化下列匿名實現類。

    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 該方法無論 Exchange 能否收到消息都會執行。 */ @Override public void confirm(CorrelationData correlationData, boolean ack, java.lang.String cause) { if (ack) log.info("消息已發送至 Exchange"); else log.info("消息未能發送到 Exchange。{}", cause); } }); 
    Copied!

#確認消息已到 Message Queue

## 確認消息已發送到隊列(Queue) spring.rabbitmq.publisher-returns=true 
Copied!
@Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // 設置開啟 Mandatory,才能觸發回調函數,無論消息推送結果怎么樣都強制調用回調函數 rabbitTemplate.setMandatory(true); ... // 當消息『走不到』RabbitMQ 的 Queue 時會被觸發執行。 rabbitTemplate.setReturnCallback( ... ); return rabbitTemplate; } 
Copied!

你可以使用 lamda 表達式來簡化下列匿名實現類。

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 該方法在 Queue 無法收到消息時被觸發執行。Queue 能收到消息,反而不會執行。 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("ReturnCallback 消息:{}", message); log.info("ReturnCallback 回應碼:{}", replyCode); log.info("ReturnCallback 回應信息:{}", replyText); log.info("ReturnCallback 交換機:{}", exchange); log.info("ReturnCallback 路由鍵:{}", routingKey); } }); 
Copied!

你可以向不存在的 Exchange 和 Queue 發送消息已驗證效果。

#10. 消費端的確認與拒絕

默認情況下,RabbitMQ 啟用的是消費端自動(auto)回復。即,當消費端收到消息,就會給 RabbitMQ Broker 作出回復,表示已收到。

只有在消費端回復 RabbitMQ Broker 之后,RabbitMQ Broker 才會將該消息從消息隊列中移除。

回復的行為除了有 AUTO 之外,還有 NONE 和 MANUAL 。

NONE 表示不回復,即,RabbitMQ Broker 永遠不可能知道消費者端到底有沒有收到消息。RabbitMQ Broker 發出

MANUAL 則意味着需要在消費者端手動發送回復信息。在消費者回復前,該消息在消費端未回復前在 RabbitMQ Brocker 上一直處於 Unacked 狀態。如果消費者始終都不回復該消息,那么直到消費者與 RabbitMQ 斷開連接之后,這條消息才會重新變為 Ready 狀態。

啟用消費端的確認功能需要打開配置開關:

spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.direct.acknowledge-mode=manual 
Copied!

於此同時,消息消費者的處理方法需要改造成以下形式:

@Component public class Consumer2 { @RabbitListener(queues = "queue-demo-1") public void process(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { ... } } 
Copied!

#確認消息

確認消息使用 channel 的 .basicAck 方法:

channel.basicAck(tag, false); 
Copied!

basicAck 方法需要傳遞兩個參數:

  • deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel(Channel 是比 Connection 更小的單位),RabbitMQ 通過 Channel 向消費者投遞消息時,都會為該消息分配一個唯一性標識:delivery tag 。同一個 Channel 中的消息的 delivery tag 都是唯一且單調遞增的。

  • multiple:是否批量確認。當參數為 false 時,意味着確認單條消息,RabbitMQ 僅從消息隊列中刪除該消息;當參數為 true 時,意味着批量確認,RabbitMQ 會從消息隊列中刪除編號小於等於該消息的所有信息。

#拒絕消息

拒絕消息使用 channel 的 .basicReject 方法:

channel.basicReject(tag, false); 
Copied!

basicReject 方法也需要傳力兩個參數:

  • deliveryTag(唯一標識 ID):同上。

  • requeue(重入標識):標識該消息是否需要 RabbitMQ Broker 重新入隊。(有可能的話,會被該隊列的其它消費者消費)。

另外,拒絕的方法還有 .basicNack,表示批量拒絕。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM