RxJava2|Flowable以及背壓


RxJava2 Flowable以及背壓

前述

java-1.8

maven-3

rxjava-2.2.3

背壓

背壓是指在異步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略。

----https://www.jianshu.com/p/0cd258eecf60

Flowable的官方介紹:

io.reactivex.Flowable: 0..N flows, supporting Reactive-Streams and backpressure

0...N flows, 支持響應式流和背壓(backpressure)

只有在需要處理背壓問題時,才需要使用Flowable。

由於只有在上下游運行在不同的線程中,且上游發射數據的速度大於下游接收處理數據的速度時,才會產生背壓問題;
所以,如果能夠確定:

  1. 上下游運行在同一個線程中,

  2. 上下游工作在不同的線程中,但是下游處理數據的速度不慢於上游發射數據的速度,

  3. 上下游工作在不同的線程中,但是數據流中只有一條數據
    則不會產生背壓問題,就沒有必要使用Flowable,以免影響性能。

    由於基於Flowable發射的數據流,以及對數據加工處理的各操作符都添加了背壓支持,附加了額外的邏輯,其運行效率要比Observable慢得多。

此段出處: https://www.jianshu.com/p/ff8167c1d191

示例(Flowable簡單使用)

Flowable邏輯類 - HelloFlowable.java

package yag;


import io.reactivex.*;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;


public class HelloFlowable {

    public void helloFlowable(){
        // 基本上和Observable一樣.
        Flowable
                .create((FlowableOnSubscribe<Integer>) flowableEmitter -> {
                    Integer i = 0;
                    while ( i < 7) {
                        i++;
                        flowableEmitter.onNext(i);
                    }
                }, BackpressureStrategy.ERROR/* 背壓 */)

                .subscribe(new Subscriber<Integer>() {

                    private Subscription subscription;
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        subscription.request(Long.MAX_VALUE);
                        this.subscription = subscription;
                    }

                    @Override
                    public void onNext(Integer i) {
                        if (i == 5){
                            // 退出接收
                            subscription.cancel();
                        }else {
                            System.out.println("現在接收到的信號是: 第" + i + "信號");
                        }
                    }

                    @Override
                    public void onError(Throwable throwable) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });


    }
}

執行者 - Runner.java

package yag;

public class Runner {

    public static void main(String[] args){
        HelloFlowable helloFlowable = new HelloFlowable();
        helloFlowable.helloFlowable();
    }
}

執行結果

現在接收到的信號是: 第1信號
現在接收到的信號是: 第2信號
現在接收到的信號是: 第3信號
現在接收到的信號是: 第4信號

Process finished with exit code 0

小結

request()

subscription.request(Long.MAX_VALUE);

這個方法就是用來向生產者申請可以消費的事件數量。這樣我們便可以根據本身的消費能力進行消費事件。

當調用了request()方法后,生產者便發送對應數量的事件供消費者消費。

BackpressureStrategy.ERROR

參考: https://www.jianshu.com/p/1f4867ce3c01

這是一個背壓操作策略. (BackpressureStrategy - 背壓策略)

ERROR策略下,如果緩存池溢出,就會立刻拋出MissingBackpressureException異常。即保證在異步操作中,事件累積不能超過128,超過即出現異常。消費者不能再接收事件了,但生產者並不會停止。

其他

  • BUFFER - 所謂BUFFER就是把RxJava中默認的只能存128個事件的緩存池換成一個大的緩存池,支持存很多很多的數據。消費者通過request()即使傳入一個很大的數字,生產者也會生產事件,並將處理不了的事件緩存。

    比較消耗內存, 除非是我們比較了解消費者的消費能力,能夠把握具體情況,不會產生OOM。(OutOfMemoryError)

  • DROP - 當消費者處理不了事件,就丟棄。

  • LATEST - 消費者通過request()傳入其需求n,然后生產者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丟掉。
    唯一的區別就是LATEST總能使消費者能夠接收到生產者產生的最后一個事件

個人補充:

  • MISSING - 寫入過程中沒有任何緩沖或丟棄, 即不操作.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM