聲明:原創在這里https://blog.csdn.net/u011677147/article/details/80271174,在此也謝謝哥們。
1、目錄結構
2、BusinessThread.java
package com.cn.commodity.config; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @Component @Scope("prototype")//spring 多例 public class BusinessThread implements Runnable{ private String acceptStr; public BusinessThread(String acceptStr) { this.acceptStr = acceptStr; } public String getAcceptStr() { return acceptStr; } public void setAcceptStr(String acceptStr) { this.acceptStr = acceptStr; } @Override public void run() { //業務操作 System.out.println("多線程已經處理訂單插入系統,訂單號:"+acceptStr); //線程阻塞 /*try { Thread.sleep(1000); System.out.println("多線程已經處理訂單插入系統,訂單號:"+acceptStr); } catch (InterruptedException e) { e.printStackTrace(); }*/ } }
3、TestThreadPoolManager.java
package com.cn.commodity.studyTest; import com.cn.commodity.config.BusinessThread; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.stereotype.Component; import java.util.Map; import java.util.Queue; import java.util.concurrent.*; @Component public class TestThreadPoolManager implements BeanFactoryAware { //用於從IOC里取對象 private BeanFactory factory; //如果實現Runnable的類是通過spring的application.xml文件進行注入,可通過 factory.getBean()獲取,這里只是提一下 // 線程池維護線程的最少數量 private final static int CORE_POOL_SIZE = 2; // 線程池維護線程的最大數量 private final static int MAX_POOL_SIZE = 10; // 線程池維護線程所允許的空閑時間 private final static int KEEP_ALIVE_TIME = 0; // 線程池所使用的緩沖隊列大小 private final static int WORK_QUEUE_SIZE = 50; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { factory = beanFactory; } /** * 用於儲存在隊列中的訂單,防止重復提交,在真實場景中,可用redis代替 驗證重復 */ Map<String, Object> cacheMap = new ConcurrentHashMap<>(); /** * 訂單的緩沖隊列,當線程池滿了,則將訂單存入到此緩沖隊列 */ Queue<Object> msgQueue = new LinkedBlockingQueue<Object>(); /** * 當線程池的容量滿了,執行下面代碼,將訂單存入到緩沖隊列 */ final RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //訂單加入到緩沖隊列 msgQueue.offer(((BusinessThread) r).getAcceptStr()); System.out.println("系統任務太忙了,把此訂單交給(調度線程池)逐一處理,訂單號:" + ((BusinessThread) r).getAcceptStr()); } }; /**創建線程池*/ final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler); /**將任務加入訂單線程池*/ public void addOrders(String orderId){ System.out.println("此訂單准備添加到線程池,訂單號:" + orderId); //驗證當前進入的訂單是否已經存在 if (cacheMap.get(orderId) == null) { cacheMap.put(orderId, new Object()); BusinessThread businessThread = new BusinessThread(orderId); threadPool.execute(businessThread); } } /** * 線程池的定時任務----> 稱為(調度線程池)。此線程池支持 定時以及周期性執行任務的需求。 */ final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); /** * 檢查(調度線程池),每秒執行一次,查看訂單的緩沖隊列是否有 訂單記錄,則重新加入到線程池 */ final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { //判斷緩沖隊列是否存在記錄 if(!msgQueue.isEmpty()){ //當線程池的隊列容量少於WORK_QUEUE_SIZE,則開始把緩沖隊列的訂單 加入到 線程池 if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) { String orderId = (String) msgQueue.poll(); BusinessThread businessThread = new BusinessThread(orderId); threadPool.execute(businessThread); System.out.println("(調度線程池)緩沖隊列出現訂單業務,重新添加到線程池,訂單號:"+orderId); } } } }, 0, 1, TimeUnit.SECONDS); /**獲取消息緩沖隊列*/ public Queue<Object> getMsgQueue() { return msgQueue; } /**終止訂單線程池+調度線程池*/ public void shutdown() { //true表示如果定時任務在執行,立即中止,false則等待任務結束后再停止 System.out.println("終止訂單線程池+調度線程池:"+scheduledFuture.cancel(false)); scheduler.shutdown(); threadPool.shutdown(); } }
4、TestController.java
package com.cn.commodity.controller; import com.cn.commodity.studyTest.TestThreadPoolManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import java.util.Queue; import java.util.UUID; @RestController public class TestController { @Autowired TestThreadPoolManager testThreadPoolManager; /** * 測試模擬下單請求 入口 * @param id * @return */ @GetMapping("/start/{id}") public String start(@PathVariable Long id) { //模擬的隨機數 String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString(); testThreadPoolManager.addOrders(orderNo); return "Test ThreadPoolExecutor start"; } /** * 停止服務 * @param id * @return */ @GetMapping("/end/{id}") public String end(@PathVariable Long id) { testThreadPoolManager.shutdown(); Queue q = testThreadPoolManager.getMsgQueue(); System.out.println("關閉了線程服務,還有未處理的信息條數:" + q.size()); return "Test ThreadPoolExecutor start"; } }
5、使用Jmeter測試,下載地址為:https://jmeter.apache.org/download_jmeter.cgi,下載完成后,解壓點擊bin/下面的ApacheJMeter.jar文件,就會出現界面,啟動springboot,按以下配置,就可以執行,模擬高並發。