使用springboot線程池,插入十萬條數據


一、線程池配置

@Configuration
@EnableAsync
public class ExecutorConfig {

    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心線程數
        executor.setCorePoolSize(corePoolSize);
        //配置最大線程數
        executor.setMaxPoolSize(maxPoolSize);
        //配置隊列大小
        executor.setQueueCapacity(queueCapacity);
        //配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix(namePrefix);

        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        return executor;
    }
}

  applications.properties中配置@Value("${}")的值

# 異步線程配置
# 配置核心線程數
async.executor.thread.core_pool_size = 20
# 配置最大線程數
async.executor.thread.max_pool_size = 20
# 配置隊列大小
async.executor.thread.queue_capacity =9999999
# 配置線程池中的線程的名稱前綴
async.executor.thread.name.prefix = async-service-

二、編寫測試類

  @Test
    @Order(2)
    public void insertMany(){
        int num=20;
        try {
            //計數器數量就等於線程數量
            countDownLatch = new CountDownLatch(num);
            for(int i=0;i<num;i++){
                iAsyncService.executeAsyncInsertUser(countDownLatch,i);
            }
            //主線程喚醒
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            System.out.println("end");
        }
    }

三、service層

public interface IAsyncService {
    public void executeAsyncInsertUser(CountDownLatch countDownLatch, int i);
}
@Service
public class AsyncServiceImpl implements IAsyncService {
    
    @Override
    @Async("asyncServiceExecutor")
    public void executeAsyncInsertUser(CountDownLatch countDownLatch,int i){
        Connection con = null;
        PreparedStatement pstmt = null;
        try {
            long start = System.currentTimeMillis();
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test_a?setUnicode=true&characterEncoding=utf8&useSSL=false", "root", "bydwx.23");
            System.out.println("線程" + Thread.currentThread().getId() + "開始執行");
            for (int j=1;j<=5000;j++){
                String name= NameUtil.getChineseName();
                int age= NumberUtil.getNum(0,100);
                String sex= SexUtil.getSex();
                String address= ProvinceUtil.getProvince();

                pstmt = con.prepareStatement("insert into users(name,age,sex,address) values(?,?,?,?)");
                pstmt.setString(1,name);
                pstmt.setInt(2,age);
                pstmt.setString(3,sex);
                pstmt.setString(4,address);
                pstmt.execute();

                if (j%100==0){
                    System.out.println("線程" + Thread.currentThread().getId() + ",  j="+j);
                }
            }
            long end = System.currentTimeMillis();
            //計算本線程運行完畢使用的時間,單位為秒
            long threadUsedTimeSecond=(end-start)/1000;
            System.out.println("線程" + Thread.currentThread().getId() + "執行結束,用時"+threadUsedTimeSecond+"s");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉資源
            try {
                if(pstmt != null) pstmt.close();
                if(con != null) con.close();
            } catch (Exception e) {
                System.err.println(e.getMessage());
            }
            //線程運行完畢,線程計數器-1
            countDownLatch.countDown();
        }
    }
}

四、執行時間

  下表為不同線程數完成插入10萬條數據所用時間,本機cup為八核

線程數 時間(秒)
20  381
30 253
40 193
50 157
100 84
200 44+
500 22
1000 16/17/18
900 16/17/18
800 17/18/19

  在線程數為200時,在插入9.5w+數據后,程序報錯:

com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Data source rejected establishment of connection,  message from server: "Too many connections"

  應該是MySQL的默認max_connections屬性配置太小,重新配置了MySQL的my.ini設置max_connections=1000,

  重啟MySQL后,跑500線程數,跟預期結果一樣正常運行,並且時間大幅縮短;

  1000線程數跑完了但也出現Too many connections錯誤,但可以看出增加到1000后相比500收益沒有很明顯。

  實際運行的線程數應該小於max_connections。

  可以看出來,增加線程數對於大量數據的插入會起到非常大的收益,大幅減少運行時間。

參考:

  https://blog.csdn.net/qq_41315539/article/details/107885680


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM