多個rabbitmq的消費問題


最近碰到一個項目中需要使用多個rabbitmq,連接信息很好配置,配置多連接工廠就可以了,消費者的demo好像不多,做下簡單記錄
1.隊列信息都是需要指定AmqpAdmin 進行關聯,交換機,隊列,Binder

     @Autowired
    @Qualifier("customAmqpAdmin1")
    private AmqpAdmin marketAdmin;

    public static final String QUEUE_NAME = "market.luck.award.push.message.queue.ttl";

    /**
     * 消息中心延遲消費交換配置
     *
     * @return
     */
    @Bean
    CustomExchange messagePushTtlDirect() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        CustomExchange exchange = new CustomExchange(MarketQueueEnum.PUSH_MESSAGE_TTL_QUEUE.getExchange()
                , "x-delayed-message", true, false, args);
        exchange.setAdminsThatShouldDeclare(marketAdmin);
        return exchange;
    }
    /**
     * 消息中心實際消費隊列配置
     *
     * @return
     */
    @Bean(QUEUE_NAME)
    public Queue messagePushQueue() {
        Map<String, Object> arguments = new HashMap<>(4);
        Queue queue = new Queue(MarketQueueEnum.PUSH_MESSAGE_TTL_QUEUE.getQueueName()
                , true, false, false, arguments);
        queue.setAdminsThatShouldDeclare(marketAdmin);
        return queue;
    }


    /**
     * 消息中心TTL綁定實際消息中心實際消費交換機
     *
     * @return
     */
    @Bean
    public Binding messageTtlBinding(@Qualifier(QUEUE_NAME) Queue queue) {
        Binding binding = BindingBuilder
                .bind(queue)
                .to(messagePushTtlDirect())
                .with(MarketQueueEnum.PUSH_MESSAGE_TTL_QUEUE.getRouteKey()).noargs();
        binding.setAdminsThatShouldDeclare(marketAdmin);
        return binding;
    }
2.消費監聽需要指定連接工廠即可
~~~java
 @RabbitListener(queues = PushAddressRabbitConfig.QUEUE_NAME, containerFactory = "customListenerContainerFactory1")
    public void handPushObject(String messageContent, Channel channel, @Headers Map<String, Object> headers) throws IOException {
        log.info("market.luck.award.push.message.queue.ttl收到消息:{}", messageContent);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        try {
            String key = MessageConstant.REDIS_PUSH_KEY_PRE + messageContent;
            String id = (String) cacheCommonService.get(key);
            if (org.apache.commons.lang3.StringUtils.isNotBlank(id)) {
                AwardRecord awardRecord = awardRecordService.getById(messageContent);
                //地址已經填寫 則無需推送消息
                if (!Objects.isNull(awardRecord) &&
                        org.apache.commons.lang3.StringUtils.isBlank(awardRecord.getReceiveAddress())) {
                    PushMessageRequest pushMessageRequest = new PushMessageRequest();
                    pushMessageRequest.setPushMessageContentTypeEnum(PushMessageContentTypeEnum.FILL_IN_ADDRESS);
                    pushMessageRequest.setAwardRecordId(Long.valueOf(messageContent));
                    pushMessageFacade.pushMessage(pushMessageRequest);
                } else {
                    log.info("中獎地址已經填寫 無需推送填寫地址消息");
                }
                cacheCommonService.remove(key);

            }
            //false不支持批量簽收
            channel.basicAck(deliveryTag, false);
            log.info("market.luck.award.push.message.queue.ttl消費完成:" + 
  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss")));
        } catch (IOException e) {
            log.error("消費券記錄的消息異常", e);
            // multiple:是否批量. true:將一次性拒絕所有小於deliveryTag的消息
            //requeue:被拒絕的是否重新入隊列 注意:如果設置為true ,則會添加在隊列的末端
            channel.basicNack(deliveryTag, false, true);
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM