場景:
MQ批量推送消息過來,逐條訂閱,逐條 一開始程序還行,處理能力還可以,因為數據不是很多,涉及的程序處理過程雖然多,但是勉強撐過去。
但是隨着業務增長數據由原來的每分鍾10條數據,增加到了100條數據,由於之前程序處理能力有限,導致mq上出現消息堆積,越堆越多。
解決方案A:
因為是分布式系統,多部署幾個消費者,解決問題。
解決方案B:
采用多線程處理。只要服務器資源夠,那么久可以提高生產效率,開啟10個線程。那么相當於之前的一個程序處理,變成了10個程序處理,效率提高10倍。(其他因素不考慮的情況下,比如數據庫連接數,CPU,內存消耗等)。
經搜,發現采用spring的線程池的居多,因為簡單,采用jdk原生線程池的,用得不好的情況下問題還挺多。。
經測,采用了spring線程池解決問題。
例子:
新建一個配置文件,指定固定線程數,以及其他連個參數即可。
配置類 TaskPoolConfig
這里指定兩個業務類型線程池。
ThreadPoolTaskExecutor 里面有很多參數,但大多有默認值,可不用設置,只需設置我們自己的線程池大小即可。

import java.util.concurrent.Executor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration
@EnableAsync
public class TaskPoolConfig {
/**msg線程池 * * @return */ @Bean("msgExecutor") public Executor msgExecutor(){ ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setThreadNamePrefix(" msgExecutor=="); threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskExecutor.setAwaitTerminationSeconds(600); return threadPoolTaskExecutor; } /**money線程池 * * @return */ @Bean("moneyExecutor") public Executor moneyExecutor(){ ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setThreadNamePrefix(" moneyExecutor=="); threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskExecutor.setAwaitTerminationSeconds(600); return threadPoolTaskExecutor; } }
業務處理類 AsyncService.java
import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; @Slf4j @Service public class AsyncService { @Async("msgExecutor") public void msgExecutor(String num) { log.info("msgExecutor ======"+num+"======="); try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } log.info("msgExecutor ======"+num+"======="); } @Async("moneyExecutor") public void moneyExecutor(String num) { log.info("moneyExecutor ======"+num+"======="); try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } log.info("moneyExecutor ======"+num+"======="); } }
測試類:(注意,調用類與實現業務類必須是兩個分開的類)
import lombok.extern.slf4j.Slf4j; @Slf4j public class TaskPoolTest extends BaseTest { @Autowired private AsyncService asyncService; @Test public void showTest() throws Exception { for (int i=0;i<10;i++) { asyncService.moneyExecutor("" + i); } System.out.println("other==============="); Thread.sleep(10000); for (int i=100;i<110;i++) { asyncService.msgExecutor("" + i); } } }
執行結果:
結果日志中可以看到,執行的程序最終執行完成后 。雖然執行test已完成,都執行了shuting down 了,但程序依舊會等待到所有線程執行完結后才終止。
只是得益於這兩個句:
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskExecutor.setAwaitTerminationSeconds(600);
結論:spring提供的線程池簡單好用,提供服務利用率。在多地方可以考慮使用。