多線程 采用spring線程池ThreadPoolTaskExecutor提高程序處理能力 筆記


場景:

MQ批量推送消息過來,逐條訂閱,逐條 一開始程序還行,處理能力還可以,因為數據不是很多,涉及的程序處理過程雖然多,但是勉強撐過去。

但是隨着業務增長數據由原來的每分鍾10條數據,增加到了100條數據,由於之前程序處理能力有限,導致mq上出現消息堆積,越堆越多。

 

解決方案A:

因為是分布式系統,多部署幾個消費者,解決問題。

 

解決方案B:

采用多線程處理。只要服務器資源夠,那么久可以提高生產效率,開啟10個線程。那么相當於之前的一個程序處理,變成了10個程序處理,效率提高10倍。(其他因素不考慮的情況下,比如數據庫連接數,CPU,內存消耗等)。

經搜,發現采用spring的線程池的居多,因為簡單,采用jdk原生線程池的,用得不好的情況下問題還挺多。。

 

經測,采用了spring線程池解決問題。

例子:

新建一個配置文件,指定固定線程數,以及其他連個參數即可。


配置類 TaskPoolConfig

這里指定兩個業務類型線程池。

ThreadPoolTaskExecutor  里面有很多參數,但大多有默認值,可不用設置,只需設置我們自己的線程池大小即可。

 

 

 
import java.util.concurrent.Executor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration

@EnableAsync

public class TaskPoolConfig {
/**msg線程池 * * @return */ @Bean("msgExecutor") public Executor msgExecutor(){ ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setThreadNamePrefix(" msgExecutor=="); threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskExecutor.setAwaitTerminationSeconds(600); return threadPoolTaskExecutor; } /**money線程池 * * @return */ @Bean("moneyExecutor") public Executor moneyExecutor(){ ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setThreadNamePrefix(" moneyExecutor=="); threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskExecutor.setAwaitTerminationSeconds(600); return threadPoolTaskExecutor; } }

 

 

 

業務處理類 AsyncService.java
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class AsyncService {
    @Async("msgExecutor")
    public void msgExecutor(String num) {
        log.info("msgExecutor ======"+num+"=======");
        try {
            Thread.sleep(3000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("msgExecutor ======"+num+"=======");
    }
    
    

    @Async("moneyExecutor")
    public void moneyExecutor(String num) {
        log.info("moneyExecutor ======"+num+"=======");
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("moneyExecutor ======"+num+"=======");
    }
}

 

測試類:(注意,調用類與實現業務類必須是兩個分開的類)

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TaskPoolTest extends BaseTest {
    
    @Autowired
    private AsyncService asyncService;
    
    @Test
    public void showTest() throws Exception {
        for (int i=0;i<10;i++) {
            asyncService.moneyExecutor("" + i);
        }
        System.out.println("other===============");
        Thread.sleep(10000);
        for (int i=100;i<110;i++) {
            asyncService.msgExecutor("" + i);
        }
    }
}

 

執行結果:

結果日志中可以看到,執行的程序最終執行完成后 。雖然執行test已完成,都執行了shuting down 了,但程序依舊會等待到所有線程執行完結后才終止。

只是得益於這兩個句:

        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskExecutor.setAwaitTerminationSeconds(600);

 

 

 

 

結論:spring提供的線程池簡單好用,提供服務利用率。在多地方可以考慮使用。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM