一、廣播模式fanout、不需要指定路由key。
注:與topic和direct區別是:fanout廣播模式會兩個隊列同時發送相同的消息,並非由交換器轉發到某一個隊列
二、實戰(廣播模式)
1、引入maven
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency>
2、添加配置
spring:
application:
#指定應用的名字
name: rabbit-add
#配置rabbitmq
rabbitmq:
#鏈接主機
host: 127.0.0.1
#端口
port: 5672
#已經授權的用戶賬號密碼
username: user
password: user
#指定的虛擬主機,默認/,
virtual-host: my_vhost
# 自定義配置應用於topic交換器
mq:
config:
#自定義交換器名稱
exchange: log.fanout
queue:
#自定義隊列名稱
smsName: fanout.sms
orderName: fanout.order
3、消費者創建,堅挺着不需要指定路由鍵

package com.niu.fanout; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/4/28 7:20 PM * @RabbitListener 自定義監聽事件 * @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器 * value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列 * exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型 * key 指定路由鍵 */ @Component @Slf4j @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.orderName}", autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)) ) public class OrderReceiver{ /** * 設置監聽方法 * * @param msg * @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體 */ @RabbitHandler(isDefault = true) public void process(String msg) { log.info("接受到消息:order {}", msg); } }

package com.niu.fanout; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/4/28 7:20 PM * @RabbitListener 自定義監聽事件 * @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器 * value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列 * exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型 * key 指定路由鍵 */ @Component @Slf4j @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.smsName}", autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)) ) public class SmsReceiver { /** * 設置監聽方法 * @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體 * * @param msg */ @RabbitHandler(isDefault = true) public void process(String msg) { log.info("接受到消息:sms {}", msg); } }
4、生產者創建,發送的時候只需要指定交換器即可,路由鍵默認空字符串

package com.niu.fanout; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/4/29 9:44 AM */ @Component public class Sender { /** * spring整合的操作類 * Message 發送的消息對象 * void send(Message var1) throws AmqpException; * <p> * var1 路由鍵 Message 發送的消息對象 * void send(String var1, Message var2) throws AmqpException; * <p> * var1 指定交換器名稱 var2 路由鍵 Message 發送的消息對象 * void send(String var1, String var2, Message var3) throws AmqpException; * * convertAndSend() 方法不需要指定MessageProperties屬性即可發布 */ @Autowired private RabbitTemplate rabbitTemplate; @Value("${mq.config.exchange}") private String exchange; public void send(String msg) { //需要指定交換器和路由鍵就可以轉發 注路由鍵為null rabbitTemplate.convertAndSend(exchange, "", msg); } }
5、結果展示
注意ack確認機制,容易產生數據丟失,和產生內存泄漏,消費者進行死循環,配置這兩個屬性進行確認。
1、autoDelete屬性設置為false
@Queue(value = "${mq.config.queue.orderName}", autoDelete = "false"
2、消費者進行死循環問題
docker安裝rabbitmq:rabbitMQ安裝docker版 /權限管理命令
簡單應用來這里吧: SpringBoot應用操作Rabbitmq
簡單應用來這里吧: SpringBoot應用操作Rabbitmq(direct高級操作)
簡單應用來這里吧:SpringBoot應用操作Rabbitmq(topic交換器高級操作)
簡單應用來這里吧:SpringBoot應用操作Rabbitmq(fanout廣播高級操作)