原文:https://blog.csdn.net/qq_38439885/article/details/88982373
進入正題,本文會介紹兩種實現rabbitmq的ack模式的方法,分別為:
一、通過配置文件配置。
二、通過手動注冊 SimpleMessageListenerContainer容器實現。
先介紹方法一:
通過配置文件配置。
此類實現起來較為方便,通過springboot的配置文件以及注解的形式即可完成。
1.首先引入依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.編寫配置文件
# rabbitmq基本配置
spring.rabbitmq.host=***
spring.rabbitmq.port=5672
spring.rabbitmq.username=***
spring.rabbitmq.password=***
spring.rabbitmq.virtual-host=/
# 開啟發送確認
spring.rabbitmq.publisher-confirms=true
# 開啟發送失敗退回
spring.rabbitmq.publisher-returns=true
# 全局開啟ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
在配置文件中使用
spring.rabbitmq.listener.simple.acknowledge-mode
來配置ack模式,這個配置有三種配置方式,分別為NONE、MANUAL、AUTO。
I:NONE:默認為NONE,也就是自動ack模式,在消費者接受到消息后無需手動ack,消費者會自動將消息ack掉。
II:MANUAL:即為手動ack模式,消費者在接收到消息后需要手動ack消息,不然消息將一直處於uncheck狀態,在應用下次啟動的時候會再次對消息進行消費。使用該配置需要注意的是,配置開啟后即項目全局開啟手動ack模式,所有的消費者都需要在消費信息后手動ack消息,否則在重啟應用的時候將會有大量的消息無法被消費掉而重復消費。
III:AUTO:自動確認ack 如果此時消費者拋出異常,不同的異常會有不同的處理方式。
3.編寫MQConfig的代碼,實現相應的queue和exchange的注冊以及綁定。
/**
* ACK 測試
*/
public static final String ACK_QUEUE_A = "ack.test.queue.A";
public static final String ACK_QUEUE_B = "ack.test.queue.B";
public static final String ACK_EXCHANGE = "ack.test.exchange";
/**
* ACK TEST
*/
@Bean
public Queue ackQueueA() {
return new Queue(ACK_QUEUE_A);
}
@Bean
public Queue ackQueueB() {
return new Queue(ACK_QUEUE_B);
}
@Bean
public FanoutExchange ackFanoutExchange() {
return new FanoutExchange(ACK_EXCHANGE);
}
@Bean
public Binding ackBindingA() {
return BindingBuilder.bind(ackQueueA()).to(ackFanoutExchange());
}
@Bean
public Binding ackBindingB() {
return BindingBuilder.bind(ackQueueB()).to(ackFanoutExchange());
}
上方代碼中做了三件事:
I.注冊了兩個queue,分別為ackQueueA以及ackQueueB。
II.注冊了一個fanout類型的exchange。
III.將兩個queue和其綁定。
4. 生產者代碼編寫
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author hsw
* @since 9:26 2019/4/2
*/
@Slf4j
@Service
public class MQAckSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void ackMQSender(String msg) {
log.info("send ack message :" + msg);
// 生產者發送消息到exchange后沒有綁定的queue時將消息退回
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("ackMQSender 發送消息被退回" + exchange + routingKey);
});
// 生產者發送消息confirm檢測
this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.info("ackMQSender 消息發送失敗" + cause + correlationData.toString());
} else {
log.info("ackMQSender 消息發送成功 ");
}
});
this.rabbitTemplate.convertAndSend(MQConfig.ACK_EXCHANGE, "", msg);
}
}
這里使用了RabbitTemplate而沒有使用AmqpTemplate,可以將RabbitTemplate看作一個實現了AmqpTemplate的工具類,其中定義了更多方法供開發者使用。
在第一步的配置文件中定義了MANUAL的ack模式的同時,也配置了發送確認以及發送失敗退回,所以在上述生產者代碼中,分別配置了這兩項。具體回調時間見注釋。
5.消費者代碼編寫
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @author hsw
* @since 9:39 2019/4/2
*/
@Slf4j
@Service
public class MQAckReceive {
@RabbitListener(queues = MQConfig.ACK_QUEUE_A)
public void process(String msg, Channel channel, Message message) throws IOException {
log.info("ACK_QUEUE_A 收到 : " + msg);
try {
// 框架容器,是否開啟手動ack按照框架配置
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("ACK_QUEUE_A 接受信息成功");
} catch (Exception e) {
e.printStackTrace();
//丟棄這條消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.info("ACK_QUEUE_A 接受信息異常");
}
}
@RabbitListener(queues = MQConfig.ACK_QUEUE_B)
public void process2(String msg, Channel channel, Message message) throws IOException {
log.info("ACK_QUEUE_B 收到 : " + msg);
try {
//告訴服務器收到這條消息 已經被我消費了 可以在隊列刪掉 這樣以后就不會再發了 否則消息服務器以為這條消息沒處理掉 重啟應用后還會在發
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("ACK_QUEUE_B 接受信息成功");
} catch (Exception e) {
e.printStackTrace();
//丟棄這條消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.info("ACK_QUEUE_B 接受信息異常");
}
}
}
上述代碼定義了兩個消費者,即為之前定義的ackQueueA以及ackQueueB的消費者。
與默認ack模式的消費者不同的是,在消費者消費信息的時候,需要手動ack掉信息,即為上述代碼中的:
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
該方法有兩個參數,分別為long類型和boolean類型:
/**
* Acknowledge one or several received
* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being acknowledged.
* @see com.rabbitmq.client.AMQP.Basic.Ack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to acknowledge all messages up to and
* including the supplied delivery tag; false to acknowledge just
* the supplied delivery tag.
* @throws java.io.IOException if an error is encountered
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;
第一個deliveryTag參數為每條信息帶有的tag值,第二個multiple參數為布爾類型,為true時會將小於等於此次tag的所有消息都確認掉,如果為false則只確認當前tag的信息,可根據實際情況進行選擇。
再看下另外兩個拒絕消息的函數:
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
第一個方法 basicNack 有三個參數,分別為long類型、boolean類型和boolean類型:
/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
前兩個參數和接受方法 basicAck 的參數相似,第一個deliveryTag參數為每條信息帶有的tag值,第二個multiple參數為true時會將小於等於此次tag的所有消息都拒絕掉,如果為false則只拒絕當前tag的信息,可根據實際情況進行選擇。
第三個參數為requeue,為true的時候會將消息重新發送到當前隊列。可根據具體業務需求中不同的異常捕捉實現不同的拒絕方式。
第二個方法 basicReject 和 basicAck 方法類似,但是只能拒絕/重發當前tag的信息。
6.項目測試
@GetMapping("/ack")
public void springAck() {
try {
mqAckSender.ackMQSender("this is a ack msg");
} catch (Exception e) {
e.printStackTrace();
}
}
調用接口后返回:
2019-04-03 10:18:07.018 INFO 7352 --- [nio-8081-exec-3] c.h.a.rabbitmq.amqp.MQAckSender : send ack message :this is a ack msg
2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-9] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_B 收到 : this is a ack msg
2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-9] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_B 接受信息成功
2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-1] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_A 收到 : this is a ack msg
2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-1] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_A 接受信息成功
2019-04-03 10:18:07.035 INFO 7352 --- [2.20.4.100:5672] c.h.a.rabbitmq.amqp.MQAckSender : ackMQSender 消息發送成功
若在queueA消費者ack消息前打上斷點,可在rabbitmq管理后台看到:
第一種方式的手動ack模式開啟成功!
接下來介紹方法二:
通過手動注冊 SimpleMessageListenerContainer容器實現。
方法一通過注解方式開啟ack模式固然方便,但通過注解方式開啟后,項目全局的ack模式都將被修改,那怎么樣做到只修改單個消費者的ack模式呢?這里就需要手動注冊相應容器來修改ack模式。話不多說,先上代碼。
MQConfig和MQSender端代碼不變。
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(ACK_QUEUE_A);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
log.info(ACK_QUEUE_A + "get msg:" +new String(message.getBody()));
if(message.getMessageProperties().getHeaders().get("error") == null){
// 消息手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息確認");
}else {
// 消息重新回到隊列
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
// 拒絕消息(刪除)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息拒絕");
}
});
return container;
}
與第一種方法的不同點:
1、配置文件配置的ack模式不會影響。
2、消費者需要配置在setMessageListener中。
上述代碼中,手動注冊了一個SimpleMessageListenerContainer容器,並將對應的queueName、需要修改的ack模式以及消費者收到消息后的處理一並注入到spring中。
由於是手動注冊容器,不受到配置文件的影響,所以可以實現對單個queue的ack模式修改。
需要注意的是,如果消費者依舊使用@RabbitListener注解進行消費信息,手動注冊容器中修改的ack模式是無效的。
---------------------
作者:hhsway
來源:CSDN
原文:https://blog.csdn.net/qq_38439885/article/details/88982373
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!