一、线程池配置
@Configuration @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; } }
applications.properties中配置@Value("${}")的值
# 异步线程配置 # 配置核心线程数 async.executor.thread.core_pool_size = 20 # 配置最大线程数 async.executor.thread.max_pool_size = 20 # 配置队列大小 async.executor.thread.queue_capacity =9999999 # 配置线程池中的线程的名称前缀 async.executor.thread.name.prefix = async-service-
二、编写测试类
@Test @Order(2) public void insertMany(){ int num=20; try { //计数器数量就等于线程数量 countDownLatch = new CountDownLatch(num); for(int i=0;i<num;i++){ iAsyncService.executeAsyncInsertUser(countDownLatch,i); } //主线程唤醒 countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("end"); } }
三、service层
public interface IAsyncService { public void executeAsyncInsertUser(CountDownLatch countDownLatch, int i); }
@Service public class AsyncServiceImpl implements IAsyncService { @Override @Async("asyncServiceExecutor") public void executeAsyncInsertUser(CountDownLatch countDownLatch,int i){ Connection con = null; PreparedStatement pstmt = null; try { long start = System.currentTimeMillis(); Class.forName("com.mysql.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test_a?setUnicode=true&characterEncoding=utf8&useSSL=false", "root", "bydwx.23"); System.out.println("线程" + Thread.currentThread().getId() + "开始执行"); for (int j=1;j<=5000;j++){ String name= NameUtil.getChineseName(); int age= NumberUtil.getNum(0,100); String sex= SexUtil.getSex(); String address= ProvinceUtil.getProvince(); pstmt = con.prepareStatement("insert into users(name,age,sex,address) values(?,?,?,?)"); pstmt.setString(1,name); pstmt.setInt(2,age); pstmt.setString(3,sex); pstmt.setString(4,address); pstmt.execute(); if (j%100==0){ System.out.println("线程" + Thread.currentThread().getId() + ", j="+j); } } long end = System.currentTimeMillis(); //计算本线程运行完毕使用的时间,单位为秒 long threadUsedTimeSecond=(end-start)/1000; System.out.println("线程" + Thread.currentThread().getId() + "执行结束,用时"+threadUsedTimeSecond+"s"); } catch (Exception e) { e.printStackTrace(); } finally { //关闭资源 try { if(pstmt != null) pstmt.close(); if(con != null) con.close(); } catch (Exception e) { System.err.println(e.getMessage()); } //线程运行完毕,线程计数器-1 countDownLatch.countDown(); } } }
四、执行时间
下表为不同线程数完成插入10万条数据所用时间,本机cup为八核
线程数 | 时间(秒) |
20 | 381 |
30 | 253 |
40 | 193 |
50 | 157 |
100 | 84 |
200 | 44+ |
500 | 22 |
1000 | 16/17/18 |
900 | 16/17/18 |
800 | 17/18/19 |
在线程数为200时,在插入9.5w+数据后,程序报错:
com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Data source rejected establishment of connection, message from server: "Too many connections"
应该是MySQL的默认max_connections属性配置太小,重新配置了MySQL的my.ini设置max_connections=1000,
重启MySQL后,跑500线程数,跟预期结果一样正常运行,并且时间大幅缩短;
1000线程数跑完了但也出现Too many connections错误,但可以看出增加到1000后相比500收益没有很明显。
实际运行的线程数应该小于max_connections。
可以看出来,增加线程数对于大量数据的插入会起到非常大的收益,大幅减少运行时间。
参考:
https://blog.csdn.net/qq_41315539/article/details/107885680