webFlux 學習(一)


在學習webflux 之前我們先要了解一個概念

什么是背壓(back press)

對我們以前使用的發布訂閱模式來說.我們的以前的模式是 消費只能通過提供者提供的數據來持續消費 如果一直發送消息,那么就只能一直消費

我們對背壓做一個詳細的比喻

比如我們每家每戶,都有一個水龍頭.自來水公司相當於發布者者,我們家庭就是個訂閱者,水就是數據,在之前的模式我們的訂閱者是一個被動接受的概念

背壓就是相當於我們家里安裝了一個水龍頭,我們需要的時候就把他打開.不需要的時候就把他關閉

 

reactive stream

響應式流.這是jdk9 引入的一套標准,他能夠很好的實現背壓,但是我去官網的時候,發現jdk9已經結束.我們看看jdk11吧

jdk11有一個flow接口 里面有4個方法 

1.publisher 就是發布者

  subscribe:就是和訂閱者產生一個關系

2.subscribe 就是訂閱者

  onSubscribe:簽署一個訂閱關系傳入subscription

  onNext(): 接受到一條數據

  onError(): 就是出錯

  onComplete(): 完成

3.Subscription接口中就是其中實現背壓的關鍵 其中request方法就是告訴發布者我需要多少資源,發布者那里 就會發布多少資源

4.Processor  既可以做發布者,也可以做訂閱者,具體是用來中間環節的數據處理工作

 簡單的例子我們來運行下

每次處理完之后告訴發布者我還可以處理的數據是多少

  public static void main(String[] args) throws InterruptedException {
      //1.定義發布者

        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        //2. 定義訂閱者

        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() {

            private Flow.Subscription subscription;
            int total = 0;
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                //保存訂閱關系
                this.subscription = subscription;
                //請求一個數據
                subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("接受到: "+ item);
                total++;
                System.out.println("接受的條數為  : "+ total);
                this.subscription.request(1);
                //或者到達一定數量告訴發布者不接受數據了
                if(total ==10){
                    this.subscription.cancel();
                    System.out.println("接受數據已經足夠");

                }





            }

            @Override
            public void onError(Throwable throwable) {
            throwable.printStackTrace();
            //拋出異常就返回
            this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                System.out.println("數據處理完了.");
            }
        };

        //3發布和訂閱 建立訂閱關系

        publisher.subscribe(subscriber);


        //4.生產數據
        for (int i = 0; i < 100; i++) {
            publisher.submit(i);
        }

        //5.關閉發布者
        publisher.close();

        Thread.currentThread().join(5000);

    }

processor

  public static void main(String[] args) throws InterruptedException {
      //1.定義發布者

        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
        //2.定義一個處理器 對數據進行過濾,並轉化為string的類型
        MyProcessor myProcessor = new MyProcessor();
        //3.發布者與處理器建立關系
        publisher.subscribe(myProcessor);

        //4. 定義最終訂閱者

        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                //保存訂閱關系
                this.subscription = subscription;
                //請求一個數據
                subscription.request(1);
            }
            LinkedList<String > list = new LinkedList<>();
            @Override
            public void onNext(String item) {
                list.add(item);
                this.subscription.request(1);
                //或者到達一定數量告訴發布者不接受數據了
                System.out.println(item);
                if(list.size() == 10){
                    this.subscription.cancel();
                    System.out.println("接受數據已經足夠");
                    this.onComplete();

                }





            }

            @Override
            public void onError(Throwable throwable) {
            throwable.printStackTrace();
            //拋出異常就返回
            this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                System.out.println("數據處理完了."+list.toString());
            }
        };

        //5 處理器和最終的訂閱者建立關系

       myProcessor.subscribe(subscriber);


        //4.生產數據
        for (int i = 0; i < 100; i++) {
            publisher.submit(i);
        }

        //5.關閉發布者
        publisher.close();

        Thread.currentThread().join(5000);



    }
    static   class  MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> {
        private Flow.Subscription subscription;
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;

            subscription.request(1);

        }

        @Override
        public void onNext(Integer item) {
//            System.out.println("processor-> 處理器接收到的數據.."+item);
            if(item % 2 ==0){
                this.submit("轉->" +item);

            }
            this.subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
        throwable.printStackTrace();
        this.subscription.cancel();
        }

        @Override
        public void onComplete() {
            System.out.println("processor 處理器已經處理完成!");
        }
    }

里面的運行機制

publiser.submit():是一個阻塞方法

訂閱者有一個緩沖池.當緩沖池滿了之后 submit()方法就會被阻塞.這樣就不會再去生產數據了

subscription 緩沖的capacity默認是256個.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM