(05)使用DeferredResult多線程異步處理請求


  通常處理HTTP請求時使用同步處理的方式,但有時根據業務場景和性能要求異步處理可能更合適。簡單說下概念。

  同步處理:一個HTTP請求進入一個主線程,主線程處理完后給出一個HTTP響應。

  異步處理:一個HTTP請求進入一個主線程,主線程調用一個副線程,副線程處理業務邏輯,當副線程處理完后,主線程把結果返回給給客戶端。在副線程處理邏輯的同時,主線程可以空閑出來處理其他請求。因為服務器同時處理的線程數量有限,所以異步處理提升了服務器的吞吐量。

  異步處理請求有Runnable和DeferredResult兩種方式,下面舉例說一下。

  1、Runnable異步處理REST服務

@RestController
public class AsyncController {    
    
    private Logger logger=LoggerFactory.getLogger(getClass());
    
    @RequestMapping("/order")
    public Callable<String> order(){
        logger.info("主線程開始");
        Callable<String> result=new Callable<String>() {
            @Override 
            public String call() throws Exception {
                logger.info("副線程開始");
                Thread.sleep(5000);
                logger.info("副線程結束");
                return "success"; 
            }
        };
        logger.info("主線程返回");
        return result;
    }
}

  啟動服務,瀏覽器訪問http://localhost/order,頁面5秒后返回success,查看日志如下 :

  

  日志中顯示了兩個線程,主線程返回后過了5秒副線程結束。

  2、DeferredResult異步處理Rest服務

  通過上面的例子可以看到使用Runnable異步處理,副線程由主線程調起。這樣有些業務場景不能滿足,所以使用DeferredResult。舉例如下:

  

  接受下單請求和處理下單邏輯的應用不在一台服務器上。線程1接受HTTP請求,並把該請求放到一個消息隊列里。應用2監聽消息隊列,當監聽到消息隊列里有下單請求后處理下單邏輯,處理完后把結果再放到這個消息隊列里。應用1中線程2監聽消息隊列,當監聽到訂單處理結果的消息后,會根據這個消息的結果返回一個HTTP響應給客戶端。在這個場景中,線程1和線程2是完全隔離的,誰也不知道誰的存在。

  模擬消息隊列:MockQueue.java,setPlaceOrder方法表示隊列中處理下單請求,創建一個Thread模擬其它應用環境。

@Component
public class MockQueue {
    
    private Logger logger=LoggerFactory.getLogger(getClass());
    
    private String placeOrder;
    private String completeOrder;
    
    public String getPlaceOrder() {
        return placeOrder;
    }
    public void setPlaceOrder(String placeOrder){
        new Thread(()->{
            logger.info("接到下單請求:"+placeOrder);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.completeOrder = placeOrder;
            logger.info("下單請求處理完畢:"+placeOrder);
        }).start();
    }
    public String getCompleteOrder() {
        return completeOrder;
    }
    public void setCompleteOrder(String completeOrder) {
        this.completeOrder = completeOrder;
    }
}

  在線程1和線程2之間傳遞DeferredResult對象:DeferredResultHolder.java,Map中key是訂單號,value是處理結果。

@Component
public class DeferredResultHolder {

    private Map<String,DeferredResult<String>> map=new HashMap<String,DeferredResult<String>>();

    public Map<String, DeferredResult<String>> getMap() {
        return map;
    }
    public void setMap(Map<String, DeferredResult<String>> map) {
        this.map = map;
    }
}

  處理請求:AsyncController.java

@RestController
public class AsyncController {    
    
    private Logger logger=LoggerFactory.getLogger(getClass());
    
    @Autowired
    private MockQueue mockQueue;
    @Autowired
    private DeferredResultHolder deferredResultHolder;
    
    @RequestMapping("/order")
    public DeferredResult<String> order(){
        logger.info("主線程開始");
        String orderNumber=RandomStringUtils.randomNumeric(8);
        mockQueue.setPlaceOrder(orderNumber);
        DeferredResult<String> result=new DeferredResult<>();
        deferredResultHolder.getMap().put(orderNumber, result);
        logger.info("主線程返回");
        return result;
    } 
}

  模擬線程2的代碼,監聽器:QueueListener.java。ContextRefreshedEvent事件是整個Spring容器初始化完畢的事件。

@Component
public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {
    
    private Logger logger=LoggerFactory.getLogger(getClass());
    
    @Autowired
    private MockQueue mockQueue;
    @Autowired
    private DeferredResultHolder deferredResultHolder;
    
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        new Thread(()->{//開啟一個線程,否則Spring容器阻塞,無法啟動
            while(true) {
                if(StringUtils.isNotBlank(mockQueue.getCompleteOrder())) {
                    String orderNumber=mockQueue.getCompleteOrder();
                    logger.info("返回訂單處理結果:"+orderNumber);
                    deferredResultHolder.getMap().get(orderNumber).setResult("place order success");
                    mockQueue.setCompleteOrder(null);
                }else {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

  啟動容器:請求3次http://localhost/order,頁面輸出place order success,打印日志如下:

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM