項目場景:
最近在項目中使用了RabbitMq,其中有一個功能必須能隨時切斷RabbitMq的coumser。第一時間寫出來的代碼如下:
偽代碼:
1 while(flag){ 2 3 QueueingConsumer.Delivery delivery=consumer.nextDelivery(); 4 String message = new String(delivery.getBody()); 5 //doing someting strange 6 //...... 7 8 } 9 10 11 //另外一個項目開始關閉 12 13 public void closeConsumer{ 14 channel().close(); 15 connection().close(); 16 } 17 closeConsumer();
通過關閉channel,消費者自然會關閉。然而,項目開始報錯:
channel關閉拋出ShutdownSignalException,拋出異常就表示這種關閉方式是不合理的。有必要去探索一下是否有更優雅的鏈接關閉方式。
先看consumer的源碼:
public Delivery nextDelivery()throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{ return handle(_queue.take()); }
而這里_queue其實是一個 LinkedBlockingQueue,LinkedBlockingQueue是一個單向鏈表實現的阻塞隊列。nextDelivery()方法使用LinkedBlockingQueue的take方法實現了阻塞。這個地方感覺不好操作。但是QueueingConsumer還有另外一個讀取數據的方法,源碼如下:
public Delivery nextDelivery(long timeout)throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{ return handle(_queue.poll(timeout, TimeUnit.MILLISECONDS)); }
這邊設定了超時時間。雖然沒想到優雅關閉消費者的方法,但是利用超時時間來修改一下讀取數據的方法還是可以的。代碼如下:
try{ while(flag){ QueueingConsumer.Delivery delivery = consumer.getDeliveryMessage(10000); if(consumer.getState()==Consumer.Status.Stopped.getValue()){ break; } if(delivery==null){ continue; } String message = new String(delivery.getBody()); //....dosomething } }finally { if(consumer!=null){ consumer.closeConnection(); } } public void closeConnection{ channel().close(); connection().close(); } //另外一個線程關閉consumer consumer.setStatus(Consumer.Status.Stopped);
總結:這種關閉方式可以讓Mq關閉連接時不拋出異常,比之前的方式好一點。但可能並不是最好的方法。如果有更好的方案,請留言告訴我,謝謝