體驗RxJava


RxJava是 ReactiveX在 Java上的開源的實現,簡單概括,它就是一個實現異步操作的庫,使用時最直觀的感受就是在使用一個觀察者模式的框架來完成我們的業務需求;
其實java已經有了現成的觀察者模式實現:java.util.Observable和java.util.Observer,那么為何還要RxJava呢?

java.util.Observable是典型的觀察者模式實現,而RxJava主要功能如下:

  1. 生產者加工數據,然后發布給觀察者;
  2. 觀察者處理數據;
  3. 從生產者生產數據到觀察者處理數據,這之間傳遞的數據可以被處理;
  4. 線程切換,生產者發布數據和觀察者處理數據可以在指定線程中處理;

RxJava還有個特點就是支持鏈式編碼,再配合lambda,可以保持簡潔和清晰的邏輯(注意是邏輯簡潔,代碼是否簡潔只能取決於實際業務);

看得出,除了實現觀察者模式,RxJava還提供了更豐富的能力,純文字太枯燥了,我們來編碼實戰吧!

源碼下載

如果您不打算寫代碼,也可以從GitHub上下載本次實戰的源碼,地址和鏈接信息如下表所示:

名稱 鏈接 備注
項目主頁 https://github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) git@github.com:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議


這個git項目中有多個文件夾,本章的應用在rxdemo文件夾下,如下圖紅框所示:

這里寫圖片描述

源碼僅用來參考,建議自己把代碼寫出來,才能印象深刻;

准備工作之一:日志

本次詩函通過打印日志來觀察代碼執行情況,會打印時間和執行線程,這里用的是slf4j+log4j的方式;

工程創建完畢后,結構如下:

這里寫圖片描述

  • log4j.propertieds文件的位置請注意,需要放在上圖紅框位置;
  • 為了在日志中打印當前線程,log4j的配置如上圖綠框所示, %t表示當前線程, %r表示程序已經執行的時間;
  • 在pom文件中,對日志的依賴為:
<dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.8.0-alpha2</version>
    </dependency>

准備工作之二:單元測試

驗證代碼是通過單元測試實現的,pom.xml文件中,對單元測試的依賴為:

<dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

單元測試代碼在如下圖紅框位置:

這里寫圖片描述

准備工作之三:支持lambda

支持lambda表達式表現在maven支持和intellij idea工具支持兩個方面,具體設置請參照《設置Intellij idea和maven,支持lambda表達式》

准備工作結束,可以正式開發了

RxJava的依賴庫

依賴庫選用1.0.10版本,如下:

<dependency>
      <groupId>io.reactivex</groupId>
      <artifactId>rxjava</artifactId>
      <version>1.0.10</version>
    </dependency>

最簡單的觀察者模式實現

第一個例子,我們實踐最簡單的用法:

  1. 創建App.java類,聲明日志服務:
public class App 
{
    private static final Logger logger = LoggerFactory.getLogger(App.class);
  1. 開發doExecute方法實現基於Rxjava的觀察者模式:
public void doExecute(){
        logger.debug("start doExecute");

        //聲明一個觀察者,用來響應被觀察者發布的事件
        Observer<String> observer = new Observer<String>() {
            /**
             * 被觀察者發布結束事件的時候,該方法會被調用
             */
            public void onCompleted() {
                logger.debug("start onCompleted");
            }

            /**
             * 被觀察者發布事件期間,和觀察者處理事件期間,發生異常的時候,該方法都會被調用
             */
            public void onError(Throwable throwable) {
                logger.debug("start onError : " + throwable);
            }

            /**
             * 被觀察者發布事件后,該方法會被調用
             * @param s
             */
            public void onNext(String s) {
                logger.debug("start onNext [" + s + "]");
            }
        };

        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            public void call(Subscriber<? super String> subscriber) {
                //向觀察者發布事件
                subscriber.onNext("Hello");
                //再次向觀察者發布事件
                subscriber.onNext("world");
                //通知觀察者,訂閱結束
                subscriber.onCompleted();
            }
        });

        logger.debug("try subscribe");
        
        //執行訂閱
        observable.subscribe(observer);

        logger.debug("finish doExecute");
    }

代碼的邏輯很簡單,定義觀察者(observer),被觀察者(observable),執行訂閱;
3. 本次測試用junit來執行,在test目錄下創建一個AppTest類,具體的目錄和內容如下圖:

這里寫圖片描述

打開控制台,在pom.xml文件所在目錄下執行mvn test,即可看到日志:

2017-06-10 10:02:02  [ main:0 ] - [ DEBUG ]  start doExecute
2017-06-10 10:02:02  [ main:19 ] - [ DEBUG ]  try subscribe
2017-06-10 10:02:02  [ main:22 ] - [ DEBUG ]  start onNext [Hello]
2017-06-10 10:02:02  [ main:22 ] - [ DEBUG ]  start onNext [world]
2017-06-10 10:02:02  [ main:22 ] - [ DEBUG ]  start onCompleted
2017-06-10 10:02:02  [ main:23 ] - [ DEBUG ]  finish doExecute

執行的代碼是observable.subscribe,此代碼執行后,觀察者的onNext和onCompleted被回調;

簡化的觀察者

在上面的doExecute方法中,我們創建的被觀察者實現了onNext,onError,onCompleted這三個方法,有的場景下我們只關注onNext,對onError和onCompleted都不關心,此時我們可以使用Action1對象來替代Observer,代碼如下:

public void doAction(){
        logger.debug("start doAction");

        Action1<String> onNextAction = new Action1<String>() {
            public void call(String s) {
                logger.debug("start Action1 onNextAction [" + s + "]");
            }
        };

        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello");
                subscriber.onNext("world");
                subscriber.onCompleted();
            }
        });

        logger.debug("try subscribe");

        observable.subscribe(onNextAction);

        logger.debug("finish doAction");
    }

可以看到,只要一個Action1對象即可;

另外,對於錯誤回調也可以用Action1來實現,事件完成的回調用Action0,Action0的特點是方法沒有返回,對於的這些Action,observable.subscribe方法提供了各種重載,我們可以按照自己需要來決定使用哪種,傳入哪些Action;

簡化的被觀察者

在上面的doExecute方法中,被觀察者發布了兩個事件:onNext("Hello")和onNext("world"),我們創建被觀察者是通過Observable.create,然后在call方法中寫入onNext("Hello"),onNext("world")最后在寫上subscriber.onCompleted(),對於這種發布確定的對象事件的場景,rxjava已經做了簡化,直接上代碼:

public void doFromChain(){
        logger.debug("start doFromChain");


        //聲明一個觀察者,用來響應被觀察者發布的事件
        Action1<String> observer = new Action1<String>() {
            /**
             * 被觀察者發布事件后,該方法會被調用
             * @param s
             */
            public void call(String s) {
                logger.debug("start onNext [" + s + "]");
            }
        };


        String[] array = {"Hello", "world"};

        //from方法可以直接創建被觀察者,並且發布array中的元素對應的事件
        Observable.from(array).subscribe(observer);

        logger.debug("finish doFromChain");
    }

如上代碼,之前我們創建被觀察者,並且在call方法中依次執行onNext的操作,這些事情都被Observable.from(array)簡化了;

進一步簡化的被觀察者

Observable.from接受的是一個數組,而Observable.just可以直接接受多個元素,我們連創建數組的步驟都省略掉了,再把Action1簡化為lambda,可以得到更加簡化的代碼:

public void doJustChain(){
        logger.debug("start doJustChain");

        Observable.just("Hello", "world")
                .subscribe(s -> logger.debug("start onNext [" + s + "]"));

        logger.debug("finish doJustChain");
    }

經歷了以上的實戰,我們對Rxjava的基本能力有了了解,下面了解一些更復雜的用法;

基本變換

試想,如果被觀察者發布的事件是int型,但是觀察者是處理String型事件的,那么此觀察者如何才能處理被觀察者發布的事件呢,除了修改觀察者或者被觀察者的代碼,我們還可以使用Rxjava的變換方法-map:

public void doMap(){
        logger.debug("start doMap");

        Observable.just(1001, 1002)
        .map(intValue -> "int[" + intValue + "]")
        .subscribe(s -> logger.debug("Action1 call invoked [" + s + "]"));


        logger.debug("finish doMap");
    }

代碼中可以看到,map方法接受的是Func1接口的實現,由於此接口只聲明了一個方法,所以這里被簡化成了lambda表達式,lambda表達式的入參由just的入參類型推斷而來,是int型,返回的是字符串,后面的代碼就可以直接用String型的消費者來處理事件了;

更自由的變換

map方法提供了一對一的映射,但是實際場景中未必是一對一的,例如一個int數字要發起兩個String事件,map就不合適了,RxJava還有個flatMap方法,可以提供這種能力,此處沒用lambda來簡化,可以看的更清楚:

public void doFlatMap(){
        logger.debug("start doFlatMap");

        Observable.just(101, 102, 103)
                .flatMap(new Func1<Integer, Observable<String>>() {
                    public Observable<String> call(final Integer integer) {
                        return Observable.create(new Observable.OnSubscribe<String>() {
                            public void call(Subscriber<? super String> subscriber) {
                                subscriber.onNext("after flatMap (" + integer + ")");
                                subscriber.onNext("after flatMap (" + (integer+1000) + ")");
                            }
                        });
                    }
                })
                .subscribe(s -> logger.debug("Action1 call invoked [" + s + "]"));

        logger.debug("finish doFlatMap");
    }

可以看到,被觀察者發布了三個int事件:101, 102, 103,在flatMap中訂閱了這三個事件,每個事件都可以新建一個被觀察者,這個被觀察者拿到了101,102,103,然后可以按實際需求,選擇發布一個或者多個String事件,甚至不發布,這里發布的事件,都會被觀察者收到;

線程調度

Rxjava可以指定被觀察者發布事件的線程,也可以制定觀察者處理事件的線程:

public void doSchedule(){
        logger.debug("start doSchedule");

        Observable.create(subscriber -> {
            logger.debug("enter subscribe");
            subscriber.onNext("Hello");
            subscriber.onCompleted();
        })
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .flatMap(str -> {
            logger.debug("enter flatMap");
            return Observable.create(
                    subscriber -> subscriber.onNext("after flatMap (" + str + ")")
            );
            }
        )
        .observeOn(Schedulers.newThread())
        .subscribe(s -> logger.debug("Observer's onNext invoked [" + s + "]"));
        logger.debug("finish doSchedule");
    }

subscribeOn()方法指定了被觀察者發布事件的時候使用io類型的線程處理,參數Schedulers.io()表示指定的線程來自內部實現的一個無數量上限的線程池,可以重用空閑的線程,適合處理io相關的業務,特點是等待時間長,cup占用低;

observeOn()方法表示觀察者處理事件的時候使用新線程處理,Schedulers.newThread()表示總是啟用新線程,並在新線程執行操作;
上面代碼用了兩次observeOn,分別用來指定flatMap中處理事件以及觀察者中處理事件的線程;

執行代碼的結果:

2017-06-10 12:15:42  [ main:0 ] - [ DEBUG ]  start doSchedule
2017-06-10 12:15:42  [ RxCachedThreadScheduler-1:156 ] - [ DEBUG ]  enter subscribe
2017-06-10 12:15:42  [ main:156 ] - [ DEBUG ]  finish doSchedule
2017-06-10 12:15:42  [ RxNewThreadScheduler-2:157 ] - [ DEBUG ]  enter flatMap
2017-06-10 12:15:42  [ RxNewThreadScheduler-1:164 ] - [ DEBUG ]  Observer's onNext invoked [after flatMap (Hello)]

RxCachedThreadScheduler-1:156表示來自線程池的緩存線程;
RxNewThreadScheduler-2:157和RxNewThreadScheduler-1:164表示新的線程;

常用的參數類型還有:
Schedulers.immediate(): 直接在當前線程運行,相當於不指定線程;
Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

以上就是Rxjava基礎入門的實戰,希望大家一起實踐並用到日常工作中,簡化邏輯,提升效率;

歡迎關注我的公眾號

在這里插入圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM