1、使用異步通信的背景
項目需要接收從電信AEP平台推送過來的消息,而接收這個消息需要有一個監聽器一直監聽。自己又不想這個監聽業務影響到項目其他業務,所以就將這個監聽過程定義為了一個異步任務。
2、異步任務定義過程
(1)使用注解(@EnableAsync)開啟異步任務

(2)編寫異步任務配置文件
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
// ThredPoolTaskExcutor的處理流程
// 當池子大小小於corePoolSize,就新建線程,並處理請求
// 當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去workQueue中取任務並處理
// 當workQueue放不下任務時,就新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
// 當池子的線程數大於corePoolSize時,多余的線程會等待keepAliveTime長時間,如果無請求可處理就自行銷毀
@Override
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//設置核心線程數
threadPool.setCorePoolSize(10);
//設置最大線程數
threadPool.setMaxPoolSize(100);
//線程池所使用的緩沖隊列
threadPool.setQueueCapacity(10);
//等待任務在關機時完成--表明等待所有線程執行完
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 等待時間 (默認為0,此時立即停止),並沒等待xx秒后強制停止
threadPool.setAwaitTerminationSeconds(60);
// 線程名稱前綴
threadPool.setThreadNamePrefix("Derry-Async-");
// 初始化線程
threadPool.initialize();
return threadPool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
(3)編寫異步業務
@Service
public class AsyncTaskServiceImpl implements AsyncTaskService {
@Async
public void executeAsyncTask() {
System.out.println("線程" + Thread.currentThread().getName() + " 開始執行異步任務:");
String server = "xxx"; //消息服務地址
String tenantId = "xxx";//租戶ID
String token = "xxx";//身份認證token串
String certFilePath = ""; //直接填空字符串,CA證書,JDK已經內置相關根證書,無需指定
//創建消息接收Listener
IMsgListener msgListener = new IMsgListener() {
@Override
public void onMessage(String msg) {
//接收消息
System.out.println(msg);
}
};
//創建消息接收類
IMsgConsumer consumer = new MqMsgConsumer();
try {
//初始化
/**
* @param server 消息服務server地址
* @param tenantId 租戶Id
* @param token 用戶認證token
* @param certFilePath 證書文件路徑
* @param topicNames 主題名列表,如果該列表為空或null,則自動消費該租戶所有主題消息
* @param msgListener 消息接收者
* @return 是否初始化成功
*/
consumer.init(server, tenantId, token, certFilePath, null, msgListener);
//開始接收消息
consumer.start();
//程序退出時,停止接收、銷毀
//consumer.stop();
//consumer.destroy();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("start");
}
}
(4)調用業務
@Autowired
AsyncTaskService asyncTaskService;
@Test
public void test(){
asyncTaskService.executeAsyncTask();
}
3、遇到的問題
在異步方法(executeAsyncTask)內調用別的service服務,報空指針異常錯誤。
在網上找到類似的錯誤: new出來的線程類中無法使用@Autowired注入Bean
解決辦法是:我們新增一個類,實現ApplicationContextAware接口
@Service
public class ManageSpringBeans implements ApplicationContextAware {
private static ApplicationContext context;
public static <T> T getBean(final Class<T> requiredType) {
return context.getBean(requiredType);
}
public static <T> T getBean(final String beanName) {
@SuppressWarnings("unchecked")
final T bean = (T) context.getBean(beanName);
return bean;
}
public static <T> Map<String, T> getBeans(final Class<T> requiredType) {
return context.getBeansOfType(requiredType);
}
public static Map<String, Object> getBeansWithAnnotation(final Class<? extends Annotation> annotationType) {
return context.getBeansWithAnnotation(annotationType);
}
@Override
public void setApplicationContext(final ApplicationContext applicationContext) {
context = applicationContext;
}
}
之后就可以在需要某一個bean的時候使用該類的ManageSpringBeans.getBean(beanName)或者ManageSpringBeans.getBean(beanClassName.class)來獲取來獲取Spring的bean對象了。
ISensorHumitureService sensorHumitureService = ManageSpringBeans.getBean(ISensorHumitureService.class);
sensorHumitureService.save(sensorHumiture);
參考鏈接:https://blog.csdn.net/xiegongmiao/article/details/107839041
