【rabbitmq】Queueingconsumer被廢止后老代碼如何做的解決方案


amqp-client 3.x之前的rabbitmq版本有個消費者的寫法是借助於Queueingconsumer的:

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicQos(1);

channel.basicConsume(QUEUE_NAME, false, "consumer_test",consumer);

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [X] Received '" + message + "'");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

break;

這個應該是5.x之前的經典寫法。但是在4.x的版本QueueingConsumer被標記廢止5.x被移除。移除的原因是什么呢?

原來QueueingConsumer內部用LinkedBlockingQueue來存放消息的內容,而LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,照先進先出的順序進行排序 ,未指定長度的話,默認 此隊列的長度為Integer.MAX_VALUE,那么問題來了,如果生產者的速度遠遠大於消費者的速度,也許沒等到隊列阻塞的條件產生(長度達到Integer.MAX_VALUE)內存就完蛋了,在老的版本你可以通過設置 rabbitmq的prefetch屬性channel.basicQos(prefetch)來處理這個問題如果不設置可能出現內存問題(比如因為網絡問題只能向rabbitmq生產不能消費,消費者恢復網絡之后就會有大量的數據涌入,出現內存問題,oom fgc等)。

而且最上面的寫法很不合理不符合事件驅動,什么時候停止while循環也不能寫的很優雅,所以在更高的版本直接被移除。取而代之的是DefaultConsumer,你可以通過擴展DefaultConsumer來實現消費者:

消費的代碼:(RabbitMqMessageConsumer是對DefaultConsumer的擴展)

RabbitMqMessageConsumer rpcMessageConsumer = new RabbitMqMessageConsumer(channel,cores);
channel.basicQos(cores);
channel.basicConsume(QUEUE_NAME, true, rpcMessageConsumer);

RabbitMqMessageConsumer代碼:

public class RabbitMqMessageConsumer extends DefaultConsumer{
      public RabbitMqMessageConsumer(Channel channel) {
          super(channel);
     }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  //TODO someting
 }
}

其中handleDelivery是處理消息的邏輯。

高版本的解決方案給出了,那么回到我們題目的問題,老代碼是按照4.x之前的寫的,由於某種原因升級到了5.x了如何做?釜底抽薪的辦法就是按照上面的事件驅動的方式重寫消費者。折中的辦法呢(不想改變老代碼的邏輯和結構)。

我就碰到了這樣的問題,老代碼寫了很多的輪子,導致這塊代碼很難重寫。那就只能按照原來QueueingConsumer的寫法繼續做。解決思路如下:

首先消費的過程還是按照最開始那樣:

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicQos(1);

channel.basicConsume(QUEUE_NAME, false, "consumer_test",consumer);

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [X] Received '" + message + "'");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

break;

 然后實現自己的QueueingConsumer(QueueingConsumer已經移除):

public class QueueingConsumer extends DefaultConsumer{
  private LinkedBlockingQueue<Delivery> queue;
  public QueueingConsumer(Channel channel) {
    super(channel);
    queue = new LinkedBlockingQueue<RabbitMqMessageConsumer.Delivery>();
  }
  public QueueingConsumer(Channel channel,int size) {
    super(channel);
    queue = new LinkedBlockingQueue<RabbitMqMessageConsumer.Delivery>(size);
  }
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
    Delivery delivery = new Delivery();
    delivery.setBody(body);
    delivery.setProperties(properties);
    delivery.setEnvelope(envelope);
    try {
      queue.put(delivery);
    } catch (InterruptedException e) {
      LogUtils.error(e);
    }
  }
  public Delivery nextDelivery()throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{
    return queue.take();
  }
  public Delivery nextDelivery(long timeout)throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{
    return queue.poll(timeout, TimeUnit.MILLISECONDS);
  }
  public class Delivery{
    private BasicProperties properties;
    private byte[] body;
    private Envelope envelope;
    public BasicProperties getProperties() {
      return properties;
    }
    public void setProperties(BasicProperties properties) {
      this.properties = properties;
    }
    public byte[] getBody() {
      return body;
    }
    public void setBody(byte[] body) {
      this.body = body;
    }
    public Envelope getEnvelope() {
      return envelope;
    }
    public void setEnvelope(Envelope envelope) {
      this.envelope = envelope;
    }

  }
}

這樣你就能在不修改之前的老代碼的情況下升級版本了,當然最好還是重寫,這個只能起到個過度


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM