SpringBoot AMQP线程池的坑, 坑三


之前已经遇到过二次SpringBootAmqp的坑了,但是没有写博客,一个是重试的坑,一个是RabbitListener自动创建的坑

现在这个问题是2021年7月26日出现的

前言:生产环境已经出现设备消息到达慢的问题,需要马上解决

先梳理出来目前的整个链路流程如图

image-20210728212746569

为什么要用这么多队列,因为设备消息是单通道,不是多通道,指令需要串行

发送指令属于优先级的消息,分为三类优先级,高中低,高优先级一般是用户需要等待结果的指令

但是依然会出现发送开门指令之后需要等待20S设备才开门的场景

基于这些问题,拉了个会议,技术部的几个大佬一起讨论一下最优解和最快解是啥,最后得出三个解

  1. 增加监听器的线程池,增大并发处理量
  2. 发送给设备的同步等待修改成异步ACK,增加吞吐量,需要考虑设备不支持双通道
  3. 发送给设备的指令不保证一致性,只保证实时行,给设备提供增量拉取接口

因为在不该动原有的协议情况下,第三个结论被淘汰了,所以最快是1, 最优是2

没有设置线程池,都是走的默认的,消费ListenerContainer使用的是DirectMessageListenerContainer

  DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
  container.setExposeListenerChannel(true);
  container.setConsumersPerQueue(1);
  container.setPrefetchCount(1);
  container.setMessageConverter(new Jackson2JsonMessageConverter());
  container.setAcknowledgeMode(AcknowledgeMode.AUTO);
  container.setMessageListener(new ChannelAwareMessageListener() {
      //message receive
  })

默认的线程池前缀是 pool-2-thread-

接下来坑来了

首先说一下网上博客的坑

说的是默认是5个线程,狗屎, 当然我不能说他的是错的,可能环境不一样

下面看看官方说的

官方说的默认是核心数*2, 实际上查看源码确实也是,并且最大为16, 测试确实也是16个

但是注意,重点来了,他说你可以设置taskExecutor来提供更大的并发量, 这个跟博客倒是一致,但是事实证明 bull shit

不管怎么说,首先肯定是这样尝试一下,于是我把代码改成了

DirectMessageListenerContainer container = new DirectMessageListenerContainer(
        connectionFactory);
container.setExposeListenerChannel(true);
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(60);
taskExecutor.setThreadNamePrefix("device-receive-");
taskExecutor.setThreadGroupName("device-receive-group");
//设置该属性,灵活设置并发 ,多线程运行。
container.setTaskExecutor(taskExecutor);
container.setConsumersPerQueue(1);
container.setPrefetchCount(1);
container.setMessageConverter(new Jackson2JsonMessageConverter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener()

无效!!!!!!!

依然是走的默认的线程池,搞我心态

然后我准备设置CONNECT MODEL来试试

然后我配置成connection模式,并且设置成60,结果玩具一样,他疯狂一直增加。200个线程了还没停(没截图保存),打住,我不想朝这个方向研究了,反正就是坑,而且使用connection模式,log里面会输出,connection模式不支持动态新增队列,跟我的业务场景就违背了

然后我在想,是不是用@RabbitListener, 然后用DirectRabbitListenerContainerFactory来设置taskExecutor然后给@RabbitListenr提供containerFactory给可以打破这个问题

说干就干

@RabbitListener(containerFactory="xxxxxx") // 因为这里是动态设置的queue,所以不设置
public void messageReceive(Message message){
    //todo
}

@Bean
@Primary
private DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    container.setExposeListenerChannel(true);
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(10);
    taskExecutor.setMaxPoolSize(60);
    taskExecutor.setThreadNamePrefix("device-receive-");
    taskExecutor.setThreadGroupName("device-receive-group");
    //设置该属性,灵活设置并发 ,多线程运行。
    container.setTaskExecutor(taskExecutor);
    container.setConsumersPerQueue(1);
    container.setPrefetchCount(1);
    container.setMessageConverter(new Jackson2JsonMessageConverter());
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    container.setMessageListener()
}

​ 依然还是走的默认线程池,抱着试试砍的态度,把@RabbitListenercontainerFactory 不设置,因为不设置也会走默认的,况且我只配置了一个,当时他竟然变了!!!变成了 SimpleAsyncTaskExecutor 线程池组,这个图片没保存,贴不了了,我特么。。。。。。,不管了,反正是错误的,我也不想去加大async的线程池来这样做,也不想去细究他为什么会这样

最后一步,debug源码吧

万万没想到的是,最后还是解决了,从晚上6点到凌晨3点,我最终还是解决了这个问题,没办法,不解决这个问题,我睡不着

// 仅仅只是把给DirectMessageListenerContainer设置线程池修改成了给ConnectFactory设置
@Bean(name = DeviceMQConstant.DEVICE_COMMAND_CONTAINER)
  public DirectMessageListenerContainer deviceCommadnContainer(
      AbstractConnectionFactory connectionFactory) {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(60);
    taskExecutor.setMaxPoolSize(60);
    taskExecutor.setThreadNamePrefix("mq-receive-");
    taskExecutor.setThreadGroupName("mq-receive-group");
    taskExecutor.initialize();
    connectionFactory.setExecutor(taskExecutor);

这一块当时没有截图,现在也不想回顾的太深了,反正就是如果不给ConnectFactory 设置Executor 他就会走默认的线程池

然后收工,日志正常

线程数正常

然后生产环境也正常了


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM