之前已经遇到过二次SpringBootAmqp的坑了,但是没有写博客,一个是重试的坑,一个是RabbitListener自动创建的坑
现在这个问题是2021年7月26日出现的
前言:生产环境已经出现设备消息到达慢的问题,需要马上解决
先梳理出来目前的整个链路流程如图
为什么要用这么多队列,因为设备消息是单通道,不是多通道,指令需要串行
发送指令属于优先级高的消息,分为三类优先级,高中低,高优先级一般是用户需要等待结果的指令
但是依然会出现发送开门指令之后需要等待20S设备才开门的场景
基于这些问题,拉了个会议,技术部的几个大佬一起讨论一下最优解和最快解是啥,最后得出三个解
- 增加监听器的线程池,增大并发处理量
- 发送给设备的同步等待修改成异步ACK,增加吞吐量,需要考虑设备不支持双通道
- 发送给设备的指令不保证一致性,只保证实时行,给设备提供增量拉取接口
因为在不该动原有的协议情况下,第三个结论被淘汰了,所以最快是1, 最优是2
没有设置线程池,都是走的默认的,消费ListenerContainer使用的是DirectMessageListenerContainer
DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
container.setExposeListenerChannel(true);
container.setConsumersPerQueue(1);
container.setPrefetchCount(1);
container.setMessageConverter(new Jackson2JsonMessageConverter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener(new ChannelAwareMessageListener() {
//message receive
})
默认的线程池前缀是 pool-2-thread-
接下来坑来了
首先说一下网上博客的坑
说的是默认是5个线程,狗屎, 当然我不能说他的是错的,可能环境不一样
下面看看官方说的
官方说的默认是核心数*2, 实际上查看源码确实也是,并且最大为16, 测试确实也是16个
但是注意,重点来了,他说你可以设置taskExecutor来提供更大的并发量, 这个跟博客倒是一致,但是事实证明 bull shit
不管怎么说,首先肯定是这样尝试一下,于是我把代码改成了
DirectMessageListenerContainer container = new DirectMessageListenerContainer(
connectionFactory);
container.setExposeListenerChannel(true);
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(60);
taskExecutor.setThreadNamePrefix("device-receive-");
taskExecutor.setThreadGroupName("device-receive-group");
//设置该属性,灵活设置并发 ,多线程运行。
container.setTaskExecutor(taskExecutor);
container.setConsumersPerQueue(1);
container.setPrefetchCount(1);
container.setMessageConverter(new Jackson2JsonMessageConverter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener()
无效!!!!!!!
依然是走的默认的线程池,搞我心态
然后我准备设置CONNECT MODEL来试试
干
然后我配置成connection模式,并且设置成60,结果玩具一样,他疯狂一直增加。200个线程了还没停(没截图保存),打住,我不想朝这个方向研究了,反正就是坑,而且使用connection模式,log里面会输出,connection模式不支持动态新增队列,跟我的业务场景就违背了
然后我在想,是不是用@RabbitListener, 然后用DirectRabbitListenerContainerFactory来设置taskExecutor然后给@RabbitListenr提供containerFactory给可以打破这个问题
说干就干
@RabbitListener(containerFactory="xxxxxx") // 因为这里是动态设置的queue,所以不设置
public void messageReceive(Message message){
//todo
}
@Bean
@Primary
private DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
container.setExposeListenerChannel(true);
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(60);
taskExecutor.setThreadNamePrefix("device-receive-");
taskExecutor.setThreadGroupName("device-receive-group");
//设置该属性,灵活设置并发 ,多线程运行。
container.setTaskExecutor(taskExecutor);
container.setConsumersPerQueue(1);
container.setPrefetchCount(1);
container.setMessageConverter(new Jackson2JsonMessageConverter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener()
}
依然还是走的默认线程池,抱着试试砍的态度,把@RabbitListener 的containerFactory 不设置,因为不设置也会走默认的,况且我只配置了一个,当时他竟然变了!!!变成了 SimpleAsyncTaskExecutor 线程池组,这个图片没保存,贴不了了,我特么。。。。。。,不管了,反正是错误的,我也不想去加大async的线程池来这样做,也不想去细究他为什么会这样
最后一步,debug源码吧
万万没想到的是,最后还是解决了,从晚上6点到凌晨3点,我最终还是解决了这个问题,没办法,不解决这个问题,我睡不着
// 仅仅只是把给DirectMessageListenerContainer设置线程池修改成了给ConnectFactory设置
@Bean(name = DeviceMQConstant.DEVICE_COMMAND_CONTAINER)
public DirectMessageListenerContainer deviceCommadnContainer(
AbstractConnectionFactory connectionFactory) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(60);
taskExecutor.setMaxPoolSize(60);
taskExecutor.setThreadNamePrefix("mq-receive-");
taskExecutor.setThreadGroupName("mq-receive-group");
taskExecutor.initialize();
connectionFactory.setExecutor(taskExecutor);
这一块当时没有截图,现在也不想回顾的太深了,反正就是如果不给ConnectFactory 设置Executor 他就会走默认的线程池
然后收工,日志正常
线程数正常
然后生产环境也正常了