深入學習RabbitMQ(四):channel的confirm模式


轉自:http://m.blog.csdn.net/article/details?id=54340711

上一篇博客我們介紹了使用RabbitMQ可能會遇到的一個問題,即生產者不知道消息是否真正到達broker代理服務器,隨后通過AMQP協議層面為我們提供的事務機制解決了這個問題,但是采用事務機制實現會降低RabbitMQ的消息吞吐量,那么有沒有更加高效的解決方式呢?RabbitMQ團隊為我們拿出了更好的方案,即采用發送方確認模式;

       生產者確認模式實現原理:

       生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,broker就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會在將消息寫入磁盤之后發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號,此外broker也可以設置basic.ack的multiple域,表示到這個序列號之前的所有消息都已經得到了處理;

       confirm模式最大的好處在於他是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息;

       開啟confirm模式的方法:

       生產者通過調用channel的confirmSelect方法將channel設置為confirm模式,(注意一點,已經在transaction事務模式的channel是不能再設置成confirm模式的,即這兩種模式是不能共存的),如果沒有設置no-wait標志的話,broker會返回confirm.select-ok表示同意發送者將當前channel信道設置為confirm模式(從目前RabbitMQ最新版本3.6來看,如果調用了channel.confirmSelect方法,默認情況下是直接將no-wait設置成false的,也就是默認情況下broker是必須回傳confirm.select-ok的,而且我也沒找到我們自己能夠設置no-wait標志的方法);

       生產者實現confiem模式有三種編程方式:

       (1):普通confirm模式,每發送一條消息,調用waitForConfirms()方法等待服務端confirm,這實際上是一種串行的confirm,每publish一條消息之后就等待服務端confirm,如果服務端返回false或者超時時間內未返回,客戶端進行消息重傳;

       (2):批量confirm模式,每發送一批消息之后,調用waitForConfirms()方法,等待服務端confirm,這種批量確認的模式極大的提高了confirm效率,但是如果一旦出現confirm返回false或者超時的情況,客戶端需要將這一批次的消息全部重發,這會帶來明顯的重復消息,如果這種情況頻繁發生的話,效率也會不升反降;

       

       講完了基本的原理之后,代碼級別我們該怎么設置channel信道為confirm模式呢?以及我們該怎么獲取broker返回給我們的確認消息呢?

       測試1:普通confirm模式

       首先從最簡單的開始,僅僅將channel設置成confirm模式,並且生產者每發送一條消息就等待broker回應確認消息,至於確認消息是什么我們不去做任何處理,為了測試方便,此處生產者只發送了5條消息,實現代碼如下:

 

public class ProducerTest {
    public static void main(String[] args) {
        String exchangeName = "confirmExchange";
        String queueName = "confirmQueue";
        String routingKey = "confirmRoutingKey";
        String bindingKey = "confirmRoutingKey";
        int count = 5;
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.151.74");
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setPort(5672);
        
        //創建生產者
        Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
        producer.run();
    }
}

class Sender
{
    private ConnectionFactory factory;
    private int count;
    private String exchangeName;
    private String     queueName;
    private String routingKey;
    private String bindingKey;
    
    public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
        this.factory = factory;
        this.count = count;
        this.exchangeName = exchangeName;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.bindingKey = bindingKey;
    }
    
    public void run() {
        Channel channel = null;
        try {
            Connection connection = factory.newConnection();
            channel = connection.createChannel();
            //創建exchange
            channel.exchangeDeclare(exchangeName, "direct", true, false, null);
            //創建隊列
            channel.queueDeclare(queueName, true, false, false, null);
            //綁定exchange和queue
            channel.queueBind(queueName, exchangeName, bindingKey);
            channel.confirmSelect();
            //發送持久化消息
            for(int i = 0;i < count;i++)
            {
                //第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,
                //因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
                //我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵
                channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
                if(channel.waitForConfirms())
                {
                    System.out.println("發送成功");
                }
            }
            final long start = System.currentTimeMillis();
            System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

       在第50行調用Channel信道的confirmSelect方法將當前信道設置成了confirm模式,第57行通過for循環調用Channel的basicPublish方法發送了5條消息到消息隊列中,第58行調用waitForConfirms方法等待broker服務端返回ack或者nack消息,這種模式每發送一條消息就會等待broker代理服務器返回消息,這點我們可以從抓包的角度觀察結果:

       可以看到上面生產者通過Confirm.Select將當前Channel信道設置成confirm模式,broker代理服務器收到之后回傳Confirm.Select-Ok同一將當前Channel設置成confirm模式,此外看到返回5條Basic.Ack消息;

        測試2:批量confirm模式

        這種模式生產者不是每發送一條就等待broker確認,而是發送一批,實現代碼見下:

 

public class ProducerTest {
    public static void main(String[] args) {
        String exchangeName = "confirmExchange";
        String queueName = "confirmQueue";
        String routingKey = "confirmRoutingKey";
        String bindingKey = "confirmRoutingKey";
        int count = 100;
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.151.74");
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setPort(5672);
        
        //創建生產者
        Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
        producer.run();
    }
}

class Sender
{
    private ConnectionFactory factory;
    private int count;
    private String exchangeName;
    private String     queueName;
    private String routingKey;
    private String bindingKey;
    
    public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
        this.factory = factory;
        this.count = count;
        this.exchangeName = exchangeName;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.bindingKey = bindingKey;
    }
    
    public void run() {
        Channel channel = null;
        try {
            Connection connection = factory.newConnection();
            channel = connection.createChannel();
            //創建exchange
            channel.exchangeDeclare(exchangeName, "direct", true, false, null);
            //創建隊列
            channel.queueDeclare(queueName, true, false, false, null);
            //綁定exchange和queue
            channel.queueBind(queueName, exchangeName, bindingKey);
            channel.confirmSelect();
            //發送持久化消息
            for(int i = 0;i < count;i++)
            {
                //第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,
                //因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
                //我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵
                channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
            }
            long start = System.currentTimeMillis();
            channel.waitForConfirmsOrDie();
            System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

       第50行調用channel.confirmSelect將當前channel信道設置成confirm模式,接着在第57行通過for循環發送了100條消息,第60行調用了channel的waitForConfirmsOrDie,從waitForConfirmsOrDie方法的注釋上可以看出,該方法會等到最后一條消息得到確認或者得到nack才會結束,也就是說在waitForConfirmsOrDie處會造成當前程序的阻塞,以測試1程序發送100條消息為例,阻塞時間是135ms,我們再來看看對測試1的抓包情況:

   

       從紅色箭頭的標號1出可以看到:首先是24向74發送了Confirm.Select消息表示請求將當前信道設置為confirm模式,接着74向24回送了Confirm.Select-Ok消息表示同意將信道設置成confirm模式,從紅色標號2處NoWait字段的值為false也印證了我們如果直接調用Channel信道的confirmSelect()方法的話,實際上默認是開啟broker回傳Confirm.Select-Ok確認消息的;  

       接下來我們看看broker回傳給客戶端的確認消息數據包是什么樣子的呢?同樣通過抓包看看結果:

  

       你會發現,在上面測試1中我們通過for循環發送了100條消息,但是在抓包的時候我們僅僅看到有兩個Basic.Ack確認消息回傳回來,原因在於上面截圖的標號3處,你會發現Multiple域的值是True的,之前我們已經講過broker可以設置Multiple域表示broker已經收到當前確認消息的Delivery-Tag域之前標號的消息,以上面截圖為例的話表示broker告訴發送者編號4之前的消息已經全部收到了,從這點我們看出broker端默認情況下是進行批量回復的,並不是針對每條消息都發送一條ack消息;

       測試2:

       測試1我們僅僅是測試發送者能夠收到broker的確認消息以及知道了broker對消息默認是采用批量回復方式的,那么在程序中我們該怎么獲取到broker回傳回來的確認消息呢,假如我們有時候需要在收到確認消息之后做一些提示性操作該怎么辦呢?測試1中,我們采用的是Channel信道的waitForConfirmsOrDie等待broker端回傳回ack確認消息的,但我們沒法拿到這個ack消息進行后期操作,要想拿到ack消息的話,我們可以給當前Channel信道綁定監聽器,具體來說就是調用Channel信道的addConfirmListener方法進行設置,Channel信道在收到broker的ack消息之后會回調設置在該信道監聽器上的handleAck方法,在收到nack消息之后會回調設置在該信道監聽器上的handleNack方法。

       實現代碼:

 

public class ProducerTest {
    public static void main(String[] args) {
        String exchangeName = "confirmExchange";
        String queueName = "confirmQueue";
        String routingKey = "confirmRoutingKey";
        String bindingKey = "confirmRoutingKey";
        int count = 100;
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.151.74");
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setPort(5672);
        
        //創建生產者
        Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
        producer.run();
    }
}

class Sender
{
    private ConnectionFactory factory;
    private int count;
    private String exchangeName;
    private String     queueName;
    private String routingKey;
    private String bindingKey;
    
    public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
        this.factory = factory;
        this.count = count;
        this.exchangeName = exchangeName;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.bindingKey = bindingKey;
    }
    
    public void run() {
        Channel channel = null;
        try {
            Connection connection = factory.newConnection();
            channel = connection.createChannel();
            //創建exchange
            channel.exchangeDeclare(exchangeName, "direct", true, false, null);
            //創建隊列
            channel.queueDeclare(queueName, true, false, false, null);
            //綁定exchange和queue
            channel.queueBind(queueName, exchangeName, bindingKey);
            channel.confirmSelect();
            //發送持久化消息
            for(int i = 0;i < count;i++)
            {
                //第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,
                //因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
                //我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵
                channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
            }
            long start = System.currentTimeMillis();
            channel.addConfirmListener(new ConfirmListener() {
                
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
                }
                
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
                }
            });
            System.out.println("執行waitForConfirmsOrDie耗費時間: "+(System.currentTimeMillis()-start)+"ms");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

       第60行我們調用了Channel信道的addConfirmListener設置了監聽器,並且在監聽器的handleAck和handleNack方法中打印了信息,運行程序查看輸出:

  

       可以看到,雖然我們還是發送了100條消息,同樣我們並沒有收到100個ack消息 ,只收到兩個ack消息,並且這兩個ack消息的multiple域都為true,這點和測試1是相同的,你多次運行程序會發現每次發送回來的ack消息中的deliveryTag域的值並不是一樣的,說明broker端批量回傳給發送者的ack消息並不是以固定的批量大小回傳的;

       也就是我們通過信道Channel的waitForConfirmsOrDie方法或者為信道設置監聽器都可以保證發送者收到broker回傳的ack或者nack消息,那么這兩種方式有什么區別呢?從測試一的第61行代碼以及測試2的第72行代碼處你就能找到答案啦,測試1中調用waitForConfirmsOrDie方法發送100條消息並且全部收到確認需要135ms,測試2中通過監聽器的方式僅僅需要1ms,說明調用waitForConfirmsOrDie會造成程序的阻塞,通過監聽器並不會造成程序的阻塞,下一篇博客我會試着從RabbitMQ的源碼層面來分析這兩種方式造成這種區別的原因啦啦;

       參考資料:

       RabbitMQ官網

       RabbitMQ不同Confirm模式下的性能對比


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM