webFlux 學習(二)


webFlux

webFlux 是spring5提出的,一個非阻塞,運行在netty或者Servlet3.1之上,

MVC和webFlux 有什么關系呢?

1.阻塞和非阻塞

webflux 是一個非阻塞的模式 可以在一個線程里可以處理更多的請求

傳統的mvc是一個阻塞的開發模式 一個請求對應我們容器里的一個線程 

2.運行環境

mvc 是基於servlet api 所以必須運行servlet 容器上面

webflux是基於響應式流,它可以運行servlet 或者netty 上面

3.數據方面

關系數據暫時不能使用webflux

優勢

1.支持高並發量 水平擴展/垂直擴展,我們可以使用webflux 進行垂直擴展.

servlet

idea 創建servlet https://blog.csdn.net/a376298333/article/details/79121548

為什么要使用異步servlet?

@WebServlet(urlPatterns = { "/AsyncServlet" },asyncSupported = true)
public class AyncServlet extends HttpServlet {
    protected void doPost(javax.servlet.http.HttpServletRequest request, javax.servlet.http.HttpServletResponse response) throws javax.servlet.ServletException, IOException {

    }

    protected void doGet(javax.servlet.http.HttpServletRequest request, javax.servlet.http.HttpServletResponse response) throws javax.servlet.ServletException, IOException {
        long start = System.currentTimeMillis();
        //1.開啟異步
        AsyncContext asyncContext = request.startAsync();
        //2.執行耗時操作
        CompletableFuture.runAsync(()->{
            something(asyncContext,asyncContext.getRequest(),asyncContext.getResponse());
        });


        System.out.println("aynctime use: "+ (System.currentTimeMillis()-start));
    }

    private void something(AsyncContext asyncContext, ServletRequest request, ServletResponse response)  {
        //模擬耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            response.getWriter().append("aync done");
        } catch (IOException e) {
            e.printStackTrace();
        }
        //業務代碼處理完.我們要通知他
        asyncContext.complete();
    }
}

aynctime use: 11

這是一個異步的servlet http://localhost:8080/AsyncServlet 訪問在前台來看還是5秒鍾

但是后台的時間是11ms 不會阻塞tomcat線程 可以把一些耗時的操作放在獨立線程池里,我們的serlet線程就可以處理下一個線程達到比較高的吞吐量

同步servlet阻塞了什么?

/**
 * @Created by xiaodao
 */
@WebServlet("/SyncServlet")
public class SyncServlet extends javax.servlet.http.HttpServlet {
    protected void doPost(javax.servlet.http.HttpServletRequest request, javax.servlet.http.HttpServletResponse response) throws javax.servlet.ServletException, IOException {

    }

    protected void doGet(javax.servlet.http.HttpServletRequest request, javax.servlet.http.HttpServletResponse response) throws javax.servlet.ServletException, IOException {
        long start = System.currentTimeMillis();
        System.out.println(start);
        something(request,response)
        ;

        System.out.println("time use: "+ (System.currentTimeMillis()-start));
    }

    private void something(HttpServletRequest request, HttpServletResponse response) throws IOException {
        //模擬耗時操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        response.getWriter().append("done");
    }
}

time use :5001

在瀏覽器中訪問可以看到我們返回需要5m中.這時候我們就知道

同步的servlet阻塞了tomcat容器的servlet線程,

當我們的網絡請求->tomcat容器->為每一個請求啟動一個線程去處理->線程里找一個servlet線程來處理

異步servlet怎么工作的?

1.我們開啟異步支持

2.把頁面代碼放到獨立的線程池執行

3.調用異步上下文的comlate方法通知他結束

webFlux 入門

什么是reactor編程呢?就是jdk8 stream + jdk11 reactive stream 

mono: 0-1個元素

flux:0-N個元素

我們來一個示例程序看下:

/**
 * @Created by xiaodao
 */
public class Main {

    public static void main(String[] args) {
        String[] arr = {"1","2","3","4"};
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription =subscription;
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {

            }
        };
        //jdk8 stream 這個時候並沒有執行
        Flux.fromArray(arr).map(s->Integer.parseInt(s))
                .subscribe(subscriber);//jdk9 的 reactive stream
    }
}

jdk8 的流可以看做是一個發布者

jdk9的subscribe可以看做是一個訂閱者.

來看下webflux是如何執行的?

@RestController
@Slf4j
public class TestController {


    @GetMapping("/test1")
    public String test1(){
        log.info("start");
        String str  = createStr();
        log.info("end");
        return  str;
    }

    @GetMapping("/test2")
    public Mono<String> test2(){
        log.info("mono start");
        Mono<String> stringMono = Mono.fromSupplier(() -> createStr());
        log.info("mono end ");
        return stringMono;
    }


    public String createStr(){
        try {

            TimeUnit.SECONDS.sleep(5);


        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello web flux ";
    }
}

上面的代碼是一個簡單的操作我們模擬業務需求讓代碼耗時5秒鍾的時間

在瀏覽器中分別訪問test1 test2 在瀏覽器中都是等待5秒鍾,這點都都一樣,對於瀏覽器來說不存在同步和異步之說,阻塞和非阻塞,同步與異步只存在於服務端

接下來我們來看來在spring中是如何執行的.

 

我們可以看到當我們訪問test1 的時候test1 controller方法就占用了5秒鍾的時間

第二種模式,controller是基本沒有耗時的,我們新模式中返回的mono實際上是返回了一個流,當調用subscibe()方法的時候才會執行 

flux

   @GetMapping(value = "/test3",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> test3(){
        log.info("mono start");

        Flux<String> stringMono = Flux.fromStream(IntStream.range(1,5).mapToObj(i->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "flux data "+i+"/n";
        }));
        return stringMono;
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM