RabbitMQ---4、消息確認Ack


一:消費者確認

消費者確認或者說消費者應答指的是RabbitMQ需要確認消息到底有沒有被收到 
- 自動應答

boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer);

在訂閱消息的時候可以指定應答模式,當自動應答等於true的時候,表示當消費者一收到消息就表示消費者收到了消息,消費者收到了消息就會立即從隊列中刪除。

生產者

public class Producer { @Test public void testBasicPublish() throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String EXCHANGE_NAME = "exchange.direct"; String QUEUE_NAME = "queue_name"; String ROUTING_KEY = "key"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); String message = "Hello RabbitMQ:"; for (int i = 0; i < 5; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (message + i).getBytes("UTF-8")); } channel.close(); connection.close(); } }

消費者

public class Consumer1 { @Test public void testBasicConsumer1() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); // 5672 factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); String EXCHANGE_NAME = "exchange.direct"; String QUEUE_NAME = "queue_name"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); // GetResponse response = channel.basicGet(QUEUE_NAME, false); // byte[] body = response.getBody(); // System.out.println(new String(body).toString()); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); } }; channel.basicConsume(QUEUE_NAME, true, consumer); Thread.sleep(100000); } }

運行結果:

運行生產者可以看到Ready=5, Unacked=0, Total=5, Total代表隊列中的消息總條數,Ready代表消費者還可以讀到的條數,Unacked:代表還有多少條沒有被應答 
這里寫圖片描述

在消費者端的獲取消息的第一行打個斷點,可以看到,第一次進入到handleDelivery()方法時,隊列瞬間被清空。Ready=0, Unacked=0, Total=0 
這里寫圖片描述

這里寫圖片描述

當消費者連接上隊列了,因為沒有指定消費者一次獲取消息的條數,所以隊列把隊列中的所有消息一下子推送到消費者端,當消費者訂閱的該隊列,消息就會從隊列推到客戶端,當消息從隊列被推出的時的那一刻就表示已經對消息進行自動確認了,消息就會從隊列中刪除。

  • 手動應答

手動應答和自動應答不一樣,需要將autoAck設置為false,當消費者收到消息在合適的時候來顯示的進行確認,說我已經接收到了該消息了,RabbitMQ可以從隊列中刪除該消息了,可以通過顯示調用channel.basicAck(envelope.getDeliveryTag(), false);來告訴消息服務器來刪除消息

boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer);

消費者

Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, consumer); Thread.sleep(100000);

當代碼執行完channel.basicConsume(QUEUE_NAME, false, consumer);還沒有進入到handleDelivery()方法時可以看到Ready=0, Unacked=5, Total=5 
這里寫圖片描述
這里寫圖片描述

當代碼進入handleDelivery()方法沒執行一次channel.basicAck(envelope.getDeliveryTag(), false);Unacked和Total就會減去1,直到兩個值都為0 
這里寫圖片描述

特殊情況:手動應答如果忘記寫channel.basicAck(envelope.getDeliveryTag(), false)這行代碼,現象是消費者仍然能獲取所有消息,不過此時Unacked和Total一直都是5,Ready=0, Unacked=5, Total=5,直到消費者運行結束,Ready=5, Unacked=0, Total=5 
這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

特殊情況2:如果設置消費者每次從隊列中獲取指定的條數channel.basicQos(1);,此時如果沒有應答的話,消費者將不再繼續獲取

這里寫圖片描述

// 因設置了一次獲取一條,所以可讀的為4,未應答的是1 
這里寫圖片描述

// 繼續運行,因為一次只獲取一條,而這一條還沒有應答,就沒有辦法繼續獲取下一條 
這里寫圖片描述

// 消費者運行結束的時候又回到原來的狀態Ready=5, Unacked=0, Total=5 
這里寫圖片描述

注意:如果都沒有手動應答,在沒有指定獲取消息的條數時,消費者可以獲取所有消息,當指定時,只能獲取指定條,下次就只能等待了,沒法繼續獲取下一條了

  • 手動拒絕 
    手動應答是除了確認應答,也可以拒絕應答。
requeue=true,表示將消息重新放入到隊列中,false:表示直接從隊列中刪除,此時和basicAck(long deliveryTag, false)的效果一樣 void basicReject(long deliveryTag, boolean requeue);

消費者代碼示例一:

public class Consumer1 { @Test public void testBasicConsumer1() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); // 5672 factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); String EXCHANGE_NAME = "exchange.direct"; String QUEUE_NAME = "queue_name"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); if (message.contains(":3")){ // requeue:重新入隊列,false:直接丟棄,相當於告訴隊列可以直接刪除掉 channel.basicReject(envelope.getDeliveryTag(), false); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); Thread.sleep(100000); } }

這里寫圖片描述

這里寫圖片描述

消費者代碼示例二:

Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); if (message.contains(":3")){ // requeue:重新入隊列,true: 重新放入隊列 channel.basicReject(envelope.getDeliveryTag(), true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); Thread.sleep(100000);

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

結果解釋:代碼中沒有指定設定消費者一次從隊列中獲取消息的條數,所以消費者一下子拿到了5條消息,消費了0、1、2當消費第i=3時執行channel.basicReject(envelope.getDeliveryTag(), true);會將消息放入到隊列中,然后將消息推送給消費者, 然后消費4,接着再消費3,還會再次放入到隊列,整個過程死循環,Ready=0, Unacked=1, Total=1, 當消費者運行結束了,Ready=1, Unacked=0, Total=1, 這個1就是消息3

  • 重新投遞 
    basicRecover(): 重新投遞並沒有所謂的像basicReject中的basicReject的deliveryTag參數,所以重新投遞好像是將消費者還沒有處理的所有的消息都重新放入到隊列中,而不是將某一條消息放入到隊列中,與basicReject不同的是,重新投遞可以指定投遞的消息是否允許當前消費者消費。
// If true, messages will be requeued and possibly delivered to a different consumer. If false, messages will be redelivered to the same consumer. Basic.RecoverOk basicRecover(boolean requeue);

示例代碼一:

Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); if (message.contains(":3")){ channel.basicRecover(true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); Thread.sleep(100000);

這里寫圖片描述

這里寫圖片描述

這里不太明白,true的話表示會被其他消費者消費,不知道3、4又被接收了一次???


false:表示重新遞送的消息還會被當前消費者消費 
這里寫圖片描述

二:生產者確認

當生產者發布消息到RabbitMQ中,生產者需要知道是否真的已經發送到RabbitMQ中,需要RabbitMQ告訴生產者。

  • Confirm機制 
    channel.confirmSelect(); 
    channel.waitForConfirms();

  • 事務機制 
    channel.txSelect(); 
    channel.txRollback();

注意:事務機制是非常非常非常消耗性能的,最好使用Confirm機制,Confirm機制相比事務機制性能上要好很多。

channel.confirmSelect();
String message = "Hello RabbitMQ:"; for (int i = 0; i < 5; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (message + i).getBytes("UTF-8")); } boolean isAllPublished = channel.waitForConfirms();

********

String message = "Hello RabbitMQ:"; try { channel.txSelect(); for (int i = 0; i < 5; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (message + i).getBytes("UTF-8")); } channel.txCommit(); } catch (Exception e) { channel.txRollback(); }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM