更多文章請點擊鏈接:http://77blogs.com/?p=162
轉載請標明出處:https://www.cnblogs.com/tangZH/p/12088300.html,http://77blogs.com/?p=162
RxJava究竟是啥,從根本上來講,它就是一個實現異步操作的庫,並且能夠使代碼非常簡潔。它的異步是使用觀察者模式來實現的。
關於觀察者模式的介紹,可以看我這一篇文章:
https://www.cnblogs.com/tangZH/p/11175120.html
這里我主要講RxJava的一些基本用法,基本案例,原理的話暫時不深究:
一、自己構造事件
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter emitter) { int i = getNumber(); if (i < 0) { emitter.onComplete(); return; } else { Log.d(TAG, Thread.currentThread().getName()); emitter.onNext(i); emitter.onComplete(); } } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, Thread.currentThread().getName()); Log.d(TAG, integer + ""); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { } });
RxJava 有四個基本概念:Observable
(可觀察者,即被觀察者)、 Observer
(觀察者)、 subscribe
(訂閱)、事件。Observable
和 Observer
通過 subscribe()
方法實現訂閱關系,從而 Observable
可以在需要的時候發出事件來通知 Observer
。
onNext():方法用來發送事件。
下面看看其他兩個方法:
onCompleted()
: 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的onNext()
發出時,需要觸發onCompleted()
方法作為標志。onError()
: 事件隊列異常。在事件處理過程中出異常時,onError()
會被觸發,同時隊列自動終止,不允許再有事件發出。- 在一個正確運行的事件序列中,
onCompleted()
和onError()
有且只有一個,並且是事件序列中的最后一個。需要注意的是,onCompleted()
和onError()
二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。
講一下我們上面的例子,上面這個例子是采用簡潔的鏈式調用來寫的:
首先使用 create()
方法來創建一個 Observable ,並為它定義事件觸發規則,然后通過emitter.onNext(i)傳遞出來,.subscribeOn(Schedulers.io())便是指定該事件產生的所在的線程為子線程,.observeOn(AndroidSchedulers.mainThread())指定觀察者執行的線程為主線程。這時候為止返回的對象為Observable對象。
然后該Observable對象subscribe綁定觀察者(也就是觀察者進行訂閱),里面有接收被觀察者發出來的事件,有一個成功的方法,和一個失敗的方法,這樣就實現了由被觀察者向觀察傳遞事件。
二、對集合里的數據進行變換
List<Integer> list = new ArrayList<Integer>() { { add(0); add(1); add(2); } }; Observable.fromIterable(list).map(new Function() { @Override public Object apply(Object o) throws Exception { int i = (int) o + 1; return String.valueOf(i); } }) .toList() .toObservable().subscribeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Object o) throws Exception { Log.d(TAG, o.toString()); } });
且看,我們需要對某個集合里面的數據一一進行變換,然后發送出來執行其他操作。
上面便是對集合里面的每一項進行加一操作,然后再轉換為String類型,然后toList(),組合成集合發送出來,最后在觀察者方法中打印出每一項。
三、合並執行
定義兩個被觀察者,各自產生事件,然后合並在一起,發送給一個觀察者。
首先定義我們上面第一個例子的被觀察者,用於發送一個數字:
Observable observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter emitter) { int i = getNumber(); if (i < 0) { emitter.onComplete(); return; } else { Log.d(TAG, Thread.currentThread().getName()); emitter.onNext(i); emitter.onComplete(); } } }) .subscribeOn(Schedulers.io());
其次再定義我們上面第二個例子的被觀察者:
List<Integer> list = new ArrayList<Integer>() { { add(0); add(1); add(2); } }; Observable observable2 = Observable.fromIterable(list).map(new Function() { @Override public Object apply(Object o) { int i = (int) o + 1; return String.valueOf(i); } }) .toList() .toObservable().subscribeOn(Schedulers.io());
最后將這兩個被觀察者的事件合並起來發送給一個觀察者:
Disposable disposable = Observable.zip(observable1, observable2, new BiFunction() { @Override public Object apply(Object o, Object o2) throws Exception { int i = (int) o; String k = (String) ((List) o2).get(0); return k + i; } }) .subscribe(new Consumer() { @Override public void accept(Object o) { Log.d(TAG, (String) o); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { Log.d(TAG, throwable.getMessage()); } });
zip方法,顧名思義,有點類似與於打包的意思。
o為被觀察者1返回的結果,o2為被觀察2返回的結果,將這兩個結果一起處理后發送給觀察者。打印出來。
現在先介紹這幾個,找個時間再整理一些其他的用法以及原理實現。