未路由的消息
當生產這發送的消息到達指定的交換器后,如果交換器無法根據自身類型、綁定的隊列以及消息的路由鍵找到匹配的隊列,默認情況下消息將被丟棄。可以通過兩種方式
處理這種情況,一是在發送是設置mandatory參數,二是通過備份交換器。
設置mandatory參數
在發送消息是,可以設置mandatory參數未true,這樣當消息在交換器上無法被路由時,服務器將消息返回給生產者,生產者實現回調函數處理被服務端返回的消息。
public class NoRouteMessage {
private static String QUEUE = "unreachable_queue";
private static String EXCHANGE = "unreachable_exchange";
private static String BINDING_KEY = "fake_key";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory cf = new ConnectionFactory();
Connection connection = cf.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT,false);
channel.queueDeclare(QUEUE,false,false,false,null);
channel.queueBind(QUEUE,EXCHANGE,BINDING_KEY);
String message = "an unreachable message";
boolean mandatory = true;
channel.basicPublish(EXCHANGE,"mykey",mandatory,null,message.getBytes());
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("replyCode: " + replyCode);
System.out.println("replyText: " + replyText);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("message: " + new String(body));
}
});
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.close();
connection.close();
}
}
如上代碼所示,創建了隊列並和direct類型的交換器使用"fake_key"綁定,發送消息時,設定消息路由鍵為"mykey",這樣消息到達交換器時將無比被路由。由於發送消息時
設置basicPublish的參數為true,並為生產這添加處理返回消息的回調方法,這樣,消息將被服務端返回並在回調中得到處理。
備份交換器
與設置mandatory將無法路由的消息返回給生產者不同,可以為交換器設置一般備份交換器(Alternate Exchange),這樣,消息在交換器上無法路由時,將被直接發送到
備份交換器,由備份交換器再次路由。
在下面到示例中,創建了交換器source_exchange,生產者將消息發送到該交換器。source_exchange並未綁定任何隊列,這將導致消息被丟棄。為了處理這種情況,創建
了交換器ae並綁定了一個隊列,然后將ae作為source_exchange對備份交換器,這是通過創建source_exchange交換器時設定alternate-exchange參數完成的。之后,發送到
source_exchange到消息將被服務端發送到ae交換器中,然后路由到ae_queue等待處理。
public class AlternateExchange {
private static String QUEUE = "ae_queue";
private static String EXCHANGE = "source_exchange";
private static String AE = "ae";
private static String BINDING_KEY = "fake_key";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory cf = new ConnectionFactory();
Connection connection = cf.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(AE, "fanout");
Map<String,Object> exArgs = new HashMap<String, Object>();
exArgs.put("alternate-exchange",AE);
channel.exchangeDeclare(EXCHANGE,"direct",false,false,exArgs);
channel.queueDeclare(QUEUE,false,false,false,null);
channel.queueBind(QUEUE,AE,"");
channel.basicPublish(EXCHANGE,"anyKey",null,"message".getBytes());
}
}
TTL
在RabbitMQ中,可以為消息和隊列設置過期時間。消息過期未被消費后,默認被丟棄;隊列過期也會被刪除。
消息的TTL
可以通過兩種方式來為消息設置TTL,一是在發送消息是設置單條消息的TTL;二是在隊列上通過隊列屬性設置TTL,這種情況下,路由到該隊列到消息都擁有同樣都TTL。
當然,也可以同時使用兩種方式,這時,消息的TTL取兩者中較小的。
-
設置單條消息都TTL
使用basic.Publish發送消息時,通過expiration參數設置消息的TTL。AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("10000"); //ttl 10s AMQP.BasicProperties properties = builder.build(); channel.basicPublish(EXCHANGE,"",properties,"10s TTL message".getBytes());
-
通過隊列屬性設置TTL
創建隊列時,可以通過隊列的x-message-ttl參數來設置隊列中消息的TTL。Map<String,Object> params = new HashMap<String, Object>(); params.put("x-message-ttl",5000); channel.queueDeclare(QUEUE,false,false,false,params);
上述代碼將隊列的消息ttl設置為5s。
對於第二種在隊列上設置消息TTL到方式,消息一旦過期,會立刻被從隊列中刪除;而通過第一種發送消息時設置TTL的方式,消息過期后不一定會立即刪除。這是由內部實現決定的,
對於第二種方式,隊列中消息的TTL都相同,則消息過期順序和入隊順序一致,那么只需要從隊頭定期刪除消息即可;而第一種方式下,每條消息過期時間都不同,要實現"實時"刪除
過期消息,得不斷掃描整個隊列,代價太大,所以等到消息即將被推送給消費者時在判斷是否過期,如果過期就刪除,是一種惰性處理策略。
示例
在以下示例中,創建來一個隊列,並設置其中的消息TTL為20s,然后發送兩條被路由到該隊列的消息。第一條消息發送時設置了TTL為10s,這樣,它到達隊列后的TTL將為10s;
第二條消息發送時未設置TTL,它到達隊列后的TTL為20s。
channel.exchangeDeclare(EXCHANGE,"fanout");
Map<String,Object> params = new HashMap<String, Object>();
params.put("x-message-ttl",20000);
channel.queueDeclare(QUEUE,false,false,false,params);
channel.queueBind(QUEUE,EXCHANGE,"");
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("10000"); //ttl 10s
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(EXCHANGE,"",properties,"10s TTL message".getBytes());
channel.basicPublish(EXCHANGE,"",null,"20s TTL message".getBytes());
可以在RabbitMQ的Web管理頁面或使用rabbitmqctl工具在命令行中看到,隊列中到消息剛開時積攢了兩條,10秒鍾后第一條消息到達TTL未被消費,被從隊列中丟棄,隊列中
只剩第二條消息,在過10s,第二條消息也不丟棄。
隊列的TTL
與消息TTL類型,可以為隊列設置TTL。為隊列中設置了TTL后,如果TTL時間內隊列上沒有消費者,或者隊列沒有被重新聲明,那么隊列將被服務端自動刪除。
使用basic.QueueDeclare(channel.queueDeclare)聲明隊列時,通過x-expires參數可以設置隊列的TTL。
聲明一個ttl為10s的隊列:
Map<String,Object> qArgs = new HashMap<String, Object>();
qArgs.put("x-expires",10000);
channel.queueDeclare(TEMP_QUEUE,false,false,false,qArgs);
死信
如果消息在隊列中到達TTL,將被丟棄。這時候,消息變成死信(dead letter).過期是導致死信的原因之一,在RabbitMQ中,以下情況都會產生死信:
- 消息過期
- 消息被消費着拒絕(reject/nack),並且設置requeue參數為false
- 隊列到達最大長度
消息在隊列中變成死信默認將被丟棄,為了處理死信,可以使用死信交換器(DLX)。
死信交換器可以認為是隊列的備胎,當隊列中產生死信時,死信被發送到死信交換器,由死信交換器重新路由到與之綁定的隊列,這些隊列被成為死信隊列。
聲明隊列時,可以通過x-dead-letter-exchange參數設置該隊列的死信交換器,也可以通過policy方式設定隊列的死信交換器。
Map<String,Object> params = new HashMap<String, Object>();
params.put("x-dead-letter-exchange","dlx-exchange");
channel.queueDeclare("myqueue",false,false,false,params);
這樣,當myquue隊列中產生死信時,死信將被發送到dlx-exchange交換器,與它重新路由。
消息到路由鍵是后生產者發送是設置到,在死信被發送到死信交換器時,我們有機會修改消息到路由鍵。在聲明隊列是,指定x-dead-letter-routing-key參數即可。
params.put("x-dead-letter-routing-key","deadKey");
這樣,當死信被發送到死信交換器時,它到路由鍵變為deadKey,后續在死信交換器中將根據該路由鍵進行路由。通過這種在隊列上為死信統一更新路由鍵到方式,使得在某些
情況下可以統一將死信路由到指定隊列,方便對死信統一處理。