RabbitMQ消費方式匯總


在學習本章節前,請先學習之前的章節:
Java訪問RabbitMQ:https://www.cnblogs.com/duanjt/p/10057330.html
RabbitMQ消息發布時的權衡:https://www.cnblogs.com/duanjt/p/10075308.html


一、推送Consume


前面我們使用到的都是這種模式,注冊一個消費者后,RabbitMQ會在消息可用時,自動將消息進行推送給消費者。這種方式效率最高最及時。
核心代碼如下:

// 接收消息,第二個參數表示是否自動應答
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println(envelope.getRoutingKey() + " 接收到數據:" + new String(body));
    }
});

 

 

二、拉取Get


屬於一種輪詢模型,發送一次get請求,獲得一個消息。如果此時RabbitMQ中沒有消息,會獲得一個表示空的回復。總的來說,這種方式性能比較差,很明顯,每獲得一條消息,都要和RabbitMQ進行網絡通信發出請求。而且對RabbitMQ來說,RabbitMQ無法進行任何優化,因為它永遠不知道應用程序何時會發出請求。
核心代碼如下:

while(true){
    //如果沒有消息,將返回null
    GetResponse getResponse = channel.basicGet(queueName, true);    
    if(null!=getResponse){
        System.out.println("received["+getResponse.getEnvelope().getRoutingKey()+"]"+new String(getResponse.getBody()));
    }
    Thread.sleep(1000);
}

 

 

三、自動確認


方法channel.basicConsume和方法channel.basicGet表示同步或異步獲取消息,第二個參數都表示是否自動確認。前面我們都設置為了true。這個時候我們只需要處理邏輯,將自動向RabbitMQ進行確認。
當autoAck=true時,一旦消費者接收到了消息,就視為自動確認了消息。如果消費者在處理消息的過程中,出了錯,就沒有什么辦法重新處理這條消息,所以我們很多時候,需要在消息處理成功后,再確認消息,這就需要手動確認。

 

四、手動確認

 

// 接收消息手動確認,第二個參數表示是否自動應答
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println(envelope.getRoutingKey() + " 接收到數據:" + new String(body));
        //手動確認,第一個參數是消息標識,第二個參數表示是否批量確認。這里是一條一條確認,所以設置false
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

 

 

五、QoS預取模式


在確認消息被接收之前,消費者可以預先要求接收一定數量的消息,在處理完一定數量的消息后,批量進行確認。如果消費者應用程序在確認消息之前崩潰,則所有未確認的消息將被重新發送給其他消費者。所以這里存在着一定程度上的可靠性風險。
這種機制一方面可以實現限速(將消息暫存到RabbitMQ內存中)的作用,一方面可以保證消息確認質量(比如確認了但是處理有異常的情況)
核心代碼:

//參數1表示限制條數,參數2 true=channel,false=消費者
channel.basicQos(100, true);

// 接收消息手動確認,第二個參數表示是否自動應答
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println(envelope.getRoutingKey() + " 接收到數據:" + new String(body));
        //手動確認,第一個參數是消息標識,第二個參數表示是否批量確認。這里是一條一條確認,所以設置false
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

 

 

注意:
1.消費確認模式必須是非自動ACK機制(這個是使用baseQos的前提條件,否則會Qos不生效),然后設置basicQos的值;另外,還可以基於consume和channel的粒度進行設置(global)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM