四種途徑提高RabbitMQ傳輸消息數據的可靠性(一)


 

前言

RabbitMQ雖然有對隊列及消息等的一些持久化設置,但其實光光只是這一個是不能夠保障數據的可靠性的,下面我們提出這樣的質疑:

(1)RabbitMQ生產者是不知道自己發布的消息是否已經正確達到服務器呢,如果中間發生網絡異常等情況呢?消息必然會丟失!

(2)RabbitMQ如果沒有設置隊列持久化,RabbitMQ服務器重后隊列的元數據會丟失,消息自然也會丟失!

(3)RabbitMQ如果消費者設置自動確認,即autoAck為true,那么不管消費者發生什么情況,該消息會自動從隊列中移除,實際上消費者有可能掛掉,消息必然會丟失!

(4)RabbitMQ中的消息如果沒有匹配到隊列時,那么消息也會丟失!

本文其實也就是結合以上四個方面進行講解的,主要參考《RabbitMQ實戰指南》(有需要PDF電子書的可以評論或者私信我),本文截圖也來自其中,另外可以對一些RabbitMQ的概念的認識可以參考我的另外兩篇博文認識RabbitMQ交換機模型RabbitMQ是如何運轉的?

 


 

 

一、設置mandotory參數、AE備份交換器

針對前言中的第(4)個問題,我們可以通過設置mandotory參數與AE備份交換器來解決

1、mandotory參數

  1)當為true時,交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列,此時RabbitMQ會調用Basic.Return命令將消息返回給生產者,消息將不會丟失

  2)當為false時,消息將會被直接丟棄。

  3)RabbitMQ通過addReturnListener添加ReturnLisener監聽器監聽獲取沒有被正確路由到合適隊列的消息

channel.basicPublish(EXCHANGE NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes()); 
channel.addReturnListener(new ReturnListener(){ 
  public void handleReturn(int replyCode, String replyText, 
                String exchange, String routingKey, 
                AMQP.BasicProperties basicProperties, 
                byte[] body) throws IOException { 
    String message = new String(body); 
    System.out.println("Basic.Return 返回的結果是: " + message);
  }
});

 

2、AE備份交換器

  Alternate Exchange,簡稱AE,不設置mandatory參數,那么消息將會被丟失,設置mandatory參數的話,需要添加ReturnListner監聽器,增加復雜代碼,如果既不想增加代碼又不想消息丟失,則使用AE,將沒有被路由的消息存儲於RabbitMQ中。當mandatory參數用AE一起使用時,mandatory將失效。在介紹AE之前,也認識RabbitMQ對於消息的過期時間TTL設置以及隊列的過期時間TTL設置

2.1 TTL過期時間設置

  可以對隊列設置TTL與消息設置TTL,其中消息設置TTL經常用於死信隊列、延遲隊列等高級應用中。

  1)設置消息TTL

  設置TTL過期時間一般有兩種當時:一是通過隊列屬性,對隊列中所有消息設置相同的TTL。二就是對消息本身單獨設置,每條消息TTL不同。如果一起使用時候,TTL小的為准,當一旦超過設置的TTL時間時,就會變成“死信”。

  方式一:針對每條消息設置TTL是通過增加expiration的屬性參數實現的,不可能像方式二一樣掃描整個隊列再判斷是否過期,只有當該消息即將被消費時再判定是否過期即可刪除,也就是消息即使已經過期,但不一定立馬被刪除!

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); 
// 持久化消息
builder deliveryMode(2);
// 設置 TTL=60000ms
builder expiration( 60000 ); 
AMQP.BasicProperties properties = builder. build(); 
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());

  方式二:通過隊列屬性設置消息TTL是增加x-message-ttl參數實現的,只需要掃描整個隊列頭部即可立即刪除,也就是消息一旦過期就會被刪除!

Map<String, Object> argss = new HashMap<String , Object>(); 
argss.put("x-message-ttl", 6000); 
channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss) ;

 

  2)設置隊列TTL

  通過在隊列中添加參數x-message-ttl參數實現設置隊列被自動刪除前處於未被使用狀態的時間,注意是隊列的使用狀態,並不是消息是否被消費的狀態

  設置ttl=30min的隊列,時間一到RabbitMQ會保證隊列被刪除,但是不會保證刪除的速度有多快。

Map<String, Object> args = new HashMap<String, Object>{); 
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

 

2.2 AE備份交換器的使用

  聲明交換器的時候,添加alternate-exchange參數實現,或通過策略實現。前者優先級高。從代碼角度需要以下三個步驟,具體代碼如下:

Map<String, Object> args = new HashMap<String, Object>(); 
args.put("a1ternate-exchange", "myAe"); 
channe1.exchangeDec1are("norma1Exchange", "direct", true, fa1se, args); 
channe1.exchangeDec1are("myAe", "fanout", true, fa1se, nu11) ; 
channe1.queueDec1are( "norma1Queue", true, fa1se, fa1se, nu11); 
channe1.queueB nd("norma1Queue", "norma1Exchange", "norma1Key"); 
channe1.queueDec1are("unroutedQueue", true, fa1se, fa1se, nu11);

  1)聲明normalExchange類型為direct的交換器、類型為fanout的myAe備份交換器;並且normalExchange的備份交換器為myAe(備份交換器建議使用fanout類型交換器)

  2)聲明normalQueue隊列,聲明unrouteQueue隊列;

  3)通過路由鍵normalKey綁定normalExchange與normalQueue,不適用路由鍵綁定unrouteQueue與myAe

 

        

 

二、消費者手動確認

針對前言中第(3)個問題,我們需要在消費者消費完消息后手動進行確認,保證消息數據不丟失!

1、autoAck參數設置

  1) 當autoAck參數為false時,手動確認:

  RabbitMQ會等待消費者顯式地回復確認信號后從內存中移去消息(實際上是先標示刪除標記,之后再刪除),這是一般推薦使用的方式,因為使用手動確認有足夠的時間處理消息,不需要擔心消費者進程掛掉之后消息丟失問題。此時的消息就會分為兩個部分:一是等待投遞給消費者的消息;二是已經投遞給消費者但還沒有收到消費者確認信號的消息。

  2) 當autoAck為true時,自動確認:

  RabbitMQ會自動隱式地回復確認信號后從內存中移去消息, RabbitMQ不需要管消費者是否真正消費了這些消息,RabbitMQ會自動把發送出去的消息置為確認,然后直接從內存中刪除。

2、重新投遞

  問:如果選擇手動確認,即autoAck為false時,消費者由於某些原因斷開了,那么消息的確認會受到影響,那么此時的消息會丟失嗎?

 這也就是一開始提出來的問題,其實是不必擔心消息會被丟失,因為RabbitMQ如果一直沒收到消費者的確認信號,並且消費此消息的消費者已經斷開,則RabbitMQ會重新安排消息進入隊列等待給下一個消費者。也就是RabbitMQ不會設置消息的過期時間(當然也可以設置過期時間,但與之有關系方式消息丟失的特性是死信隊列),它只判斷是否需要重新安排入隊列重新投遞,而判斷的唯一標准是消費此消息的消費者連接是否已經斷開,即RabbitMQ會允許消費一條消息的時間很久很久。

3、消費者拒絕消息

  1)使用channel.basicReject方法,但只能拒絕一條。

void basicReject(long deliveryTag, boolean requeue) throws IOException;

  deliveryTag:消息的唯一標識

  requeue:表示是否可以拒絕的消息重新存入隊列

  2)使用channel.basicNack。不同於前者,此方法可以批量拒絕。

void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException;

  multiple:設置為true則表示拒絕deliveryTag編號之前所有未被當前消費者確認的消息。  

  3)問:關鍵在於,消費者拒絕消費消息后怎么處理?是丟棄,還是重新回到隊列呢?

當參數requeue設置為true時候,可以重新進入隊列,投遞給下一個消費者。如果為false,消息就會把隊列中消息立馬移除,再結合啟用“死信隊列”,防止消息丟失並且可以分析異常情況的發生。

 


 

  最后,由於剩下的兩種方式涉及的內容較多,所以在此將分成兩篇繼續介紹,請看下篇四種途徑提高RabbitMQ傳輸數據的可靠性(二)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM