在上一篇文章講解MQ消息可靠性投遞和冪等性中有提到confirm機制的重要性,現在更詳細的說明一下
一、Confirm機制
Confirm就是消息確認,當Producer發送消息,如果Broker收到消息,會回復一個應答,我們可以以此來確認消息是否成功送達,是保證
消息可靠性投遞的核心保障
Producer代碼如下,只需要修改Producer端,而Consumer端不需要修改
//4 指定我們的消息投遞模式: 消息的確認模式 channel.confirmSelect(); //5 發送一條消息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); //6 添加一個確認監聽 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------no ack!-----------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ack!-----------"); } });
結果:
-------ack!-----------
只要Producer能把消息發送給Broker,就會返回handlerAck中,返回到NAck的可能很小,例如MQ出現異常,queue的容量達到上限
二、Return消息機制
Return Listener用於處理一些不可路由的消息
Producer:

public class Producer { public static void main(String[] args) throws Exception { //1 創建ConnectionFactory ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("139.196.75.238"); factory.setPort(5672); //2 獲取Connection Connection connection = factory.newConnection(); //3 通過Connection創建一個新的Channel Channel channel = connection.createChannel(); String exchangeName = "exchange_topic"; String routingKey = "fdasfdsafsadf4543453"; //6 添加一個return監聽 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("---------handle return----------"); System.err.println("replyCode: " + replyCode); System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); //5 發送一條消息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes()); } }
結果:
---------handle return---------- replyCode: 312 replyText: NO_ROUTE exchange: exchange_topic routingKey: fdasfdsafsadf4543453 properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) body: Hello RabbitMQ Send confirm message!
注意:
channel.basicPublish參數里面一定要把Mandatory設置為true,才能收到監聽不可達的消息(創建exchange、routingKey不匹配等問題
,導致不可達),然后進行后續處理,如果為false,broker自動刪除該消息,上面例子就是routingKey設置不匹配,Consumer的代碼就不給了
三、消息端限流
限流一般無法從生產端,只能在消費端處理
在Consumer端設置:
channel.basicQos(0, 1, false); channel.basicConsume(queueName, false, new MyConsumer(channel));
qos:
服務質量保證,在非自動確認情況下,一定數目的消息沒有確認,不進行消費新的消息,通過producer/consumer設置qos的值
channel.basicQos(prefetchSize, prefetch_count, global);
注意:
prefetchSize和global,rabbitMQ沒有實現,默認0表示對單條message的大小沒有限制、false(非channel級別,consumer級別)
channel.basicConsume中自動簽收一定要設置成false
prefetch_count表示一次給幾條進行消費,直到返回ack,才能繼續給prefetch_count條message
在MyConsumer中手動簽收
public class MyConsumer extends DefaultConsumer { private Channel channel; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("body: " + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }
四、TTL
五、死信隊列
未完待續。。。