RxJava算是最新最常用的,也是程序員們最喜歡的框架之一了。
RxJava的核心由Observable(被觀察者,事件源)和Subscriber(觀察者)構成,Observable負責發出一系列事件,Subscriber處理這些事件。
一個Observble可以發出零個或多個事件,直到結束或出錯。每發出一個事件,就會調用與之關聯的所有觀察者Subscriber的onNext()方法;如果中途出錯,則會回調這個觀察者的onError()方法;事件發布給所有觀察者之后,會回調最后一個觀察者的onCompleted()方法。
RxJava很像設計模式中的觀察者模式,但有一點不同,就是當一個被觀察者沒有任何與之關聯的觀察者時,這個被觀察者不會發出任何事件。
在Android中使用RxJava,需要先導入RxJava和RxAndroid的依賴:
compile 'io.reactivex:rxjava:1.2.2'
compile 'io.reactivex:rxandroid:1.2.1'
DEMO 1:
Observable<String> observabele = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava"); subscriber.onCompleted(); } }); Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { System.out.println("-------------------->>>>Completed"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("------------onNext---------->>>>>>>" + s); } }; observabele.subscribe(observer);
通過Observable的create()方法創建一個自定義的Observable被觀察者,在參數傳入的OnSubscribe內部類中通過調用子方法的Subscriber對象參數的onNext()、onError()、onCompleted()三個方法,操作與這個被觀察者綁定的所有觀察者。
通過new Observer()方法來創建一個觀察者,需要實現onNext()、onError()、onCompleted()三個方法。
通過Observeable對象的subscribe()方法綁定一個觀察者,此時這個觀察者就可以接收到被觀察者發送的消息了。
DEMO2:
Observable<String> observable = Observable.just("Hello RxJava", "My", "Name");
Action1<String> next = new Action1<String>() {
@Override
public void call(String s) {
// onNext()中執行的代碼
}
};
Action1<Throwable> error = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// onError()中執行的代碼
}
};
Action0 completed = new Action0() {
@Override
public void call() {
// onCompleted()中執行的代碼
}
};
observable.subscribe(next, error, completed);
Subscriber中的三個子方法可以拆分成兩個Action1和一個Action0。onNext()和onError()兩個方法對應的Action1,onCompleted()方法對應的是Action0。
Observable對象的subscribe()方法可以傳入三個Action對象,即表示被觀察者已經綁定了一個觀察者,這個觀察者是由這三個Action組成的。subscribe()方法有多個重載,可以只有一個onNext()方法的Action1,可以有一個onNext()方法的Action1和一個onError()方法的Action1,也可以像上面代碼一樣有三個Action。
Observable對象的just()方法可以有任意個參數,表示將這些對象逐個通過onNext()方法發送出去。
DEMO3:
String[] array = {"Hello RxJava", "My", "Name"};
Observable<String> observable = Observable.from(array);
Action1<String> next = new Action1<String>() {
@Override
public void call(String s) {
// onNext()中執行的代碼
System.out.println("----------------------->>>>>>>>>>" + s);
}
};
Action1<Throwable> error = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// onError()中執行的代碼
}
};
Action0 completed = new Action0() {
@Override
public void call() {
// onCompleted()中執行的代碼
}
};
observable.subscribe(next, error, completed);
Observable.from()方法和Observable.just()方法的效果是一樣的,都是將一組數據逐個發送出去,區別是from()方法是將一個數組中的數據逐個發送,而just()方法是將各個參數中的數據進行逐個發送。
DEMO4:
Observable<String> observable = Observable .just("Hello RxJava", "aaaa", "bbbbb", "aaaaaacccc") .map(new Func1<String, String>() { @Override public String call(String s) { return "string=========" + s; } }); Action1<String> next = new Action1<String>() { @Override public void call(String s) { // onNext()中執行的代碼 System.out.println("----------------------->>>>>>>>>>" + s); } }; Action1<Throwable> error = new Action1<Throwable>() { @Override public void call(Throwable throwable) { // onError()中執行的代碼 } }; Action0 completed = new Action0() { @Override public void call() { // onCompleted()中執行的代碼 } }; observable.subscribe(next, error, completed);
map方法是將just中的數據進行進一步的處理,例如,上面的代碼中就是在每個字符串前面加了另一端字符串。
DEMO5:
Observable.just("https://api.github.com/users/basil2style")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
StringBuffer result = null;
try {
URL url = new URL(s);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.setDoInput(true);
connection.connect();
int responseCode = connection.getResponseCode();
if (responseCode == 200) {
result = new StringBuffer();
BufferedInputStream bis = new BufferedInputStream(connection.getInputStream());
byte[] b = new byte[1024];
int len = -1;
while ((len = bis.read(b)) != -1) {
result.append(new String(b, 0, len));
}
bis.close();
}
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("------------------->>>>>" + result.toString());
return result.toString();
}
})
.map(new Func1<String, InfoData>() {
@Override
public InfoData call(String s) {
InfoData infoData = new InfoData();
try {
JSONObject object = new JSONObject(s);
infoData.setId(object.getInt("id"));
infoData.setUrl(object.getString("url"));
infoData.setType(object.getString("type"));
infoData.setName(object.getString("name"));
} catch (JSONException e) {
e.printStackTrace();
}
return infoData;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<InfoData>() {
@Override
public void onCompleted() {
System.out.println("---------------------->>>>>>Completed");
}
@Override
public void onError(Throwable e) {
Toast.makeText(MainActivity.this, "獲取網絡數據失敗", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(InfoData infoData) {
Toast.makeText(MainActivity.this, infoData.getName(), Toast.LENGTH_SHORT).show();
}
});
這是用RxJava結合原生的JAVA API完成網絡訪問的代碼,通過map()方法在不同的參數之間進行轉換,最終得到InfoData對象並輸出數據。
現在,開發人員更喜歡將RxJava和Retrofit結合使用,原因是RxJava可以設置一段代碼執行的線程,這樣就可以輕松的、解耦的替換Handler和AsyncTask進行異步數據的訪問。
有關RxJava和Retrofit結合使用的案例我會寫在我的下一個帖子中,敬請期待~~
