一:消费者确认
消费者确认或者说消费者应答指的是RabbitMQ需要确认消息到底有没有被收到
- 自动应答
boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
在订阅消息的时候可以指定应答模式,当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
生产者
public class Producer { @Test public void testBasicPublish() throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String EXCHANGE_NAME = "exchange.direct"; String QUEUE_NAME = "queue_name"; String ROUTING_KEY = "key"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); String message = "Hello RabbitMQ:"; for (int i = 0; i < 5; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (message + i).getBytes("UTF-8")); } channel.close(); connection.close(); } }
消费者
public class Consumer1 { @Test public void testBasicConsumer1() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); // 5672 factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); String EXCHANGE_NAME = "exchange.direct"; String QUEUE_NAME = "queue_name"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); // GetResponse response = channel.basicGet(QUEUE_NAME, false); // byte[] body = response.getBody(); // System.out.println(new String(body).toString()); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); } }; channel.basicConsume(QUEUE_NAME, true, consumer); Thread.sleep(100000); } }
运行结果:
运行生产者可以看到Ready=5, Unacked=0, Total=5, Total代表队列中的消息总条数,Ready代表消费者还可以读到的条数,Unacked:代表还有多少条没有被应答
在消费者端的获取消息的第一行打个断点,可以看到,第一次进入到handleDelivery()方法时,队列瞬间被清空。Ready=0, Unacked=0, Total=0
当消费者连接上队列了,因为没有指定消费者一次获取消息的条数,所以队列把队列中的所有消息一下子推送到消费者端,当消费者订阅的该队列,消息就会从队列推到客户端,当消息从队列被推出的时的那一刻就表示已经对消息进行自动确认了,消息就会从队列中删除。
- 手动应答
手动应答和自动应答不一样,需要将autoAck设置为false,当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,RabbitMQ可以从队列中删除该消息了,可以通过显示调用channel.basicAck(envelope.getDeliveryTag(), false);来告诉消息服务器来删除消息
boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
消费者
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, consumer); Thread.sleep(100000);
当代码执行完channel.basicConsume(QUEUE_NAME, false, consumer);还没有进入到handleDelivery()方法时可以看到Ready=0, Unacked=5, Total=5
当代码进入handleDelivery()方法没执行一次channel.basicAck(envelope.getDeliveryTag(), false);Unacked和Total就会减去1,直到两个值都为0
特殊情况:手动应答如果忘记写channel.basicAck(envelope.getDeliveryTag(), false)这行代码,现象是消费者仍然能获取所有消息,不过此时Unacked和Total一直都是5,Ready=0, Unacked=5, Total=5,直到消费者运行结束,Ready=5, Unacked=0, Total=5
特殊情况2:如果设置消费者每次从队列中获取指定的条数channel.basicQos(1);,此时如果没有应答的话,消费者将不再继续获取
// 因设置了一次获取一条,所以可读的为4,未应答的是1
// 继续运行,因为一次只获取一条,而这一条还没有应答,就没有办法继续获取下一条
// 消费者运行结束的时候又回到原来的状态Ready=5, Unacked=0, Total=5
注意:如果都没有手动应答,在没有指定获取消息的条数时,消费者可以获取所有消息,当指定时,只能获取指定条,下次就只能等待了,没法继续获取下一条了
- 手动拒绝
手动应答是除了确认应答,也可以拒绝应答。
requeue=true,表示将消息重新放入到队列中,false:表示直接从队列中删除,此时和basicAck(long deliveryTag, false)的效果一样 void basicReject(long deliveryTag, boolean requeue);
消费者代码示例一:
public class Consumer1 { @Test public void testBasicConsumer1() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); // 5672 factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); String EXCHANGE_NAME = "exchange.direct"; String QUEUE_NAME = "queue_name"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); if (message.contains(":3")){ // requeue:重新入队列,false:直接丢弃,相当于告诉队列可以直接删除掉 channel.basicReject(envelope.getDeliveryTag(), false); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); Thread.sleep(100000); } }
消费者代码示例二:
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); if (message.contains(":3")){ // requeue:重新入队列,true: 重新放入队列 channel.basicReject(envelope.getDeliveryTag(), true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); Thread.sleep(100000);
结果解释:代码中没有指定设定消费者一次从队列中获取消息的条数,所以消费者一下子拿到了5条消息,消费了0、1、2当消费第i=3时执行channel.basicReject(envelope.getDeliveryTag(), true);会将消息放入到队列中,然后将消息推送给消费者, 然后消费4,接着再消费3,还会再次放入到队列,整个过程死循环,Ready=0, Unacked=1, Total=1, 当消费者运行结束了,Ready=1, Unacked=0, Total=1, 这个1就是消息3
- 重新投递
basicRecover(): 重新投递并没有所谓的像basicReject中的basicReject的deliveryTag参数,所以重新投递好像是将消费者还没有处理的所有的消息都重新放入到队列中,而不是将某一条消息放入到队列中,与basicReject不同的是,重新投递可以指定投递的消息是否允许当前消费者消费。
// If true, messages will be requeued and possibly delivered to a different consumer. If false, messages will be redelivered to the same consumer. Basic.RecoverOk basicRecover(boolean requeue);
示例代码一:
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); if (message.contains(":3")){ channel.basicRecover(true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); Thread.sleep(100000);
这里不太明白,true的话表示会被其他消费者消费,不知道3、4又被接收了一次???
false:表示重新递送的消息还会被当前消费者消费
二:生产者确认
当生产者发布消息到RabbitMQ中,生产者需要知道是否真的已经发送到RabbitMQ中,需要RabbitMQ告诉生产者。
-
Confirm机制
channel.confirmSelect();
channel.waitForConfirms(); -
事务机制
channel.txSelect();
channel.txRollback();
注意:事务机制是非常非常非常消耗性能的,最好使用Confirm机制,Confirm机制相比事务机制性能上要好很多。
channel.confirmSelect();
String message = "Hello RabbitMQ:"; for (int i = 0; i < 5; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (message + i).getBytes("UTF-8")); } boolean isAllPublished = channel.waitForConfirms();
********
String message = "Hello RabbitMQ:"; try { channel.txSelect(); for (int i = 0; i < 5; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (message + i).getBytes("UTF-8")); } channel.txCommit(); } catch (Exception e) { channel.txRollback(); }