Spring boot 實現高吞吐量異步處理(適用於高並發場景)


 

技術要點

org.springframework.web.context.request.async.DeferredResult<T>

 

示例如下:

1.   新建Maven項目  async

 

2.   pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
        http://maven.apache.org/xsd/maven-4.0.0.xsd">


    <modelVersion>4.0.0</modelVersion>
    <groupId>com.java</groupId>
    <artifactId>async</artifactId>
    <version>1.0.0</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>


    <dependencies>

        <!-- Spring Boot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <!-- 熱部署 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>springloaded</artifactId>
            <version>1.2.8.RELEASE</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

3.   AsyncStarter.java

package com.java;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class AsyncStarter {

    public static void main(String[] args) {
        SpringApplication.run(AsyncStarter.class, args);
    }

}

 

4.   AsyncVo.java

package com.java.vo;

import org.springframework.web.context.request.async.DeferredResult;

/**
 * 存儲異步處理信息
 * 
 * @author Logen
 *
 * @param <I> 接口輸入參數
 * @param <O> 接口返回參數
 */
public class AsyncVo<I, O> {

    /**
     * 請求參數
     */
    private I params;

    /**
     * 響應結果
     */
    private DeferredResult<O> result;

    public I getParams() {
        return params;
    }

    public void setParams(I params) {
        this.params = params;
    }

    public DeferredResult<O> getResult() {
        return result;
    }

    public void setResult(DeferredResult<O> result) {
        this.result = result;
    }

}

 

5.   RequestQueue.java

package com.java.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.stereotype.Component;

import com.java.vo.AsyncVo;

/**
 * 存放所有異步處理接口請求隊列的對象,一個接口對應一個隊列
 * 
 * @author Logen
 *
 */
@Component
public class RequestQueue {

    /**
     * 處理下訂單接口的隊列,設置緩沖容量為50
     */
    private BlockingQueue<AsyncVo<String, Object>> orderQueue = new LinkedBlockingQueue<>(50);

    public BlockingQueue<AsyncVo<String, Object>> getOrderQueue() {
        return orderQueue;
    }

}

 

6.   OrderTask.java

package com.java.task;

import java.util.HashMap;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.java.queue.RequestQueue;
import com.java.vo.AsyncVo;

/**
 * 處理訂單接口的任務,每個任務類處理一種接口
 * 
 * @author Logen
 *
 */
@Component
public class OrderTask extends Thread {

    @Autowired
    private RequestQueue queue;

    private boolean running = true;

    @Override
    public void run() {
        while (running) {
            try {
                AsyncVo<String, Object> vo = queue.getOrderQueue().take();
                System.out.println("[ OrderTask ]開始處理訂單");

                String params = vo.getParams();
                Thread.sleep(3000);
                Map<String, Object> map = new HashMap<>();
                map.put("params", params);
                map.put("time", System.currentTimeMillis());

                vo.getResult().setResult(map);

                System.out.println("[ OrderTask ]訂單處理完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
                running = false;
            }

        }
    }

    public void setRunning(boolean running) {
        this.running = running;
    }

}

 

7.   QueueListener.java

package com.java.listener;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.java.task.OrderTask;

/**
 * 隊列監聽器,初始化啟動所有監聽任務
 * 
 * @author Logen
 *
 */
@Component
public class QueueListener {

    @Autowired
    private OrderTask orderTask;

    /**
     * 初始化時啟動監聽請求隊列
     */
    @PostConstruct
    public void init() {
        orderTask.start();
    }

    /**
     * 銷毀容器時停止監聽任務
     */
    @PreDestroy
    public void destory() {
        orderTask.setRunning(false);
    }

}

 

8.   OrderController.java

package com.java.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import com.java.queue.RequestQueue;
import com.java.vo.AsyncVo;

/**
 * <blockquote>
 * 
 * <pre>
 * 
 * 模擬下單處理,實現高吞吐量異步處理請求
 * 
 * 1、 Controller層接口只接收請求,不進行處理,而是把請求信息放入到對應該接口的請求隊列中
 * 2、 該接口對應的任務類監聽對應接口的請求隊列,從隊列中順序取出請求信息並進行處理
 * 
 * 優點:接口幾乎在收到請求的同時就已經返回,處理程序在后台異步進行處理,大大提高吞吐量
 * 
 * 
 * </pre>
 * 
 * </blockquote>
 * 
 * @author Logen
 *
 */
@RestController
public class OrderController {

    @Autowired
    private RequestQueue queue;

    @GetMapping("/order")
    public DeferredResult<Object> order(String number) throws InterruptedException {
        System.out.println("[ OrderController ] 接到下單請求");
        System.out.println("當前待處理訂單數: " + queue.getOrderQueue().size());

        AsyncVo<String, Object> vo = new AsyncVo<>();
        DeferredResult<Object> result = new DeferredResult<>();

        vo.setParams(number);
        vo.setResult(result);

        queue.getOrderQueue().put(vo);
        System.out.println("[ OrderController ] 返回下單結果");
        return result;
    }

}

 

9.   運行 AsyncStarter.java ,啟動測試

瀏覽器輸入 http://localhost:8080/order?number=10001

正常情況處理3秒返回,返回結果如下

{"time":1548241500718,"params":"10001"}

 

觀察控制台打印日志,如下所示:

[ OrderController ] 接到下單請求
當前待處理訂單數: 0
[ OrderController ] 返回下單結果
[ OrderTask ]開始處理訂單
[ OrderTask ]訂單處理完成

結論Controller層幾乎在接收到請求的同時就已經返回,處理程序在后台異步處理任務。

 

快速多次刷新瀏覽器,目的為了高並發測試,觀察控制台打印信息

現象:Controller層快速返回,待處理請求在隊列中開始增加,異步處理程序在按順序處理請求。

 

優點:對客戶端響應時間不變,但提高了服務端的吞吐量。大大提升高並發處理性能!

 

 

.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM