消息的簽收機制說明
消息消費成功后,我們在客戶端簽收后,消息就從MQ服務器里面刪除了若消息沒有消費成功,我們讓他回到MQ里面,讓別人再次重試消費。
自動簽收
消息只要被客戶端接收到,無論你客戶端發生了什么,我們服務器都不管你了,直接把消息刪除了,這是它是默認的行為。
手動簽收
創建項目 springboot-rabbitmq,創建方式和之前的方式一樣依賴也是。
修改application.yml配置文件:
server:
port: 8080
spring:
application:
name: Springboot-RabbitMQ
rabbitmq:
username: user
password: 123456
host: 139.196.183.130
port: 5672
virtual-host: v-it6666
# NONE 值是禁用發布確認模式,是默認值
# CORRELATED 值是發布消息成功到交換機后會觸發回調方法
publisher-confirm-type: correlated
# 這個是老版本的用法
# publisher-confirms: true
# 消息由交換機到達隊列失敗時觸發
publisher-returns: true
listener:
simple:
# 自動簽收,這個是默認行為
# acknowledge-mode: auto
# 手動簽收
acknowledge-mode: manual
direct:
# 設置直連交換機的簽收類型
acknowledge-mode: manual
消息投遞的 ID 說明
獲取投遞的 ID
/**
* @author BNTang
*/
@Component
public class MessageReceive {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
key = {"error"},
exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT)
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
// 消息投遞ID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// messageId 就是消息的唯一的標識,自己定義
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消費者收到消息 → 消息對象:" + message);
System.out.println("消費者收到消息 → 內容為:" + content);
System.out.println("消費者收到消息 → 信道:" + channel);
System.out.println("消息投遞ID → :" + deliveryTag);
System.out.println("消息自定義ID → :" + messageId);
channel.basicAck(deliveryTag, false);
}
}
basicAck方法參數的解釋如下:
- deliveryTag:消息投遞ID,要簽收的投遞ID。
- multiple:是否批量簽收。
投遞 ID 存在的問題及消息永久 ID 設置的問題
什么能代表消息的唯一的標識,顯然投送的 ID 不行,因為一個消息可能會有多個投送的 ID,我們就需要給消息一個唯一的值,這個伴隨消息終身,不會變化!我們需要發送消息時,給消息設置一個 ID,然后保證該 ID 唯一就可以了,如下所示!
@Test
void sendMsg() throws IOException {
for (int i = 0; i < 5; i++) {
this.rabbitTemplate.convertAndSend("directs", "error", "我是一個測試消息" + i,
message -> {
String messageId = UUID.randomUUID().toString().replace("-", "");
// 自己給消息設置自定義的ID
message.getMessageProperties().setMessageId(messageId);
return message;
});
System.out.println("消息發送成功");
System.in.read();
}
}
關於批量簽收消息
若我們此時簽收了編號為4的消息,但是前面的0,1,2,3 都沒有簽收,則MQ若是批量的簽收,它會把0,1,2,3 都簽收,因為MQ認為,比他晚投遞的已經簽收,前面的肯定已經消費成功了。
生產者
static int a = 1;
@Test
public void sendMessage() throws IOException {
for (int i = 0; i <= 3; i++) {
this.rabbitTemplate.convertAndSend("directs", "error", "ABC - " + i, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 自己給消息設置自定義的ID
message.getMessageProperties().setMessageId((a++) + "");
return message;
}
});
}
System.out.println("消息發送成功");
System.in.read();
}
消費者
/**
* @author BNTang
*/
@Component
public class MessageReceive {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("queue"),
key = {"error"},
exchange = @Exchange(value = "directs")
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息投遞ID → :" + deliveryTag);
System.out.println("消息自定義ID → :" + messageId);
if (content.equals("ABC - 3")) {
channel.basicAck(deliveryTag, true);
System.out.println("消息簽收成功 → 內容為:" + content);
}
}
}
可以發現只簽收了ABC - 3 但是隊列里面沒有消息了,說明前面的12都被批量簽收了。
不簽收
當我們認為消息不合格時,或不是我們要的消息時,我們可以選擇不簽收它。
生產者
@Test
public void sendMessage() throws IOException {
this.rabbitTemplate.convertAndSend("directs", "error", "1234567", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
String messageId = UUID.randomUUID().toString().replace("-", "");
// 自己給消息設置自定義的ID
message.getMessageProperties().setMessageId(messageId);
return message;
}
});
System.out.println("消息發送成功");
System.in.read();
}
消費者
/**
* @author BNTang
*/
@Component
public class MessageReceive {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("queue"),
key = {"error"},
exchange = @Exchange(value = "directs")
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息投遞ID → :" + deliveryTag);
System.out.println("消息自定義ID → :" + messageId);
if (content.equals("1234567")) {
channel.basicAck(deliveryTag, true);
System.out.println("消息簽收成功");
} else {
// 如果不是 1234567 就決絕簽收
channel.basicNack(deliveryTag, false, true);
System.out.println("消息被決絕簽收");
}
}
}
如上的代碼測試方式你先發送一個消息,消息內容為 1234567 這是正常的情況,然后在發送一個 123456 就會發現效果,消息消費死循環了。
我們選擇不簽收,其實是為了保護消息,當消費消息發生異常時,我們可以把消息放在隊列里面,讓它重新投遞,重新讓別人消費!而不是丟了它!
解決不簽收消息的死循環
不簽收,並且讓它回到隊列里面,想法很好,但是很容易造成死循環,因為沒有任何人能消費她! 我們設計一個機制,當一個消息被消費3次還沒有消費成功,我們就直接把它記錄下來,人工處理! 消息消費3次(消息的標識,消息的計數)我們引入Redis,使用Redis計數,若超過3次,直接拒絕消息,並且不回到隊列里面。
引入 Redis 依賴,並使用 Docker 運行 Redis,Redis 依賴如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Docker 運行 Redis 命令腳本如下所示,當然也可以使用本地的Redis圖方便,我這里是有機子我就用我的機子了:
docker run -d --name myredis -p 6390:6379 redis --requirepass "1234"
修改消費者的配置文件
server:
port: 8002
spring:
application:
name: consumer
rabbitmq:
host: 139.196.183.130
port: 5672
username: user
password: 123456
virtual-host: v-it6666
# Redis的配置
redis:
host: 139.196.183.130
port: 6390
password: 1234
改造消費者,改造之后的代碼如下:
/**
* @author BNTang
*/
@Component
public class MessageReceive {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 消息的前綴
*/
private String MESSAGE = "message:";
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("queue"),
key = {"error"},
exchange = @Exchange(value = "directs")
)
})
public void receiveMessage(String content, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息投遞ID → :" + deliveryTag);
System.out.println("消息自定義ID → :" + messageId);
if (content.equals("1234567")) {
channel.basicAck(deliveryTag, true);
System.out.println("消息簽收成功");
} else {
String count = this.redisTemplate.opsForValue().get(MESSAGE + messageId);
if (count != null && Long.valueOf(count) >= 3) {
channel.basicNack(deliveryTag, false, false);
System.out.println("該消息消費【3】次都失敗,我們記錄它,人工處理" + content);
} else {
// 如果不是 1234567 就決絕簽收
// 處理業務邏輯【可能邏輯處理的出現了問題啥的】
channel.basicNack(deliveryTag, false, true);
System.out.println("消息被決絕簽收");
// 因為拒絕了,我們把消息ID放到Redis里面
this.redisTemplate.opsForValue().increment(MESSAGE + messageId);
}
}
}
}
如上basicNack方法參數的解釋如下所示:
- deliveryTag:消息的投遞ID,要簽收的投遞ID是多少
- multiple:是否批量簽收
- requeue:true,代表決絕簽收,並把消息重新放回隊列里面,false,直接拒絕簽收
測試注意,因為統計計數時,消息的次數,是通過消息的 ID 來計數的,我們在發送消息時,要設置消息的頭: