1、使用异步通信的背景
项目需要接收从电信AEP平台推送过来的消息,而接收这个消息需要有一个监听器一直监听。自己又不想这个监听业务影响到项目其他业务,所以就将这个监听过程定义为了一个异步任务。
2、异步任务定义过程
(1)使用注解(@EnableAsync)开启异步任务
(2)编写异步任务配置文件
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
// ThredPoolTaskExcutor的处理流程
// 当池子大小小于corePoolSize,就新建线程,并处理请求
// 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
// 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
// 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
@Override
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//设置核心线程数
threadPool.setCorePoolSize(10);
//设置最大线程数
threadPool.setMaxPoolSize(100);
//线程池所使用的缓冲队列
threadPool.setQueueCapacity(10);
//等待任务在关机时完成--表明等待所有线程执行完
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
threadPool.setAwaitTerminationSeconds(60);
// 线程名称前缀
threadPool.setThreadNamePrefix("Derry-Async-");
// 初始化线程
threadPool.initialize();
return threadPool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
(3)编写异步业务
@Service
public class AsyncTaskServiceImpl implements AsyncTaskService {
@Async
public void executeAsyncTask() {
System.out.println("线程" + Thread.currentThread().getName() + " 开始执行异步任务:");
String server = "xxx"; //消息服务地址
String tenantId = "xxx";//租户ID
String token = "xxx";//身份认证token串
String certFilePath = ""; //直接填空字符串,CA证书,JDK已经内置相关根证书,无需指定
//创建消息接收Listener
IMsgListener msgListener = new IMsgListener() {
@Override
public void onMessage(String msg) {
//接收消息
System.out.println(msg);
}
};
//创建消息接收类
IMsgConsumer consumer = new MqMsgConsumer();
try {
//初始化
/**
* @param server 消息服务server地址
* @param tenantId 租户Id
* @param token 用户认证token
* @param certFilePath 证书文件路径
* @param topicNames 主题名列表,如果该列表为空或null,则自动消费该租户所有主题消息
* @param msgListener 消息接收者
* @return 是否初始化成功
*/
consumer.init(server, tenantId, token, certFilePath, null, msgListener);
//开始接收消息
consumer.start();
//程序退出时,停止接收、销毁
//consumer.stop();
//consumer.destroy();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("start");
}
}
(4)调用业务
@Autowired
AsyncTaskService asyncTaskService;
@Test
public void test(){
asyncTaskService.executeAsyncTask();
}
3、遇到的问题
在异步方法(executeAsyncTask)内调用别的service服务,报空指针异常错误。
在网上找到类似的错误: new出来的线程类中无法使用@Autowired注入Bean
解决办法是:我们新增一个类,实现ApplicationContextAware接口
@Service
public class ManageSpringBeans implements ApplicationContextAware {
private static ApplicationContext context;
public static <T> T getBean(final Class<T> requiredType) {
return context.getBean(requiredType);
}
public static <T> T getBean(final String beanName) {
@SuppressWarnings("unchecked")
final T bean = (T) context.getBean(beanName);
return bean;
}
public static <T> Map<String, T> getBeans(final Class<T> requiredType) {
return context.getBeansOfType(requiredType);
}
public static Map<String, Object> getBeansWithAnnotation(final Class<? extends Annotation> annotationType) {
return context.getBeansWithAnnotation(annotationType);
}
@Override
public void setApplicationContext(final ApplicationContext applicationContext) {
context = applicationContext;
}
}
之后就可以在需要某一个bean的时候使用该类的ManageSpringBeans.getBean(beanName)或者ManageSpringBeans.getBean(beanClassName.class)来获取来获取Spring的bean对象了。
ISensorHumitureService sensorHumitureService = ManageSpringBeans.getBean(ISensorHumitureService.class);
sensorHumitureService.save(sensorHumiture);
参考链接:https://blog.csdn.net/xiegongmiao/article/details/107839041