定义了一个线程池,然后利用 @Async
注解写了3个任务,并指定了这些任务执行使用的线程池
1、我们定义一个 ThreadPoolTaskScheduler
线程池
package com.sinosoft.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * Created by xushuyi on 2018/4/2. */ @EnableAsync @Configuration public class ThreadPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { // 创建一个线程池对象 ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); // 定义一个线程池大小 scheduler.setPoolSize(100); // 线程池名的前缀 scheduler.setThreadNamePrefix("taskExecutor-"); // 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean scheduler.setWaitForTasksToCompleteOnShutdown(true); // 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住 scheduler.setAwaitTerminationSeconds(60); // 线程池对拒绝任务的处理策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务 scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return scheduler; } }
2、建立异步任务、让它依赖一个外部资源,比如:Redis
package com.sinosoft.common; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.TimeUnit; /** * Created by xushuyi on 2018/4/2. * 查看redis api: * https://docs.spring.io/spring-data/data-keyvalue/docs/current/api/org/springframework/data/keyvalue/redis/core/StringRedisTemplate.html */ @Slf4j @Component public class TaskScheduler { @Autowired private StringRedisTemplate stringRedisTemplate; @Async("taskExecutor") public void doTaskOne() { log.info("开始做任务一..."); long start = System.currentTimeMillis(); log.info(stringRedisTemplate.toString()); // 向Redis写入数据 永久性 stringRedisTemplate.opsForValue().set("username", "xushuyi"); // 删除Redis中的Key值 stringRedisTemplate.delete("username1"); // 向Redis写入数据 + 缓存时间 1分钟 stringRedisTemplate.opsForValue().set("username2", "zhangsan", 60 * 1, TimeUnit.SECONDS); // 随机获取Redis中的Key值 log.info("获取随机Redis Key值:" + stringRedisTemplate.randomKey()); // 向Redis写入num 数值 stringRedisTemplate.opsForValue().set("num", "100"); // 获取Redis的Key num对应的值 log.info("获取写入Redis num的值:" + stringRedisTemplate.opsForValue().get("num")); // 对Redis中的num数值进行减操作 stringRedisTemplate.boundValueOps("num").increment(-1);// val 做 -1 操作 // 获取Redis的Key num 进行减操作后对应的值 log.info("获取写入Redis num减去1操作的值:" + stringRedisTemplate.opsForValue().get("num")); // 获取Redis的Key num 进行加操作 stringRedisTemplate.boundValueOps("num").increment(2);// val 做 +1 操作 // 获取Redis的Key num 进行加操作后对应的值 log.info("获取写入Redis num加上2操作的值:" + stringRedisTemplate.opsForValue().get("num")); // 获取Redis Key值的过期时间 log.info("获取写入Redis username的过期时间" + stringRedisTemplate.getExpire("username")); log.info("获取写入Redis username2的过期时间" + stringRedisTemplate.getExpire("username2")); // 获取Redis Key值过期时间 并换算为指定时间单位 例如:天 log.info("获取写入Redis username的过期时间" + stringRedisTemplate.getExpire("username", TimeUnit.SECONDS)); log.info("获取写入Redis username2的过期时间" + stringRedisTemplate.getExpire("username2", TimeUnit.SECONDS)); // 判断Redis是否包含对应的Key值 log.info("判断Redis 是否包含Key username:" + stringRedisTemplate.hasKey("username")); log.info("判断Redis 是否包含Key username1:" + stringRedisTemplate.hasKey("username1")); // 向Redis 指定Key放入对象或集合 stringRedisTemplate.opsForSet().add("set_123", "1", "2", "3"); stringRedisTemplate.expire("set_123", 1000, TimeUnit.MILLISECONDS);//设置过期时间 log.info("根据key查看集合中是否存在指定数据:" + stringRedisTemplate.opsForSet().isMember("set_123", "1")); log.info("查看集合数据:" + stringRedisTemplate.opsForSet().members("set_123")); // 通过Redis Key值获取对应的有效时间 Long expire = stringRedisTemplate.boundHashOps("set_123").getExpire(); log.info("集合有效时间:" + expire + "S"); // 向Redis 中写入Map对象 其中 三个参数 为:redis key,对应map 的 key 及 val User user = new User("xushuyi", "123"); stringRedisTemplate.opsForHash().putIfAbsent("key1", "xushuyi", user.toString()); // 获取Redis中的Map对象 Map backUser = stringRedisTemplate.opsForHash().entries("key1"); log.info("获取Redis中的Map对象:" + backUser); log.info("获取Redis Key值下的某一个Map对象:" + stringRedisTemplate.opsForHash().get("key1", "xushuyi")); long end = System.currentTimeMillis(); log.info("完成任务一,共计耗时:" + (end - start) + "毫秒."); } @Async("taskExecutor") public void doTaskTwo() { log.info("开始做任务二..."); long start = System.currentTimeMillis(); log.info(stringRedisTemplate.randomKey()); long end = System.currentTimeMillis(); log.info("完成任务二,共计耗时:" + (end - start) + "毫秒."); } @Async("taskExecutor") public void doTaskThree() { log.info("开始做任务三..."); long start = System.currentTimeMillis(); log.info(stringRedisTemplate.randomKey()); long end = System.currentTimeMillis(); log.info("完成任务三,共计耗时:" + (end - start) + "毫秒."); } } @Data class User { private String username; private String pwd; public User(String username, String pwd) { this.username = username; this.pwd = pwd; } }
3、pom.xml 中配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>micro</artifactId> <groupId>com.sinosoft</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>threadPool-task</artifactId> <packaging>jar</packaging> <dependencies> <!-- 添加Eureka的依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-feign</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- Swagger UI --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>${springfox.version}</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>${springfox.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>19.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> </dependency> <!-- 整合hystrix,其实feign中自带了hystrix,引入该依赖主要是为了使用其中的hystrix-metrics-event-stream,用于dashboard --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency> <dependency> <groupId>com.sinosoft</groupId> <artifactId>common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.sinosoft</groupId> <artifactId>interface</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
4、修改单元测试 模拟高并发shutdown情况
package com.sinosoft; import com.sinosoft.common.TaskScheduler; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Import; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.web.WebAppConfiguration; /** * Created by xushuyi on 2018/4/2. */ @Slf4j @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest @Import(ThreadApplication.class) @ActiveProfiles(profiles = "") @WebAppConfiguration public class TaskSchedulerTest { @Autowired private TaskScheduler taskScheduler; @Before public void before() { log.info("开始单元测试..."); } @Test @SneakyThrows public void taskTest() { taskScheduler.doTaskOne(); for (int i = 0; i < 100; i++) { taskScheduler.doTaskTwo(); taskScheduler.doTaskThree(); // 模拟高并发 情况下 系统 shutdown if (i == 50) { System.exit(0); } } } @After public void after() { log.info("结束单元测试..."); } }