RabbitMQ刪除隊列不重啟消費者,動態重啟


項目框架使用的是springboot,關於springboot整合rabbitMQ博客一搜一大堆這里就不做贅述,此篇博客主要是記錄一次生產bug

首先說明一下bug:

老大跑過來說,生產消息隊列怎么沒有收到別人推得消息,你看下

我:“好”,

我查看了日志,嗯確實沒有收到,然后查看配置,mq地址也沒有收到,這就很奇怪了,嘗試本地起生產服務,然后重新發一條,能消費啊,我百思不得其解;對着配置看了好多遍,確定沒有配錯,這tm的就尷尬了

於是各種找原因嘗試場景,最后發現是別同時把隊列刪除了,這就很尷尬了,看生產報錯日志有這么一段

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[topic.woman]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:733) [spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:608) [spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:595) [spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1347) [spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1192) [spring-rabbit-2.3.6.jar:2.3.6]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1012) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:46) ~[amqp-client-5.10.0.jar:5.10.0]
    at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.queueDeclarePassive(PublisherCallbackChannelImpl.java:358) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_291]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_291]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_291]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_291]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1157) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at com.sun.proxy.$Proxy133.queueDeclarePassive(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) [spring-rabbit-2.3.6.jar:2.3.6]
    ... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'topic.woman' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.10.0.jar:5.10.0]
    ... 15 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'topic.woman' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666) ~[amqp-client-5.10.0.jar:5.10.0]
    ... 1 common frames omitted

2022-01-16 16:58:10.977  WARN 4892 --- [ntContainer#0-2] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: topic.woman
2022-01-16 16:58:10.977  WARN 4892 --- [ntContainer#0-2] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=2

最終2022-01-16 16:58:21.055 ERROR 4892 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer 有這句話是消費者已經停止了

原因是因為這個BlockingQueueConsumer 配置了當找不到隊列時只嘗試3次,且間隔是5秒也就是意味着15秒沒有找到隊列就會宕機每次執行也會提示剩余嘗試次數這個次數配置更改不了,

 

 

 

 

盡管手動添加或者是生產者重啟,消費者沒有重啟消費者是收不到消息的后台隊列也是沒有消費者在線的

 

 要想重新監聽,只能重啟服務,但是如果以后隊列又被刪除難道每次重啟生產服務?顯然這不是最好的解決方案,想解決就是想辦法讓其一一直重試或者監聽不到就重啟服務

寫個配置類

package com.blackfish.rabbitMQ.config;

import java.util.Arrays;

import org.springframework.amqp.rabbit.listener.BlockingQueueConsumer;
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import lombok.extern.slf4j.Slf4j;

/**
 * MQ消費者失敗事件監聽器
 */
@Slf4j
@Component
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {

    @Override
    public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        log.error("消費者失敗事件發生:{}", event);
//        BlockingQueueConsumer
        if (event.isFatal()) {
            log.error(String.format("Stopping container from aborted consumer. Reason::%s.",
                    event.getReason()), event.getThrowable());
            SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
            String queueNames = Arrays.toString(container.getQueueNames());
            // 重啟
            try {
                restart(container);
                log.info("重啟隊列%s的監聽成功!", queueNames);
            } catch (Exception e) {
                log.error(String.format("重啟隊列%s的監聽失敗!", queueNames), e);
            }
        }
    }

    /**
     * 重啟監聽
     * @param container
     * @return
     */
    private void restart(SimpleMessageListenerContainer container) {
        // 暫停30s
        try {
            Thread.sleep(30000);
        } catch (Exception e) {
            log.error(e.getMessage());
        }

        Assert.state(!container.isRunning(), String.format("監聽容器%s正在運行!", container));
        container.start();
    }

}

加上這個配置類就不需要手動重啟了,感覺寫個配置類有點笨重,如果能通配置可以更改重試次數或者間隔事件,這是最好的,等細細研究是否能簡便些,如果有更好的請不要吝嗇哦

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM