RxJava2.0的使用詳解
1,初識RxJava
RxJava就是一種用Java語言實現的響應式編程,來創建基於事件的異步程序
RxJava是一個基於事件訂閱的異步執行的一個類庫,目前比較火的一些技術框架!
參考資料:
Github上RxJava的項目地址:
https://github.com/ReactiveX/RxJava
技術文檔Api:
http://reactivex.io/RxJava/javadoc/
RxAndroid,用於 Android 開發:
https://github.com/ReactiveX/RxAndroid
簡書博客推薦:
http://www.jianshu.com/p/ba61c047c230
1.1使用前所添加的依賴(build.gradle):
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
1.2作用:
RxJava的目的就是異步。
RxJava的特點就是可以非常簡便的實現異步調用,可以在邏輯復雜的代碼邏輯中以比較輕易的方式實現異步調用。隨着邏輯的復雜,需求的更改,代碼可依然能保持極強的閱讀性
1.3概念:
RxJava是利用觀察者模式來實現一些列的操作,所以對於觀察者模式中的觀察者,被觀察者,以及訂閱、事件需要有一個了解.
Observable:在觀察者模式中稱為“被觀察者”;
Observer:觀察者模式中的“觀察者”,可接收Observable發送的數據;
subscribe:訂閱,觀察者與被觀察者,通過Observable的subscribe()方法進行訂閱;
Subscriber:也是一種觀察者,在2.0中 它與Observer沒什么實質的區別,不同的是 Subscriber要與Flowable(也是一種被觀察者)聯合使用,該部分 內容是2.0新增的,后續文章再介紹。Obsesrver用於訂閱Observable,而Subscriber用於訂閱Flowable.
1.4觀察者模式的理解:
A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應.
在程序的觀察者模式,觀察者不需要時刻盯着被觀察者,而是采用注冊或者稱為訂閱的方式,告訴被觀察者:我需要你的某某狀態,你要在它變化的時候通知我!
RxJava 有四個基本概念:
Observable (被觀察者)、
Observer (觀察者)、
subscribe (訂閱)
Observable 和 Observer 通過 subscribe() 方法實現訂閱關系,從而 Observable 可以在完成某些操作,獲得一些結果后,回調觸發事件,即 發出事件來通知 Observer。
關於回調,如果理解則可以跳過這一段,如果不理解,在RxJava中可以簡單的理解為:為了方便Observable和Observer交互,在Observable中,將 Observer對象傳入,在完成某些操作后調用Observer對象的方法,此時將觸發Observer中具體實現的對應方法。
注意:Observer是個接口,Observable是個類。
RxJava中定義的事件方法:
onNext(),普通事件,按照隊列依次進行處理.
onComplete(),事件隊列完結時調用該方法
onError(),事件處理過程中出現異常時,onError()觸發,同時隊列終止,不再有事件發出.
onSubscribe(),RxJava 2.0 中新增的,傳遞參數為Disposable,可用於切斷接收事件
讓Observable (被觀察者)開啟子線程執行耗操作,完成耗時操作后,觸發回調,通知Observer (觀察者)進行主線程UI更新
2,簡單使用步驟:
步驟:
創建數據發射源,上游Observable
創建數據接收處,下游Observer
數據源關聯接收處,上游銜接下游!
3,Observable
數據發射源,可觀察的,被觀察的,
Observable有兩種形式啟動形式:
1熱啟動Observable任何時候都會發送消息,即使沒有任何觀察者監聽它。
2冷啟動Observable只有在至少有一個訂閱者的時候才會發送消息
Observable的幾種創建方式:
01,just()方式
使用just( ),將創建一個Observable並自動調用onNext( )發射數據。
也就是通過just( )方式 直接觸發onNext(),just中傳遞的參數將直接在Observer的onNext()方法中接收到。
02,fromIterable()方式
使用fromIterable(),遍歷集合,發送每個item.多次自動調用onNext()方法,每次傳入一個item.
注意:Collection接口是Iterable接口的子接口,所以所有Collection接口的實現類都可以作為Iterable對象直接傳入fromIterable() 方法。
03,defer()方式
當觀察者訂閱時,才創建Observable,並且針對每個觀察者創建都是一個新的Observable.
通過Callable中的回調方法call(),決定使用以何種方式來創建這個Observable對象,當訂閱后,發送事件.
04,interval( )方式
創建一個按固定時間間隔發射整數序列的Observable,可用作定時器。按照固定時間間隔來調用onNext()方法。
05,timer( )方式
通過此種方式創建一個Observable,它在一個給定的延遲后發射一個特殊的值,即表示延遲指定時間后,調用onNext()方法。
06,range( )方式,range(x,y)
創建一個發射特定整數序列的Observable,第一個參數x為起始值,第二個y為發送的個數,如果y為0則不發送,y為負數則拋異常。
range(1,5)
上述表示發射1到5的數。即調用5次Next()方法,依次傳入1-5數字。
07,repeat( )方式
創建一個Observable,該Observable的事件可以重復調用。
部分方法介紹:
表示下游不關心任何事件,你上游盡管發你的數據
Disposable subscribe()
表示下游只關心onNext事件,其他不管
Disposable subscribe(Consumer<? super T> onNext)
表示下游只關心onNext事件,onError事件
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
表示只關心onNext事件,onError事件,onComplete事件
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
表示處理所有事件
subscribe(Observer<? super T> observer)
4,ObservableEmitter
Emitter是發射器的意思,就是用來發出事件的,它可以發出三種類型的事件
通過調用onNext(T value),發出next事件
通過調用onComplete(),發出complete事件
通過調用onError(Throwable error),發出error事件
注意事項:
onComplete和onError唯一並且互斥
發送多個onComplete, 第一個onComplete接收到,就不再接收了.
發送多個onError, 則收到第二個onError事件會導致程序會崩潰.
不可以隨意亂七八糟發射事件,需要滿足一定的規則:
上游可以發送無限個onNext, 下游也可以接收無限個onNext.
當上游發送了一個onComplete后, 上游onComplete之后的事件將會繼續發送, 而下游收到onComplete事件之后將不再繼續接收事件.
上游發送了一個onError后, 上游onError之后的事件將繼續發送, 而下游收到onError事件之后將不再繼續接收事件.
上游可以不發送onComplete或onError.
最為關鍵的是onComplete和onError必須唯一並且互斥, 即不能發多個onComplete, 也不能發多個onError, 也不能先發一個onComplete, 然后再發一個onError
5,Disposable
一次性,它理解成兩根管道之間的一個機關, 當調用它的dispose()方法時, 它就會將兩根管道切斷, 從而導致下游收不到事件.
在RxJava中,用它來切斷Observer(觀察者)與Observable(被觀察者)之間的連接,當調用它的dispose()方法時, 它就會將Observer(觀察者)與Observable(被觀察者)之間的連接切斷, 從而導致Observer(觀察者)收不到事件。
注意: 調用dispose()並不會導致上游不再繼續發送事件, 上游會繼續發送剩余的事件
我們讓上游依次發送1,2,3,complete,4,在下游收到第二個事件之后, 切斷水管, 看看運行結果
Disposable的對象通過觀察者獲得,具體分為兩種方式
1,Observer接口
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//此方法接收到Disposable的實例!
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
通過創建Observer(觀察者)接口,重寫onSubscribe方法,當訂閱后,建立與Observable(被觀察者)的聯系后,在onSubscribe(Disposable d)方法中便可以獲得Disposable對象。
2.Consumer等其他函數式接口
Disposable disposable = Observable.just("你好").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
當調用Observable的subscribe()方法后直接返回一個Disposable 對象
6,線程控制——Scheduler
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生產事件;在哪個線程生產事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (調度器)。
Schedulers.immediate():
直接在當前線程運行,相當於不指定線程。這是默認的Scheduler。
Schedulers.newThread():
總是啟用新線程,並在新線程執行操作。
Schedulers.io(): I/O
操作(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的Scheduler。行為模式和newThread()差不多,區別在於io()的內部實現是是用一個無數量上限的線程池,可以重用空閑的線程,因此多數情況下io()比newThread()更有效率。不要把計算工作放在io()中,可以避免創建不必要的線程。
Schedulers.computation():
計算所使用的Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個Scheduler使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在computation()中,否則 I/O 操作的等待時間會浪費 CPU。
AndroidSchedulers.mainThread(),
Android專用線程,指定操作在主線程運行。
如何切換線程呢?RxJava中提供了兩個方法:
subscribeOn() 和 observeOn() ,
兩者的不同點在於:
subscribeOn(): 指定subscribe()訂閱所發生的線程,或者叫做事件產生的線程。
observeOn(): 指定Observer所運行在的線程,即onNext()執行的線程。或者叫做事件消費的線程。
7,以Consumer為例,我們可以實現簡便式的觀察者模式
Observable.just("hello").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
其中Consumer中的accept()方法接收一個來自Observable的單個值。Consumer就是一個觀察者。其他函數式接口可以類似應用
8,RxJava中的操作符
01,操作符就是用於在Observable和最終的Observer之間,通過轉換Observable為其他觀察者對象的過程,修改發出的事件,
最終將最簡潔的數據傳遞給Observer對象.
每次調用一次操作符,就進行一次觀察者對象的改變,同時將需要傳遞的數據進行轉變,最終Observer對象獲得想要的數據。
以網絡加載為例,我們通過Observable開啟子線程,進行一些網絡請求獲取數據的操作,獲得到網絡數據后,然后通過操作符進行轉換,獲得我們想要的形式的數據,然后傳遞給Observer對象
02,比較常用的操作符:
map()操作符
map()操作符,就是把原來的Observable對象轉換成另一個Observable對象,同時將傳輸的數據進行一些靈活的操作,方便Observer獲得想要的數據形式。
舉例:
Observable<Integer> observable = Observable
.just("hello")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return s.length();
}
});
flatMap()操作符
flatMap()對於數據的轉換比map()更加徹底,如果發送的數據是集合,flatmap()重新生成一個Observable對象,並把數據轉換成Observer想 要的數據形式。它可以返回任何它想返回的Observable對象。
舉例:
Observable.just(list)
.flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
});
filter()操作符
filter()操作符根據它的test()方法中,根據自己想過濾的數據加入相應的邏輯判斷,返回true則表示數據滿足條件,返回false則表示數據需要被過濾。
最后過濾出的數據將加入到新的Observable對象中,方便傳遞給Observer想要的數據形式。
舉例:
Observable
.just(list)
.flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).filter(new Predicate<Object>() {
@Override
public boolean test(Object s) throws Exception {
String newStr = (String) s;
if (newStr.charAt(5) - '0' > 5) {
return true;
}
return false;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println((String)o);
}
});
take()操作符
輸出最多指定數量的結果.(接收指定數量的結果)
舉例:
Observable.just(new ArrayList<String>(){
{
for (int i = 0; i < 8; i++) {
add("data"+i);
}
}
}).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
DemonstrateUtil.showLogResult(s.toString());
}
});
doOnNext()
允許我們在每次輸出一個元素之前做一些額外的事情
舉例:
Observable.just(new ArrayList<String>(){
{
for (int i = 0; i < 6; i++) {
add("data"+i);
}
}
}).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
DemonstrateUtil.showLogResult("額外的准備工作!");
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
DemonstrateUtil.showLogResult(s.toString());
}
});
8,Flowable的理解
Flowable是一個被觀察者,與Subscriber(觀察者)配合使用,解決Backpressure問題
Backpressure(背壓)。所謂背壓,即生產者的速度大於消費者的速度帶來的問題。
什么情況下才會產生Backpressure問題?
1.如果生產者和消費者在一個線程的情況下,無論生產者的生產速度有多快,每生產一個事件都會通知消費者,等待消費者消費完畢,再生產下一個事件。
所以在這種情況下,根本不存在Backpressure問題。即同步情況下,Backpressure問題不存在。
2.如果生產者和消費者不在同一線程的情況下,如果生產者的速度大於消費者的速度,就會產生Backpressure問題。
即異步情況下,Backpressure問題才會存在。
現象演示說明:
被觀察者是事件的生產者,觀察者是事件的消費者.假如生產者無限生成事件,而消費者以很緩慢的節奏來消費事件,會造成事件無限堆積,形成背壓,最后造成OOM!
Flowable悠然而生,專門用來處理這類問題。
Flowable是為了應對Backpressure而產生的。Flowable是一個被觀察者,
與Subscriber(觀察者)配合使用,解決Backpressure問題。
注意:處理Backpressure的策略僅僅是處理Subscriber接收事件的方式,並不影響Flowable發送事件的方法。
即使采用了處理Backpressure的策略,Flowable原來以什么樣的速度產生事件,現在還是什么樣的速度不會變化,主要處理的是Subscriber接收事件的方式。
處理Backpressure問題的策略,或者來解決Backpressure問題
BackpressureStrategy.ERROR
如果緩存池溢出,就會立刻拋出MissingBackpressureException異常
request()用來向生產者申請可以消費的事件數量,這樣我們便可以根據本身的消費能力進行消費事件.
雖然並不限制向request()方法中傳入任意數字,但是如果消費者並沒有這么多的消費能力,依舊會造成資源浪費,最后產生OOM
at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:33)
在異步調用時,RxJava中有個緩存池,用來緩存消費者處理不了暫時緩存下來的數據,緩存池的默認大小為128,即只能緩存128個事件。
無論request()中傳入的數字比128大或小,緩存池中在剛開始都會存入128個事件。
當然如果本身並沒有這么多事件需要發送,則不會存128個事件。
應用舉例:
BackpressureStrategy.BUFFER
是把RxJava中默認的只能存128個事件的緩存池換成一個大的緩存池,支持存更多的數據.
消費者通過request()即使傳入一個很大的數字,生產者也會生產事件,並將處理不了的事件緩存.
注意:
這種方式任然比較消耗內存,除非是我們比較了解消費者的消費能力,能夠把握具體情況,不會產生OOM。
BUFFER要慎用
BackpressureStrategy.DROP
顧名思義,當消費者處理不了事件,就丟棄!
例如,當數據源創建了200個事件,先不進行消費臨時進行緩存實際緩存128個,我們第一次申請消費了100個,再次申請消費100個,
那么實際只消費了128個,而其余的72個被丟棄了!
BackpressureStrategy.LATEST
LATEST與DROP功能基本一致,當消費者處理不了事件,就丟棄!
唯一的區別就是LATEST總能使消費者能夠接收到生產者產生的最后一個事件。
例如,當數據源創建了200個事件,先不進行消費臨時進行緩存,我們第一次申請消費了100個,再次申請消費100個,
那么實際只消費了129個,而其余的71個被丟棄了,但是第200個(最后一個)會被消費.
BackpressureStrategy.MISSING
生產的事件沒有進行緩存和丟棄,下游接收到的事件必須進行消費或者處理!
在RxJava中會經常遇到一種情況就是被觀察者發送消息十分迅速以至於觀察者不能及時的響應這些消息
舉例:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
while (true){
e.onNext(1);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
System.out.println(integer);
}
});
被觀察者是事件的生產者,觀察者是事件的消費者。上述例子中可以看出生產者無限生成事件,而消費者每2秒才能消費一個事件,這會造成事件無限堆積,最后造成OOM。
Flowable就是由此產生,專門用來處理這類問題
代碼實現:
1 public class RxJavaDemo1Activity extends AppCompatActivity implements View.OnClickListener { 2 3 protected Button btnSend1; 4 protected Button btnSend2; 5 protected Button btnSend3; 6 protected Button btnSend4; 7 protected Button btnSend5; 8 protected Button btnSend6; 9 10 @Override 11 protected void onCreate(Bundle savedInstanceState) { 12 super.onCreate(savedInstanceState); 13 super.setContentView(R.layout.activity_rx_java_demo1); 14 initView(); 15 } 16 17 @Override 18 public void onClick(View view) { 19 if (view.getId() == R.id.btn_send1) { 20 test1();//普通使用 21 } else if (view.getId() == R.id.btn_send2) { 22 test2();//鏈式調用 23 } else if (view.getId() == R.id.btn_send3) { 24 test3();//發送中,中斷. 25 } else if (view.getId() == R.id.btn_send4) { 26 test4();//只關心onnext事件的操作 27 } else if (view.getId() == R.id.btn_send5) { 28 test5();//幾種被觀察者的創建方式 29 } else if (view.getId() == R.id.btn_send6) { 30 test6();//常用的操作符 31 } 32 } 33 34 private void test6() { 35 DialogUtil.showListDialog(this, "rxjava的操作符號使用", new String[]{ 36 "0map()操作符", 37 "1flatMap()操作符", 38 "2filter()操作符", 39 "3take()操作符", 40 "4doOnNext()操作符", 41 }, new DialogInterface.OnClickListener() { 42 @Override 43 public void onClick(DialogInterface dialog, int which) { 44 switch (which) { 45 case 0: 46 map0(); 47 break; 48 case 1: 49 map1(); 50 break; 51 case 2: 52 map2(); 53 break; 54 case 3: 55 map3(); 56 break; 57 case 4: 58 map4(); 59 break; 60 } 61 } 62 }); 63 } 64 65 private void map4() { 66 Observable.just(new ArrayList<String>(){ 67 { 68 for (int i = 0; i < 6; i++) { 69 add("data"+i); 70 } 71 } 72 }).flatMap(new Function<List<String>, ObservableSource<?>>() { 73 @Override 74 public ObservableSource<?> apply(List<String> strings) throws Exception { 75 return Observable.fromIterable(strings); 76 } 77 }).take(5).doOnNext(new Consumer<Object>() { 78 @Override 79 public void accept(Object o) throws Exception { 80 DemonstrateUtil.showLogResult("額外的准備工作!"); 81 } 82 }).subscribe(new Consumer<Object>() { 83 @Override 84 public void accept(Object s) throws Exception { 85 DemonstrateUtil.showLogResult(s.toString()); 86 } 87 }); 88 } 89 90 private void map3() { 91 Observable.just(new ArrayList<String>(){ 92 { 93 for (int i = 0; i < 8; i++) { 94 add("data"+i); 95 } 96 } 97 }).flatMap(new Function<List<String>, ObservableSource<?>>() { 98 @Override 99 public ObservableSource<?> apply(List<String> strings) throws Exception { 100 return Observable.fromIterable(strings); 101 } 102 }).take(10).subscribe(new Consumer<Object>() { 103 @Override 104 public void accept(Object s) throws Exception { 105 DemonstrateUtil.showLogResult(s.toString()); 106 } 107 }); 108 } 109 110 private void map2() { 111 Observable 112 .just(new ArrayList<String>(){ 113 { 114 for (int i = 0; i < 5; i++) { 115 add("data"+i); 116 } 117 } 118 }) 119 .flatMap(new Function<List<String>, ObservableSource<?>>() { 120 @Override 121 public ObservableSource<?> apply(List<String> strings) throws Exception { 122 return Observable.fromIterable(strings); 123 } 124 }).filter(new Predicate<Object>() { 125 @Override 126 public boolean test(Object s) throws Exception { 127 String newStr = (String) s; 128 if (newStr.contains("3")){ 129 return true; 130 } 131 return false; 132 } 133 }).subscribe(new Consumer<Object>() { 134 @Override 135 public void accept(Object o) throws Exception { 136 DemonstrateUtil.showLogResult((String)o); 137 } 138 }); 139 } 140 141 private void map1() { 142 Observable.just(new ArrayList<String>(){ 143 { 144 for (int i = 0; i < 3; i++) { 145 add("data"+i); 146 } 147 } 148 }).flatMap(new Function<List<String>, ObservableSource<?>>() { 149 @Override 150 public ObservableSource<?> apply(List<String> strings) throws Exception { 151 return Observable.fromIterable(strings); 152 } 153 }).subscribe(new Observer<Object>() { 154 @Override 155 public void onSubscribe(Disposable d) { 156 157 } 158 159 @Override 160 public void onNext(Object o) { 161 DemonstrateUtil.showLogResult("flatMap轉換后,接收到的"+o); 162 } 163 164 @Override 165 public void onError(Throwable e) { 166 167 } 168 169 @Override 170 public void onComplete() { 171 172 } 173 }); 174 } 175 176 private void map0() { 177 Observable.just("hellorxjava") 178 .map(new Function<String, Integer>() { 179 @Override 180 public Integer apply(String s) throws Exception { 181 return s.length(); 182 } 183 }).subscribe(new Observer<Integer>() { 184 @Override 185 public void onSubscribe(Disposable d) { 186 187 } 188 189 @Override 190 public void onNext(Integer integer) { 191 DemonstrateUtil.showLogResult("接收到被轉換的數據結果:"+integer); 192 } 193 194 @Override 195 public void onError(Throwable e) { 196 197 } 198 199 @Override 200 public void onComplete() { 201 202 } 203 }); 204 } 205 206 private void test5() { 207 DialogUtil.showListDialog(this, "rxjava的其他操作", new String[]{ 208 "0just()方式創建Observable", 209 "1fromIterable()方式創建Observable", 210 "2defer()方式創建Observable", 211 "3interval( )方式創建Observable", 212 "4timer( )方式創建Observable", 213 "5range( )方式創建Observable", 214 "6repeat( )方式創建Observable", 215 }, new DialogInterface.OnClickListener() { 216 @Override 217 public void onClick(DialogInterface dialog, int which) { 218 switch (which) { 219 case 0: 220 other0(); 221 break; 222 case 1: 223 other1(); 224 break; 225 case 2: 226 other2(); 227 break; 228 case 3: 229 other3(); 230 break; 231 case 4: 232 other4(); 233 break; 234 case 5: 235 other5(); 236 break; 237 case 6: 238 other6(); 239 break; 240 } 241 } 242 }); 243 } 244 245 private void other6() { 246 Observable.just(123).repeat().subscribe(new Observer<Integer>() { 247 @Override 248 public void onSubscribe(Disposable d) { 249 250 } 251 252 @Override 253 public void onNext(Integer integer) { 254 DemonstrateUtil.showLogResult("重復integer" + integer); 255 } 256 257 @Override 258 public void onError(Throwable e) { 259 260 } 261 262 @Override 263 public void onComplete() { 264 265 } 266 }); 267 } 268 269 private void other5() { 270 Observable.range(1, 5).subscribe(new Observer<Integer>() { 271 @Override 272 public void onSubscribe(Disposable d) { 273 274 } 275 276 @Override 277 public void onNext(Integer integer) { 278 DemonstrateUtil.showLogResult("連續收到:" + integer); 279 } 280 281 @Override 282 public void onError(Throwable e) { 283 284 } 285 286 @Override 287 public void onComplete() { 288 289 } 290 }); 291 } 292 293 private void other4() { 294 Observable.timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() { 295 @Override 296 public void onSubscribe(Disposable d) { 297 298 } 299 300 @Override 301 public void onNext(Long aLong) { 302 DemonstrateUtil.showLogResult("延遲5s后調用了:onNext"); 303 } 304 305 @Override 306 public void onError(Throwable e) { 307 308 } 309 310 @Override 311 public void onComplete() { 312 313 } 314 }); 315 } 316 317 private void other3() { 318 Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() { 319 @Override 320 public void onSubscribe(Disposable d) { 321 322 } 323 324 @Override 325 public void onNext(Long aLong) { 326 DemonstrateUtil.showLogResult("數字是:" + aLong); 327 //DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this,"數字是:"+aLong); 328 } 329 330 @Override 331 public void onError(Throwable e) { 332 333 } 334 335 @Override 336 public void onComplete() { 337 338 } 339 }); 340 } 341 342 private void other2() { 343 Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() { 344 @Override 345 public ObservableSource<? extends String> call() throws Exception { 346 return Observable.just("hello,defer"); 347 } 348 }); 349 350 //上游銜接下游! 351 observable.subscribe(new Observer<String>() { 352 @Override 353 public void onSubscribe(Disposable d) { 354 355 } 356 357 @Override 358 public void onNext(String s) { 359 DemonstrateUtil.showLogResult(s); 360 DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s); 361 } 362 363 @Override 364 public void onError(Throwable e) { 365 366 } 367 368 @Override 369 public void onComplete() { 370 371 } 372 }); 373 } 374 375 private void other1() { 376 Observable.fromIterable(new ArrayList<String>() { 377 { 378 for (int i = 0; i < 5; i++) { 379 add("Hello," + i); 380 } 381 } 382 }).subscribe(new Observer<String>() { 383 @Override 384 public void onSubscribe(Disposable d) { 385 386 } 387 388 @Override 389 public void onNext(String s) { 390 DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s); 391 DemonstrateUtil.showLogResult(s); 392 } 393 394 @Override 395 public void onError(Throwable e) { 396 397 } 398 399 @Override 400 public void onComplete() { 401 402 } 403 }); 404 } 405 406 private void other0() { 407 Observable.just("hello,you hao!").subscribe(new Observer<String>() { 408 @Override 409 public void onSubscribe(Disposable d) { 410 411 } 412 413 @Override 414 public void onNext(String s) { 415 DemonstrateUtil.showLogResult(s); 416 DemonstrateUtil.showToastResult(RxJavaDemo1Activity.this, s); 417 } 418 419 @Override 420 public void onError(Throwable e) { 421 422 } 423 424 @Override 425 public void onComplete() { 426 427 } 428 }); 429 } 430 431 private void test4() { 432 Observable.create(new ObservableOnSubscribe<Integer>() { 433 @Override 434 public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { 435 DemonstrateUtil.showLogResult("emitter 1"); 436 emitter.onNext(1); 437 438 DemonstrateUtil.showLogResult("emitter 2"); 439 emitter.onNext(2); 440 441 DemonstrateUtil.showLogResult("emitter 3"); 442 emitter.onNext(3); 443 444 DemonstrateUtil.showLogResult("complete"); 445 emitter.onComplete(); 446 447 DemonstrateUtil.showLogResult("emitter 4"); 448 emitter.onNext(4); 449 } 450 }).subscribe(new Consumer<Integer>() { 451 @Override 452 public void accept(Integer integer) throws Exception { 453 DemonstrateUtil.showLogResult("accept:" + integer); 454 } 455 }); 456 } 457 458 private void test3() { 459 Observable.create(new ObservableOnSubscribe<Integer>() { 460 @Override 461 public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { 462 DemonstrateUtil.showLogResult("emitter 1"); 463 emitter.onNext(1); 464 465 DemonstrateUtil.showLogResult("emitter 2"); 466 emitter.onNext(2); 467 468 DemonstrateUtil.showLogResult("emitter 3"); 469 emitter.onNext(3); 470 471 DemonstrateUtil.showLogResult("complete"); 472 emitter.onComplete(); 473 474 DemonstrateUtil.showLogResult("emitter 4"); 475 emitter.onNext(4); 476 } 477 }).subscribe(new Observer<Integer>() { 478 private Disposable mDisposable; 479 private int i; 480 481 @Override 482 public void onSubscribe(Disposable d) { 483 DemonstrateUtil.showLogResult("subscribe"); 484 mDisposable = d; 485 } 486 487 @Override 488 public void onNext(Integer value) { 489 DemonstrateUtil.showLogResult("onNext:" + value); 490 i++; 491 if (i == 2) { 492 DemonstrateUtil.showLogResult("dispose:" + value); 493 mDisposable.dispose(); 494 DemonstrateUtil.showLogResult("isDisposed : " + mDisposable.isDisposed()); 495 } 496 } 497 498 @Override 499 public void onError(Throwable e) { 500 DemonstrateUtil.showLogResult("error:"); 501 } 502 503 @Override 504 public void onComplete() { 505 DemonstrateUtil.showLogResult("complete"); 506 } 507 }); 508 509 } 510 511 private void test2() { 512 //鏈式調用 513 Observable.create(new ObservableOnSubscribe<Integer>() { 514 @Override 515 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 516 e.onNext(1); 517 e.onNext(2); 518 e.onNext(3); 519 } 520 }).subscribe(new Observer<Integer>() { 521 522 @Override 523 public void onSubscribe(Disposable d) { 524 DemonstrateUtil.showLogResult("onSubscribe"); 525 } 526 527 @Override 528 public void onNext(Integer integer) { 529 DemonstrateUtil.showLogResult("onNext-->integer" + integer); 530 } 531 532 @Override 533 public void onError(Throwable e) { 534 DemonstrateUtil.showLogResult("onError"); 535 } 536 537 @Override 538 public void onComplete() { 539 DemonstrateUtil.showLogResult("onComplete"); 540 } 541 }); 542 } 543 544 private void test1() { 545 546 //創建上游,數據發射源! 547 //ObservableOnSubscribe對象作為參數,它的作用相當於一個計划表,當 Observable被訂閱的時候, 548 // ObservableOnSubscribe的subscribe()方法會自動被調用,事件序列就會依照設定依次觸發 549 //ObservableEmitter,發射器,觸發事件. 550 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { 551 552 @Override 553 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 554 e.onNext(1); 555 e.onNext(2); 556 e.onNext(3); 557 } 558 }); 559 560 //創建下游,數據接收處! 561 Observer<Integer> observer = new Observer<Integer>() { 562 563 @Override 564 public void onSubscribe(Disposable d) { 565 DemonstrateUtil.showLogResult("onSubscribe"); 566 } 567 568 @Override 569 public void onNext(Integer integer) { 570 DemonstrateUtil.showLogResult("onNext--integer" + integer); 571 } 572 573 @Override 574 public void onError(Throwable e) { 575 DemonstrateUtil.showLogResult("onError"); 576 } 577 578 @Override 579 public void onComplete() { 580 DemonstrateUtil.showLogResult("onComplete"); 581 } 582 }; 583 584 //數據源連接接收處,上游銜接下游! 585 //只有當上游和下游建立連接之后, 上游才會開始發送事件 586 observable.subscribe(observer); 587 } 588 589 private void initView() { 590 btnSend1 = (Button) findViewById(R.id.btn_send1); 591 btnSend1.setOnClickListener(RxJavaDemo1Activity.this); 592 btnSend2 = (Button) findViewById(R.id.btn_send2); 593 btnSend2.setOnClickListener(RxJavaDemo1Activity.this); 594 btnSend3 = (Button) findViewById(R.id.btn_send3); 595 btnSend3.setOnClickListener(RxJavaDemo1Activity.this); 596 btnSend4 = (Button) findViewById(R.id.btn_send4); 597 btnSend4.setOnClickListener(RxJavaDemo1Activity.this); 598 btnSend5 = (Button) findViewById(R.id.btn_send5); 599 btnSend5.setOnClickListener(RxJavaDemo1Activity.this); 600 btnSend6 = (Button) findViewById(R.id.btn_send6); 601 btnSend6.setOnClickListener(RxJavaDemo1Activity.this); 602 } 603 }
1 public class RxJavaDemo2Activity extends AppCompatActivity implements View.OnClickListener { 2 3 protected Button btn; 4 protected ImageView iv; 5 6 @Override 7 protected void onCreate(Bundle savedInstanceState) { 8 super.onCreate(savedInstanceState); 9 super.setContentView(R.layout.activity_rx_java_demo2); 10 initView(); 11 } 12 13 @Override 14 public void onClick(View view) { 15 if (view.getId() == R.id.btn) { 16 DialogUtil.showListDialog(this, "rxJava操作!", new String[]{ 17 "0發送事件io線程並變換主線程接收", 18 "1子線程發送事件主線程接收", 19 "2默認線程發送事件默認線程接收", 20 }, new DialogInterface.OnClickListener() { 21 @Override 22 public void onClick(DialogInterface dialog, int which) { 23 switch (which) { 24 case 0: 25 show0(); 26 break; 27 case 1: 28 show1(); 29 break; 30 case 2: 31 show2(); 32 break; 33 } 34 } 35 }); 36 } 37 } 38 39 private void show2() { 40 Observable.create(new ObservableOnSubscribe<Integer>() { 41 @Override 42 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 43 DemonstrateUtil.showLogResult("發送的線程名稱:" + Thread.currentThread().getName()); 44 DemonstrateUtil.showLogResult("發送的線程id:" + Thread.currentThread().getId()); 45 46 DemonstrateUtil.showLogResult("發送的數據:" + 1); 47 e.onNext(1); 48 } 49 }).subscribe(new Consumer<Integer>() { 50 @Override 51 public void accept(Integer integer) throws Exception { 52 DemonstrateUtil.showLogResult("接收的線程:" + Thread.currentThread().getName()); 53 DemonstrateUtil.showLogResult("接收的線程id:" + Thread.currentThread().getId()); 54 DemonstrateUtil.showLogResult("接收到的數據:-integer:" + integer); 55 } 56 }); 57 } 58 59 private void show1() { 60 Observable.create(new ObservableOnSubscribe<Integer>() { 61 @Override 62 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 63 DemonstrateUtil.showLogResult("發送的線程名稱:" + Thread.currentThread().getName()); 64 DemonstrateUtil.showLogResult("發送的線程id:" + Thread.currentThread().getId()); 65 66 DemonstrateUtil.showLogResult("發送的數據:" + 1); 67 e.onNext(1); 68 } 69 }).subscribeOn(Schedulers.newThread()) 70 .observeOn(AndroidSchedulers.mainThread()) 71 .subscribe(new Consumer<Integer>() { 72 @Override 73 public void accept(Integer integer) throws Exception { 74 DemonstrateUtil.showLogResult("接收的線程:" + Thread.currentThread().getName()); 75 DemonstrateUtil.showLogResult("接收的線程id:" + Thread.currentThread().getId()); 76 DemonstrateUtil.showLogResult("接收到的數據:-integer:" + integer); 77 } 78 }); 79 } 80 81 private void show0() { 82 Observable.create(new ObservableOnSubscribe<Integer>() { 83 @Override 84 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 85 DemonstrateUtil.showLogResult("所在的線程:", Thread.currentThread().getName()); 86 DemonstrateUtil.showLogResult("發送的數據:", 1 + ""); 87 e.onNext(1); 88 } 89 }).subscribeOn(Schedulers.io()) 90 .observeOn(AndroidSchedulers.mainThread()) 91 .subscribe(new Consumer<Integer>() { 92 @Override 93 public void accept(Integer integer) throws Exception { 94 DemonstrateUtil.showLogResult("所在的線程:", Thread.currentThread().getName()); 95 DemonstrateUtil.showLogResult("接收到的數據:", "integer:" + integer); 96 } 97 }); 98 } 99 100 private void initView() { 101 btn = (Button) findViewById(R.id.btn); 102 btn.setOnClickListener(RxJavaDemo2Activity.this); 103 iv = (ImageView) findViewById(R.id.iv); 104 } 105 }
1 public class RxJavaDemo3Activity extends AppCompatActivity implements View.OnClickListener { 2 3 protected Button btnBackpressure; 4 private Flowable mFlowable; 5 private Subscriber mSubscriber; 6 private Subscription mSubscription; 7 private Flowable flowableLATEST; 8 private Subscriber subscriberLatest; 9 private Subscription subscriptionLatest; 10 11 @Override 12 protected void onCreate(Bundle savedInstanceState) { 13 super.onCreate(savedInstanceState); 14 super.setContentView(R.layout.activity_rx_java_demo3); 15 initView(); 16 init4(); 17 init6(); 18 } 19 20 private void init6() { 21 flowableLATEST = Flowable.create(new FlowableOnSubscribe<Integer>() { 22 @Override 23 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 24 25 for (int i = 1; i<=200; i++) { 26 emitter.onNext(i); 27 DemonstrateUtil.showLogResult("LATEST生產onNext:"+i); 28 } 29 } 30 }, BackpressureStrategy.LATEST); 31 32 //mSubscription = s; 33 subscriberLatest = new Subscriber<Integer>() { 34 @Override 35 public void onSubscribe(Subscription s) { 36 subscriptionLatest = s; 37 s.request(100); 38 } 39 40 @Override 41 public void onNext(Integer integer) { 42 DemonstrateUtil.showLogResult("Latest消費onNext:" + integer); 43 } 44 45 @Override 46 public void onError(Throwable t) { 47 DemonstrateUtil.showLogResult("onError"); 48 DemonstrateUtil.showLogResult(t.getMessage()); 49 DemonstrateUtil.showLogResult(t.toString()); 50 } 51 52 @Override 53 public void onComplete() { 54 DemonstrateUtil.showLogResult("onComplete"); 55 } 56 }; 57 } 58 59 private void init4() { 60 mFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() { 61 @Override 62 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 63 64 for (int i = 1; i <= 200; i++) { 65 emitter.onNext(i); 66 DemonstrateUtil.showLogResult("生產onNext:"+i); 67 } 68 } 69 }, BackpressureStrategy.DROP); 70 71 //mSubscription = s; 72 mSubscriber = new Subscriber<Integer>() { 73 @Override 74 public void onSubscribe(Subscription s) { 75 mSubscription = s; 76 s.request(100); 77 } 78 79 @Override 80 public void onNext(Integer integer) { 81 DemonstrateUtil.showLogResult("消費onNext:" + integer); 82 } 83 84 @Override 85 public void onError(Throwable t) { 86 DemonstrateUtil.showLogResult("onError"); 87 DemonstrateUtil.showLogResult(t.getMessage()); 88 DemonstrateUtil.showLogResult(t.toString()); 89 } 90 91 @Override 92 public void onComplete() { 93 DemonstrateUtil.showLogResult("onComplete"); 94 } 95 }; 96 97 } 98 99 100 @Override 101 public void onClick(View view) { 102 if (view.getId() == R.id.btn_backpressure) { 103 DialogUtil.showListDialog(this, "Flowable的理解使用", new String[]{ 104 "0事件堆積現象", 105 "1正常使用策略ERROR!", 106 "2使用策略ERROR出現的異常!", 107 "3使用策略BUFFER,更大的緩存池", 108 "4使用策略DROP,事件關聯100", 109 "5使用策略DROP,再申請100", 110 "6使用策略LATEST,事件關聯100", 111 "7使用策略LATEST,再申請100", 112 "8使用策略MISSING", 113 }, new DialogInterface.OnClickListener() { 114 @Override 115 public void onClick(DialogInterface dialog, int which) { 116 switch (which) { 117 case 0: 118 show0(); 119 break; 120 case 1: 121 show1(); 122 break; 123 case 2: 124 show2(); 125 break; 126 case 3: 127 show3(); 128 break; 129 case 4: 130 show4(); 131 break; 132 case 5: 133 show5(); 134 break; 135 case 6: 136 show6(); 137 break; 138 case 7: 139 show7(); 140 break; 141 case 8: 142 show8(); 143 break; 144 } 145 } 146 }); 147 } 148 } 149 150 private void show8() { 151 Flowable.create(new FlowableOnSubscribe<Integer>() { 152 @Override 153 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 154 for (int i = 0; i < 200; i++) { 155 DemonstrateUtil.showLogResult("MISSING-生成emitter" + i); 156 emitter.onNext(i); 157 } 158 } 159 }, BackpressureStrategy.MISSING).subscribeOn(Schedulers.io()) 160 .observeOn(AndroidSchedulers.mainThread()) 161 .subscribe(new Subscriber<Integer>() { 162 @Override 163 public void onSubscribe(Subscription s) { 164 //mSubscription = s; 165 //s.request(0); 166 } 167 168 @Override 169 public void onNext(Integer integer) { 170 DemonstrateUtil.showLogResult("MISSING-消費onNext" + integer); 171 } 172 173 @Override 174 public void onError(Throwable t) { 175 DemonstrateUtil.showLogResult("onError" + t.getMessage()); 176 DemonstrateUtil.showLogResult("onError" + t.toString()); 177 t.printStackTrace(); 178 } 179 180 @Override 181 public void onComplete() { 182 DemonstrateUtil.showLogResult("onComplete"); 183 } 184 }); 185 } 186 187 private void show7() { 188 subscriptionLatest.request(100); 189 } 190 191 private void show6() { 192 flowableLATEST.subscribeOn(Schedulers.io()) 193 .observeOn(AndroidSchedulers.mainThread()) 194 .subscribe(subscriberLatest); 195 } 196 197 private void show5() { 198 //128-100-100= -72. 199 mSubscription.request(100); 200 } 201 202 private void show4() { 203 mFlowable.subscribeOn(Schedulers.io()) 204 .observeOn(AndroidSchedulers.mainThread()) 205 .subscribe(mSubscriber); 206 } 207 208 209 private void show3() { 210 Flowable.create(new FlowableOnSubscribe<Integer>() { 211 @Override 212 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 213 for (int i = 0; i < 200; i++) { 214 DemonstrateUtil.showLogResult("emitter" + i); 215 emitter.onNext(i); 216 } 217 } 218 }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()) 219 .observeOn(AndroidSchedulers.mainThread()) 220 .subscribe(new Subscriber<Integer>() { 221 @Override 222 public void onSubscribe(Subscription s) { 223 //mSubscription = s; 224 //s.request(0); 225 } 226 227 @Override 228 public void onNext(Integer integer) { 229 DemonstrateUtil.showLogResult("onNext" + integer); 230 } 231 232 @Override 233 public void onError(Throwable t) { 234 DemonstrateUtil.showLogResult("onError" + t.getMessage()); 235 DemonstrateUtil.showLogResult("onError" + t.toString()); 236 t.printStackTrace(); 237 } 238 239 @Override 240 public void onComplete() { 241 DemonstrateUtil.showLogResult("onComplete"); 242 } 243 }); 244 } 245 246 private void show2() { 247 Flowable.create(new FlowableOnSubscribe<Integer>() { 248 @Override 249 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 250 for (int i = 0; i < 200; i++) { 251 DemonstrateUtil.showLogResult("emitter" + i); 252 emitter.onNext(i); 253 } 254 } 255 }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) 256 .observeOn(AndroidSchedulers.mainThread()) 257 .subscribe(new Subscriber<Integer>() { 258 @Override 259 public void onSubscribe(Subscription s) { 260 //mSubscription = s; 261 //s.request(0); 262 } 263 264 @Override 265 public void onNext(Integer integer) { 266 DemonstrateUtil.showLogResult("onNext" + integer); 267 } 268 269 @Override 270 public void onError(Throwable t) { 271 DemonstrateUtil.showLogResult("onError" + t.getMessage()); 272 DemonstrateUtil.showLogResult("onError" + t.toString()); 273 t.printStackTrace(); 274 } 275 276 @Override 277 public void onComplete() { 278 DemonstrateUtil.showLogResult("onComplete"); 279 } 280 }); 281 } 282 283 private void show1() { 284 Flowable.create(new FlowableOnSubscribe<Integer>() { 285 @Override 286 public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { 287 for (int i = 0; i < 127; i++) {//128--- 0--->126 288 DemonstrateUtil.showLogResult("emitter " + i); 289 emitter.onNext(i); 290 } 291 DemonstrateUtil.showLogResult("emitter complete"); 292 emitter.onComplete(); 293 } 294 }, BackpressureStrategy.ERROR) //增加了一個參數,設置處理策略. 295 .subscribeOn(Schedulers.io()) 296 .observeOn(AndroidSchedulers.mainThread()) 297 .subscribe(new Subscriber<Integer>() { 298 @Override 299 public void onSubscribe(Subscription s) { 300 DemonstrateUtil.showLogResult("onSubscribe"); 301 //用來向生產者申請可以消費的事件數量,這樣我們便可以根據本身的消費能力進行消費事件. 302 s.request(Long.MAX_VALUE); 303 } 304 305 @Override 306 public void onNext(Integer integer) { 307 DemonstrateUtil.showLogResult("onNext: " + integer); 308 } 309 310 @Override 311 public void onError(Throwable t) { 312 DemonstrateUtil.showLogResult("onError: " + t.getMessage()); 313 DemonstrateUtil.showLogResult("onError: " + t.toString()); 314 t.printStackTrace(); 315 } 316 317 @Override 318 public void onComplete() { 319 DemonstrateUtil.showLogResult("onComplete: "); 320 } 321 }); 322 } 323 324 private void show0() { 325 Observable.create(new ObservableOnSubscribe<Integer>() { 326 @Override 327 public void subscribe(ObservableEmitter<Integer> e) throws Exception { 328 while (true) { 329 for (int i = 0; i < 129; i++) { 330 e.onNext(1); 331 } 332 } 333 } 334 }).subscribeOn(Schedulers.io()) 335 .observeOn(AndroidSchedulers.mainThread()) 336 .subscribe(new Consumer<Integer>() { 337 @Override 338 public void accept(Integer integer) throws Exception { 339 Thread.sleep(5000); 340 DemonstrateUtil.showLogResult("接受到" + integer); 341 } 342 }); 343 } 344 345 private void initView() { 346 btnBackpressure = (Button) findViewById(R.id.btn_backpressure); 347 btnBackpressure.setOnClickListener(RxJavaDemo3Activity.this); 348 } 349 }