部分內容來自:http://blog.csdn.net/hzw19920329/article/details/54315940 http://blog.csdn.net/hzw19920329/article/details/54340711
持久化
rabbitmq默認沒有開啟消息的持久化,消息存儲在內存中,如果此時重啟服務器,那么消息江湖丟失。
開啟持久化會犧牲性能。響應時間和吞吐量。
如果需要在崩潰中恢復,那么開啟持久化需要做一下3步:
- 生產者在生產消息的時候,將消息的投遞模式設置為2(持久)
- 發送到持久化的交換器(配置了durable=true)
- 到達持久化的隊列(配置了durable=true)
所以,持久化在集群下工作的並不好,在集群的時候就講述。
最好只為關鍵消息做持久化。
事務
在使用RabbitMQ的時候,我們可以通過消息持久化操作來解決因為服務器的異常奔潰導致的消息丟失,除此之外我們還會遇到一個問題,當消息的發布者在將消息發送出去之后,消息到底有沒有正確到達broker代理服務器呢?如果不進行特殊配置的話,默認情況下發布操作是不會返回任何信息給生產者的,也就是默認情況下我們的生產者是不知道消息有沒有正確到達broker的,如果在消息到達broker之前已經丟失的話,持久化操作也解決不了這個問題,因為消息根本就沒到達代理服務器,你怎么進行持久化,那么這個問題該怎么解決呢?
RabbitMQ為我們提供了兩種方式:
方式一:通過AMQP事務機制實現,這也是從AMQP協議層面提供的解決方案;
方式二:通過將channel設置成confirm模式來實現;
這篇博客我們講解AMQP事務機制,下一篇再探討channel的confirm模式
首先,我們通過實例來看看AMQP的事務模式是怎么使用的:
RabbitMQ中與事務機制有關的方法有三個,分別是Channel里面的txSelect(),txCommit()以及txRollback(),txSelect用於將當前Channel設置成是transaction模式,txCommit用於提交事務,txRollback用於回滾事務,在通過txSelect開啟事務之后,我們便可以發布消息給broker代理服務器了,如果txCommit提交成功了,則消息一定是到達broker了,如果在txCommit執行之前broker異常奔潰或者由於其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback回滾事務了;
具體實例:
-
public class ProducerTest {
-
public static void main(String[] args) {
-
String exchangeName = "confirmExchange";
-
String queueName = "confirmQueue";
-
String routingKey = "confirmRoutingKey";
-
String bindingKey = "confirmRoutingKey";
-
int count = 3;
-
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost("172.16.151.74");
-
factory.setUsername("test");
-
factory.setPassword("test");
-
factory.setPort(5672);
-
-
//創建生產者
-
Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
-
producer.run();
-
}
-
}
-
-
class Sender
-
{
-
private ConnectionFactory factory;
-
private int count;
-
private String exchangeName;
-
private String queueName;
-
private String routingKey;
-
private String bindingKey;
-
-
public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
-
this.factory = factory;
-
this.count = count;
-
this.exchangeName = exchangeName;
-
this.queueName = queueName;
-
this.routingKey = routingKey;
-
this.bindingKey = bindingKey;
-
}
-
-
public void run() {
-
Channel channel = null;
-
try {
-
Connection connection = factory.newConnection();
-
channel = connection.createChannel();
-
//創建exchange
-
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
-
//創建隊列
-
channel.queueDeclare(queueName, true, false, false, null);
-
//綁定exchange和queue
-
channel.queueBind(queueName, exchangeName, bindingKey);
-
//發送持久化消息
-
for(int i = 0;i < count;i++)
-
{
-
//第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,
-
//因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
-
//我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵
-
//開啟事務
-
channel.txSelect();
-
channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
-
if(i == 1)
-
{
-
int result = 1/0;
-
}
-
//提交事務
-
channel.txCommit();
-
}
-
} catch (Exception e) {
-
try {
-
//回滾操作
-
channel.txRollback();
-
} catch (IOException e1) {
-
e1.printStackTrace();
-
}
-
e.printStackTrace();
-
}
-
}
-
}
在第57行通過channel.txSelect方法開啟事務,第64行通過channel.txCommit提交事務,為了模擬broker代理服務器異常奔潰或者發布過程中拋出異常,我們通過第61行除以0的操作來模擬(實際中第58行的basicPublish方法是有可能會拋出IOException異常),在捕獲到異常之后,第69行調用了channel.txRollback進行事務回滾操作,運行整個程序你會發現在"confirmQueue"這個隊列中只存儲了一條消息,因為在59行i等於1的時候,拋出了異常,調用了第69行進行了事務回滾操作;在實際應用中,可以在回滾操作之后進行消息重發操作;
我們來通過抓包看看程序執行過程中發出了哪些請求:
1:第一條消息調用channel.txSelect開啟事務
2:第一條消息調用channel.txCommit提交事務
3:第二條消息調用channel.txSelect開啟事務
4:因為除以0的操作程序拋出異常,執行catch語句中的channel.txRollback回滾事務
從上面的分析中,我們知道使用事務確實能夠解決發布者與broker代理服務器之間的消息確認,只有消息成功被broker接收事務提交才能成功,否則我們便可以在捕獲異常進行事務回滾操作同時進行消息重發,但是使用事務機制的話會降低RabbitMQ的性能,就拿上面的程序發送1000條消息,使用事務的話需要58244毫秒,而不使用事務的話僅僅需要89毫秒,因此在實際中使用事務會帶來很大的性能損失,那么有沒有更好的方法既能保證發布者知道消息已經正確到達,又能基本上不帶來性能上的損失呢?從AMQP協議的層面看是沒有更好的方法的,但是RabbitMQ提供了一個更好的方案,即將channel信道設置成confirm模式。
發送確認模式
采用事務機制實現會降低RabbitMQ的消息吞吐量,那么有沒有更加高效的解決方式呢?RabbitMQ團隊為我們拿出了更好的方案,即采用發送方確認模式;
生產者確認模式實現原理:
生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會在將消息寫入磁盤之后發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號,此外broker也可以設置basic.ack的multiple域,表示到這個序列號之前的所有消息都已經得到了處理;
confirm模式最大的好處在於他是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息;
開啟confirm模式的方法:
生產者通過調用channel的confirmSelect方法將channel設置為confirm模式,(注意一點,已經在transaction事務模式的channel是不能再設置成confirm模式的,即這兩種模式是不能共存的),如果沒有設置no-wait標志的話,broker會返回confirm.select-ok表示同意發送者將當前channel信道設置為confirm模式(從目前RabbitMQ最新版本3.6來看,如果調用了channel.confirmSelect方法,默認情況下是直接將no-wait設置成false的,也就是默認情況下broker是必須回傳confirm.select-ok的,而且我也沒找到我們自己能夠設置no-wait標志的方法);
生產者實現confiem模式有三種編程方式:
(1):普通confirm模式,每發送一條消息,調用waitForConfirms()方法等待服務端confirm,這實際上是一種串行的confirm,每publish一條消息之后就等待服務端confirm,如果服務端返回false或者超時時間內未返回,客戶端進行消息重傳;
(2):批量confirm模式,每發送一批消息之后,調用waitForConfirms()方法,等待服務端confirm,這種批量確認的模式極大的提高了confirm效率,但是如果一旦出現confirm返回false或者超時的情況,客戶端需要將這一批次的消息全部重發,這會帶來明顯的重復消息,如果這種情況頻繁發生的話,效率也會不升反降;
講完了基本的原理之后,代碼級別我們該怎么設置channel信道為confirm模式呢?以及我們該怎么獲取broker返回給我們的確認消息呢?
測試1:普通confirm模式
首先從最簡單的開始,僅僅將channel設置成confirm模式,並且生產者每發送一條消息就等待broker回應確認消息,至於確認消息是什么我們不去做任何處理,為了測試方便,此處生產者只發送了5條消息,實現代碼如下:
-
public class ProducerTest {
-
public static void main(String[] args) {
-
String exchangeName = "confirmExchange";
-
String queueName = "confirmQueue";
-
String routingKey = "confirmRoutingKey";
-
String bindingKey = "confirmRoutingKey";
-
int count = 5;
-
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost("172.16.151.74");
-
factory.setUsername("test");
-
factory.setPassword("test");
-
factory.setPort(5672);
-
-
//創建生產者
-
Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
-
producer.run();
-
}
-
}
-
-
class Sender
-
{
-
private ConnectionFactory factory;
-
private int count;
-
private String exchangeName;
-
private String queueName;
-
private String routingKey;
-
private String bindingKey;
-
-
public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
-
this.factory = factory;
-
this.count = count;
-
this.exchangeName = exchangeName;
-
this.queueName = queueName;
-
this.routingKey = routingKey;
-
this.bindingKey = bindingKey;
-
}
-
-
public void run() {
-
Channel channel = null;
-
try {
-
Connection connection = factory.newConnection();
-
channel = connection.createChannel();
-
//創建exchange
-
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
-
//創建隊列
-
channel.queueDeclare(queueName, true, false, false, null);
-
//綁定exchange和queue
-
channel.queueBind(queueName, exchangeName, bindingKey);
-
channel.confirmSelect();
-
//發送持久化消息
-
for(int i = 0;i < count;i++)
-
{
-
//第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,
-
//因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
-
//我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵
-
channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
-
if(channel.waitForConfirms())
-
{
-
System.out.println("發送成功");
-
}
-
}
-
final long start = System.currentTimeMillis();
-
System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
}
在第50行調用Channel信道的confirmSelect方法將當前信道設置成了confirm模式,第57行通過for循環調用Channel的basicPublish方法發送了5條消息到消息隊列中,第58行調用waitForConfirms方法等待broker服務端返回ack或者nack消息,這種模式每發送一條消息就會等待broker代理服務器返回消息,這點我們可以從抓包的角度觀察結果:
可以看到上面生產者通過Confirm.Select將當前Channel信道設置成confirm模式,broker代理服務器收到之后回傳Confirm.Select-Ok同一將當前Channel設置成confirm模式,此外看到返回5條Basic.Ack消息;
測試2:批量confirm模式
這種模式生產者不是每發送一條就等待broker確認,而是發送一批,實現代碼見下:
-
public class ProducerTest {
-
public static void main(String[] args) {
-
String exchangeName = "confirmExchange";
-
String queueName = "confirmQueue";
-
String routingKey = "confirmRoutingKey";
-
String bindingKey = "confirmRoutingKey";
-
int count = 100;
-
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost("172.16.151.74");
-
factory.setUsername("test");
-
factory.setPassword("test");
-
factory.setPort(5672);
-
-
//創建生產者
-
Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
-
producer.run();
-
}
-
}
-
-
class Sender
-
{
-
private ConnectionFactory factory;
-
private int count;
-
private String exchangeName;
-
private String queueName;
-
private String routingKey;
-
private String bindingKey;
-
-
public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
-
this.factory = factory;
-
this.count = count;
-
this.exchangeName = exchangeName;
-
this.queueName = queueName;
-
this.routingKey = routingKey;
-
this.bindingKey = bindingKey;
-
}
-
-
public void run() {
-
Channel channel = null;
-
try {
-
Connection connection = factory.newConnection();
-
channel = connection.createChannel();
-
//創建exchange
-
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
-
//創建隊列
-
channel.queueDeclare(queueName, true, false, false, null);
-
//綁定exchange和queue
-
channel.queueBind(queueName, exchangeName, bindingKey);
-
channel.confirmSelect();
-
//發送持久化消息
-
for(int i = 0;i < count;i++)
-
{
-
//第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,
-
//因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
-
//我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵
-
channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
-
}
-
long start = System.currentTimeMillis();
-
channel.waitForConfirmsOrDie();
-
System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
}
第50行調用channel.confirmSelect將當前channel信道設置成confirm模式,接着在第57行通過for循環發送了100條消息,第60行調用了channel的waitForConfirmsOrDie,從waitForConfirmsOrDie方法的注釋上可以看出,該方法會等到最后一條消息得到確認或者得到nack才會結束,也就是說在waitForConfirmsOrDie處會造成當前程序的阻塞,以測試1程序發送100條消息為例,阻塞時間是135ms,我們再來看看對測試1的抓包情況:
從紅色箭頭的標號1出可以看到:首先是24向74發送了Confirm.Select消息表示請求將當前信道設置為confirm模式,接着74向24回送了Confirm.Select-Ok消息表示同意將信道設置成confirm模式,從紅色標號2處NoWait字段的值為false也印證了我們如果直接調用Channel信道的confirmSelect()方法的話,實際上默認是開啟broker回傳Confirm.Select-Ok確認消息的;
接下來我們看看broker回傳給客戶端的確認消息數據包是什么樣子的呢?同樣通過抓包看看結果:
你會發現,在上面測試1中我們通過for循環發送了100條消息,但是在抓包的時候我們僅僅看到有兩個Basic.Ack確認消息回傳回來,原因在於上面截圖的標號3處,你會發現Multiple域的值是True的,之前我們已經講過broker可以設置Multiple域表示broker已經收到當前確認消息的Delivery-Tag域之前標號的消息,以上面截圖為例的話表示broker告訴發送者編號4之前的消息已經全部收到了,從這點我們看出broker端默認情況下是進行批量回復的,並不是針對每條消息都發送一條ack消息;
測試2:
測試1我們僅僅是測試發送者能夠收到broker的確認消息以及知道了broker對消息默認是采用批量回復方式的,那么在程序中我們該怎么獲取到broker回傳回來的確認消息呢,假如我們有時候需要在收到確認消息之后做一些提示性操作該怎么辦呢?測試1中,我們采用的是Channel信道的waitForConfirmsOrDie等待broker端回傳回ack確認消息的,但我們沒法拿到這個ack消息進行后期操作,要想拿到ack消息的話,我們可以給當前Channel信道綁定監聽器,具體來說就是調用Channel信道的addConfirmListener方法進行設置,Channel信道在收到broker的ack消息之后會回調設置在該信道監聽器上的handleAck方法,在收到nack消息之后會回調設置在該信道監聽器上的handleNack方法。
實現代碼:
-
public class ProducerTest {
-
public static void main(String[] args) {
-
String exchangeName = "confirmExchange";
-
String queueName = "confirmQueue";
-
String routingKey = "confirmRoutingKey";
-
String bindingKey = "confirmRoutingKey";
-
int count = 100;
-
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost("172.16.151.74");
-
factory.setUsername("test");
-
factory.setPassword("test");
-
factory.setPort(5672);
-
-
//創建生產者
-
Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
-
producer.run();
-
}
-
}
-
-
class Sender
-
{
-
private ConnectionFactory factory;
-
private int count;
-
private String exchangeName;
-
private String queueName;
-
private String routingKey;
-
private String bindingKey;
-
-
public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
-
this.factory = factory;
-
this.count = count;
-
this.exchangeName = exchangeName;
-
this.queueName = queueName;
-
this.routingKey = routingKey;
-
this.bindingKey = bindingKey;
-
}
-
-
public void run() {
-
Channel channel = null;
-
try {
-
Connection connection = factory.newConnection();
-
channel = connection.createChannel();
-
//創建exchange
-
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
-
//創建隊列
-
channel.queueDeclare(queueName, true, false, false, null);
-
//綁定exchange和queue
-
channel.queueBind(queueName, exchangeName, bindingKey);
-
channel.confirmSelect();
-
//發送持久化消息
-
for(int i = 0;i < count;i++)
-
{
-
//第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,
-
//因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
-
//我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵
-
channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
-
}
-
long start = System.currentTimeMillis();
-
channel.addConfirmListener(new ConfirmListener() {
-
-
@Override
-
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
-
System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
-
}
-
-
@Override
-
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
-
System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
-
}
-
});
-
System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
}
第60行我們調用了Channel信道的addConfirmListener設置了監聽器,並且在監聽器的handleAck和handleNack方法中打印了信息,運行程序查看輸出:
可以看到,雖然我們還是發送了100條消息,同樣我們並沒有收到100個ack消息 ,只收到兩個ack消息,並且這兩個ack消息的multiple域都為true,這點和測試1是相同的,你多次運行程序會發現每次發送回來的ack消息中的deliveryTag域的值並不是一樣的,說明broker端批量回傳給發送者的ack消息並不是以固定的批量大小回傳的;
也就是我們通過信道Channel的waitForConfirmsOrDie方法或者為信道設置監聽器都可以保證發送者收到broker回傳的ack或者nack消息,那么這兩種方式有什么區別呢?從測試一的第61行代碼以及測試2的第72行代碼處你就能找到答案啦,測試1中調用waitForConfirmsOrDie方法發送100條消息並且全部收到確認需要135ms,測試2中通過監聽器的方式僅僅需要1ms,說明調用waitForConfirmsOrDie會造成程序的阻塞,通過監聽器並不會造成程序的阻塞
如果把上面的發送消息部分改為:
for(int i = 0;i < count;i++) {
channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes()); } TimeUnit.SECONDS.sleep(1);
for(int i = 0;i < count;i++) { channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes()); } |
最終ConfirmListener的打印為:
ack: deliveryTag = 1 multiple: false
ack: deliveryTag = 100 multiple: true
ack: deliveryTag = 106 multiple: true
ack: deliveryTag = 200 multiple: true
我的理解如下:
ack: deliveryTag = 1 multiple: false :第1條投遞成功 至少已經持久化在交換器部分
ack: deliveryTag = 100 multiple: true:第2-100條投遞成功 至少已經持久化在交換器部分
ack: deliveryTag = 106 multiple: true:第101-106條投遞成功 至少已經持久化在交換器部分
ack: deliveryTag = 200 multiple: true:第107-200條投遞成功 至少已經持久化在交換器部分
