一、線程池配置
@Configuration @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數 executor.setCorePoolSize(corePoolSize); //配置最大線程數 executor.setMaxPoolSize(maxPoolSize); //配置隊列大小 executor.setQueueCapacity(queueCapacity); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix(namePrefix); // rejection-policy:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執行初始化 executor.initialize(); return executor; } }
applications.properties中配置@Value("${}")的值
# 異步線程配置 # 配置核心線程數 async.executor.thread.core_pool_size = 20 # 配置最大線程數 async.executor.thread.max_pool_size = 20 # 配置隊列大小 async.executor.thread.queue_capacity =9999999 # 配置線程池中的線程的名稱前綴 async.executor.thread.name.prefix = async-service-
二、編寫測試類
@Test @Order(2) public void insertMany(){ int num=20; try { //計數器數量就等於線程數量 countDownLatch = new CountDownLatch(num); for(int i=0;i<num;i++){ iAsyncService.executeAsyncInsertUser(countDownLatch,i); } //主線程喚醒 countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("end"); } }
三、service層
public interface IAsyncService { public void executeAsyncInsertUser(CountDownLatch countDownLatch, int i); }
@Service public class AsyncServiceImpl implements IAsyncService { @Override @Async("asyncServiceExecutor") public void executeAsyncInsertUser(CountDownLatch countDownLatch,int i){ Connection con = null; PreparedStatement pstmt = null; try { long start = System.currentTimeMillis(); Class.forName("com.mysql.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test_a?setUnicode=true&characterEncoding=utf8&useSSL=false", "root", "bydwx.23"); System.out.println("線程" + Thread.currentThread().getId() + "開始執行"); for (int j=1;j<=5000;j++){ String name= NameUtil.getChineseName(); int age= NumberUtil.getNum(0,100); String sex= SexUtil.getSex(); String address= ProvinceUtil.getProvince(); pstmt = con.prepareStatement("insert into users(name,age,sex,address) values(?,?,?,?)"); pstmt.setString(1,name); pstmt.setInt(2,age); pstmt.setString(3,sex); pstmt.setString(4,address); pstmt.execute(); if (j%100==0){ System.out.println("線程" + Thread.currentThread().getId() + ", j="+j); } } long end = System.currentTimeMillis(); //計算本線程運行完畢使用的時間,單位為秒 long threadUsedTimeSecond=(end-start)/1000; System.out.println("線程" + Thread.currentThread().getId() + "執行結束,用時"+threadUsedTimeSecond+"s"); } catch (Exception e) { e.printStackTrace(); } finally { //關閉資源 try { if(pstmt != null) pstmt.close(); if(con != null) con.close(); } catch (Exception e) { System.err.println(e.getMessage()); } //線程運行完畢,線程計數器-1 countDownLatch.countDown(); } } }
四、執行時間
下表為不同線程數完成插入10萬條數據所用時間,本機cup為八核
線程數 | 時間(秒) |
20 | 381 |
30 | 253 |
40 | 193 |
50 | 157 |
100 | 84 |
200 | 44+ |
500 | 22 |
1000 | 16/17/18 |
900 | 16/17/18 |
800 | 17/18/19 |
在線程數為200時,在插入9.5w+數據后,程序報錯:
com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Data source rejected establishment of connection, message from server: "Too many connections"
應該是MySQL的默認max_connections屬性配置太小,重新配置了MySQL的my.ini設置max_connections=1000,
重啟MySQL后,跑500線程數,跟預期結果一樣正常運行,並且時間大幅縮短;
1000線程數跑完了但也出現Too many connections錯誤,但可以看出增加到1000后相比500收益沒有很明顯。
實際運行的線程數應該小於max_connections。
可以看出來,增加線程數對於大量數據的插入會起到非常大的收益,大幅減少運行時間。
參考:
https://blog.csdn.net/qq_41315539/article/details/107885680