RabbitMQ---9、消息確認機制(事務+Confirm)


轉載至:https://blog.csdn.net/u013256816/article/details/55515234

參考資料:https://www.cnblogs.com/520playboy/p/6925292.html

概述

在使用RabbitMQ的時候,我們可以通過消息持久化操作來解決因為服務器的異常奔潰導致的消息丟失,除此之外我們還會遇到一個問題,當消息的發布者在將消息發送出去之后,消息到底有沒有正確到達broker代理服務器呢?如果不進行特殊配置的話,默認情況下發布操作是不會返回任何信息給生產者的,也就是默認情況下我們的生產者是不知道消息有沒有正確到達broker的,如果在消息到達broker之前已經丟失的話,持久化操作也解決不了這個問題,因為消息根本就沒到達代理服務器,你怎么進行持久化,那么這個問題該怎么解決呢?

RabbitMQ為我們提供了兩種方式:

  1. 通過AMQP事務機制實現,這也是AMQP協議層面提供的解決方案;
  2. 通過將channel設置成confirm模式來實現;

事務機制

這里首先探討下RabbitMQ事務機制。

RabbitMQ中與事務機制有關的方法有三個:txSelect(), txCommit()以及txRollback(), txSelect用於將當前channel設置成transaction模式,txCommit用於提交事務,txRollback用於回滾事務,在通過txSelect開啟事務之后,我們便可以發布消息給broker代理服務器了,如果txCommit提交成功了,則消息一定到達了broker了,如果在txCommit執行之前broker異常崩潰或者由於其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback回滾事務了。

關鍵代碼:

channel.txSelect(); channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); channel.txCommit();

通過wirkshark抓包(ip.addr==xxx.xxx.xxx.xxx && amqp),可以看到: 
這里寫圖片描述
(注意這里的Tx.Commit與Tx.Commit-Ok之間的時間間隔294ms,由此可見事務還是很耗時的。)

我們先來看看沒有事務的通信過程是什么樣的: 
這里寫圖片描述
可以看到帶事務的多了四個步驟:

  • client發送Tx.Select
  • broker發送Tx.Select-Ok(之后publish)
  • client發送Tx.Commit
  • broker發送Tx.Commit-Ok

下面我們來看下事務回滾是什么樣子的。關鍵代碼如下:

try {
    channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); int result = 1 / 0; channel.txCommit(); } catch (Exception e) { e.printStackTrace(); channel.txRollback(); }

同樣通過wireshark抓包可以看到: 
這里寫圖片描述
代碼中先是發送了消息至broker中但是這時候發生了異常,之后在捕獲異常的過程中進行事務回滾。

事務確實能夠解決producer與broker之間消息確認的問題,只有消息成功被broker接受,事務提交才能成功,否則我們便可以在捕獲異常進行事務回滾操作同時進行消息重發,但是使用事務機制的話會降低RabbitMQ的性能,那么有沒有更好的方法既能保障producer知道消息已經正確送到,又能基本上不帶來性能上的損失呢?從AMQP協議的層面看是沒有更好的方法,但是RabbitMQ提供了一個更好的方案,即將channel信道設置成confirm模式。


Confirm模式

概述

上面我們介紹了RabbitMQ可能會遇到的一個問題,即生成者不知道消息是否真正到達broker,隨后通過AMQP協議層面為我們提供了事務機制解決了這個問題,但是采用事務機制實現會降低RabbitMQ的消息吞吐量,那么有沒有更加高效的解決方式呢?答案是采用Confirm模式。

producer端confirm模式的實現原理

生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會將消息寫入磁盤之后發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也可以設置basic.ack的multiple域,表示到這個序列號之前的所有消息都已經得到了處理。

confirm模式最大的好處在於他是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息。

在channel 被設置成 confirm 模式之后,所有被 publish 的后續消息都將被 confirm(即 ack) 或者被nack一次。但是沒有對消息被 confirm 的快慢做任何保證,並且同一條消息不會既被 confirm又被nack 。

開啟confirm模式的方法

生產者通過調用channel的confirmSelect方法將channel設置為confirm模式,如果沒有設置no-wait標志的話,broker會返回confirm.select-ok表示同意發送者將當前channel信道設置為confirm模式(從目前RabbitMQ最新版本3.6來看,如果調用了channel.confirmSelect方法,默認情況下是直接將no-wait設置成false的,也就是默認情況下broker是必須回傳confirm.select-ok的)。 
這里寫圖片描述

已經在transaction事務模式的channel是不能再設置成confirm模式的,即這兩種模式是不能共存的。

編程模式

對於固定消息體大小和線程數,如果消息持久化,生產者confirm(或者采用事務機制),消費者ack那么對性能有很大的影響.

消息持久化的優化沒有太好方法,用更好的物理存儲(SAS, SSD, RAID卡)總會帶來改善。生產者confirm這一環節的優化則主要在於客戶端程序的優化之上。歸納起來,客戶端實現生產者confirm有三種編程方式:

  1. 普通confirm模式:每發送一條消息后,調用waitForConfirms()方法,等待服務器端confirm。實際上是一種串行confirm了。
  2. 批量confirm模式:每發送一批消息后,調用waitForConfirms()方法,等待服務器端confirm。
  3. 異步confirm模式:提供一個回調方法,服務端confirm了一條或者多條消息后Client端會回調這個方法。

從編程實現的復雜度上來看: 
第1種 
普通confirm模式最簡單,publish一條消息后,等待服務器端confirm,如果服務端返回false或者超時時間內未返回,客戶端進行消息重傳。 
關鍵代碼如下:

channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); if(!channel.waitForConfirms()){ System.out.println("send message failed."); }

wirkShark抓包可以看到如下: 
這里寫圖片描述
(注意這里的Publish與Ack的時間間隔:305ms 4ms 4ms 15ms 5ms…. )

第二種 
批量confirm模式稍微復雜一點,客戶端程序需要定期(每隔多少秒)或者定量(達到多少條)或者兩則結合起來publish消息,然后等待服務器端confirm, 相比普通confirm模式,批量極大提升confirm效率,但是問題在於一旦出現confirm返回false或者超時的情況時,客戶端需要將這一批次的消息全部重發,這會帶來明顯的重復消息數量,並且,當消息經常丟失時,批量confirm性能應該是不升反降的。 
關鍵代碼:

channel.confirmSelect(); for(int i=0;i<batchCount;i++){ channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); } if(!channel.waitForConfirms()){ System.out.println("send message failed."); }

第三種 
異步confirm模式的編程實現最復雜,Channel對象提供的ConfirmListener()回調方法只包含deliveryTag(當前Chanel發出的消息序號),我們需要自己為每一個Channel維護一個unconfirm的消息序號集合,每publish一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄。從程序運行效率上看,這個unconfirm集合最好采用有序集合SortedSet存儲結構。實際上,SDK中的waitForConfirms()方法也是通過SortedSet維護消息序號的。 
關鍵代碼:

 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } }); while (true) { long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); confirmSet.add(nextSeqNo); }

SDK中waitForConfirms方法實現:

/** Set of currently unconfirmed messages (i.e. messages that have * not been ack'd or nack'd by the server yet. */ private final SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException { if (nextPublishSeqNo == 0L) throw new IllegalStateException("Confirms not selected"); long startTime = System.currentTimeMillis(); synchronized (unconfirmedSet) { while (true) { if (getCloseReason() != null) { throw Utility.fixStackTrace(getCloseReason()); } if (unconfirmedSet.isEmpty()) { boolean aux = onlyAcksReceived; onlyAcksReceived = true; return aux; } if (timeout == 0L) { unconfirmedSet.wait(); } else { long elapsed = System.currentTimeMillis() - startTime; if (timeout > elapsed) { unconfirmedSet.wait(timeout - elapsed); } else { throw new TimeoutException(); } } } } }

性能測試

Client端機器和RabbitMQ機器配置:CPU:24核,2600MHZ, 64G內存,1TB硬盤。 
Client端發送消息體大小10B,線程數為1即單線程,消息都持久化處理(deliveryMode:2)。 
分別采用事務模式、普通confirm模式,批量confirm模式和異步confirm模式進行producer實驗,比對各個模式下的發送性能。 
這里寫圖片描述

發送平均速率:

  • 事務模式(tx):1637.484
  • 普通confirm模式(common):1936.032
  • 批量confirm模式(batch):10432.45
  • 異步confirm模式(async):10542.06

可以看到事務模式性能是最差的,普通confirm模式性能比事務模式稍微好點,但是和批量confirm模式還有異步confirm模式相比,還是小巫見大巫。批量confirm模式的問題在於confirm之后返回false之后進行重發這樣會使性能降低,異步confirm模式(async)編程模型較為復雜,至於采用哪種方式,那是仁者見仁智者見智了。


消息確認(Consumer端)

為了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。消費者在聲明隊列時,可以指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號后才從內存(和磁盤,如果是持久化消息的話)中移去消息。否則,RabbitMQ會在隊列中消息被消費后立即刪除它。

采用消息確認機制后,只要令noAck=false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題,因為RabbitMQ會一直持有消息直到消費者顯式調用basicAck為止。

當noAck=false時,對於RabbitMQ服務器端而言,隊列中的消息分成了兩部分:一部分是等待投遞給消費者的消息;一部分是已經投遞給消費者,但是還沒有收到消費者ack信號的消息。如果服務器端一直沒有收到消費者的ack信號,並且消費此消息的消費者已經斷開連接,則服務器端會安排該消息重新進入隊列,等待投遞給下一個消費者(也可能還是原來的那個消費者)。

RabbitMQ不會為未ack的消息設置超時時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否已經斷開。這么設計的原因是RabbitMQ允許消費者消費一條消息的時間可以很久很久。

RabbitMQ管理平台界面上可以看到當前隊列中Ready狀態和Unacknowledged狀態的消息數,分別對應上文中的等待投遞給消費者的消息數和已經投遞給消費者但是未收到ack信號的消息數。也可以通過命令行來查看上述信息: 
這里寫圖片描述

代碼示例(關閉自動消息確認,進行手動ack):

        QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(ConfirmConfig.queueName, false, consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); // do something with msg. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }

broker將在下面的情況中對消息進行confirm:

broker發現當前消息無法被路由到指定的queues中(如果設置了mandatory屬性,則broker會發送basic.return) 
非持久屬性的消息到達了其所應該到達的所有queue中(和鏡像queue中) 
持久消息到達了其所應該到達的所有queue中(和鏡像中),並被持久化到了磁盤(fsync) 
持久消息從其所在的所有queue中被consume了(如果必要則會被ack)

basicRecover:是路由不成功的消息可以使用recovery重新發送到隊列中。 
basicReject:是接收端告訴服務器這個消息我拒絕接收,不處理,可以設置是否放回到隊列中還是丟掉,而且只能一次拒絕一個消息,官網中有明確說明不能批量拒絕消息,為解決批量拒絕消息才有了basicNack。 
basicNack:可以一次拒絕N條消息,客戶端可以設置basicNack方法的multiple參數為true,服務器會拒絕指定了delivery_tag的所有未確認的消息(tag是一個64位的long值,最大值是9223372036854775807)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM