【知識整理】這可能是最好的RxJava 2.x 入門教程(四)


 

 

這可能是最好的RxJava 2.x入門教程系列專欄

文章鏈接:

這可能是最好的RxJava 2.x 入門教程(完結版)【強力推薦】

這可能是最好的RxJava 2.x 入門教程(一)

這可能是最好的RxJava 2.x 入門教程(二)

這可能是最好的RxJava 2.x 入門教程(三)

這可能是最好的RxJava 2.x 入門教程(四)

GitHub 代碼同步更新:https://github.com/nanchen2251/RxJava2Examples

為了滿足大家的飢渴難耐,GitHub將同步更新代碼,主要包含基本的代碼封裝,RxJava 2.x所有操作符應用場景介紹和實際應用場景,后期除了RxJava可能還會增添其他東西,總之,GitHub上的Demo專為大家傾心打造。傳送門:https://github.com/nanchen2251/RxJava2Examples

 

一、前言

      最近很多小伙伴私信我,說自己很懊惱,對於RxJava 2.x 系列一看就能明白,但自己寫卻又寫不出來。如果 LZ 能放上實戰情景教程就最好不過了。也是哈,單講我們的操作符,也讓我們的教程不溫不火,但 LZ 自己選擇的路,那跪着也要走完呀。所以,也就讓我可憐的小伙伴們忍忍了,操作符馬上就講完了。

二、正題

16、Single

顧名思義,Single 只會接收一個參數,而SingleObserver 只會調用onError 或者onSuccess。

 1 Single.just(new Random().nextInt())
 2                 .subscribe(new SingleObserver<Integer>() {
 3                     @Override
 4                     public void onSubscribe(@NonNull Disposable d) {
 5 
 6                     }
 7 
 8                     @Override
 9                     public void onSuccess(@NonNull Integer integer) {
10                         mRxOperatorsText.append("single : onSuccess : "+integer+"\n");
11                         Log.e(TAG, "single : onSuccess : "+integer+"\n" );
12                     }
13 
14                     @Override
15                     public void onError(@NonNull Throwable e) {
16                         mRxOperatorsText.append("single : onError : "+e.getMessage()+"\n");
17                         Log.e(TAG, "single : onError : "+e.getMessage()+"\n");
18                     }
19                 });

輸出:

 

17、distinct

去重操作符,簡單的作用就是去重。

1 Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
2                 .distinct()
3                 .subscribe(new Consumer<Integer>() {
4                     @Override
5                     public void accept(@NonNull Integer integer) throws Exception {
6                         mRxOperatorsText.append("distinct : " + integer + "\n");
7                         Log.e(TAG, "distinct : " + integer + "\n");
8                     }
9                 });

輸出:

很明顯,發射器發送的事件,在接收的時候被去重了。

 

18、debounce

去除發送頻率過快的項,看起來好像沒啥用處,但你信我,后面絕對有地方很有用武之地。

 

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        }).debounce(500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("debounce :" + integer + "\n");
                        Log.e(TAG,"debounce :" + integer + "\n");
                    }
                });

輸出:

代碼很清晰,去除發送間隔時間小於500毫秒的發射事件,所以1 和 3 被去掉了。

 

19、defer

簡單地時候就是每次訂閱都會創建一個新的Observable,並且如果沒有被訂閱,就不會產生新的Observable

 1 Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
 2             @Override
 3             public ObservableSource<Integer> call() throws Exception {
 4                 return Observable.just(1, 2, 3);
 5             }
 6         });
 7 
 8 
 9         observable.subscribe(new Observer<Integer>() {
10             @Override
11             public void onSubscribe(@NonNull Disposable d) {
12 
13             }
14 
15             @Override
16             public void onNext(@NonNull Integer integer) {
17                 mRxOperatorsText.append("defer : " + integer + "\n");
18                 Log.e(TAG, "defer : " + integer + "\n");
19             }
20 
21             @Override
22             public void onError(@NonNull Throwable e) {
23                 mRxOperatorsText.append("defer : onError : " + e.getMessage() + "\n");
24                 Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");
25             }
26 
27             @Override
28             public void onComplete() {
29                 mRxOperatorsText.append("defer : onComplete\n");
30                 Log.e(TAG, "defer : onComplete\n");
31             }
32         });

輸出:

 

20、last

last 操作符僅取出可觀察到的最后一個值,或者是滿足某些條件的最后一項。

1 Observable.just(1, 2, 3)
2                 .last(4)
3                 .subscribe(new Consumer<Integer>() {
4                     @Override
5                     public void accept(@NonNull Integer integer) throws Exception {
6                         mRxOperatorsText.append("last : " + integer + "\n");
7                         Log.e(TAG, "last : " + integer + "\n");
8                     }
9                 });

輸出:

 

21、merge

merge 顧名思義,熟悉版本控制工具的你一定不會不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多個 Observable 結合起來,接受可變參數,也支持迭代器集合。注意它和 concat 的區別在於,不用等到 發射器 A 發送完所有的事件再進行發射器 B 的發送。

1 Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
2                 .subscribe(new Consumer<Integer>() {
3                     @Override
4                     public void accept(@NonNull Integer integer) throws Exception {
5                         mRxOperatorsText.append("merge :" + integer + "\n");
6                         Log.e(TAG, "accept: merge :" + integer + "\n" );
7                     }
8                 });

輸出:

 

 

22、reduce

reduce 操作符每次用一個方法處理一個值,可以有一個 seed 作為初始值。

 1 Observable.just(1, 2, 3)
 2                 .reduce(new BiFunction<Integer, Integer, Integer>() {
 3                     @Override
 4                     public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
 5                         return integer + integer2;
 6                     }
 7                 }).subscribe(new Consumer<Integer>() {
 8             @Override
 9             public void accept(@NonNull Integer integer) throws Exception {
10                 mRxOperatorsText.append("reduce : " + integer + "\n");
11                 Log.e(TAG, "accept: reduce : " + integer + "\n");
12             }
13         });

輸出:

可以看到,代碼中,我們中間采用 reduce ,支持一個 function 為兩數值相加,所以應該最后的值是:1 + 2 = 3 + 3 = 6 , 而Log 日志完美解決了我們的問題。

 

23、scan

scan 操作符作用和上面的 reduce 一致,唯一區別是 reduce 是個只追求結果的壞人,而 scan 會始終如一地把每一個步驟都輸出。

 1 Observable.just(1, 2, 3)
 2                 .scan(new BiFunction<Integer, Integer, Integer>() {
 3                     @Override
 4                     public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
 5                         return integer + integer2;
 6                     }
 7                 }).subscribe(new Consumer<Integer>() {
 8             @Override
 9             public void accept(@NonNull Integer integer) throws Exception {
10                 mRxOperatorsText.append("scan " + integer + "\n");
11                 Log.e(TAG, "accept: scan " + integer + "\n");
12             }
13         });

輸出:

看日志,沒毛病。

 

24、window

按照實際划分窗口,將數據發送給不同的Observable

 1 mRxOperatorsText.append("window\n");
 2         Log.e(TAG, "window\n");
 3         Observable.interval(1, TimeUnit.SECONDS) // 間隔一秒發一次
 4                 .take(15) // 最多接收15個
 5                 .window(3, TimeUnit.SECONDS)
 6                 .subscribeOn(Schedulers.io())
 7                 .observeOn(AndroidSchedulers.mainThread())
 8                 .subscribe(new Consumer<Observable<Long>>() {
 9                     @Override
10                     public void accept(@NonNull Observable<Long> longObservable) throws Exception {
11                         mRxOperatorsText.append("Sub Divide begin...\n");
12                         Log.e(TAG, "Sub Divide begin...\n");
13                         longObservable.subscribeOn(Schedulers.io())
14                                 .observeOn(AndroidSchedulers.mainThread())
15                                 .subscribe(new Consumer<Long>() {
16                                     @Override
17                                     public void accept(@NonNull Long aLong) throws Exception {
18                                         mRxOperatorsText.append("Next:" + aLong + "\n");
19                                         Log.e(TAG, "Next:" + aLong + "\n");
20                                     }
21                                 });
22                     }
23                 });

輸出:

 

三、寫在最后

      至此,大部分 RxJava 2.x 的操作符就告一段落了,當然還有一些沒有提到的操作符,不是說它們不重要,而是 LZ 也要考慮大家的情況,接下來就會根據實際應用場景來對 RxJava 2.x 發起沖鋒。如果想看更多的數據,請移步 GitHub:https://github.com/nanchen2251/RxJava2Examples

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM