1.Confirm消息確認機制
消息的確認:是指生產者投遞消息后,如果Broker收到消息,則會給生產者一個應答。
生產者進行接收應答,用來確定這條消息是否正常的發送到Broker,這種方式也是消息的可靠性投遞的核心保障。

生產端
public static void main(String[] args) throws IOException, TimeoutException {
//創建一個連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//創建連接
Connection connection = connectionFactory.newConnection();
//通過連接創建一個Channel
Channel channel = connection.createChannel();
//指定消息的投遞模式:消息的確認模式
channel.confirmSelect();
//通過Channel發送數據
channel.basicPublish("","hello",null,"hello world".getBytes());
//添加一個確認監聽
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----handleAck---");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----handleNack---");
}
});
}
消費端:
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//創建一個連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//創建連接
Connection connection = connectionFactory.newConnection();
//通過連接創建一個Channel
Channel channel = connection.createChannel();
//創建一個隊列
String queueName = "hello";
channel.queueDeclare(queueName,true,false,false,null);
//創建一個消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//設置Channel
channel.basicConsume(queueName,true,consumer);
//獲取消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消費端:"+msg);
}
}
運行結果:
消費端:

生產端:

2.Return返回消息機制
某些情況下,如果我們在發送消息的時候,當前的exchange不存在或者指定的路由key路由不到,這時候如果我們需要監聽這種不可達的消息,就需要使用Return Listener
在API中有個一重要配置項:
Mandatory:如果為true,則監聽器會接收到路由不可達的消息,然后進行后續處理,如果為false,則broker端自動刪除該消息。
Return消息機制流程:

消費端跟上文一樣,
生產端:
public static void main(String[] args) throws IOException, TimeoutException {
//創建一個連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//創建連接
Connection connection = connectionFactory.newConnection();
//通過連接創建一個Channel
Channel channel = connection.createChannel();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
System.out.println(properties);
System.out.println(Arrays.toString(body));
}
});
//通過Channel發送數據
// 在這里要設置Mandatory(第三個參數)為true,否則broker會自動刪除消息
channel.basicPublish("","return",true,null,"hello world".getBytes());
}
打印結果:

