amqp-client 3.x之前的rabbitmq版本有個消費者的寫法是借助於Queueingconsumer的:
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, "consumer_test",consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [X] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
break;
}
這個應該是5.x之前的經典寫法。但是在4.x的版本QueueingConsumer被標記廢止5.x被移除。移除的原因是什么呢?
原來QueueingConsumer內部用LinkedBlockingQueue來存放消息的內容,而LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,照先進先出的順序進行排序 ,未指定長度的話,默認 此隊列的長度為Integer.MAX_VALUE,那么問題來了,如果生產者的速度遠遠大於消費者的速度,也許沒等到隊列阻塞的條件產生(長度達到Integer.MAX_VALUE)內存就完蛋了,在老的版本你可以通過設置 rabbitmq的prefetch屬性channel.basicQos(prefetch)來處理這個問題如果不設置可能出現內存問題(比如因為網絡問題只能向rabbitmq生產不能消費,消費者恢復網絡之后就會有大量的數據涌入,出現內存問題,oom fgc等)。
而且最上面的寫法很不合理不符合事件驅動,什么時候停止while循環也不能寫的很優雅,所以在更高的版本直接被移除。取而代之的是DefaultConsumer,你可以通過擴展DefaultConsumer來實現消費者:
消費的代碼:(RabbitMqMessageConsumer是對DefaultConsumer的擴展)
RabbitMqMessageConsumer rpcMessageConsumer = new RabbitMqMessageConsumer(channel,cores);
channel.basicQos(cores);
channel.basicConsume(QUEUE_NAME, true, rpcMessageConsumer);
RabbitMqMessageConsumer代碼:
public class RabbitMqMessageConsumer extends DefaultConsumer{
public RabbitMqMessageConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
//TODO someting
}
}
其中handleDelivery是處理消息的邏輯。
高版本的解決方案給出了,那么回到我們題目的問題,老代碼是按照4.x之前的寫的,由於某種原因升級到了5.x了如何做?釜底抽薪的辦法就是按照上面的事件驅動的方式重寫消費者。折中的辦法呢(不想改變老代碼的邏輯和結構)。
我就碰到了這樣的問題,老代碼寫了很多的輪子,導致這塊代碼很難重寫。那就只能按照原來QueueingConsumer的寫法繼續做。解決思路如下:
首先消費的過程還是按照最開始那樣:
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, "consumer_test",consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [X] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
break;
}
然后實現自己的QueueingConsumer(QueueingConsumer已經移除):
public class QueueingConsumer extends DefaultConsumer{
private LinkedBlockingQueue<Delivery> queue;
public QueueingConsumer(Channel channel) {
super(channel);
queue = new LinkedBlockingQueue<RabbitMqMessageConsumer.Delivery>();
}
public QueueingConsumer(Channel channel,int size) {
super(channel);
queue = new LinkedBlockingQueue<RabbitMqMessageConsumer.Delivery>(size);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
Delivery delivery = new Delivery();
delivery.setBody(body);
delivery.setProperties(properties);
delivery.setEnvelope(envelope);
try {
queue.put(delivery);
} catch (InterruptedException e) {
LogUtils.error(e);
}
}
public Delivery nextDelivery()throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{
return queue.take();
}
public Delivery nextDelivery(long timeout)throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{
return queue.poll(timeout, TimeUnit.MILLISECONDS);
}
public class Delivery{
private BasicProperties properties;
private byte[] body;
private Envelope envelope;
public BasicProperties getProperties() {
return properties;
}
public void setProperties(BasicProperties properties) {
this.properties = properties;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
public Envelope getEnvelope() {
return envelope;
}
public void setEnvelope(Envelope envelope) {
this.envelope = envelope;
}
}
}
這樣你就能在不修改之前的老代碼的情況下升級版本了,當然最好還是重寫,這個只能起到個過度
