RabbitMQ從入門到精通(三)


自定義消費者使用

  • 我們之前呢都是在代碼中編寫while循環,進行 consumer.nextDelivery 方法進行獲取下一條消息,然后進行消費處理!
  • 其實我們還可以使用自定義的Consumer,它更加的方便,解耦性更加的強,也是在實際工作中最常用的使用方式!
  • 自定義消費端實現只需要繼承 DefaultConsumer 類,重寫 handleDelivery 方法即可

 

自定義消費端演示

public class Producer {
	 public static void main(String[] args) throws Exception {
	        //1 創建ConnectionFactory
		 	ConnectionFactory connectionFactory = new ConnectionFactory();
	        connectionFactory.setHost("192.168.244.11");
			connectionFactory.setPort(5672);
			connectionFactory.setVirtualHost("/");
			connectionFactory.setHandshakeTimeout(20000);
	        //2 獲取Connection
	        Connection connection = connectionFactory.newConnection();
	        //3 通過Connection創建一個新的Channel
	        Channel channel = connection.createChannel();
	        
	        String exchange = "test_consumer_exchange";
	        String routingKey = "consumer.save";
	        
	        String msg = "Hello RabbitMQ Consumer Message";
	        //4 發送消息
	        for(int i =0; i<5; i ++){
	            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
	        }
	    }
}

 

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //consumerTag: 內部生成的消費標簽  properties: 消息屬性  body: 消息內容  
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        //envelope包含屬性:deliveryTag(標簽), redeliver, exchange, routingKey
        //redeliver是一個標記,如果設為true,表示消息之前可能已經投遞過了,現在是重新投遞消息到監聽隊列的消費者
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

 

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1 創建ConnectionFactory
	 	ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection創建一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.#";
        String queueName = "test_consumer_queue";
        //4 聲明交換機和隊列,然后進行綁定設置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 設置channel,使用自定義消費者
        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}

 

運行說明

先啟動消費端,訪問管控台:http://ip:15672,檢查Exchange和Queue是否設置OK,然后啟動生產端。消費端打印內容如下

 

消費端的限流策略

限流的場景與機制

  • 假設一個場景,我們Rabbitmq服務器有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現這種情況:巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數據!此時很有可能導致服務器崩潰,嚴重的可能導致線上的故障。
  • 除了這種場景,還有一些其他的場景,比如說單個生產者一分鍾生產出了幾百條數據,但是單個消費者一分鍾可能只能處理60條數據,這個時候生產端和消費端肯定是不平衡的。通常生產端是沒辦法做限制的。所以消費端肯定需要做一些限流措施,否則如果超出最大負載,可能導致消費端性能下降,服務器卡頓甚至崩潰等一系列嚴重后果。

 

消費端限流機制

RabbitMQ提供了一種qos (服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息 (通過基於consume或者channel設置Qos的值) 未被確認前,不進行消費新的消息。

需要注意:

1.不能設置自動簽收功能(autoAck = false)

2.如果消息沒被確認,就不會到達消費端,目的就是給消費端減壓

 

限流相關API

限流設置 - BasicQos()

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize: 單條消息的大小限制,消費端通常設置為0,表示不做限制
prefetchCount: 一次最多能處理多少條消息,通常設置為1
global: 是否將上面設置應用於channel,false代表consumer級別

注意事項

prefetchSizeglobal這兩項,rabbitmq沒有實現,暫且不研究
prefetchCountautoAck=false 的情況下生效,即在自動應答的情況下這個值是不生效的
 
手工ACK - basicAck()

void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,調用這個方法就會主動回送給Broker一個應答,表示這條消息我處理完了,你可以給我下一條了。參數multiple表示是否批量簽收,由於我們是一次處理一條消息,所以設置為false

 

限流演示

生產端

生產端就是正常的邏輯

public class Producer {
	public static void main(String[] args) throws Exception {

		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);

		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();

		String exchange = "test_qos_exchange";
		String routingKey = "qos.save";

		String msg = "Hello RabbitMQ QOS Message";
		// 發送消息
		for (int i = 0; i < 5; i++) {
			channel.basicPublish(exchange, routingKey, true, null,
					msg.getBytes());
		}
	}
}

 

自定義消費者

為了看到限流效果,這里不進行ACK

public class MyConsumer extends DefaultConsumer {

    //接收channel
    private Channel channel ;
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        //System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        //手工ACK,參數multiple表示不批量簽收
        //channel.basicAck(envelope.getDeliveryTag(), false);   
    }
}

 

消費端

關閉autoACK,進行限流設置

public class Consumer {

    public static void main(String[] args) throws Exception {
        //1 創建ConnectionFactory
    	ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection創建一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        //4 聲明交換機和隊列,然后進行綁定設置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //進行參數設置:單條消息的大小限制,一次最多能處理多少條消息,是否將上面設置應用於channel
        channel.basicQos(0, 1, false);
        
        //限流: autoAck設置為 false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}

 

運行說明

我們先注釋掉手工ACK方法,然后啟動消費端和生產端,此時消費端只打印了一條消息2019-06-09_163747

這是因為我們設置了手工簽收,並且設置了一次只處理一條消息,當我們沒有回送ack應答時,Broker端就認為消費端還沒有處理完這條消息,基於這種限流機制就不會給消費端發送新的消息了,所以消費端只打印了一條消息。

通過管控台也可以看到隊列總共收到了5條消息,有一條消息沒有ack。

將手工簽收代碼取消注釋,再次運行消費端,此時就會打印5條消息的內容。

 

消費端ACK與重回隊列機制

ACK與NACK

當我們設置 autoACK=false 時,就可以使用手工ACK方式了,那么其實手工方式包括了手工ACK與NACK。

當我們手工 ACK 時,會發送給Broker一個應答,代表消息成功處理了,Broker就可以回送響應給生產端了。NACK 則表示消息處理失敗了,如果設置重回隊列,Broker端就會將沒有成功處理的消息重新發送。

 

使用方式

  1. 消費端進行消費的時候,如果由於業務異常我們可以手工 NACK 並進行日志的記錄,然后進行補償!
    方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  2. 如果由於服務器宕機等嚴重問題,那我們就需要手工進行 ACK 保障消費端消費成功!
    方法:void basicAck(long deliveryTag, boolean multiple)

 

重回隊列演示

  • 消費端重回隊列是為了對沒有處理成功的消息,把消息重新會遞給Broker!
  • 重回隊列,會把消費失敗的消息重新添加到隊列的尾端,供消費者繼續消費。
  • 一般我們在實際應用中,都會關閉重回隊列,也就是設置為false

 

生產端

對消息設置自定義屬性以便進行區分

public class Producer {

    public static void main(String[] args) throws Exception {
        //1 創建ConnectionFactorys
    	ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection創建一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";
        
        for(int i =0; i<5; i ++){
            //設置消息屬性
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            //發送消息
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }   
    }
}

 

自定義消費

對第一條消息進行NACK,並設置重回隊列

public class MyConsumer extends DefaultConsumer {

    private Channel channel ;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
            //NACK,參數三requeue:是否重回隊列
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
}

 

消費端

關閉自動簽收功能

public class Consumer {
    
    public static void main(String[] args) throws Exception {
        
    	ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";
        //聲明交換機和隊列,然后進行綁定設置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //手工簽收 必須要設置 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}

 

運行說明

先啟動消費端,然后啟動生產端,消費端打印如下,顯然第一條消息由於我們調用了NACK,並且設置了重回隊列,所以會導致該條消息一直重復發送,消費端就會一直循環消費。

 

一般工作中不會設置重回隊列這個屬性,都是自己去做補償或者投遞到延遲隊列里的,然后指定時間去處理即可。

 

TTL

TTL說明

  • TTL是Time To Live的縮寫,也就是生存時間
  • RabbitMQ支持消息的過期時間,在消息發送時可以進行指定
  • RabbitMQ支持為每個隊列設置消息的超時時間,從消息入隊列開始計算,只要超過了隊列的超時時間配置,那么消息會自動的清除

 

TTL演示

這次演示我們不寫代碼,只通過管控台進行操作,實際測試也會更為方便一些。
 

1. 創建Exchange

選擇Exchange菜單,找到下面的Add a new exchange

 

2.創建Queue

選擇Queue菜單,找到下面的Add a new queue

 

3.建立隊列和交換機的綁定關系

點擊Exchange表格中的test002_exchange,在下面添加綁定規則

 

4.發送消息

點擊Exchange表格中的test002_exchange,在下面找到Publish message,設置消息進行發送

 

5.驗證

點擊Queue菜單,查看表格中test002已經有了一條消息,10秒后表格顯示0條,說明過期時間到了消息被自動清除了。

 

6.設置單條消息過期時間

點擊Exchange表格中的test002_exchange,在下面找到Publish message,設置消息的過期時間並進行發送,此時觀察test002隊列,發現消息5s后就過期被清除了,即使隊列設置的過期時間是10s。

 
TTL代碼設置過期時間

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("10000") //10s過期
                .build();
        //發送消息
        channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());

 

隊列過期時間設置

        //設置隊列的過期時間10s
        Map<String,Object> param = new HashMap<>();
        param.put("x-message-ttl", 10000);
        //聲明隊列
        channel.queueDeclare(queueName, true, false, false, null);

 
注意事項

  1. 兩者的區別是設置隊列的過期時間是對該隊列的所有消息生效的。
  2. 為消息設置TTL有一個問題:RabbitMQ只對處於隊頭的消息判斷是否過期(即不會掃描隊列),所以,很可能隊列中已存在死消息,但是隊列並不知情。這會影響隊列統計數據的正確性,妨礙隊列及時釋放資源。

 

死信隊列

死信隊列介紹

  • 死信隊列:DLX,dead-letter-exchange
  • 利用DLX,當消息在一個隊列中變成死信 (dead message) 之后,它能被重新publish到另一個Exchange,這個Exchange就是DLX

 

消息變成死信有以下幾種情況

  • 消息被拒絕(basic.reject / basic.nack),並且requeue = false
  • 消息TTL過期
  • 隊列達到最大長度

 

死信處理過程

  • DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。
  • 當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange上去,進而被路由到另一個隊列。
  • 可以監聽這個隊列中的消息做相應的處理。

 

死信隊列設置

  1. 首先需要設置死信隊列的exchange和queue,然后進行綁定:

  1. 然后需要有一個監聽,去監聽這個隊列進行處理
  2. 然后我們進行正常聲明交換機、隊列、綁定,只不過我們需要在隊列加上一個參數即可:arguments.put(" x-dead-letter-exchange","dlx.exchange");,這樣消息在過期、requeue、 隊列在達到最大長度時,消息就可以直接路由到死信隊列!

 

死信隊列演示

生產端

public class Producer {
    public static void main(String[] args) throws Exception {
        //1 創建ConnectionFactory
    	ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection創建一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";
        
        String msg = "Hello RabbitMQ DLX Message";
        
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("10000")
                .build();
        //發送消息
        channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
    }
}

 

自定義消費者

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

 

消費端

  • 聲明正常處理消息的交換機、隊列及綁定規則
  • 在正常交換機上指定死信發送的Exchange
  • 聲明死信交換機、隊列及綁定規則
  • 監聽死信隊列,進行后續處理,這里省略
public class Consumer {
	public static void main(String[] args) throws Exception {

		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);

		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();

		// 聲明一個普通的交換機 和 隊列 以及路由
		String exchangeName = "test_dlx_exchange";
		String routingKey = "dlx.#";
		String queueName = "test_dlx_queue";
		String deadQueueName = "dlx.queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		// 指定死信發送的Exchange
		Map<String, Object> agruments = new HashMap<String, Object>();
		agruments.put("x-dead-letter-exchange", "dlx.exchange");
		// 這個agruments屬性,要設置到聲明隊列上
		channel.queueDeclare(queueName, true, false, false, agruments);
		channel.queueBind(queueName, exchangeName, routingKey);

		// 要進行死信隊列的聲明
		channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
		channel.queueDeclare(deadQueueName, true, false, false, null);
		channel.queueBind(deadQueueName, "dlx.exchange", "#");

		channel.basicConsume(queueName, true, new MyConsumer(channel));
		//channel.basicConsume(deadQueueName, true, new MyConsumer(channel));

	}
}

 

運行說明

啟動消費端,此時查看管控台,新增了兩個Exchange,兩個Queue。在test_dlx_queue上我們設置了DLX,也就代表死信消息會發送到指定的Exchange上,最終其實會路由到dlx.queue上。

此時關閉消費端,然后啟動生產端,查看管控台隊列的消息情況,test_dlx_queue的值為1,而dlx_queue的值為0。
10s后的隊列結果如圖,由於生產端發送消息時指定了消息的過期時間為10s,而此時沒有消費端進行消費,消息便被路由到死信隊列中。

實際環境我們還需要對死信隊列進行一個監聽和處理,當然具體的處理邏輯和業務相關,這里只是簡單演示死信隊列是否生效。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM