SpringBoot應用操作Rabbitmq(fanout廣播高級操作)


一、廣播模式fanout、不需要指定路由key。

     注:與topic和direct區別是:fanout廣播模式會兩個隊列同時發送相同的消息,並非由交換器轉發到某一個隊列

 

 

二、實戰(廣播模式)

1、引入maven

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>

2、添加配置

spring:
  application:
    #指定應用的名字
    name: rabbit-add
  #配置rabbitmq
  rabbitmq:
  #鏈接主機
    host: 127.0.0.1
  #端口
    port: 5672
  #已經授權的用戶賬號密碼
    username: user
    password: user
  #指定的虛擬主機,默認/,
    virtual-host: my_vhost
# 自定義配置應用於topic交換器
mq:
  config:
   #自定義交換器名稱
     exchange: log.fanout
     queue:
        #自定義隊列名稱
        smsName: fanout.sms
        orderName: fanout.order

 

3、消費者創建,堅挺着不需要指定路由鍵

package com.niu.fanout;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/4/28  7:20 PM
 * @RabbitListener 自定義監聽事件
 * @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
 * value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
 * exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
 * key  指定路由鍵
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.orderName}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT))
)
public class OrderReceiver{

    /**
     * 設置監聽方法
     *
     * @param msg
     * @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:order {}", msg);
    }
}
View Code
package com.niu.fanout;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/4/28  7:20 PM
 * @RabbitListener 自定義監聽事件
 * @QueueBinding 綁定交換器與隊列的關系value 指定隊列exchange指定交換器
 * value= @Queue 指定配置隊列的信息 value隊列名稱 autoDelete是否是臨時隊列
 * exchange= @Exchange 指定交換器 value指定交換器名稱 type交換器類型
 * key  指定路由鍵
 */
@Component
@Slf4j
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.smsName}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT))
)
public class SmsReceiver {

    /**
     * 設置監聽方法
     *  @RabbitHandler 聲明監聽方法是下面的 isDefault屬性是默認false接受的完整對象,true接受body體
     *
     * @param msg
     */
    @RabbitHandler(isDefault = true)
    public void process(String msg) {
        log.info("接受到消息:sms {}", msg);
    }
}
View Code

 

4、生產者創建,發送的時候只需要指定交換器即可,路由鍵默認空字符串

package com.niu.fanout;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/4/29  9:44 AM
 */
@Component
public class Sender {
    /**
     * spring整合的操作類
     * Message 發送的消息對象
     * void send(Message var1) throws AmqpException;
     * <p>
     * var1 路由鍵 Message 發送的消息對象
     * void send(String var1, Message var2) throws AmqpException;
     * <p>
     * var1 指定交換器名稱 var2 路由鍵 Message 發送的消息對象
     * void send(String var1, String var2, Message var3) throws AmqpException;
     *
     * convertAndSend() 方法不需要指定MessageProperties屬性即可發布
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${mq.config.exchange}")
    private String exchange;

    public void send(String msg) {
        //需要指定交換器和路由鍵就可以轉發 注路由鍵為null
        rabbitTemplate.convertAndSend(exchange, "", msg);
    }

}
View Code

 

5、結果展示

 

 

注意ack確認機制,容易產生數據丟失,和產生內存泄漏,消費者進行死循環,配置這兩個屬性進行確認。

1、autoDelete屬性設置為false

@Queue(value = "${mq.config.queue.orderName}", autoDelete = "false"

2、消費者進行死循環問題

 

 

 

 

docker安裝rabbitmq:rabbitMQ安裝docker版 /權限管理命令

簡單應用來這里吧: SpringBoot應用操作Rabbitmq

簡單應用來這里吧: SpringBoot應用操作Rabbitmq(direct高級操作)

簡單應用來這里吧:SpringBoot應用操作Rabbitmq(topic交換器高級操作)   

簡單應用來這里吧:SpringBoot應用操作Rabbitmq(fanout廣播高級操作)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM