rabbitMq消費死循環


消費過程發生錯誤容易造成死循環


1.控制重發次數
2.try+catch+手動ack
3.try+catch+手動ack+死信隊列(重試次數就失效了,因為捕捉確認后被打入了相應的死信隊列)

void basicAck(long deliveryTag, boolean multiple) throws IOException;
第一個參數deliveryTag:發布的每一條消息都會獲得一個唯一的deliveryTag,(任何channel上發布的第一條消息的deliveryTag為1,此后的每一條消息都會加1),deliveryTag在channel范圍內是唯一的
第二個參數multiple:批量確認標志。如果值為true,則執行批量確認,此deliveryTag之前收到的消息全部進行確認; 如果值為false,則只對當前收到的消息進行確認


void basicReject(long deliveryTag, boolean requeue) throws IOException;
第一個參數deliveryTag:發布的每一條消息都會獲得一個唯一的deliveryTag,deliveryTag在channel范圍內是唯一的
第二個參數requeue:表示如何處理這條消息,如果值為true,則重新放入RabbitMQ的發送隊列,如果值為false,則通知RabbitMQ銷毀這條消息




channel.basicNack是 channel.basicReject的補充,提供一次對多條消息進行拒絕的功能

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; 
第一個參數deliveryTag:發布的每一條消息都會獲得一個唯一的deliveryTag,deliveryTag在channel范圍內是唯一的
第二個參數multiple:批量確認標志。如果值為true,包含本條消息在內的、所有比該消息deliveryTag值小的 消息都被拒絕了(除了已經被 ack 的以外);如果值為false,只拒絕三本條消息
第三個參數requeue:表示如何處理這條消息,如果值為true,則重新放入RabbitMQ的發送隊列,如果值為false,則通知RabbitMQ銷毀這條消息

 // 每個客戶端每次最后獲取N個消息 channel.basicQos(1);



 
         
 







 * (Wmxg)表控制層
*
* @author makejava
* @since 2021-03-27 23:02:34 結合producetransation 看文檔的圖片理解
*/
@Component
public class OrderMqConsumer {
/**
* 服務對象
*/
private int count=1;
@Autowired
private DispatcherService dispatcherService;

@RabbitHandler
// @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order.fanout.exchange",
// durable = "true",autoDelete = "false"),
// exchange = @Exchange(value = "order_fanout_exchange",type = ExchangeTypes.FANOUT)))
@RabbitListener(queues ="order.fanout.exchange")
public void messageconsumer(String mesg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG)long tag) throws IOException {
try {
//1收到消息是
System.out.println("收到的消息是:" + mesg + ",count=" + count);
//2獲取訂單的信息
Kung kung = JSON.parseObject(mesg, Kung.class);
//3獲取ID
String orderId = kung.getOrderId();
String userId = kung.getUserId();
//4保存運單
Wmxg wmxg = new Wmxg();
wmxg.setOrderId(orderId);
wmxg.setUserId(userId);
          //冪等性的問題,存在則更新,不存在則插入 使用分布式鎖也可以解決 避免重試時重復派單
dispatcherService.insert(wmxg);
System.out.println(1 / 0);

//對當前消息進行應答
//catch進行捕捉
channel.basicAck(tag,false); //只對當前收到的消息進行確認
true對消息進行批量確認

} catch (Exception e) {
//如果出現異常的情況,根據實際情況去進行重發
//重發一次后,丟失還是日記,庫存根據自己的業務場景去定
//參數1:消息的tag
// 參數2false 多條處理
// 參數3requeue重發 fasle 不會重發,會把消息打入死信隊列(自己建立一個死信隊列,如下文書所示) true會進入死循環的重發(造成重復消費),建議true的情況下,不使用try catch 否則造成循環
channel.basicNack(tag,false,false);

}
}


自己建立的死信隊列:
@Bean
public Queue orderQueue() {
Map<String,Object> args=new HashMap<>();
args.put("x-message-ttl",5000);//這里過期時間一定是一個INT類型
args.put("x-dead-letter-exchange","dead_direct_exchange");//綁定死信隊列交換機
args.put("x-max-length",5);//指定最大接受多少條
args.put("x-dead-letter-routing-key","dead");//fanout沒有key
return new Queue("order.fanout.exchange", true,false,false,args);
}
 
 
 
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM