消費端的手工ACK和NACK
消費端進行消費的時候,如果由於業務異常我們可以進行日志的記錄,然后進行補償。
如果由於服務器宕機等嚴重問題,那么我們就需要手工進行ACK保障消費端成功。
消費端重回隊列
為了對沒有處理成功的消息,把消息重新回遞給Broker。
一般我們在實際應用中,都會關閉重回隊列,也就是設置為false。
//生產端代碼
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_ack_exchange"; String routingKey = "ack.save"; for(int i =0; i<5; i ++){ Map<String, Object> headers = new HashMap<String, Object>(); headers.put("num", i); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers) .build(); String msg = "Hello RabbitMQ ACK Message " + i; channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); }
//消費端代碼
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String queueName = "test_ack_queue"; String routingKey = "ack.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); // 手工簽收 必須要關閉 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
//自定義消費者
private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("body: " + new String(body)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if((Integer)properties.getHeaders().get("num") == 0) { channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } }
TTL隊列/消息
TTL是time to live的縮寫,也就是生存時間
RabbitMQ支持消息的過期時間,在消息發送時可以進行指定
RabbitMQ支持隊列的過期時間,從消息入隊列開始計算,只要超過了隊列的超過時間配置,那么消息會自動的清除。
消息10s過期,TTL是隊列過期時間。
DLX死信隊列
DLX,Dead-Letter-Exchange
利用DLX,當消息在一個隊列中變成死信之后,它能夠被重新publish到另一個exchange,這個exchange就是DLX。
消息變成死信情況:
消息被拒絕(basic.reject/basic.nack)並且request=false
消息TTL過期
隊列達到最大的長度
DLX也是一個正常的exchange,和一般的exchange沒有區別,他能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。
當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的exchange上去,進而被路由到另一個隊列。
可以監聽這個隊列中消息做相應的處理,這個特性可以彌補RabbitMQ3.0以前支持的immediate參數的功能。
死信隊列設置:
首先要設置死信隊列的exchange和queue,然后進行綁定:
Exchange:dlx.exchange
Queue:dlx.queue
RoutingKey:#
然后我們進行正常聲明交換機,隊列,綁定,只不過我們需要在隊列加上一個參數:arguments.put("x-dead-letter-exchange","dlx.exchange");
這樣消息在過期、request、隊列子啊達到最大長度時, 消息就可以直接路由到死信隊列。
//生產者端代碼
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_dlx_exchange"; String routingKey = "dlx.save"; String msg = "Hello RabbitMQ DLX Message"; for(int i =0; i<1; i ++){ AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .build(); channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); }
//消費者端代碼
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 這就是一個普通的交換機 和 隊列 以及路由
String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); //這個agruments屬性,要設置到聲明隊列上
channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); //要進行死信隊列的聲明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); channel.queueBind("dlx.queue", "dlx.exchange", "#"); channel.basicConsume(queueName, true, new MyConsumer(channel));
//自定義消費者
public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); }