Rabbitmq 重消費處理
一 處理流程圖:
業務交換機:正常接收發送者,發送過來的消息,交換機類型topic
AE交換機: 當業務交換機無法根據指定的routingkey去路由到隊列的時候,會全部發送到AE交換機.發送到此隊列的消息屬於,業務垃圾消息,或者攻擊消息類型,交換機類型fanout
死信交換機:用於處理消費者,消費失敗回退的消息,根據死信交換機的routingkey發送到死信隊列,交換機類型 topic
EXAMPLE:
業務routingkey: hello/task_queue
生產者發送routingkey: hello task_queue michael
最后業務交換機根據routingkey 會把匹配 hello 和task_queue的發送到業務隊列1和業務隊列2,michael routingkey沒匹配,直接發送到AE交換機,AE交換機的類型為fanout,直接到了告警隊列里面,這時候就說明,生產者有問題,或者有一些非法生產者在猜測routingkey,這時候就可以告警處理.
當正常的消息到了業務隊列后,消費者監聽這個隊列進行處理,在消費者處理的過程中,難免會有一些消息處理失敗,因為業務的種種原因,但是這些消息一旦失敗,那么就會影響性能和后面的消息的消費,這時候就需要一個死信隊列,來存放這個消費不了的消息,進入死信隊列后,在進行其他處理.
消息消費失敗處理方式:
一 進入死信隊列(進入死信的三種方式)
1.消息被拒絕(basic.reject or basic.nack)並且requeue=false
2.消息TTL過期
3.隊列達到最大長度
二 重新發布此消息到對應的隊列(low)
二 處理案例
基礎設置:
業務交換機設置:
設置AE交換機為alter
綁定routingkey到對應的隊列
AE交換機設置,設置為internal,表示內部EXchange到EXchange,綁定需要接受的隊列alter_message
三個隊列都屬於常規的隊列,其中task_queue業務比較重要,當消費者消費失敗的時候要回退到死信隊列在進行處理!
Task_queue設置
設置:x-dead-letter-exchange 指定死信送往的交換機
設置:x-dead-letter-routing-key 指定死信的routingkey
(我在消費者處理失敗的時候已經修改好了routingkey)
死信隊列設置:因為是隊列到exchange的所以不能把交換機設置成internal,設置匹配的routingkey,表示消費失敗的消息隊列,最好可以一眼看出,是什么隊列消費失敗,所以我這里設置了queue_name.fail
下面開始消息發送和觀察現象:
我定義發送的消息和routingkey是一致的,這里我發送了三條消息,其中有兩條設置了routingkey,另外一條沒做routingkey.
可以看到匹配到routingkey的消息已經進入到了匹配的隊列,沒有匹配到的消息也就是michael,直接進入了告警隊列.
我這里打印的信息很多,但是可以看到hello這條消息時成功的被獲取到和被消費掉了.
針對task_queue我做了特殊處理,匹配到了消息時task_queue就直接進入到死信隊列里面,在程序里面消費的時候,可以捕獲expection 然后進行退信處理.我這里用的是basic.reject和requeue=false進行處理的!
進過處理后可以看到task_queue的消息已經進入到了死亡隊列里面,這樣對於消費失敗的消息,存放死信隊列就成功了,
下面就是進入死信隊列的消息,可以看到routingkey已經按照我們設置的
還有queue從哪個隊列進來的和原始的routingkey還有一個就是reason表明消息時rejected.
可以看到在警告隊里里面的消息michael.
所有所設置的功能已經實現!
---------------------
作者:Michael_曾浩
來源:CSDN
原文:https://blog.csdn.net/qq_29778131/article/details/52536965
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!