Spring rabbitMq 中 correlationId或CorrelationIdString 消費者獲取為null的問題


問題

在用Spring boot 的 spring-boot-starter-amqp   快速啟動 rabbitMq 是遇到了個坑

消費者端獲取不到:correlationId或CorrelationIdString

 

問題產生的原因

 

 correlationId 的在 spring rabbitmq 2.0 以后 byte方式會被放棄,所以 目前 代碼中有些地方沒有改過來,應該算一個BUG

 

@SuppressWarnings("deprecation")
public class DefaultMessagePropertiesConverter implements MessagePropertiesConverter {

    @Deprecated
    public enum CorrelationIdPolicy {
        STRING, BYTES, BOTH
    }

    private static final int DEFAULT_LONG_STRING_LIMIT = 1024;

    private final int longStringLimit;

    private final boolean convertLongLongStrings;

    private volatile CorrelationIdPolicy correlationIdPolicy = CorrelationIdPolicy.BYTES;

}


/**
 * For inbound, determine whether correlationId, correlationIdString or
 * both are populated. For outbound, determine whether correlationIdString
 * or correlationId is used when mapping; if {@code CorrelationIdPolicy.BOTH}
 * is set for outbound, String takes priority and we fallback to bytes.
 * Default {@code CorrelationIdPolicy.BYTES}.
 * @param correlationIPolicy true to use.
 * @deprecated - the byte[] version of correlation id will be removed in 2.0
 */
@Deprecated
public void setCorrelationIdPolicy(CorrelationIdPolicy correlationIPolicy) {
   setCorrelationIdAsString(correlationIPolicy);
}

@SuppressWarnings("deprecation")
public MessageProperties toMessageProperties(final BasicProperties source, final Envelope envelope,
      final String charset) {
   MessageProperties target = new MessageProperties();
   Map<String, Object> headers = source.getHeaders();
   if (!CollectionUtils.isEmpty(headers)) {
      for (Map.Entry<String, Object> entry : headers.entrySet()) {
         String key = entry.getKey();
         if (MessageProperties.X_DELAY.equals(key)) {
            Object value = entry.getValue();
            if (value instanceof Integer) {
               target.setReceivedDelay((Integer) value);
            }
         }
         else {
            target.setHeader(key, convertLongStringIfNecessary(entry.getValue(), charset));
         }
      }
   }
   target.setTimestamp(source.getTimestamp());
   target.setMessageId(source.getMessageId());
   target.setReceivedUserId(source.getUserId());
   target.setAppId(source.getAppId());
   target.setClusterId(source.getClusterId());
   target.setType(source.getType());
   Integer deliveryMode = source.getDeliveryMode();
   if (deliveryMode != null) {
      target.setReceivedDeliveryMode(MessageDeliveryMode.fromInt(deliveryMode));
   }
   target.setDeliveryMode(null);
   target.setExpiration(source.getExpiration());
   target.setPriority(source.getPriority());
   target.setContentType(source.getContentType());
   target.setContentEncoding(source.getContentEncoding());
   String correlationId = source.getCorrelationId();
   if (!CorrelationIdPolicy.BYTES.equals(this.correlationIdPolicy) && correlationId != null) {
      target.setCorrelationIdString(correlationId);
   }
   if (!CorrelationIdPolicy.STRING.equals(this.correlationIdPolicy)) {
      if (correlationId != null) {
         try {
            target.setCorrelationId(source.getCorrelationId().getBytes(charset));
         }
         catch (UnsupportedEncodingException ex) {
            throw new AmqpUnsupportedEncodingException(ex);
         }
      }
   }
   String replyTo = source.getReplyTo();
   if (replyTo != null) {
      target.setReplyTo(replyTo);
   }
   if (envelope != null) {
      target.setReceivedExchange(envelope.getExchange());
      target.setReceivedRoutingKey(envelope.getRoutingKey());
      target.setRedelivered(envelope.isRedeliver());
      target.setDeliveryTag(envelope.getDeliveryTag());
   }
   return target;
}

public BasicProperties fromMessageProperties(final MessageProperties source, final String charset) {
   BasicProperties.Builder target = new BasicProperties.Builder();
   target.headers(this.convertHeadersIfNecessary(source.getHeaders()))
      .timestamp(source.getTimestamp())
      .messageId(source.getMessageId())
      .userId(source.getUserId())
      .appId(source.getAppId())
      .clusterId(source.getClusterId())
      .type(source.getType());
   MessageDeliveryMode deliveryMode = source.getDeliveryMode();
   if (deliveryMode != null) {
      target.deliveryMode(MessageDeliveryMode.toInt(deliveryMode));
   }
   target.expiration(source.getExpiration())
      .priority(source.getPriority())
      .contentType(source.getContentType())
      .contentEncoding(source.getContentEncoding());
   @SuppressWarnings("deprecation")
   byte[] correlationId = source.getCorrelationId();
   String correlationIdString = source.getCorrelationIdString();
   if (!CorrelationIdPolicy.BYTES.equals(this.correlationIdPolicy)
         && StringUtils.hasText(correlationIdString)) {
      target.correlationId(correlationIdString);
      correlationId = null;
   }
   if (!CorrelationIdPolicy.STRING.equals(this.correlationIdPolicy)
         && correlationId != null && correlationId.length > 0) {
      try {
         target.correlationId(new String(correlationId, charset));
      }
      catch (UnsupportedEncodingException ex) {
         throw new AmqpUnsupportedEncodingException(ex); }
   }
   String replyTo = source.getReplyTo();
   if (replyTo != null) {
      target.replyTo(replyTo);
   }
   return target.build();
}

 

解決方法

 

生產者:

 

 
        
public void topicPublish(String content) {
    CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend(AmqpDirectExchangeConfig.FANOUT_EXCHANGE,"",
            this.buildMessage(content,correlationId.getId()), correlationId);
    this.log.info("消息id-{},消息內容為-{},已發送",correlationId,content);

}

/**
 * 返回rabbitTemplate
 * @return
 */
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必須是prototype類型
public RabbitTemplate rabbitRtryTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionAckFactory());
    template.setMessagePropertiesConverter(defaultMessagePropertiesConverter());
    template.setRetryTemplate(rabbitRetry());
    template.setReplyTimeout(2000);//2s秒超時
    return template;
}

@Bean
public MessagePropertiesConverter defaultMessagePropertiesConverter(){
   DefaultMessagePropertiesConverter messagePropertiesConverter=new DefaultMessagePropertiesConverter();
   messagePropertiesConverter.setCorrelationIdPolicy(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
    return messagePropertiesConverter;
}
 
        

消費者:

/**
 * 消息消費者
 * @return
 */
@Bean
public SimpleMessageListenerContainer messageContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionAckFactory());
    container.setQueues(queue1());
    container.setExposeListenerChannel(true);
    container.setMessagePropertiesConverter(defaultMessagePropertiesConverter());
    container.setMaxConcurrentConsumers(1);
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
    container.setMessageListener(new ChannelAwareMessageListener  ()  {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            byte[] body = message.getBody();
            MessageProperties messageProperties=message.getMessageProperties();
            log.info("消費者A,從隊列{},訂閱到CorrelationId=[{}],消息body=[{}]",
                    messageProperties.getConsumerQueue(),
                    messageProperties.getCorrelationIdString(),
                    new String(body,"utf-8"));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
        }
    });
    return container;
}
 
         
         
        

 

 




 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM