本文轉自:http://m.blog.csdn.net/article/details?id=54315940
在使用RabbitMQ的時候,我們可以通過消息持久化操作來解決因為服務器的異常奔潰導致的消息丟失,除此之外我們還會遇到一個問題,當消息的發布者在將消息發送出去之后,消息到底有沒有正確到達broker代理服務器呢?如果不進行特殊配置的話,默認情況下發布操作是不會返回任何信息給生產者的,也就是默認情況下我們的生產者是不知道消息有沒有正確到達broker的,如果在消息到達broker之前已經丟失的話,持久化操作也解決不了這個問題,因為消息根本就沒到達代理服務器,你怎么進行持久化,那么這個問題該怎么解決呢?
RabbitMQ為我們提供了兩種方式:
方式一:通過AMQP事務機制實現,這也是從AMQP協議層面提供的解決方案;
方式二:通過將channel設置成confirm模式來實現;
這篇博客我們講解AMQP事務機制,下一篇再探討channel的confirm模式
首先,我們通過實例來看看AMQP的事務模式是怎么使用的:
RabbitMQ中與事務機制有關的方法有三個,分別是Channel里面的txSelect(),txCommit()以及txRollback(),txSelect用於將當前Channel設置成是transaction模式,txCommit用於提交事務,txRollback用於回滾事務,在通過txSelect開啟事務之后,我們便可以發布消息給broker代理服務器了,如果txCommit提交成功了,則消息一定是到達broker了,如果在txCommit執行之前broker異常奔潰或者由於其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback回滾事務了;
具體實例:
public class ProducerTest { public static void main(String[] args) { String exchangeName = "confirmExchange"; String queueName = "confirmQueue"; String routingKey = "confirmRoutingKey"; String bindingKey = "confirmRoutingKey"; int count = 3; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.16.151.74"); factory.setUsername("test"); factory.setPassword("test"); factory.setPort(5672); //創建生產者 Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey); producer.run(); } } class Sender { private ConnectionFactory factory; private int count; private String exchangeName; private String queueName; private String routingKey; private String bindingKey; public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) { this.factory = factory; this.count = count; this.exchangeName = exchangeName; this.queueName = queueName; this.routingKey = routingKey; this.bindingKey = bindingKey; } public void run() { Channel channel = null; try { Connection connection = factory.newConnection(); channel = connection.createChannel(); //創建exchange channel.exchangeDeclare(exchangeName, "direct", true, false, null); //創建隊列 channel.queueDeclare(queueName, true, false, false, null); //綁定exchange和queue channel.queueBind(queueName, exchangeName, bindingKey); //發送持久化消息 for(int i = 0;i < count;i++) { //第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的, //因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話 //我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵 //開啟事務 channel.txSelect(); channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes()); if(i == 1) { int result = 1/0; } //提交事務 channel.txCommit(); } } catch (Exception e) { try { //回滾操作 channel.txRollback(); } catch (IOException e1) { e1.printStackTrace(); } e.printStackTrace(); } } }
在第57行通過channel.txSelect方法開啟事務,第64行通過channel.txCommit提交事務,為了模擬broker代理服務器異常奔潰或者發布過程中拋出異常,我們通過第61行除以0的操作來模擬(實際中第58行的basicPublish方法是有可能會拋出IOException異常),在捕獲到異常之后,第69行調用了channel.txRollback進行事務回滾操作,運行整個程序你會發現在"confirmQueue"這個隊列中只存儲了一條消息,因為在59行i等於1的時候,拋出了異常,調用了第69行進行了事務回滾操作;在實際應用中,可以在回滾操作之后進行消息重發操作;
我們來通過抓包看看程序執行過程中發出了哪些請求:
1:第一條消息調用channel.txSelect開啟事務
2:第一條消息調用channel.txCommit提交事務
3:第二條消息調用channel.txSelect開啟事務
4:因為除以0的操作程序拋出異常,執行catch語句中的channel.txRollback回滾事務
從上面的分析中,我們知道使用事務確實能夠解決發布者與broker代理服務器之間的消息確認,只有消息成功被broker接收事務提交才能成功,否則我們便可以在捕獲異常進行事務回滾操作同時進行消息重發,但是使用事務機制的話會降低RabbitMQ的性能,就拿上面的程序發送1000條消息,使用事務的話需要58244毫秒,而不使用事務的話僅僅需要89毫秒,因此在實際中使用事務會帶來很大的性能損失,那么有沒有更好的方法既能保證發布者知道消息已經正確到達,又能基本上不帶來性能上的損失呢?從AMQP協議的層面看是沒有更好的方法的,但是RabbitMQ提供了一個更好的方案,即將channel信道設置成confirm模式,關於confirm的注意點將在下一篇博客介紹;