SpringBoot集成rabbitmq(二)


前言

         在使用rabbitmq時,我們可以通過消息持久化來解決服務器因異常崩潰而造成的消息丟失。除此之外,我們還會遇到一個問題,當消息生產者發消息發送出去后,消息到底有沒有正確到達服務器呢?如果不進行特殊配置,默認情況下發送的消息是不會給生產者返回任何響應的,也就是默認情況下生產者並不知道消息是否正常到達了服務器。對於數據必達的需求,你肯定對消息的來龍去脈都有個了接,這種情況下就需要用到rabbitmq消息確認。

 

消息確認

      rabbitmq消息確認分為生產者確認和消費者確認。

      生產者消費確認提供了兩種機制:

  •       通過事務機制實現
  •       通過confirm機制實現

     事務機制則用到channel.txSelect、channel.txCommit、channel.txRollback。可以參考下面AMQP協議流轉過程(參考Rabbitmq實戰指南)

     事務機制在一條消息發送之后會阻塞發送端,以等待rabbitmq回應,之后才繼續發送下一條消息。所以相對來說事務機制的性能要差一些。事務機制會降低rabbitmq的吞吐量,所以又引入了另一種輕量級的方式:confirm機制。

     生產者通過調用channel.confirmSelect將信道設置為confirm模式,之后Rabbitmq會返回Confirm.Select-Ok命令表示同意生產者將當前信道設置為confirm模式。所有被發送的后續消息都被ack或nack一次。類似如下代碼:

     channel.confirmSelect()

     channel.basicPublish("exchange","routingkey",null,"test".getBytes())

     confirm機制流轉過程參考下圖(參考Rabbitmq實戰指南)

   消費者確認

        消費者在訂閱消息隊列時指定autoAck參數。當參數設置為false時rabbitmq會等待消費者顯式回復確認信號才會從內存或者磁盤種刪除這條消息。參數默認為true。當autoAck設置為false時,對於rabbitmq服務器而言,隊列中的消息分成了兩部分:一部分是等待投遞給消費者的消息、一部分是已經投遞給消費者的消息但是還沒有收到確認信號的消息。可通過RabbitMQ Web平台查看隊列中Ready和UnAck對應的數量。

       消費者消息確認涉及到3個方法:channel.basicAck、channel.basicNack、channel.basicReject

             

SpringBoot集成rabbitmq下實現消息確認

       springboot集成rabbitmq實現消息確認主要涉及兩個回調方法(ReturnCallback、ConfirmCallback)。這里消費者部分我用兩種方式來實現。一種是基於SimpleMessageListenerContainer。 另一種就是用RabbitListener注解實現。

1、application.yml

spring:
  rabbitmq:
    host: 192.168.80.128
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 10
        retry:
          enabled: true

 

2、配置文件(這里實現ReturnCallback、ConfirmCallback)

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;


@Configuration
public class MqConfig {

    private Logger logger= LoggerFactory.getLogger(MqConfig.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    ConnectionFactory connectionFactory;

    @Bean
    public Queue queue(){
        return new Queue("testMq",true); //持久化隊列(默認值也是true)
    }

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("testMq",true,false);
    }

    @Bean
    Binding binding(Queue queue,DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("testMq");
    }

    /**
     * i->replyCode
     * s->replyText
     * s1->exchange
     * s2->routingKey
     * **/
    //消息從交換器發送到隊列失敗時觸發
    RabbitTemplate.ReturnCallback msgReturnCallback=new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {

            logger.info("消息:{},錯誤碼:{},失敗原因:{},交換器:{},路由key:{}",message.getMessageProperties().getCorrelationId(),i,s,s1,s2);
        }
    };

    //消息發送到交換器時觸發
    RabbitTemplate.ConfirmCallback msgConfirmCallback=new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(@Nullable CorrelationData correlationData, boolean b, @Nullable String s) {
            if(b){
                logger.info("消息{}發送exchange成功",correlationData.getId());
            }else{
                logger.info("消息發送到exchange失敗,原因:{}",s);
            }
        }
    };

    /***
     * 消費者確認(方式二)
     * **/
    @Bean
    public SimpleMessageListenerContainer listenerContainer(){
        SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("testMq");
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(10);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                try{
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                    logger.info("接收消息:{}",new String(message.getBody()));
                }catch (Exception ex){

                    //channel.basicReject
                    //channel.basicNack

                }

            }
        });

        return container;
    }


    /**
     * 生產者的回調都在這里
     * **/
    @Autowired
    public RabbitTemplate rabbitTemplate(){
        //消息發送失敗后返回到隊列中
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setReturnCallback(msgReturnCallback);
        rabbitTemplate.setConfirmCallback(msgConfirmCallback);

        return rabbitTemplate;
    }


}

 另一種消費端實現方式

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class MqConsumer {

    private Logger logger= LoggerFactory.getLogger(MqConsumer.class);
    @RabbitListener(queues = "testMq")
    public void handler(Message message,Channel channel){
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            logger.info("接收消息:{}",new String(message.getBody()));
        } catch (IOException e) {


            e.printStackTrace();
        }
    }
}

 

3、消息生產者

     消息發送時注意生成一個消息id。一開始沒用到這個參數,在消息接收時消費者會拋空指針異常

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.UUID;


@Controller
@RequestMapping("/rabbitMq")
public class MqController {

    private Logger logger= LoggerFactory.getLogger(MqController.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMq")
    @ResponseBody
    public String sendMq(){

        /**
         * 這里exchange、routingkey都叫testMq
         * **/
        Object message=null;
        for(int i=0;i<10;i++){
            logger.info("生產者:第{}條消息",i);
            CorrelationData correlationId=new CorrelationData(UUID.randomUUID().toString());
            message="第"+i+"條消息";
            rabbitTemplate.convertAndSend("testMq","testMq",message,correlationId);
        }

        return "sending...";

    }



}

  

    從運行截圖中可以看到生產者和消費者都收到對應的回調消息。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM