RabbitMQ在SpringBoot中的使用


SpringBoot應用可以完成自動配置及依賴注入——可以通過Spring直接提供與MQ的連接對象

 

6.1 消息生產者

  • 創建SpringBoot應用,添加依賴

  • 配置application.yml

  

server:
  port: 9001
spring:
  application:
    name: producer
  rabbitmq:
    host: 47.96.11.185
    port: 5672
    virtual-host: host1
    username: ytao
    password: admin123

 

 

發送消息

@Service
public class TestService {

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendMsg(String msg){

        //1. 發送消息到隊列
        amqpTemplate.convertAndSend("queue1",msg);

        //2. 發送消息到交換機(訂閱交換機)
        amqpTemplate.convertAndSend("ex1","",msg);

        //3. 發送消息到交換機(路由交換機)
        amqpTemplate.convertAndSend("ex2","a",msg);
        
    }

}

 

 

6.2 消息消費者

  • 創建項目添加依賴

  • 配置yml

  • 接收消息

@Service
//@RabbitListener(queues = {"queue1","queue2"})
@RabbitListener(queues = "queue1")
public class ReceiveMsgService {

    @RabbitHandler
    public void receiveMsg(String msg){
        System.out.println("接收MSG:"+msg);
    }

    //@RabbitHandler
    //public void receiveMsg(byte[] bs){
    //
    //}

}

 

 

 

二、使用RabbitMQ傳遞對象

RabbitMQ是消息隊列,發送和接收的都是字符串/字節數組類型的消息

2.1 使用序列化對象

  

  要求:

  •   傳遞的對象實現序列化接口

  • 傳遞的對象的包名、類名、屬性名必須一致

 

 

消息提供者

@Service
public class MQService {

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendGoodsToMq(Goods goods){
        //消息隊列可以發送 字符串、字節數組、序列化對象
        amqpTemplate.convertAndSend("","queue1",goods);
    }

}

 

消息消費者

@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {

    @RabbitHandler
    public void receiveMsg(Goods goods){
        System.out.println("Goods---"+goods);
    }

}

2.2 使用序列化字節數組

要求:

  • 傳遞的對象實現序列化接口

  • 傳遞的對象的包名、類名、屬性名必須一致

 

 

消息提供者

@Service
public class MQService {

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendGoodsToMq(Goods goods){
        //消息隊列可以發送 字符串、字節數組、序列化對象
        byte[] bytes = SerializationUtils.serialize(goods);
        amqpTemplate.convertAndSend("","queue1",bytes);
    }

}

 

消息消費者

@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {

    @RabbitHandler
    public void receiveMsg(byte[] bs){
        Goods goods = (Goods) SerializationUtils.deserialize(bs);
        System.out.println("byte[]---"+goods);
    }

}

 

2.3 使用JSON字符串傳遞

要求:對象的屬性名一直

  • 消息提供者

@Service
public class MQService {

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendGoodsToMq(Goods goods) throws JsonProcessingException {
        //消息隊列可以發送 字符串、字節數組、序列化對象
        ObjectMapper objectMapper = new ObjectMapper();
        String msg = objectMapper.writeValueAsString(goods);
        amqpTemplate.convertAndSend("","queue1",msg);
    }

}

 

消息消費者

@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {

    @RabbitHandler
    public void receiveMsg(String msg) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        Goods goods = objectMapper.readValue(msg,Goods.class);
        System.out.println("String---"+msg);
    }
}

 

三、基於Java的交換機與隊列創建

我們使用消息隊列,消息隊列和交換機可以通過管理系統完成創建,也可以在應用程序中通過Java代碼來完成創建

3.1 普通Maven項目交換機及隊列創建

  • 使用Java代碼新建隊列

  • //1.定義隊列 (使用Java代碼在MQ中新建一個隊列)
    //參數1:定義的隊列名稱
    //參數2:隊列中的數據是否持久化(如果選擇了持久化)
    //參數3: 是否排外(當前隊列是否為當前連接私有)
    //參數4:自動刪除(當此隊列的連接數為0時,此隊列會銷毀(無論隊列中是否還有數據))
    //參數5:設置當前隊列的參數
    channel.queueDeclare("queue7",false,false,false,null);

     

  • 新建交換機
  • //定義一個“訂閱交換機”
    channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT);
    //定義一個“路由交換機”
    channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);

     

  • 綁定隊列到交換機
  • //綁定隊列
    //參數1:隊列名稱
    //參數2:目標交換機
    //參數3:如果綁定訂閱交換機參數為"",如果綁定路由交換機則表示設置隊列的key
    channel.queueBind("queue7","ex4","k1");
    channel.queueBind("queue8","ex4","k2");

     

 

3.2 SpringBoot應用中通過配置完成隊列的創建

@Configuration
public class RabbitMQConfiguration {

    //聲明隊列
    @Bean
    public Queue queue9(){
        Queue queue9 = new Queue("queue9");
        //設置隊列屬性
        return queue9;
    }
    @Bean
    public Queue queue10(){
        Queue queue10 = new Queue("queue10");
        //設置隊列屬性
        return queue10;
    }

    //聲明訂閱模式交換機
    @Bean
    public FanoutExchange ex5(){
        return new FanoutExchange("ex5");
    }

    //聲明路由模式交換機
    @Bean
    public DirectExchange ex6(){
        return new DirectExchange("ex6");
    }

    //綁定隊列
    @Bean
    public Binding bindingQueue9(Queue queue9, DirectExchange ex6){
        return BindingBuilder.bind(queue9).to(ex6).with("k1");
    }
    @Bean
    public Binding bindingQueue10(Queue queue10, DirectExchange ex6){
        return BindingBuilder.bind(queue10).to(ex6).with("k2");
    }
}

 

 

四、消息的可靠性

4.1 RabbitMQ事務

當在消息發送過程中添加了事務,處理效率降低幾十倍甚至上百倍

channel.txSelect();  //開啟事務
try{
    channel.basicPublish("ex4", "k1", null, msg.getBytes());
    System.out.println("發送:" + msg);
    channel.txCommit(); //提交事務
}catch (Exception e){
    channel.txRollback(); //事務回滾
}

 

4.2 RabbitMQ消息確認和return機制

 

 

消息確認機制:確認消息提供者是否成功發送消息到交換機

return機制:確認消息是否成功的從交換機分發到隊列

 

 

 

4.2.1 普通Maven項目的消息確認
  • 普通confirm方式

  • //1.發送消息之前開啟消息確認
    channel.confirmSelect();
    
    channel.basicPublish("ex1", "a", null, msg.getBytes());
    
    //2.接收消息確認
    boolean b = channel.waitForConfirms(); 
    
    System.out.println("發送:" +(b?"成功":"失敗"));

     

  • 批量confirm方式
  • //1.發送消息之前開啟消息確認
    channel.confirmSelect();
    
    //2.批量發送消息
    for (int i=0 ; i<10 ; i++){
        channel.basicPublish("ex1", "a", null, msg.getBytes());
    }
    
    //3.接收批量消息確認:發送的所有消息中,如果有一條是失敗的,則所有消息發送直接失敗,拋出IO異常
    boolean b = channel.waitForConfirms(); 

     

  • 異步confirm方式
  • //發送消息之前開啟消息確認
    channel.confirmSelect();
    
    //批量發送消息
    for (int i=0 ; i<10 ; i++){
        channel.basicPublish("ex1", "a", null, msg.getBytes());
    }
    
    //假如發送消息需要10s,waitForConfirms會進入阻塞狀態
    //boolean b = channel.waitForConfirms();
    
    //使用監聽器異步confirm
    channel.addConfirmListener(new ConfirmListener() {
        //參數1: long l  返回消息的表示
        //參數2: boolean b 是否為批量confirm
        public void handleAck(long l, boolean b) throws IOException {
            System.out.println("~~~~~消息成功發送到交換機");
        }
        public void handleNack(long l, boolean b) throws IOException {
            System.out.println("~~~~~消息發送到交換機失敗");
        }
    });

     

  

 

4.2.2 普通Maven項目的return機制
  • 添加return監聽器

  • 發送消息是指定第三個參數為true

  • 由於監聽器監聽是異步處理,所以在消息發送之后不能關閉channel

String msg = "Hello HuangDaoJun!";
Connection connection = ConnectionUtil.getConnection();     //相當於JDBC操作的數據庫連接
Channel channel = connection.createChannel();               //相當於JDBC操作的statement

//return機制:監控交換機是否將消息分發到隊列
channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int i, String s, String s1, String s2,AMQP.BasicProperties basicProperties,byte[] bytes) throws IOException {
        //如果交換機分發消息到隊列失敗,則會執行此方法(用來處理交換機分發消息到隊列失敗的情況)
        System.out.println("*****"+i);  //標識
        System.out.println("*****"+s);  //
        System.out.println("*****"+s1); //交換機名
        System.out.println("*****"+s2); //交換機對應的隊列的key
        System.out.println("*****"+new String(bytes));  //發送的消息
    }
});

//發送消息
//channel.basicPublish("ex2", "c", null, msg.getBytes());
channel.basicPublish("ex2", "c", true, null, msg.getBytes());

 

 

4.3 在SpringBoot應用實現消息確認與return監聽

4.3.1 配置application.yml,開啟消息確認和return監聽
spring:
  rabbitmq:
    publisher-confirm-type: simple  ## 開啟消息確認模式
    publisher-returns: true        ##使用return監聽機制

 

4.3.2 創建confirm和return監聽

@Component
public class MsgConfirmAndReturn implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    Logger logger = LoggerFactory.getLogger(MsgConfirmAndReturn.class);

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        //此方法用於監聽消息確認結果(消息是否發送到交換機)
        if(b){
            logger.info("-------消息成功發送到交換機");
        }else{
            logger.warn("-------消息發送到交換機失敗");
        }
    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        //此方法用於return監聽(當交換機分發消息到隊列失敗時執行)
        logger.warn("~~~~~~~交換機分發消息到隊列失敗");
    }
}

 

 

 

 

 

五、延遲機制

5.1 延遲隊列

  • 延遲隊列——消息進入到隊列之后,延遲指定的時間才能被消費者消費

  • AMQP協議和RabbitMQ隊列本身是不支持延遲隊列功能的,但是可以通過TTL(Time To Live)特性模擬延遲隊列的功能

  • TTL就是消息的存活時間。RabbitMQ可以分別對隊列和消息設置存活時間

 

 

 

  • 在創建隊列的時候可以設置隊列的存活時間,當消息進入到隊列並且在存活時間內沒有消費者消費,則此消息就會從當前隊列被移除;

  • 創建消息隊列沒有設置TTL,但是消息設置了TTL,那么當消息的存活時間結束,也會被移除;

  • 當TTL結束之后,我們可以指定將當前隊列的消息轉存到其他指定的隊列

 

5.2 使用延遲隊列實現訂單支付監控

5.2.1 實現流程圖

 

 

 

 

 

 

5.2.2 創建交換機和隊列

 

 

2.創建消息隊列

 

 

 

 

3.創建死信隊列

 

 

 

4.隊列綁定

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM