之前已經遇到過二次SpringBootAmqp的坑了,但是沒有寫博客,一個是重試的坑,一個是RabbitListener自動創建的坑
現在這個問題是2021年7月26日出現的
前言:生產環境已經出現設備消息到達慢的問題,需要馬上解決
先梳理出來目前的整個鏈路流程如圖
為什么要用這么多隊列,因為設備消息是單通道,不是多通道,指令需要串行
發送指令屬於優先級高的消息,分為三類優先級,高中低,高優先級一般是用戶需要等待結果的指令
但是依然會出現發送開門指令之后需要等待20S設備才開門的場景
基於這些問題,拉了個會議,技術部的幾個大佬一起討論一下最優解和最快解是啥,最后得出三個解
- 增加監聽器的線程池,增大並發處理量
- 發送給設備的同步等待修改成異步ACK,增加吞吐量,需要考慮設備不支持雙通道
- 發送給設備的指令不保證一致性,只保證實時行,給設備提供增量拉取接口
因為在不該動原有的協議情況下,第三個結論被淘汰了,所以最快是1, 最優是2
沒有設置線程池,都是走的默認的,消費ListenerContainer使用的是DirectMessageListenerContainer
DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
container.setExposeListenerChannel(true);
container.setConsumersPerQueue(1);
container.setPrefetchCount(1);
container.setMessageConverter(new Jackson2JsonMessageConverter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener(new ChannelAwareMessageListener() {
//message receive
})
默認的線程池前綴是 pool-2-thread-
接下來坑來了
首先說一下網上博客的坑
說的是默認是5個線程,狗屎, 當然我不能說他的是錯的,可能環境不一樣
下面看看官方說的
官方說的默認是核心數*2, 實際上查看源碼確實也是,並且最大為16, 測試確實也是16個
但是注意,重點來了,他說你可以設置taskExecutor來提供更大的並發量, 這個跟博客倒是一致,但是事實證明 bull shit
不管怎么說,首先肯定是這樣嘗試一下,於是我把代碼改成了
DirectMessageListenerContainer container = new DirectMessageListenerContainer(
connectionFactory);
container.setExposeListenerChannel(true);
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(60);
taskExecutor.setThreadNamePrefix("device-receive-");
taskExecutor.setThreadGroupName("device-receive-group");
//設置該屬性,靈活設置並發 ,多線程運行。
container.setTaskExecutor(taskExecutor);
container.setConsumersPerQueue(1);
container.setPrefetchCount(1);
container.setMessageConverter(new Jackson2JsonMessageConverter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener()
無效!!!!!!!
依然是走的默認的線程池,搞我心態
然后我准備設置CONNECT MODEL來試試
干
然后我配置成connection模式,並且設置成60,結果玩具一樣,他瘋狂一直增加。200個線程了還沒停(沒截圖保存),打住,我不想朝這個方向研究了,反正就是坑,而且使用connection模式,log里面會輸出,connection模式不支持動態新增隊列,跟我的業務場景就違背了
然后我在想,是不是用@RabbitListener, 然后用DirectRabbitListenerContainerFactory來設置taskExecutor然后給@RabbitListenr提供containerFactory給可以打破這個問題
說干就干
@RabbitListener(containerFactory="xxxxxx") // 因為這里是動態設置的queue,所以不設置
public void messageReceive(Message message){
//todo
}
@Bean
@Primary
private DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
container.setExposeListenerChannel(true);
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(60);
taskExecutor.setThreadNamePrefix("device-receive-");
taskExecutor.setThreadGroupName("device-receive-group");
//設置該屬性,靈活設置並發 ,多線程運行。
container.setTaskExecutor(taskExecutor);
container.setConsumersPerQueue(1);
container.setPrefetchCount(1);
container.setMessageConverter(new Jackson2JsonMessageConverter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener()
}
依然還是走的默認線程池,抱着試試砍的態度,把@RabbitListener 的containerFactory 不設置,因為不設置也會走默認的,況且我只配置了一個,當時他竟然變了!!!變成了 SimpleAsyncTaskExecutor 線程池組,這個圖片沒保存,貼不了了,我特么。。。。。。,不管了,反正是錯誤的,我也不想去加大async的線程池來這樣做,也不想去細究他為什么會這樣
最后一步,debug源碼吧
萬萬沒想到的是,最后還是解決了,從晚上6點到凌晨3點,我最終還是解決了這個問題,沒辦法,不解決這個問題,我睡不着
// 僅僅只是把給DirectMessageListenerContainer設置線程池修改成了給ConnectFactory設置
@Bean(name = DeviceMQConstant.DEVICE_COMMAND_CONTAINER)
public DirectMessageListenerContainer deviceCommadnContainer(
AbstractConnectionFactory connectionFactory) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(60);
taskExecutor.setMaxPoolSize(60);
taskExecutor.setThreadNamePrefix("mq-receive-");
taskExecutor.setThreadGroupName("mq-receive-group");
taskExecutor.initialize();
connectionFactory.setExecutor(taskExecutor);
這一塊當時沒有截圖,現在也不想回顧的太深了,反正就是如果不給ConnectFactory 設置Executor 他就會走默認的線程池
然后收工,日志正常
線程數正常
然后生產環境也正常了