這是一個基於消息的分布式事務的一部分,主要通過消息來實現,生產者把消息發到隊列后,由消費方去執行剩下的邏輯,而當消費方處理失敗后,我們需要進行重試,即為了最現數據的最終一致性,在rabbitmq里,它有消息重試和重試次數的配置,但當你配置之后,你的TTL達到 后,消息不能自動放入死信隊列,所以這塊需要手工處理一下.
rabbitmq關於消息重試的配置
rabbitmq:
host: xxx
port: xxx
username: xxx
password: xxx
virtual-host: xxx
###開啟消息確認機制 confirms
publisher-confirms: true
publisher-returns: true
listener:
simple:
acknowledge-mode: manual #設置確認方式
prefetch: 1 #每次處理1條消息
retry.max-attempts: 3 # 最大重試次數
retry.enabled: true #是否開啟消費者重試(為false時關閉消費者重試,這時消費端代碼異常會一直重復收到消息)
retry.initial-interval: 2000 #重試間隔時間(單位毫秒)
default-requeue-rejected: true #該配置項是決定由於監聽器拋出異常而拒絕的消息是否被重新放回隊列。默認值為true,需要手動basicNack時這些參數諒失效了
手工實現消息重試並放入死信的方式
定義隊列的相關配置
/**
* 創建普通交換機.
*/
@Bean
public TopicExchange lindExchange() {
//消息持久化
return (TopicExchange) ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build();
}
@Bean
public TopicExchange deadExchange() {
return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true).build();
}
/**
* 基於消息事務的處理方式,當消費失敗進行重試,有時間間隔,當達到超時時間,就發到死信隊列,等待人工處理.
* @return
*/
@Bean
public Queue testQueue() {
//設置死信交換機
return QueueBuilder.durable(QUEUE).withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)
//毫秒
.withArgument("x-message-ttl", CONSUMER_EXPIRE)
//設置死信routingKey
.withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE).build();
}
@Bean
public Queue deadQueue() {
return new Queue(LIND_DEAD_QUEUE);
}
@Bean
public Binding bindBuildersRouteKey() {
return BindingBuilder.bind(testQueue()).to(lindExchange()).with(ROUTER);
}
@Bean
public Binding bindDeadBuildersRouteKey() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(LIND_DEAD_QUEUE);
}
消費者實現的代碼
/**
* 延時隊列:不應該有RabbitListener訂閱者,應該讓它自己達到超時時間后自動轉到死信里去消費
* 消息異常處理:消費出現異常后,延時幾秒,然后從新入隊列消費,直到達到TTL超時時間,再轉到死信,證明這個信息有問題需要人工干預
*
* @param message
*/
@RabbitListener(queues = MqConfig.QUEUE)
public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException {
try {
System.out.println(LocalDateTime.now() + ":Subscriber:" + new String(message.getBody(), "UTF-8"));
//當程序處理出現問題時,消息使用basicReject上報
int a = 0;
int b = 1 / a;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception ex) {
//出現異常手動放回隊列
Thread.sleep(2000);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
/**
* 死信隊列.
*
* @param message
*/
@RabbitListener(queues = MqConfig.LIND_DEAD_QUEUE)
public void dealSubscribe(Message message, Channel channel) throws IOException {
System.out.println("Dead Subscriber:" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
消費者這塊,也可以直接聲明隊列和綁定交換機,直接在注解上添加 QueueBinding即可.
@RabbitListener(bindings = {@QueueBinding(value = @Queue(
name = MqConfig.QUEUE,
durable = "true",arguments = {@Argument(name = "x-dead-letter-exchange", value = MqConfig.LIND_DL_EXCHANGE),
@Argument(name = "x-message-ttl", value = MqConfig.CONSUMER_EXPIRE,type="java.lang.Long"),
@Argument(name = "x-dead-letter-routing-key", value = MqConfig.LIND_DEAD_QUEUE)}),
exchange = @Exchange(value = MqConfig.EXCHANGE, durable = "true",type="topic")
)})
public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException {
}
這邊嘗試讓消費者執行出錯,然后走到catch里使用basicNack方法把消息從新放里隊列里,並讓線程讓休息2秒,以避免頻繁操作,之后就是我們希望看到的代碼
2019-12-20T17:21:31.190:Subscriber:send a message to mq
2019-12-20T17:21:33.200:Subscriber:send a message to mq
2019-12-20T17:21:35.206:Subscriber:send a message to mq
2019-12-20T17:21:37.213:Subscriber:send a message to mq
2019-12-20T17:21:39.221:Subscriber:send a message to mq
Dead Subscriber:send a message to mq
這就是一個消息隊列的補償機制,使用死信隊列也可以實現延時消息的機制
,有時間再給大家分享!