目錄:
- 細說交換器
- 細說隊列
- 發送消息
- 消費消息
- 確認與拒絕
細說交換器:
1、方法:
public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException
autoDelete:自動刪除;必須有解綁的動作,且需要全部解綁后交換器才會刪除。
internal:內置路由器;客戶端無法直接發送消息到交換器,只能通過交換器路由到內置路由器。
2、其他方法:
)不等待服務通知創建交換器命令:
public void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException
)檢測交換器
public AMQP.Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException
)刪除交換器
public AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException
public void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException
public AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException
ifUnused:為true,交換器沒有被使用時刪除這個交換器;為false,不管三七二十一直接刪除這個交換器。
細說隊列:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
隊列主要介紹參數:boolean exclusive、boolean autoDelete、Map<String, Object> arguments。
1、exclusive:排它性,true-排它,false-非排它。
設置為排它隊列后,該隊列只對首次聲明它的連接可見,並在連接斷開時自動刪除;適用於一個客戶端同時讀寫消息的場景。
1 public class Product { 2 3 private static final String EXCHANGE_NAME = "exclusive.exchange"; 4 private static final String QUEUE_NAME = "exclusive.queue"; 5 private static final String ROUTING_KEY_NAME = "exclusive.routing.key"; 6 7 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 8 Connection connection = RabbitMqUtils.getConnection(); 9 Channel channel = connection.createChannel(); 10 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); 11 channel.queueDeclare(QUEUE_NAME, true, true, false, null); 12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_NAME); 13 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, 14 "排它隊列".getBytes()); 15 16 Connection connection2 = RabbitMqUtils.getConnection(); 17 Channel channel2 = connection2.createChannel(); 18 channel2.basicConsume(QUEUE_NAME, new DefaultConsumer(channel2) { 19 @Override 20 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 21 System.err.println("收到消息:" + new String(body)); 22 } 23 }); 24 25 // 線程休眠5秒,待消息回調完成 26 TimeUnit.SECONDS.sleep(5); 27 28 RabbitMqUtils.close(connection, channel); 29 } 30 }
代碼中18行用的是channel2,也就是connection2創建的信道,此時不滿足排它隊列的只允許首次創建它的連接使用,所以會拋出異常;將16行注釋掉,17行的connection2替換成connection便能正常運行。
2、autoDelete:自動刪除,true-自動刪除,false-非自動刪除。
設置為自動刪除的隊列后,該隊列在其消費者都斷開連接后,自動刪除,不管隊列里是否存在數據。
3、arguments:隊列的其它參數。
前面幾個參數很簡單,我們這里只講重點的x-dead-letter-exchange、x-dead-letter-routing-key。
1 public class DeadProduct { 2 3 private static final String EXCHANGE_NAME = "normal.exchange"; 4 private static final String DEAD_EXCHANGE_NAME = "dead.exchange"; 5 private static final String QUEUE_NAME = "normal.queue"; 6 private static final String DEAD_QUEUE_NAME = "dead.queue"; 7 private static final String ROUTING_KEY_NAME = "normal.routing.key"; 8 private static final String DEAD_ROUTING_KEY_NAME = "dead.routing.key"; 9 10 public static void main(String[] args) throws IOException, TimeoutException { 11 Connection connection = RabbitMqUtils.getConnection(); 12 Channel channel = connection.createChannel(); 13 14 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); 15 channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); 16 17 channel.queueDeclare(QUEUE_NAME, false, false, false, getArguments()); 18 channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null); 19 20 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_NAME); 21 channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, ROUTING_KEY_NAME); 22 for (int i = 0; i < 10; i++) { 23 String message = "死信交換機" + i; 24 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, 25 message.getBytes()); 26 } 27 28 RabbitMqUtils.close(connection, channel); 29 } 30 31 private static Map<String, Object> getArguments() { 32 Map<String, Object> result = new HashMap<String, Object>(5); 33 result.put("x-message-ttl", 5000); 34 result.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); 35 result.put("x-dead-letter-routing-key", ROUTING_KEY_NAME); 36 return result; 37 } 38 }
x-dead-letter-exchange參數其實只是定義了一個交換機的名稱,實際上你還需要定義一套mq的流程,如代碼exchangeDeclare、queueDeclare、queueBind。
x-dead-letter-routing-key參數的routing-key需要與之前的路由key一致,否則數據將會匹配不到,也就不會走到DEAD_EXCHANGE_NAME里去了。
發送消息:
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
發送消息主要參數:mandatory、immediate。
1、mandatory:當消息無法路由到隊列的處理方式。
2、immediate:當路由的隊列無消費者時如何處理。
消費消息:
消費消息分為兩種,一種是推,一種是主動拉。
1、推:basicConsumer(推,相當於監聽,會消費調當前隊列中所有的消息)
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
autoAck:是否自動確認,true-自動確認,false-非自動確認;當autoAck=true時,消費完時就會自動刪除這條消息。
noLocal:設置為ture時,不能將同一個connection生產的消息在此connection消費,也就是說一個connection不能同時為生產者和消費者。
2、拉:basicGet(拉,只會拉取一條消息)
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
注意:不能用for循環的basicGet代替basicConsumer,因為這樣性能非常差。
確認與拒絕:
1、消息確認:
)概念解釋:
為了保證消息可靠的到達消費者,RabbitMQ提供了消息確認機制。
首先我們要知道除了手動的確認消息,還可以通過basicConsumer指定的autoAck參數來確認消息的到達。
當autoAck=false時,RabbitMQ會顯式的等待消息回復確認后,才會將消息從內存或磁盤中移除(先設置為移除標記,然后在真正的移除);當autoAck=true時,RabbitMQ不管消費者有沒有接收到消息,都會直接將消息移除。
注意:當RabbitMQ一直沒有收到消費者確認消息的通知,並且消費者的連接斷開時,那么此條消息會重新進入隊列。
void basicAck(long deliveryTag, boolean multiple) throws IOException;
multiple:是否確認多條消息;true-確認多條(確認該信道上deliveryTag標記之前所有未經確認的消息),false-確認單條。
)代碼示例:
1 public class AckProduct { 2 3 private static final String EXCHANGE_NAME = "ack.exchange"; 4 private static final String QUEUE_NAME = "ack.queue"; 5 private static final String ROUTING_KEY = "ack.routing-key"; 6 7 public static void main(String[] args) throws IOException, TimeoutException { 8 Connection connection = RabbitMqUtils.getConnection(); 9 Channel channel = connection.createChannel(); 10 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); 13 14 for (int i = 0; i < 20; i++) { 15 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, ("ack" + i).getBytes()); 16 } 17 18 RabbitMqUtils.close(connection, channel); 19 } 20 }
1 public class AckConsumer { 2 3 private static final String QUEUE_NAME = "ack.queue"; 4 5 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 6 Connection connection = RabbitMqUtils.getConnection(); 7 final Channel channel = connection.createChannel(); 8 9 // 因為basicConsumer會消費隊列中所有的消息,這樣不方便演示basicAck中multiple參數的效果,故用basicGet來消費消息 10 GetResponse getResponse = channel.basicGet(QUEUE_NAME, false); 11 System.err.println(getResponse.getEnvelope().getDeliveryTag() + ": " + new String(getResponse.getBody())); 12 System.err.println("-------------------------"); 13 14 GetResponse getResponse2 = channel.basicGet(QUEUE_NAME, false); 15 System.err.println(getResponse2.getEnvelope().getDeliveryTag() + ": " + new String(getResponse2.getBody())); 16 System.err.println("-------------------------"); 17 18 TimeUnit.SECONDS.sleep(10); 19 20 GetResponse getResponse3 = channel.basicGet(QUEUE_NAME, false); 21 System.err.println(getResponse3.getEnvelope().getDeliveryTag() + ": " + new String(getResponse3.getBody())); 22 channel.basicAck(getResponse3.getEnvelope().getDeliveryTag(), false); 23 System.err.println("-------------------------"); 24 25 // 休眠10秒,方便看到multiple參數的效果 26 TimeUnit.SECONDS.sleep(10); 27 28 GetResponse getResponse4 = channel.basicGet(QUEUE_NAME, false); 29 System.err.println(getResponse4.getEnvelope().getDeliveryTag() + ": " + new String(getResponse4.getBody())); 30 channel.basicAck(getResponse4.getEnvelope().getDeliveryTag(), true); 31 32 RabbitMqUtils.close(connection, channel); 33 } 34 }
首先AckProduct生產20條消息,AckConsumer先消費兩條消息,再休眠10秒,此時可以看到ready為18,unacked為2,這兩消息未確認(10 - 18行);
然后我們確認一條消息后,發現ready為17,unacked為2(20 - 23行);
此時便是關鍵時刻,若multiple真是如其定義一般,那么在執行28 - 30行后,ready應該是16,而未確認消息unacked應該是0才對,10秒后發現果然如此。
2、消息拒絕:
)拒絕單條消息:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
)拒絕多條消息:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
)參數說明:
multiple:是否拒絕多條消息;true-拒絕多條(拒絕該信道上deliveryTag標記之前所有未經確認的消息),false-拒絕單條(與basicAck類似)。
requeue:是否將拒絕的消息重新放入隊列中;true-重新放入,false-丟棄。