自定義消費者使用
- 我們之前呢都是在代碼中編寫while循環,進行
consumer.nextDelivery
方法進行獲取下一條消息,然后進行消費處理! - 其實我們還可以使用自定義的Consumer,它更加的方便,解耦性更加的強,也是在實際工作中最常用的使用方式!
- 自定義消費端實現只需要繼承
DefaultConsumer
類,重寫handleDelivery
方法即可
自定義消費端演示
public class Producer {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創建一個新的Channel
Channel channel = connection.createChannel();
String exchange = "test_consumer_exchange";
String routingKey = "consumer.save";
String msg = "Hello RabbitMQ Consumer Message";
//4 發送消息
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//consumerTag: 內部生成的消費標簽 properties: 消息屬性 body: 消息內容
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
//envelope包含屬性:deliveryTag(標簽), redeliver, exchange, routingKey
//redeliver是一個標記,如果設為true,表示消息之前可能已經投遞過了,現在是重新投遞消息到監聽隊列的消費者
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創建一個新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
//4 聲明交換機和隊列,然后進行綁定設置路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 設置channel,使用自定義消費者
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
運行說明
先啟動消費端,訪問管控台:http://ip:15672,檢查Exchange和Queue是否設置OK,然后啟動生產端。消費端打印內容如下
消費端的限流策略
限流的場景與機制
- 假設一個場景,我們Rabbitmq服務器有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現這種情況:巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數據!此時很有可能導致服務器崩潰,嚴重的可能導致線上的故障。
- 除了這種場景,還有一些其他的場景,比如說單個生產者一分鍾生產出了幾百條數據,但是單個消費者一分鍾可能只能處理60條數據,這個時候生產端和消費端肯定是不平衡的。通常生產端是沒辦法做限制的。所以消費端肯定需要做一些限流措施,否則如果超出最大負載,可能導致消費端性能下降,服務器卡頓甚至崩潰等一系列嚴重后果。
消費端限流機制
RabbitMQ提供了一種qos
(服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息 (通過基於consume或者channel設置Qos的值) 未被確認前,不進行消費新的消息。
需要注意:
1.不能設置自動簽收功能(autoAck = false)
2.如果消息沒被確認,就不會到達消費端,目的就是給消費端減壓
限流相關API
限流設置 - BasicQos()
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize:
單條消息的大小限制,消費端通常設置為0,表示不做限制
prefetchCount:
一次最多能處理多少條消息,通常設置為1
global:
是否將上面設置應用於channel,false代表consumer級別
注意事項
prefetchSize
和global
這兩項,rabbitmq沒有實現,暫且不研究
prefetchCount
在 autoAck=false
的情況下生效,即在自動應答的情況下這個值是不生效的
手工ACK - basicAck()
void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,調用這個方法就會主動回送給Broker一個應答,表示這條消息我處理完了,你可以給我下一條了。參數multiple
表示是否批量簽收,由於我們是一次處理一條消息,所以設置為false
限流演示
生產端
生產端就是正常的邏輯
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
// 發送消息
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchange, routingKey, true, null,
msg.getBytes());
}
}
}
自定義消費者
為了看到限流效果,這里不進行ACK
public class MyConsumer extends DefaultConsumer {
//接收channel
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
//System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//手工ACK,參數multiple表示不批量簽收
//channel.basicAck(envelope.getDeliveryTag(), false);
}
}
消費端
關閉autoACK,進行限流設置
public class Consumer {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創建一個新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
//4 聲明交換機和隊列,然后進行綁定設置路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//進行參數設置:單條消息的大小限制,一次最多能處理多少條消息,是否將上面設置應用於channel
channel.basicQos(0, 1, false);
//限流: autoAck設置為 false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
運行說明
我們先注釋掉手工ACK方法,然后啟動消費端和生產端,此時消費端只打印了一條消息
這是因為我們設置了手工簽收,並且設置了一次只處理一條消息,當我們沒有回送ack應答時,Broker端就認為消費端還沒有處理完這條消息,基於這種限流機制就不會給消費端發送新的消息了,所以消費端只打印了一條消息。
通過管控台也可以看到隊列總共收到了5條消息,有一條消息沒有ack。
將手工簽收代碼取消注釋,再次運行消費端,此時就會打印5條消息的內容。
消費端ACK與重回隊列機制
ACK與NACK
當我們設置 autoACK=false
時,就可以使用手工ACK方式了,那么其實手工方式包括了手工ACK與NACK。
當我們手工 ACK
時,會發送給Broker一個應答,代表消息成功處理了,Broker就可以回送響應給生產端了。NACK
則表示消息處理失敗了,如果設置重回隊列,Broker端就會將沒有成功處理的消息重新發送。
使用方式
- 消費端進行消費的時候,如果由於業務異常我們可以手工
NACK
並進行日志的記錄,然后進行補償!
方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue)
- 如果由於服務器宕機等嚴重問題,那我們就需要手工進行
ACK
保障消費端消費成功!
方法:void basicAck(long deliveryTag, boolean multiple)
重回隊列演示
- 消費端重回隊列是為了對沒有處理成功的消息,把消息重新會遞給Broker!
- 重回隊列,會把消費失敗的消息重新添加到隊列的尾端,供消費者繼續消費。
- 一般我們在實際應用中,都會關閉重回隊列,也就是設置為false
生產端
對消息設置自定義屬性以便進行區分
public class Producer {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactorys
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創建一個新的Channel
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){
//設置消息屬性
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
//發送消息
String msg = "Hello RabbitMQ ACK Message " + i;
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
自定義消費
對第一條消息進行NACK,並設置重回隊列
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//NACK,參數三requeue:是否重回隊列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
消費端
關閉自動簽收功能
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
//聲明交換機和隊列,然后進行綁定設置路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//手工簽收 必須要設置 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
運行說明
先啟動消費端,然后啟動生產端,消費端打印如下,顯然第一條消息由於我們調用了NACK,並且設置了重回隊列,所以會導致該條消息一直重復發送,消費端就會一直循環消費。
一般工作中不會設置重回隊列這個屬性,都是自己去做補償或者投遞到延遲隊列里的,然后指定時間去處理即可。
TTL
TTL說明
- TTL是
Time To Live
的縮寫,也就是生存時間 - RabbitMQ支持消息的過期時間,在消息發送時可以進行指定
- RabbitMQ支持為每個隊列設置消息的超時時間,從消息入隊列開始計算,只要超過了隊列的超時時間配置,那么消息會自動的清除
TTL演示
這次演示我們不寫代碼,只通過管控台進行操作,實際測試也會更為方便一些。
1. 創建Exchange
選擇Exchange菜單,找到下面的Add a new exchange
2.創建Queue
選擇Queue菜單,找到下面的Add a new queue
3.建立隊列和交換機的綁定關系
點擊Exchange表格中的test002_exchange
,在下面添加綁定規則
4.發送消息
點擊Exchange表格中的test002_exchange
,在下面找到Publish message
,設置消息進行發送
5.驗證
點擊Queue菜單,查看表格中test002已經有了一條消息,10秒后表格顯示0條,說明過期時間到了消息被自動清除了。
6.設置單條消息過期時間
點擊Exchange表格中的test002_exchange
,在下面找到Publish message
,設置消息的過期時間並進行發送,此時觀察test002隊列,發現消息5s后就過期被清除了,即使隊列設置的過期時間是10s。
TTL代碼設置過期時間
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("10000") //10s過期
.build();
//發送消息
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
隊列過期時間設置
//設置隊列的過期時間10s
Map<String,Object> param = new HashMap<>();
param.put("x-message-ttl", 10000);
//聲明隊列
channel.queueDeclare(queueName, true, false, false, null);
注意事項
- 兩者的區別是設置隊列的過期時間是對該隊列的所有消息生效的。
- 為消息設置TTL有一個問題:RabbitMQ只對處於隊頭的消息判斷是否過期(即不會掃描隊列),所以,很可能隊列中已存在死消息,但是隊列並不知情。這會影響隊列統計數據的正確性,妨礙隊列及時釋放資源。
死信隊列
死信隊列介紹
- 死信隊列:DLX,
dead-letter-exchange
- 利用DLX,當消息在一個隊列中變成死信
(dead message)
之后,它能被重新publish到另一個Exchange,這個Exchange就是DLX
消息變成死信有以下幾種情況
- 消息被拒絕(basic.reject / basic.nack),並且requeue = false
- 消息TTL過期
- 隊列達到最大長度
死信處理過程
- DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。
- 當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange上去,進而被路由到另一個隊列。
- 可以監聽這個隊列中的消息做相應的處理。
死信隊列設置
- 首先需要設置死信隊列的exchange和queue,然后進行綁定:
- 然后需要有一個監聽,去監聽這個隊列進行處理
- 然后我們進行正常聲明交換機、隊列、綁定,只不過我們需要在隊列加上一個參數即可:
arguments.put(" x-dead-letter-exchange","dlx.exchange");
,這樣消息在過期、requeue、 隊列在達到最大長度時,消息就可以直接路由到死信隊列!
死信隊列演示
生產端
public class Producer {
public static void main(String[] args) throws Exception {
//1 創建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創建一個新的Channel
Channel channel = connection.createChannel();
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
//發送消息
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
自定義消費者
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
消費端
- 聲明正常處理消息的交換機、隊列及綁定規則
- 在正常交換機上指定死信發送的Exchange
- 聲明死信交換機、隊列及綁定規則
- 監聽死信隊列,進行后續處理,這里省略
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個普通的交換機 和 隊列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
String deadQueueName = "dlx.queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
// 指定死信發送的Exchange
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
// 這個agruments屬性,要設置到聲明隊列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
// 要進行死信隊列的聲明
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare(deadQueueName, true, false, false, null);
channel.queueBind(deadQueueName, "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
//channel.basicConsume(deadQueueName, true, new MyConsumer(channel));
}
}
運行說明
啟動消費端,此時查看管控台,新增了兩個Exchange,兩個Queue。在test_dlx_queue
上我們設置了DLX,也就代表死信消息會發送到指定的Exchange上,最終其實會路由到dlx.queue
上。
此時關閉消費端,然后啟動生產端,查看管控台隊列的消息情況,test_dlx_queue
的值為1,而dlx_queue
的值為0。
10s后的隊列結果如圖,由於生產端發送消息時指定了消息的過期時間為10s,而此時沒有消費端進行消費,消息便被路由到死信隊列中。
實際環境我們還需要對死信隊列進行一個監聽和處理,當然具體的處理邏輯和業務相關,這里只是簡單演示死信隊列是否生效。