一、需求
前兩篇文章,我們分別介紹了消息發送方的確認和消息接收方的消息確認,由此可知,消息的發送方只關注消息有木有到達隊列,消息的接收方只關注在什么時候告訴隊列這個條消息可以刪除了,那么如果有那樣的需求,發送方想獲取消息的消費情況,例如想修改消息表中消息的狀態,也就是得想一個辦法,如何在消息到達接收方之后通知發送方。
二、思路
消息發送方在發送消息之后,監聽一個返回消息隊列reply,消息接收方消費完后消息再發送到隊列reply,這樣根據唯一的messageId原來的發送方就能獲取返回的消息了。只不過這個時候發送方和接收方的角色就模糊了,原來的發送方變成了即使發送方又是接收方,原來的接收方同理。
三、實現
Spring為我們提供了一個@SendTo("demo.reply-to")注解,跟@RabbitListener一起使用,demo.reply-to 就是一個返回消息隊列,這個隊列不需要綁定交換器,要返回的消息直接return就可以。
3.1、yml
spring:
rabbitmq:
host: 192.168.31.70
port: 5672
username: guest
password: guest
# 發送確認
publisher-confirms: true
# 路由失敗回調
publisher-returns: true
template:
# 必須設置成true 消息路由失敗通知監聽者,false 將消息丟棄
mandatory: true
#消費端
listener:
simple:
# 每次從RabbitMQ獲取的消息數量
prefetch: 1
default-requeue-rejected: false
# 每個隊列啟動的消費者數量
concurrency: 1
# 每個隊列最大的消費者數量
max-concurrency: 1
# 簽收模式為手動簽收-那么需要在代碼中手動ACK
acknowledge-mode: manual
#消費者消費之后向生產者的反饋
reply:
queue:
name: demo.reply-to
exchange:
name: demoReplyExchange
sms:
queue:
name: demo.sms
3.2、RabbitConfig
/**
* @author DUCHONG
* @since 2020-09-02 21:24
**/
@Configuration
public class RabbitConfig {
@Value("${sms.queue.name}")
private String smsQueue;
@Value("${reply.queue.name}")
private String replyQueue;
@Value("${reply.exchange.name}")
private String replyExchange;
@Bean
public Queue smsQueue() {
return new Queue(smsQueue);
}
@Bean
public Queue replyQueue() {
return new Queue(replyQueue);
}
@Bean
TopicExchange replyExchange() {
return new TopicExchange(replyExchange);
}
@Bean
Binding bindingReplyQueue() {
return BindingBuilder.bind(smsQueue()).to(replyExchange()).with(smsQueue+".#");
}
}
3.3、消息發送方
/**
* 生產者
*
* @author DUCHONG
* @since 2020-09-02 21:32
**/
@RestController
@Slf4j
public class ReplyProviderController {
@Autowired
RabbitTemplate rabbitTemplate;
@Value("${reply.exchange.name}")
private String replyExchange;
@GetMapping("/sendReplyMessage")
public void sendReplyMessage() {
String msgId = UUID.randomUUID().toString().replace("-","").toUpperCase();
JSONObject reply=new JSONObject();
reply.put("messageId",msgId);
reply.put("content","this is a reply demo message");
CorrelationData correlationData=new CorrelationData(msgId);
rabbitTemplate.convertAndSend(replyExchange,"demo.sms.x",reply.toJSONString(),correlationData);
log.info("---provider發送消息---{}",reply);
}
/**
* 監聽demo.reply-to隊列,接收consumer的反饋
* @param message
* @param channel
* @param headers
* @throws IOException
*/
@RabbitListener(queues ="demo.reply-to")
@RabbitHandler
public void handleReplyMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
try {
String msg=new String(message.getBody(), CharEncoding.UTF_8);
log.info("---provider接收到reply消息----{}",msg);
//業務邏輯代碼
//.....
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
catch (Exception e) {
log.info("ReplyConsumerController.handleReplyMessage error",e);
}
}
}
3.4、消息接收方
/**
* 消費者
*
* @author DUCHONG
* @since 2020-09-02 21:33
**/
@Component
@Slf4j
public class ReplyConsumerController {
/**
* 郵件發送 ack 之后返回消息到demo.reply-to
* @param message
* @param channel
* @param headers
* @throws IOException
*/
@RabbitListener(queues ="demo.sms")
@RabbitHandler
@SendTo("demo.reply-to")
public String handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
try {
String msg=new String(message.getBody(), CharEncoding.UTF_8);
log.info("---consumer接收到消息----{}",msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
return msg;
}
catch (Exception e) {
log.info("ReplyConsumerController.handleEmailMessage error",e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
return null;
}
}
3.5、運行結果

