RxJava開發精要6 - 組合Observables


上一章中。我們學到怎樣轉換可觀測序列。我們也看到了map(),scan(),groupBY(),以及很多其它實用的函數的實際樣例,它們幫助我們操作Observable來創建我們想要的Observable。

本章中,我們將研究組合函數並學習怎樣同一時候處理多個Observables來創建我們想要的Observable。

Merge

在異步的世界常常會創建這種場景。我們有多個來源可是僅僅想有一個結果:多輸入,單輸出。RxJava的merge()方法將幫助你把兩個甚至很多其它的Observables合並到他們發射的數據里。下圖給出了把兩個序列合並在一個終於發射的Observable。

正如你看到的那樣,發射的數據被交叉合並到一個Observable里面。

注意假設你同步的合並Observable,它們將連接在一起而且不會交叉。

像通常一樣。我們用我們的App和已安裝的App列表來創建了一個“真實世界”的樣例。我們還須要第二個Observable。我們能夠創建一個單獨的應用列表然后逆序。當然沒有實際的意義,僅僅是為了這個樣例。第二個列表,我們的loadList()函數像以下這樣:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    List reversedApps = Lists.reverse(apps);
    Observable<AppInfo> observableApps =Observable.from(apps);
    Observable<AppInfo> observableReversedApps =Observable.from(reversedApps);
    Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);

    mergedObserbable.subscribe(new Observer<AppInfo>(){
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }

        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "One of the two Observable threw an error!", Toast.LENGTH_SHORT).show();
            mSwipeRefreshLayout.setRefreshing(false);
        }

        @Override
        public void onNext(AppInfoappInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        } 
    });
}

我們創建了Observable和observableApps數據以及新的observableReversedApps逆序列表。使用Observable.merge(),我們能夠創建新的ObservableMergedObservable在單個可觀測序列中發射源Observables發出的全部數據。

正如你能看到的,每一個方法簽名都是一樣的,因此我們的觀察者無需在意不論什么不同就能夠復用代碼。結果例如以下:

注意錯誤時的toast消息。你能夠覺得每一個Observable拋出的錯誤將會打斷合並。假設你須要避免這種情況。RxJava提供了mergeDelayError(),它能從一個Observable中繼續發射數據即便是當中有一個拋出了錯誤。當全部的Observables都完畢時,mergeDelayError()將會發射onError()。例如以下圖所看到的:

ZIP

我們在處理多源時可能會帶來這樣一種場景:多從個Observables接收數據,處理它們。然后將它們合並成一個新的可觀測序列來使用。RxJava有一個特殊的方法能夠完畢:zip()合並兩個或者多個Observables發射出的數據項,依據指定的函數Func*變換它們。並發射一個新值。下圖展示了zip()方法怎樣處理發射的“numbers”和“letters”然后將它們合並一個新的數據項:

對於“真實世界”的樣例來說,我們將使用已安裝的應用列表和一個新的動態的Observable來讓樣例變得有點有趣味。

Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

tictocObservable變量使用interval()函數每秒生成一個Long類型的數據:簡單且高效。正如之前所說的,我們須要一個Func對象。

由於它須要傳兩個參數,所以是Func2:

private AppInfo updateTitle(AppInfoappInfo, Long time) {
    appInfo.setName(time + " " + appInfo.getName());
    return appInfo;
}

如今我們的loadList()函數變成這樣:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> observableApp = Observable.from(apps);

    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

    Observable.zip(observableApp, tictoc,
    (AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }

        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            } 
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        }
    });
}

正如你看到的那樣。zip()函數有三個參數:兩個Observables和一個Func2

細致一看會發現observeOn()函數。

它將在下一章中解說:如今我們能夠小試一下。

結果例如以下:

Join

前面兩個方法。zip()merge()方法作用在發射數據的范疇內,在決定怎樣操作值之前有些場景我們須要考慮時間的。RxJava的join()函數基於時間窗體將兩個Observables發射的數據結合在一起。

為了正確的理解上一張圖。我們解釋下join()須要的參數:

  • 第二個Observable和源Observable結合。

  • Func1參數:在指定的由時間窗體定義時間間隔內,源Observable發射的數據和從第二個Observable發射的數據相互配合返回的Observable。
  • Func1參數:在指定的由時間窗體定義時間間隔內,第二個Observable發射的數據和從源Observable發射的數據相互配合返回的Observable。
  • Func2參數:定義已發射的數據怎樣與新發射的數據項相結合。

  • 例如以下練習的樣例。我們能夠改動loadList()函數像以下這樣:
private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);

    Observable<AppInfo> appsSequence =
    Observable.interval(1000, TimeUnit.MILLISECONDS)
                .map(position -> {
                    return apps.get(position.intValue());
                });

    Observable<Long> tictoc = Observable.interval(1000,TimeUnit.MILLISECONDS);

    appsSequence.join(
        tictoc, 
        appInfo -> Observable.timer(2,TimeUnit.SECONDS),
        time -> Observable.timer(0, TimeUnit.SECONDS),
        this::updateTitle)
        .observeOn(AndroidSchedulers.mainThread())
        .take(10)
        .subscribe(new Observer<AppInfo>() {
            @Override
            public void onCompleted() {
                Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
            }

            @Override
            public void onError(Throwable e) {
                mSwipeRefreshLayout.setRefreshing(false); 
                Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onNext(AppInfoappInfo) {
                if (mSwipeRefreshLayout.isRefreshing()) {
                    mSwipeRefreshLayout.setRefreshing(false);
                } 
                mAddedApps.add(appInfo);
                int position = mAddedApps.size() - 1;
                mAdapter.addApplication(position, appInfo);
                mRecyclerView.smoothScrollToPosition(position);
            } 
        });
}

我們有一個新的對象appsSequence,它是一個每秒從我們已安裝的app列表發射app數據的可觀測序列。

tictoc這個Observable數據每秒僅僅發射一個新的Long型整數。為了合並它們,我們須要指定兩個Func1變量:

appInfo -> Observable.timer(2, TimeUnit.SECONDS)

time -> Observable.timer(0, TimeUnit.SECONDS)

上面描寫敘述了兩個時間窗體。

以下一行描寫敘述我們怎樣使用Func2將兩個發射的數據結合在一起。

this::updateTitle

結果例如以下:

它看起來有點亂,可是注意app的名字和我們指定的時間窗體,我們能夠看到:一旦第二個數據發射了我們就會將它與源數據結合,但我們用同一個源數據有2秒鍾。這就是為什么標題反復數字累加的原因。

值得一提的是,為了簡單起見,也有一個join()操作符作用於字符串然后簡單的和發射的字符串連接成終於的字符串。

combineLatest

RxJava的combineLatest()函數有點像zip()函數的特殊形式。

正如我們已經學習的,zip()作用於近期未打包的兩個Observables。相反。combineLatest()作用於近期發射的數據項:假設Observable1發射了A而且Observable2發射了B和C,combineLatest()將會分組處理AB和AC,例如以下圖所看到的:

combineLatest()函數接受二到九個Observable作為參數,假設有須要的話或者單個Observables列表作為參數。

從之前的樣例中把loadList()函數借用過來,我們能夠改動一下來用於combineLatest()實現“真實世界”這個樣例:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
              .map(position ->apps.get(position.intValue()));
    Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
    Observable.combineLatest(appsSequence, tictoc,
               this::updateTitle)
       .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<AppInfo>() {

        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }

        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            } 
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        } 
    });
}

這我們使用了兩個Observables:一個是每秒鍾從我們已安裝的應用列表發射一個App數據,第二個是每隔1.5秒發射一個Long型整數。

我們將他們結合起來並運行updateTitle()函數,結果例如以下:

正如你看到的。由於不同的時間間隔,AppInfo對象如我們所預料的那樣有時候會反復。

And,Then和When

在將來另一些zip()滿足不了的場景。

如復雜的架構,或者是僅僅為了個人愛好。你能夠使用And/Then/When解決方式。

它們在RxJava的joins包下,使用Pattern和Plan作為中介,將發射的數據集合並到一起。

我們的loadList()函數將會被改動從這樣:

private void loadList(List<AppInfo> apps) {

    mRecyclerView.setVisibility(View.VISIBLE);

    Observable<AppInfo> observableApp = Observable.from(apps);

    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

    Pattern2<AppInfo, Long> pattern = JoinObservable.from(observableApp).and(tictoc); 

    Plan0<AppInfo> plan = pattern.then(this::updateTitle);

    JoinObservable
        .when(plan)
        .toObservable()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<AppInfo>() {

            @Override
            public void onCompleted() {
                Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
            }

            @Override
            public void onError(Throwable e) {
                mSwipeRefreshLayout.setRefreshing(false); 
                Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onNext(AppInfoappInfo) {
                if (mSwipeRefreshLayout.isRefreshing()) { 
                mSwipeRefreshLayout.setRefreshing(false);
                } 
                mAddedApps.add(appInfo);
                int position = mAddedApps.size() - 1;
                mAdapter.addApplication(position, appInfo); mRecyclerView.smoothScrollToPosition(position);
            } 
        });
}

和通常一樣。我們有兩個發射的序列。observableApp,發射我們安裝的應用列表數據。tictoc每秒發射一個Long型整數。如今我們用and()連接源Observable和第二個Observable。

JoinObservable.from(observableApp).and(tictoc);

這里創建一個pattern對象,使用這個對象我們能夠創建一個Plan對象:”我們有兩個發射數據的Observables,then()是做什么的?”

pattern.then(this::updateTitle);

如今我們有了一個Plan對象而且當plan發生時我們能夠決定接下來發生的事情。

.when(plan).toObservable()

這時候,我們能夠訂閱新的Observable,正如我們總是做的那樣。

Switch

有這樣一個復雜的場景就是在一個subscribe-unsubscribe的序列里我們能夠從一個Observable自己主動取消訂閱來訂閱一個新的Observable。

RxJava的switch(),正如定義的,將一個發射多個Observables的Observable轉換成另一個單獨的Observable。后者發射那些Observables近期發射的數據項。

給出一個發射多個Observables序列的源Observable,switch()訂閱到源Observable然后開始發射由第一個發射的Observable發射的一樣的數據。當源Observable發射一個新的Observable時,switch()馬上取消訂閱前一個發射數據的Observable(因此打斷了從它那里發射的數據流)然后訂閱一個新的Observable,並開始發射它的數據。

StartWith

我們已經學到怎樣連接多個Observables並追加指定的值到一個發射序列里。

RxJava的startWith()concat()的相應部分。正如concat()向發射數據的Observable追加數據那樣,在Observable開始發射他們的數據之前。 startWith()通過傳遞一個參數來先發射一個數據序列。

總結

這章中。我們學習了怎樣將兩個或者很多其它個Observable結合來創建一個新的可觀測序列。

我們將能夠merge Observable。join Observables 。zip Observables 並在幾種情況下把他們結合在一起。

下一章。我們將介紹調度器,它將非常easy的幫助我們創建主線程以及提高我們應用程序的性能。我們也將學習怎樣正確的運行長任務或者I/O任務來獲得更好的性能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM