MDC
這個問題是我當初寫項目時遇到的,因為用RabbitMQ做削峰處理,高並發情況下,channel數到達了限制,所以不能繼續創建,相信大家也遇到過。
正常來說,這個錯誤還是比較少見的,只不過項目需要保證消息的可靠性,所以采取了發送確認和消費手動確認機制,導致並發性能下降,從而出現這個問題。、
這里先上結論,方便着急的小伙伴們改bug。
結論:RabbitMQ java客戶端在創建連接時,會向服務端發送一個請求,這個請求會獲取到服務端的channelMax值,java客戶端會自己進行一個處理,兩者都不為0時,會選擇一個小的值,如果你沒有在rabbitmq.conf文件中修改channel_Max的值,那么java客戶端會采用默認的2047或更小,這就會導致你明明在客戶端連接上配置了channelMax(比如你配置了4095),但依舊會報錯,而且web管理頁面最大值依舊是2047
第一次修改配置不生效
出現這種情況經常伴隨着消息丟失,而且消息丟失情況非常嚴重,達到了百分之二十的丟失率,這個丟失率也會因為並發量、每次消費數量等等配置的不同而變化。
由於項目是基於SpringBoot2.2的,yml暫時無法配置RequestChannelMax的值,這里只能采用直接通過set的方式放入值。
@Configuration @Slf4j public class RabbitMQConfig { @Bean public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) connectionFactory; //這里我明明設置了4095,但是項目運行之后,壓測之后,還是會報異常,而且報異常的時候,RabbitMQ //web管理頁面上的channel數依舊是2047,不得已只能分析源碼了 cachingConnectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(4095); final RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); rabbitTemplate.setMessageConverter(jackson2MessageConverter()); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData, b, s) -> { if(!b){ log.error("confirmCallBack 發送失敗的數據:{}",correlationData); log.error("confirmCallBack 確認情況:{}",b); log.error("confirmCallBack 發送失敗的原因:{}",s); } }); rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> { log.error("returnCallBack 消息:{}",message); log.error("returnCallBack 回應碼:{}",i); log.error("returnCallBack 回應信息:{}",s); log.error("returnCallBack 交換機:{}",s1); log.error("returnCallBack 路由鍵:{}",s2); }); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jackson2MessageConverter() { return new Jackson2JsonMessageConverter(); } }
分析源碼
首先是模擬出報錯的場景,然后進入報異常的類。
發現是this.delegate.createChannel();方法返回的是一個空channel對象,進入這個方法看一下。
發現有一個ChannelManager對象,顧名思義,就是一個channel管理器,由它負責創建channel,那么看一下這個對象都有什么值呢?
com.rabbitmq.client.impl.ChannelManager#createChannel(com.rabbitmq.client.impl.AMQConnection)
public Channel createChannel() throws IOException { this.ensureIsOpen(); ChannelManager cm = this._channelManager; if (cm == null) { return null; } else { Channel channel = cm.createChannel(this); this.metricsCollector.newChannel(channel); return channel; } }
只截取了部分代碼,首先可以看到有一個int類型的channelMax,這個值就是channel的最大值,還有一個構造器,很明顯,這個值是通過構造器傳進來的,通過容器初始化時打斷點進行跟蹤,發現此時的channelMax依舊是2047,這也進一步證明了,值的覆蓋或者處理發生在這個類調用之前。
com.rabbitmq.client.impl.ChannelManager
public class ChannelManager { private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class); private final Object monitor; private final Map<Integer, ChannelN> _channelMap; private final IntAllocator channelNumberAllocator; private final ConsumerWorkService workService; private final Set<CountDownLatch> shutdownSet; private final int _channelMax; private ExecutorService shutdownExecutor; private final ThreadFactory threadFactory; private int channelShutdownTimeout; protected final MetricsCollector metricsCollector; public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory, MetricsCollector metricsCollector) { this.monitor = new Object(); this._channelMap = new HashMap(); this.shutdownSet = new HashSet(); this.channelShutdownTimeout = 63000; if (channelMax == 0) { channelMax = 65535; } this._channelMax = channelMax; this.channelNumberAllocator = new IntAllocator(1, channelMax); this.workService = workService; this.threadFactory = threadFactory; this.metricsCollector = metricsCollector; } }
進一步跟蹤之后,發現在AMQConnection類里的instantiateChannelManager()方法調用了構造器,繼續往上追蹤。
com.rabbitmq.client.impl.AMQConnection#instantiateChannelManager protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) { ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory, this.metricsCollector); this.configureChannelManager(result); return result; }
在AMQConnetion類的start()方法中最終發現了值改變的地方。
this.requestedChannelMax值是我在配置類中配置的4095
connTune.getChannelMax()是2047
也就是說,negotiateChannelMax()方法對這兩個值進行了處理,最終選擇了2047
int channelMax = this.negotiateChannelMax(this.requestedChannelMax, connTune.getChannelMax()); this._channelManager = this.instantiateChannelManager(channelMax, this.threadFactory);
最終發現這么一段處理邏輯,如果兩個數字都不為0,那么就取最小的,反之取最大的,看到這里是明白做了什么處理,但是還是有一處不明白,2047的值究竟從何處來的?
com.rabbitmq.client.impl.AMQConnection#negotiateChannelMax /** * Private API, allows for easier simulation of bogus clients. */ protected int negotiateChannelMax(int requestedChannelMax, int serverMax) { return negotiatedMaxValue(requestedChannelMax, serverMax); } private static int negotiatedMaxValue(int clientValue, int serverValue) { return (clientValue == 0 || serverValue == 0) ? Math.max(clientValue, serverValue) : Math.min(clientValue, serverValue); }
通過對connTune的追尋,發現了這段處理,debug也證明了確實在這里獲取的2047這個值,
其實不管從方法名rpc()還是變量名serverResponse來看,這個都是做了一個請求,那么向誰請求其實很顯而易見了,這里向RabbitMQ端做了一個請求,用來索取MQ端的channelMax、frameMax、heartBeat值等等
Tune connTune = null; try { ...... try { Method serverResponse = this._channel0.rpc((Method)method, this.handshakeTimeout / 2).getMethod(); if (serverResponse instanceof Tune) { connTune = (Tune)serverResponse; } ......
到現在其實就很明確了,我們只在客戶端修改邊界值配置是無效的,必須同步修改MQ服務端的配置,也就是rabbitmq.conf文件
## Set the max permissible number of channels per connection. ## 0 means "no limit". ##在配置文件中,輸入以下參數和自己想要設置的值即可,如果用不到2047,那就不用配置 # channel_max = 128
其實問題並不大,主要還是不了解MQ的一個客戶端連接過程,導致耗費了大量時間。
這里還是推薦大家,先用百度搜索,第一頁看不到正確解決方案,那就去StackOverflow網站,還不行的話,那就使用終極大法,要么官網逐行看文檔,要么走一波源碼,也是鍛煉自己解決問題的思路和能力。
https://blog.csdn.net/qq_35374224/article/details/106721801
https://github.com/spring-projects/spring-amqp/issues/853
2020-09-09 14:02:36.549 INFO 6 --- [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#10-60623] c.r.c.impl.AbstractMetricsCollector : Error while computing metrics in newChannel: null 2020-09-09 14:02:36.549 ERROR 6 --- [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#10-60623] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s). org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later. at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:60) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1434) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1420) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:723) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:706) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:676) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:567) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1600(CachingConnectionFactory.java:102) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1439) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2062) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2042) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1842) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1823) ~[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1349)[spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) [spring-rabbit-2.2.10.RELEASE.jar!/:2.2.10.RELEASE] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144] 2020-09-09 14:02:36.549 INFO 6 --- [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#10-60623] c.r.c.impl.AbstractMetricsCollector : Error while computing metrics in newChannel: null