github代碼地址: https://github.com/showkawa/springBoot_2017/tree/master/spb-demo/spb-brian-query-service
假設一個需求用戶點擊某個頁面,我們后台需要向MQ推送信信息
1,模擬的MQ服務,我這邊使用RabbitMQ (關於MQ 發送和監聽消息可以參考我的博客:SpringBoot消息中間件RabbitMQ)
//后台監聽消息
@RabbitListener(queues = "brian.test") public void receiveMessage(User user){ logger.info("接收到MQ的消息體: " + user); }
2.向IOC容器中注冊一個ThreadPoolTaskExecutor實例
@Bean public ThreadPoolTaskExecutor brianThreadPool(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心線程數 executor.setCorePoolSize(8); //最大線程數 executor.setMaxPoolSize(16); //隊列中最大的數 executor.setQueueCapacity(8); //縣城名稱前綴 executor.setThreadNamePrefix("brianThreadPool_"); //rejectionPolicy:當pool已經達到max的時候,如何處理新任務 //callerRuns:不在新線程中執行任務,而是由調用者所在的線程來執行 //對拒絕task的處理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //線程空閑后最大的存活時間 executor.setKeepAliveSeconds(60); //初始化加載 executor.initialize(); return executor; }
3.實現線程池並發推送消息
/** * 多線程推送消息到MQ服務 */ public void sendMessageByThredPool(User user) throws ExecutionException, InterruptedException { Future<String> future = executor.submit(() -> { sendMessageService.sendMessage("brian","mymq",user);
logger.info("線程 [ " + Thread.currentThread().getName() + " ] 推送消息到MQ成功! " + new Date()); return Thread.currentThread().getName(); }); }
4. Controller層的調用
@PostMapping("/loop/sendMsg/userInfo")
public ResponseEntity addUserInfo2MQ(@RequestBody User user) throws ExecutionException, InterruptedException {
brianService.sendMessageByThredPool(user);
return new ResponseEntity(user, HttpStatus.OK);
}
5.利用postman 做壓力測試,測試接口
5.1 postman做loop壓力測試,需要單創建一個Collections來測試,並且當前Collections值允許放一個測試用例,比如我下面的loopSendUserInfo

5.2 設置測試規則


點擊Preview ,可以預覽測試數據

6.查看測試結果
6.1 rabbitmq

6.2 log中可以發現多個線程在推送消息

博客參考來源:1. 可取消的異步任務——FutureTask用法及解析
2. 多線程並發執行任務,取結果歸集。終極總結:Future、FutureTask、CompletionService、CompletableFuture
