通常處理HTTP請求時使用同步處理的方式,但有時根據業務場景和性能要求異步處理可能更合適。簡單說下概念。
同步處理:一個HTTP請求進入一個主線程,主線程處理完后給出一個HTTP響應。
異步處理:一個HTTP請求進入一個主線程,主線程調用一個副線程,副線程處理業務邏輯,當副線程處理完后,主線程把結果返回給給客戶端。在副線程處理邏輯的同時,主線程可以空閑出來處理其他請求。因為服務器同時處理的線程數量有限,所以異步處理提升了服務器的吞吐量。
異步處理請求有Runnable和DeferredResult兩種方式,下面舉例說一下。
1、Runnable異步處理REST服務
@RestController public class AsyncController { private Logger logger=LoggerFactory.getLogger(getClass()); @RequestMapping("/order") public Callable<String> order(){ logger.info("主線程開始"); Callable<String> result=new Callable<String>() { @Override public String call() throws Exception { logger.info("副線程開始"); Thread.sleep(5000); logger.info("副線程結束"); return "success"; } }; logger.info("主線程返回"); return result; } }
啟動服務,瀏覽器訪問http://localhost/order,頁面5秒后返回success,查看日志如下 :
日志中顯示了兩個線程,主線程返回后過了5秒副線程結束。
2、DeferredResult異步處理Rest服務
通過上面的例子可以看到使用Runnable異步處理,副線程由主線程調起。這樣有些業務場景不能滿足,所以使用DeferredResult。舉例如下:
接受下單請求和處理下單邏輯的應用不在一台服務器上。線程1接受HTTP請求,並把該請求放到一個消息隊列里。應用2監聽消息隊列,當監聽到消息隊列里有下單請求后處理下單邏輯,處理完后把結果再放到這個消息隊列里。應用1中線程2監聽消息隊列,當監聽到訂單處理結果的消息后,會根據這個消息的結果返回一個HTTP響應給客戶端。在這個場景中,線程1和線程2是完全隔離的,誰也不知道誰的存在。
模擬消息隊列:MockQueue.java,setPlaceOrder方法表示隊列中處理下單請求,創建一個Thread模擬其它應用環境。
@Component public class MockQueue { private Logger logger=LoggerFactory.getLogger(getClass()); private String placeOrder; private String completeOrder; public String getPlaceOrder() { return placeOrder; } public void setPlaceOrder(String placeOrder){ new Thread(()->{ logger.info("接到下單請求:"+placeOrder); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } this.completeOrder = placeOrder; logger.info("下單請求處理完畢:"+placeOrder); }).start(); } public String getCompleteOrder() { return completeOrder; } public void setCompleteOrder(String completeOrder) { this.completeOrder = completeOrder; } }
在線程1和線程2之間傳遞DeferredResult對象:DeferredResultHolder.java,Map中key是訂單號,value是處理結果。
@Component public class DeferredResultHolder { private Map<String,DeferredResult<String>> map=new HashMap<String,DeferredResult<String>>(); public Map<String, DeferredResult<String>> getMap() { return map; } public void setMap(Map<String, DeferredResult<String>> map) { this.map = map; } }
處理請求:AsyncController.java
@RestController public class AsyncController { private Logger logger=LoggerFactory.getLogger(getClass()); @Autowired private MockQueue mockQueue; @Autowired private DeferredResultHolder deferredResultHolder; @RequestMapping("/order") public DeferredResult<String> order(){ logger.info("主線程開始"); String orderNumber=RandomStringUtils.randomNumeric(8); mockQueue.setPlaceOrder(orderNumber); DeferredResult<String> result=new DeferredResult<>(); deferredResultHolder.getMap().put(orderNumber, result); logger.info("主線程返回"); return result; } }
模擬線程2的代碼,監聽器:QueueListener.java。ContextRefreshedEvent事件是整個Spring容器初始化完畢的事件。
@Component public class QueueListener implements ApplicationListener<ContextRefreshedEvent> { private Logger logger=LoggerFactory.getLogger(getClass()); @Autowired private MockQueue mockQueue; @Autowired private DeferredResultHolder deferredResultHolder; @Override public void onApplicationEvent(ContextRefreshedEvent event) { new Thread(()->{//開啟一個線程,否則Spring容器阻塞,無法啟動 while(true) { if(StringUtils.isNotBlank(mockQueue.getCompleteOrder())) { String orderNumber=mockQueue.getCompleteOrder(); logger.info("返回訂單處理結果:"+orderNumber); deferredResultHolder.getMap().get(orderNumber).setResult("place order success"); mockQueue.setCompleteOrder(null); }else { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
啟動容器:請求3次http://localhost/order,頁面輸出place order success,打印日志如下: