理解Confirm消息確認機制:
消息的確認,是指生產者投遞消息后,如果Broker收到消息,則會給我們生產這一個應答。
生產者進行接收應答,用來確定這條消息是否正常的發送到Broker,這種方式也是消息的可靠性投遞的核心保障。
如何實現Confirm確認消息?
第一步:在channel上開啟確認模式:channel.confirmSelect()
第二步:在channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,
根據具體的結果對消息進行重新發送、或者記錄日志等后續處理。
1 //生產端代碼 2 //1 創建ConnectionFactory 3 ConnectionFactory connectionFactory = new ConnectionFactory(); 4 connectionFactory.setHost("127.0.0.1"); 5 connectionFactory.setPort(5672); 6 connectionFactory.setVirtualHost("/"); 7 8 //2 獲取C onnection 9 Connection connection = connectionFactory.newConnection(); 10 11 //3 通過Connection創建一個新的Channel 12 Channel channel = connection.createChannel(); 13 14 15 //4 指定我們的消息投遞模式: 消息的確認模式 16 channel.confirmSelect(); 17 18 String exchangeName = "test_confirm_exchange"; 19 String routingKey = "confirm.save"; 20 21 //5 發送一條消息 22 String msg = "Hello RabbitMQ Send confirm message!"; 23 channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); 24 25 //6 添加一個確認監聽 26 channel.addConfirmListener(new ConfirmListener() { 27 @Override 28 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 29 System.err.println("-------no ack!-----------"); 30 } 31 32 @Override 33 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 34 System.err.println("-------ack!-----------"); 35 } 36 });
1 //消費端代碼 2 //1 創建ConnectionFactory 3 ConnectionFactory connectionFactory = new ConnectionFactory(); 4 connectionFactory.setHost("127.0.0.1"); 5 connectionFactory.setPort(5672); 6 connectionFactory.setVirtualHost("/"); 7 8 //2 獲取C onnection 9 Connection connection = connectionFactory.newConnection(); 10 11 //3 通過Connection創建一個新的Channel 12 Channel channel = connection.createChannel(); 13 14 String exchangeName = "test_confirm_exchange"; 15 String routingKey = "confirm.#"; 16 String queueName = "test_confirm_queue"; 17 18 //4 聲明交換機和隊列 然后進行綁定設置, 最后制定路由Key 19 channel.exchangeDeclare(exchangeName, "topic", true); 20 channel.queueDeclare(queueName, true, false, false, null); 21 channel.queueBind(queueName, exchangeName, routingKey); 22 23 //5 創建消費者 24 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 25 channel.basicConsume(queueName, true, queueingConsumer); 26 27 while(true){ 28 Delivery delivery = queueingConsumer.nextDelivery(); 29 String msg = new String(delivery.getBody()); 30 31 System.err.println("消費端: " + msg); 32 }
Return消息機制
Return Listener用於處理一些不可路由的消息。
我們的消息生產者,通過指定一個Exchange和Routingkey,把消息送到某一個隊列中,
然后我們的消費者監聽隊列,進行消息處理操作。
但是在某些情況下,如果我們在發送消息的時候,當前的exchange不存在或者指定的路由key路由不到,
這個時候我們需要監聽這種不可達的消息,就要使用return listener。
在基礎API中有一個關鍵的配置項:
Mandatory:如果為true,則監聽會接收到路由不可達的消息,然后進行后續處理,
如果為false,那么broker端自動刪除該消息。(默認false)
1 //生產端代碼 2 ConnectionFactory connectionFactory = new ConnectionFactory(); 3 connectionFactory.setHost("127.0.0.1"); 4 connectionFactory.setPort(5672); 5 connectionFactory.setVirtualHost("/"); 6 7 Connection connection = connectionFactory.newConnection(); 8 Channel channel = connection.createChannel(); 9 10 String exchange = "test_return_exchange"; 11 String routingKey = "return.save"; 12 String routingKeyError = "abc.save"; 13 14 String msg = "Hello RabbitMQ Return Message"; 15 16 17 channel.addReturnListener(new ReturnListener() { 18 @Override 19 public void handleReturn(int replyCode, String replyText, String exchange, 20 String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 21 22 System.err.println("---------handle return----------"); 23 System.err.println("replyCode: " + replyCode); 24 System.err.println("replyText: " + replyText); 25 System.err.println("exchange: " + exchange); 26 System.err.println("routingKey: " + routingKey); 27 System.err.println("properties: " + properties); 28 System.err.println("body: " + new String(body)); 29 } 30 }); 31 32 33 channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); 34
//消費端代碼 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "return.#"; String queueName = "test_return_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消費者: " + msg); }
運行結果: