在使用RabbitMQ的時候,可以通過消息的持久化操作來解決因為服務器的異常崩潰而導致的消息丟失,除此之外,我們還會遇到一個問題,當消息的生產者將消息發送出去以后,消息到底有沒有到達服務器呢?如果不進行特殊的配置,默認情況下發送消息的操作是不會返回任何消息給生產者的,也就是默認情況下是不知道消息有沒有正確地到達服務器。如果在消息到達服務器之前已經丟失,持久化操作也解決不了這個問題,因為消息根本沒有到達服務器,何談持久化?
RabbitMQ針對這個問題,提供了兩種解決方法:
❤ 事務機制
❤ 發送方確認機制
事務機制
RabbitMQ客戶端中與事務機制相關的方法有三個:channel.txSelect、channel.txCommit、channel.txRollback。channel.txSelect用於將當前的信道設置成事務模式,channel.txCommit用於提交事務,channel.txRollback用於事務回滾。
在通過channel.txSelect方法開啟事務之后,我們便可以發布消息給RabbitMQ了,如果事務提交成功,則消息一定到達了RabbitMQ中,如果在事務提交之前由於RabbitMQ異常崩潰或者其他的原因拋出異常,這個時候我們可以將其捕獲,進而通過執行channel.txRollback方法來實現事務回滾。
部分示例代碼如下:
channel.txSelect(); channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes()); channel.txCommit();
上述代碼是正常的情況下的事務機制的運轉過程,而事務回滾是什么樣子的呢?下面的代碼示例:
try { channel.txSelect(); channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes()); int result = 1 / 0; channel.txCommit(); }catch (Exception e){ e.printStackTrace(); channel.txRollback(); }
上述代碼中明顯的有一個java.lang.ArithmeticException,在事務提交之前捕獲異常,之后顯示的回滾事務。
如果要發送多條消息,則將channel.basicPublish和channel.txCommit等方法包裹進循環內即可,可以參考以下代碼:
channel.txSelect(); for (int i = 0;i < 1000;i++){ try { channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes()); int result = 1 / 0; channel.txCommit(); }catch (Exception e){ e.printStackTrace(); channel.txRollback(); } }
事務確實能夠解決消息發送方和RabbitMQ之間消息確認的問題,只有消息成功被RabbitMQ接收,事務才能提交成功,否則便可在捕獲異常之后進行事務回滾,於此同時可以進行消息重發。但是使用事務機制會“吸干”RabbitMQ的性能,那么有沒有更好的方法既能保證確認消息已經正確送達,又能基本上不帶來性能上的損失呢?從AMQP協議層面上看來沒有更好的方法,但是RabbitMQ提供了一個改進方案,即發送方確認機制。
發送方確認機制
生產者將信道設置為confirm(確認)模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的唯一ID),這就使得生產者知道消息已經到達正確的目的地了。如果消息和隊列是持久化的,那么確認消息會在消息寫入磁盤之后發出。RabbitMQ回傳給生產者的確認消息中的deliveryTag包含了確認消息的序號,此外RabbitMQ也可以設置channel.basicAck方法中的multiple參數,表示到這個序號之前的所有消息都已經得到了處理,如下圖所示:
事務機制在發送一條消息之后就會使得發送端阻塞,以等待RabbitMQ的回應,之后才能繼續發送下一條消息。相比之下,發送方確認機制最大的好處就是在於它是異步的,一旦發布一條消息,生產者應用程序就可以在等待信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用程序便可以通過回調方法來處理確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack(Basic.Nack)命令,生產者應用程序同樣可以在回調方法中處理該nack命令。
生產者通過調用channel.confirmSelect方法(即Confirm.Select命令)將信道設置為confirm模式,之后RabbitMQ會返回Confirm.Select-Ok命令表示同意生產者將當前信道設置為confirm模式,所有被發送的后續消息都被ack或者nack一次,不會出現一條消息既被ack又被nack的情況,並且RabbitMQ也沒有對消息的被confirm的快慢做出任何保證。
通過下面的例子來看一下publisher confirm機制怎么運作,代碼如下:
try { channel.confirmSelect(); channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes()); if (!channel.waitForConfirms()){ System.out.println("message failed!"); // do something } }catch (Exception e){ e.printStackTrace(); }
如果發送多條消息,只需要將channel.basicPublish和channel.waitForConfirms方法包裹在循環里面即可,可以參考事務機制,不過不需要把channel.confirmSelect方法包裹在循環內部。
對於channel.waitForConfirms而言,在RabbitMQ客戶端有它的4個同類的方法:
(1)boolean waitForConfirms() throws InterruptedException;
(2)boolean waitForConfirms(long timeout) throws InterruptedException;
(3)void waitForConfirmsOrDie() throws IOException,InterruptedException;
(4)void waitForConfirmsOrDie(long timeout) throws IOException,InterruptedException, TimeoutException;
如果信道沒有開啟publisher confirm模式,那么調用任何的waitForConfirms方法都會報錯java.lang.IllegalStateException。對於沒有參數的waitForConfirms方法來說,其返回的條件是客戶端收到了相應的Basic.Ack/.Nack或者被中斷。參數timeout表示超時時間,一旦等待RabbitMQ回應超時就會拋出java.util.concurrent.TimeoutException異常。兩個waitForConfirmsOrDie方法在接收到RabbitMQ返回的Basic.Nack之后拋出java.io.IOException。業務代碼可以根據自身的特性靈活的運用這四種方法來保障消息的可靠發送。
注意:
❤ 事務機制和publisher confirm機制是兩者互斥的,不能共存。如果企圖將已開啟事務模式的信道再設置為publisher confirm模式,RabbitMQ會報錯。或者企圖將已開啟publisher confirm模式的信道再設置為事務模式,RabbitMQ也會報錯;
❤ 事務機制和publisher confirm機制確保的是消息能夠正確的發送至RabbitMQ,這里的“發送至RabbitMQ”的含義是指消息被正確的發送至RabbitMQ的交換器,如果此交換器沒有匹配的隊列,那么消息也會丟失。所以在使用這兩種機制的時候要確保所涉及的交換器能夠有匹配的隊列。更進一步的講,發送方要配合mandatory參數或者備份交換器一起使用來提高消息傳輸的可靠性。
publisher confirm的優勢在於並不一定需要同步確認。可以改進一下使用方式:
(1)批量confirm方法:每發送一批消息后,調用chann.waitForConfirms方法,等待服務器的確認返回
(2)異步confirm方法:提供一個回調方法,服務端確認了一條或者多條消息后客戶端會回調這個方法進行處理
批量confirm方法
在批量confirm方法中,客戶端程序需要定期或者定量(達到多少條),亦或者兩者結合起來調用chann.waitForConfirms來等待RabbitMQ的確認返回。但是存在一個問題就是在返回Basic.Nack或者超時的情況下,客戶端需要將這一批的消息全部重發,這會帶來明顯的重復消息數量,並且當消息經常丟失時,批量confirm的性能是不升反降的。
批量confirm代碼如下:
try { channel.confirmSelect(); int nsgCount = 0; while (true){ channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes()); //將發送出去的消息存入緩存中,緩存可以是一個ArrayList或者BlockQueue之類的 if (++nsgCount >= 1000){ nsgCount = 0; try { if (channel.waitForConfirms()){ //將緩存中消息清空 continue; } //將緩存中消息重發 }catch (InterruptedException e){ e.printStackTrace(); //將緩存中的消息重發 } } } }catch (Exception e){ e.printStackTrace(); }
異步confirm方法
異步confirm方法的編程實現是最復雜的,在客戶端的channel接口中提供的addConfirmListener方法可以添加ConfirmListener這個回調接口,這個ConfirmListener接口包含兩個方法:handleAck和handleNack,分別用來處理RabbitMQ回傳的Basic.Ack和Basic.Nack。在這兩個方法中都包含有一個參數deliveryTag(在publisher confirm模式下用來標記消息的唯一有序序號)。我們需要為每一個信道維護一個“unconfirm”的消息序號集合,每發送一條消息,集合中的元素就加1.每當調用ConfirmListener中的handleAck方法時,“unconfirm”集合中就刪除相應的一條(multiple設置為false)或者多條(multiple設置為true)記錄。從程序的運行效率來看,這個“unconfirm”集合最好采用有序集合SortedSet的存儲結構。事實上,Java端SDK中的waitForConfirms方法也是通過SortedSet維護消息序號的。
下面的代碼示例:
SortedSet confirmSet = new TreeSet(); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack,SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple){ confirmSet.headSet(deliveryTag + 1); }else { confirmSet.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if (multiple){ confirmSet.headSet(deliveryTag + 1).clear(); }else { confirmSet.remove(deliveryTag); } //這里添加處理消息重發的場景 } }); //這里模仿一直發送消息的場景 while (true){ long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes()); confirmSet.add(nextSeqNo); }
將事務、普通confirm、批量confirm、和異步confirm一起來比較它們的QPS,如下圖所示:
可以看出批量和異步這兩種方式所呈現的性能要比其余兩種好的多。不過異步和批量的編程比較復雜,普通和事務編程較簡單。
不過還是推薦使用批量和異步來實現。
參考:《RabbitMQ實戰指南》 朱忠華 編著;