Spring Boot中通過RabbitTemplate主動pull(get)消息的例子


import java.util.Properties;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.GetResponse;

@Service
public class RabbitAdminServices {

    private static final Logger logger = LoggerFactory.getLogger(RabbitAdminServices.class);

    @Autowired
    AmqpAdmin rabbitAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    MessageConverter messageConverter;

    private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();

    public int getCount(String queueName) {

        Properties properties = rabbitAdmin.getQueueProperties(queueName);
        return (Integer)properties.get(RabbitAdmin.QUEUE_MESSAGE_COUNT);
    }

    public <T> void processQueue(String queueName, Integer count, Class<T> clazz, Consumer<T> consumer) {

        int reprocessCount = getCount(queueName);
        int requestCount = reprocessCount;
        if(count != null) {
            requestCount = count;
        }
        for(int i = 0; i < reprocessCount && i < requestCount; i++) {
            rabbitTemplate.execute(new ChannelCallback<T>() {

                @Override
                public T doInRabbit(Channel channel) throws Exception {
                    GetResponse response = channel.basicGet(queueName, false);
                    T result = null;
                    try {
                        MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
                        if(response.getMessageCount() >= 0) {
                            messageProps.setMessageCount(response.getMessageCount());
                        }
                        Message message = new Message(response.getBody(), messageProps);
                        result = (T)messageConverter.fromMessage(message);
                        consumer.accept(result);
                        channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
                    }
                    catch(Exception e) {
                        channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true);
                    }
                    return result;
                }
            });

        }
    }
}

 

 

 

實現RabbitMQ的消費者有兩種模式,推模式(Push)和拉模式(Pull)。

實現推模式推薦的方式是繼承 DefaultConsumer 基類,也可以使用Spring AMQP的 SimpleMessageListenerContainer 。

推模式是最常用的,但是有些情況下推模式並不適用的,比如說:

  • 由於某些限制,消費者在某個條件成立時才能消費消息

  • 需要批量拉取消息進行處理

 

實現拉模式

RabbitMQ的Channel提供了 basicGet 方法用於拉取消息。

  1.  
    /**
  2.  
    * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
  3.  
    * @see com.rabbitmq.client.AMQP.Basic.Get
  4.  
    * @see com.rabbitmq.client.AMQP.Basic.GetOk
  5.  
    * @see com.rabbitmq.client.AMQP.Basic.GetEmpty
  6.  
    * @param queue the name of the queue
  7.  
    * @param autoAck true if the server should consider messages
  8.  
    * acknowledged once delivered; false if the server should expect
  9.  
    * explicit acknowledgements
  10.  
    * @return a {@link GetResponse} containing the retrieved message data
  11.  
    * @throws java.io.IOException if an error is encountered
  12.  
    */
  13.  
    GetResponse basicGet( String queue, boolean autoAck) throws IOException;

basicGet 返回 GetResponse 類。

  1.  
    public class GetResponse {
  2.  
    private final Envelope envelope;
  3.  
    private final BasicProperties props;
  4.  
    private final byte[] body;
  5.  
    private final int messageCount;
  6.  
     
  7.  
    // ...
 

public class GetResponse { private final Envelope envelope; private final BasicProperties props; private final byte[] body; private final int messageCount; // ...

rabbitmq-client版本4.0.3

使用 basicGet 拉取消息需要注意:

  1.  
    basicGet
  2.  
    DefaultConsumer

示例代碼:

  1.  
    private void consume(Channel channel) throws IOException, InterruptedException {
  2.  
    while (true) {
  3.  
    if (!isConditionSatisfied()) {
  4.  
    TimeUnit.MILLISECONDS.sleep( 1);
  5.  
    continue;
  6.  
    }
  7.  
    GetResponse response = channel.basicGet(CAOSH_TEST_QUEUE, false);
  8.  
    if (response == null) {
  9.  
    TimeUnit.MILLISECONDS.sleep( 1);
  10.  
    continue;
  11.  
    }
  12.  
    String data = new String(response.getBody());
  13.  
    logger.info( "Get message <= {}", data);
  14.  
    channel.basicAck( response.getEnvelope().getDeliveryTag(), false);
  15.  
    }
  16.  
    }

批量拉取消息

RabbitMQ支持客戶端批量拉取消息,客戶端可以連續調用 basicGet 方法拉取多條消息,處理完成之后一次性ACK。需要注意:

  1.  
    basicGet
  2.  
    basicAck

示例代碼:

  1.  
    String bridgeQueueName = extractorProperties.getBridgeQueueName();
  2.  
    int batchSize = extractorProperties.getBatchSize();
  3.  
    List<GetResponse> responseList = Lists.newArrayListWithCapacity(batchSize);
  4.  
    long tag = 0;
  5.  
    while (responseList.size() < batchSize) {
  6.  
    GetResponse getResponse = channel.basicGet(bridgeQueueName, false);
  7.  
    if (getResponse == null) {
  8.  
    break;
  9.  
    }
  10.  
    responseList.add(getResponse);
  11.  
    tag = getResponse.getEnvelope().getDeliveryTag();
  12.  
    }
  13.  
    if (responseList.isEmpty()) {
  14.  
    TimeUnit.MILLISECONDS.sleep( 1);
  15.  
    } else {
  16.  
    logger.info( "Get <{}> responses this batch", responseList.size());
  17.  
    // handle messages
  18.  
    channel.basicAck(tag, true);
  19.  
    }

關於QueueingConsumer

QueueingConsumer 在客戶端本地使用 BlockingQueue 緩沖消息,其nextDelivery方法也可以用於實現拉模式(其本質上是 BlockingQueue.take ),但是 QueueingConsumer 現在已經標記為Deprecated。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM