RabbitMQ阻塞讀取時數據時,關閉channel引起的問題和解決方案


項目場景:

  最近在項目中使用了RabbitMq,其中有一個功能必須能隨時切斷RabbitMq的coumser。第一時間寫出來的代碼如下:

  偽代碼:

 1 while(flag){
 2     
 3      QueueingConsumer.Delivery delivery=consumer.nextDelivery();
 4      String message = new String(delivery.getBody());
 5      //doing someting strange
 6      //...... 
 7 
 8 }
 9 
10 
11 //另外一個項目開始關閉
12 
13 public void closeConsumer{
14       channel().close();
15       connection().close();
16 }
17 closeConsumer();

 通過關閉channel,消費者自然會關閉。然而,項目開始報錯:

 

channel關閉拋出ShutdownSignalException,拋出異常就表示這種關閉方式是不合理的。有必要去探索一下是否有更優雅的鏈接關閉方式。

 

先看consumer的源碼:

public Delivery nextDelivery()throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{
     return handle(_queue.take());
 }

而這里_queue其實是一個 LinkedBlockingQueue,LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列。nextDelivery()方法使用LinkedBlockingQueue的take方法實現了阻塞。這個地方感覺不好操作。但是QueueingConsumer還有另外一個讀取數據的方法,源碼如下:

public Delivery nextDelivery(long timeout)throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{
       return handle(_queue.poll(timeout, TimeUnit.MILLISECONDS));
}

這邊設定了超時時間。雖然沒想到優雅關閉消費者的方法,但是利用超時時間來修改一下讀取數據的方法還是可以的。代碼如下:

try{
    while(flag){
         QueueingConsumer.Delivery delivery = consumer.getDeliveryMessage(10000);
         if(consumer.getState()==Consumer.Status.Stopped.getValue()){
              break;
         }
         if(delivery==null){
                  continue;
         }
      String message = new String(delivery.getBody());
       //....dosomething
    }
}finally {
    if(consumer!=null){
        consumer.closeConnection();
    }
}


public void closeConnection{  
  channel().close();  
  connection().close(); 
}

//另外一個線程關閉consumer
consumer.setStatus(Consumer.Status.Stopped);

 

 總結:這種關閉方式可以讓Mq關閉連接時不拋出異常,比之前的方式好一點。但可能並不是最好的方法。如果有更好的方案,請留言告訴我,謝謝

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM