Rabbitmq消息接收方通知發送方


一、需求

前兩篇文章,我們分別介紹了消息發送方的確認消息接收方的消息確認,由此可知,消息的發送方只關注消息有木有到達隊列,消息的接收方只關注在什么時候告訴隊列這個條消息可以刪除了,那么如果有那樣的需求,發送方想獲取消息的消費情況,例如想修改消息表中消息的狀態,也就是得想一個辦法,如何在消息到達接收方之后通知發送方。

二、思路

消息發送方在發送消息之后,監聽一個返回消息隊列reply,消息接收方消費完后消息再發送到隊列reply,這樣根據唯一的messageId原來的發送方就能獲取返回的消息了。只不過這個時候發送方和接收方的角色就模糊了,原來的發送方變成了即使發送方又是接收方,原來的接收方同理。

三、實現

Spring為我們提供了一個@SendTo("demo.reply-to")注解,跟@RabbitListener一起使用,demo.reply-to 就是一個返回消息隊列,這個隊列不需要綁定交換器,要返回的消息直接return就可以。

3.1、yml

spring:
    rabbitmq:
        host: 192.168.31.70
        port: 5672
        username: guest
        password: guest
        # 發送確認
        publisher-confirms: true
        # 路由失敗回調
        publisher-returns: true
        template:
            # 必須設置成true 消息路由失敗通知監聽者,false 將消息丟棄
            mandatory: true
        #消費端
        listener:
            simple:
                # 每次從RabbitMQ獲取的消息數量
                prefetch: 1
                default-requeue-rejected: false
                # 每個隊列啟動的消費者數量
                concurrency: 1
                # 每個隊列最大的消費者數量
                max-concurrency: 1
                # 簽收模式為手動簽收-那么需要在代碼中手動ACK
                acknowledge-mode: manual
#消費者消費之后向生產者的反饋
reply:
    queue:
        name: demo.reply-to
    exchange:
        name: demoReplyExchange
sms:
    queue:
        name: demo.sms

3.2、RabbitConfig

/**
 * @author DUCHONG
 * @since 2020-09-02 21:24
 **/
@Configuration
public class RabbitConfig {


    @Value("${sms.queue.name}")
    private String smsQueue;
    @Value("${reply.queue.name}")
    private String replyQueue;
    @Value("${reply.exchange.name}")
    private String replyExchange;


    @Bean
    public Queue smsQueue() {
        return new Queue(smsQueue);
    }
    @Bean
    public Queue replyQueue() {
        return new Queue(replyQueue);
    }

    @Bean
    TopicExchange replyExchange() {
        return new TopicExchange(replyExchange);
    }


    @Bean
    Binding bindingReplyQueue() {
        return BindingBuilder.bind(smsQueue()).to(replyExchange()).with(smsQueue+".#");
    }

}

3.3、消息發送方

/**
 * 生產者
 *
 * @author DUCHONG
 * @since 2020-09-02 21:32
 **/
@RestController
@Slf4j
public class ReplyProviderController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Value("${reply.exchange.name}")
    private String replyExchange;

    @GetMapping("/sendReplyMessage")
    public void sendReplyMessage() {

        String msgId = UUID.randomUUID().toString().replace("-","").toUpperCase();
        JSONObject reply=new JSONObject();
        reply.put("messageId",msgId);
        reply.put("content","this is a reply demo message");
        CorrelationData correlationData=new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(replyExchange,"demo.sms.x",reply.toJSONString(),correlationData);
        log.info("---provider發送消息---{}",reply);
    }

    /**
     * 監聽demo.reply-to隊列,接收consumer的反饋
     * @param message
     * @param channel
     * @param headers
     * @throws IOException
     */
    @RabbitListener(queues ="demo.reply-to")
    @RabbitHandler
    public void handleReplyMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {

        try {

            String msg=new String(message.getBody(), CharEncoding.UTF_8);
            log.info("---provider接收到reply消息----{}",msg);
            //業務邏輯代碼
            //.....
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
        catch (Exception e) {
            log.info("ReplyConsumerController.handleReplyMessage error",e);
        }
    }

}

3.4、消息接收方

/**
 * 消費者
 *
 * @author DUCHONG
 * @since 2020-09-02 21:33
 **/
@Component
@Slf4j
public class ReplyConsumerController {


    /**
     * 郵件發送 ack 之后返回消息到demo.reply-to
     * @param message
     * @param channel
     * @param headers
     * @throws IOException
     */
    @RabbitListener(queues ="demo.sms")
    @RabbitHandler
    @SendTo("demo.reply-to")
    public String handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {

        try {

            String msg=new String(message.getBody(), CharEncoding.UTF_8);
            log.info("---consumer接收到消息----{}",msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            return msg;
        }
        catch (Exception e) {
            log.info("ReplyConsumerController.handleEmailMessage error",e);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

        }
        return null;
    }
}

3.5、運行結果

result


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM