RabbitMQ重回隊列機制(六)


      在RabbitMQ的生產端把消息發送到Exchange后,然后Exchange與Queue來建立映射關系從而

保障消費端能夠接收到消息,保障在業務端的消息可靠性,這是正常情況的一種邏輯思維。在異常的

情況下,消息到隊列中消費端並不能夠收到消息,那么就需要重試的機制,也就是重回隊列的機制。

其實重試的機制在服務端的業務保障性體系中是必須需要考慮的,因為總有特殊的情況導致發送的請

求在請求方並沒有收到請求,比如服務這層出現TimeOut,以及連接數出現瓶頸,那么這個時候整體

程序的瓶頸是在服務這層,那么既然涉及到重試的機制,一般重試是幾次了?另外需要思考的是重試

的間隔是需要多少秒之間?其實重試的間隔以及重試的次數就需要和具體技術的負責人根據業務的形態

來進行考慮,這中間也是需要考慮到冪等性的問題。但是作為服務端質量體系保障的一個部分,質量負

責人以及對應測試這部分的同學必須得有這個技術底蘊和測試場景的意識,需要更加系統宏觀的站在全局

的角度來考慮服務這層重試以及不重試給產品帶來的風險管控。當然,在本文章體系中重點核心探討的是

RabbitMQ的重回隊列的機制應用。

一、重回隊列場景

         在實際的產品設計和應用中,有如下幾個場景是需要考慮使用重回隊列的機制,具體如下:

  • 消費端在進行消費的時候,由於異常導致應該消費的消息沒有消費到,那么就需要補償的機制
  • 服務層這這邊由於TimeOut,連接數等瓶頸導致服務端這層崩潰,那么就也就重回隊列的機制

其實不管什么場景,具體總結來說就是在接收端,也就是服務這層,由於服務這邊技術上的問題導致

無法正常的消費應該消費的數據,那么就需要重回隊列的機制來保障消費端這層把應該消費的消息消

費掉。

二、怎么理解重回隊列

         消費者的重回隊列機制是對沒有處理成功的消息,消費者端這邊為了消息的可靠性,那么就會

把沒有消費的消息重新會發送給Broker,通過這樣的技術來保障消息的可靠性。那么在消費消息的時

候我們需要把autoKey的參數設置為false。

三、重回隊列案例

3.1、生產者代碼

package com.example.rabbitmq.ack;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ProducerAck
{
    private  static  final  String exchangeName="test_retry_exchange";
    private  static  final  String routyKey="retry.test";

    public static void main(String[] args) throws  Exception
    {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置連接mq的地址信息
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        for(int i=0;i<3;i++)
        {

            Map<String,Object>  headers=new HashMap<String,Object>();
            headers.put("num",i);

           AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                   .deliveryMode(2)
                   .contentEncoding("UTF_8")
                   .headers(headers)
                   .build();
            String msg = "Hello RabbitMQ Ack Message"+i;
            channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
        }

    }
}

3.2、消費者代碼

package com.example.rabbitmq.ack;

import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConsumerAck
{
    private static final String exchangeName = "test_retry_exchange";
    private  static  final String queueName="test_retry_queue";
    private  static  final  String routingKey="retry.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();
            channel.exchangeDeclare(exchangeName,"topic",true,false,null);
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,exchangeName,routingKey);

            
            channel.basicConsume(queueName,false,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

在如上的代碼中,特別是在方法basicConsum中,參數autoKey我們需要設置為false,也就是手工簽收的方式。

3.3、自定義消費者代碼

package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //獲取properties的值,然后判斷
        if((Integer) properties.getHeaders().get("num")==0)
        {
            /*
            * multiple:是否接收批量
            * requeue:重回隊列的參數,如果開啟,消費失敗的消息會再次重回到隊列里面來進行發送
            * */
            channel.basicNack(envelope.getDeliveryTag(),false,true);
        }
        else
        {
            channel.basicAck(envelope.getDeliveryTag(),false);
        }

        System.err.println("the message received:"+new String(body));

    }
}

在如上的代碼中,設置了等待2秒接收消息,方法basicNack就會負責把失敗的消息會再次發送到

Broker里面,也就是重新發送到隊列中等待消費者來進行消費。

3.4、執行結果信息

        如上的代碼執行成功后,在消費端就會看到num為0的消費失敗被再次發送到了隊列中等待

消費,具體如下所示:

 在如上中可以看到消息ID為0的,被再次重回到隊列了。重回隊列有它的優勢但是也是存在它的缺陷,

而這個缺陷可以通過死信隊列來進行補充和解決。感謝您的閱讀!

 

     


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM