用RxJava處理嵌套請求
互聯網應用開發中由於請求網絡數據頻繁,往往后面一個請求的參數是前面一個請求的結果,於是經常需要在前面一個請求的響應中去發送第二個請求,從而造成“請求嵌套”的問題。如果層次比較多,代碼可讀性和效率都是問題。本文首先從感性上介紹下RxJava,然后講解如何通過RxJava中的flatMap操作符來處理“嵌套請求”的問題
內容提要
- RxJava簡單介紹
- 嵌套請求舉例
- 運用flatMap
- map和flatMap
- RxJava與Retrofit配合解決嵌套請求
RxJava簡單介紹
這里並不打算詳細介紹RxJava的用法和原理,這方面的文章已經很多了。這里僅僅闡述本人對於RxJava的感性上的理解。先上一個圖:

我們都知道RxJava是基於觀察者模式的,但是和傳統的觀察者模式又有很大的區別。傳統的觀察者模式更注重訂閱和發布這個動作,而RxJava的重點在於數據的“流動”。
如果我們把RxJava中的Observable看做一個盒子,那么Observable就是把數據或者事件給裝進了這個易於拿取的盒子里面,讓訂閱者(或者下一級別的盒子)可以拿到而處理。這樣一來,原來靜態的數據/事件就被流動起來了。
我們知道人類會在河流中建設大壩,其實我們可以把RxJava中的filter/map/merge等Oberservable操作符看做成數據流中的大壩,經過這個操作符的操作后,大壩數據流被過濾被合並被處理,從而靈活的對數據的流動進行管制,讓最終的使用者靈活的拿到。
以上就是我對RxJava的理解,深入的用法和原理大家請自行看網上的文章。
嵌套請求舉例
這里開始進入正題,開始舉一個嵌套請求的例子。
比如我們下列接口:
- api/students/getAll (傳入班級的id獲得班級的學生數組,返回值是list
) - api/courses/getAll (傳入Student的id獲得這個學生所上的課程,返回值是List
)
我們最終的目的是要打印班上所有同學分別所上的課程(大學,同班級每個學生選上的課不一樣),按照傳統Volley的做法,代碼大概是這樣子(Volley已經被封裝過)
private void getAllStudents(String id) {
BaseRequest baseRequest = new BaseRequest();
baseRequest.setClassId(id);
String url = AppConfig.SERVER_URL + "api/students/getAll";
final GsonRequest request = new GsonRequest<>(url, baseRequest, Response.class, new Response.Listener<Response>() {
@Override
public void onResponse(Response response) {
if (response.getStatus() > 0) {
List<Student> studentList = response.getData();
for (Student student : studentList) {
}
} else {
//error
}
}
}, new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
//error
}
});
MyVolley.startRequest(request);
}
private void getAllCourses(String id) {
BaseRequest baseRequest = new BaseRequest();
baseRequest.setStudentId(id);
String url = AppConfig.SERVER_URL + "api/courses/getAll";
final GsonRequest request = new GsonRequest<>(url, baseRequest, Response.class, new Response.Listener<Response>() {
@Override
public void onResponse(Response response) {
if (response.getStatus() > 0) {
List<Course> courseList = response.getData();
for (Course course : courseList) {
//use
}
} else {
//error
}
}
}, new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
//error
}
});
MyVolley.startRequest(request);
}
顯然第一個請求的響應中獲得的數據是一個List,正對每一個List中的item要再次發送第二個請求,在第二個請求中獲得最終的結果。這就是一個嵌套請求。這會有兩個問題:
- 目前來看並不復雜,如果嵌套層次多了,會造成代碼越來越混亂
- 寫出來的重復代碼太多
運用flatMap
現在我們可以放出RxJava大法了,flatMap是一個Observable的操作符,接受一個Func1閉包,這個閉包的第一個函數是待操作的上一個數據流中的數據類型,第二個是這個flatMap操作完成后返回的數據類型的被封裝的Observable。說白了就是講一個多級數列“拍扁”成了一個一級數列。
按照上面的列子,flatMap將接受student后然后獲取course的這個二維過程給線性化了,變成了一個可觀測的連續的數據流。
於是代碼是:
ConnectionBase.getApiService2()
.getStudents(101)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return ConnectionBase.getApiService2().getAllCourse(student.getId());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
//use the Course
}
});
是不是代碼簡潔的讓你看不懂了?別急,這里面的getStutent和getAllCourse是ConnectionBase.getApiService2()的兩個方法,他集成了Retrofit2用來將請求的網絡數據轉化成Observable,最后一節將介紹,這里先不關注。
我們所要關注的是以上代碼的流程。
首先getStudent傳入了班級id(101)返回了Observable
flatMap的作用就是對傳入的對象進行處理,返回下一級所要的對象的Observable包裝。
FuncX和ActionX的區別。FuncX包裝的是有返回值的方法,用於Observable的變換、組合等等;ActionX用於包裝無返回值的方法,用於subscribe方法的閉包參數。Func1有兩個入參,前者是原始的參數類型,后者是返回值類型;而Action1只有一個入參,就是傳入的被消費的數據類型。
subscribeOn(Schedulers.io()).observeOn(AndroidScheduler.mainThread())是最常用的方式,后台讀取數據,主線程更新界面。subScribeOn指在哪個線程發射數據,observeOn是指在哪里消費數據。由於最終的Course要刷新界面,必須要在主線程上調用更新view的方法,所以observeOn(AndroidScheduler.mainThread())是至關重要的。
map和flatMap
運用flatMap的地方也是可以用map的,但是是有區別的。先看下map操作符的用法:
ConnectionBase.getApiService2()
.getStudents(101)
.map(new Func1<Student>, Course>() {
@Override
public Course call(Student student) {
return conventStudentToCourse();// has problem
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
//use the Course
}
});
可以看到map也是接受一個Func1閉包,但是這個閉包的第二個參數即返回值參數類型並不是一個被包裝的Observable,而是實際的原始類型,由於call的返回值是Course,所以conventStudentToCourse這里就不能用Retrofit2的方式返回一個Observable了。
所以這里是有一個問題的,對於這種嵌套的網絡請求,由於接到上端數據流到處理后將結果數據放入下端數據流是一個異步的過程,而conventStudentToCourse這種直接將Student轉化為Course是沒法做到異步的,因為沒有回調方法。那么這種情況,最好還是用flatMap並通過retrofit的方式來獲取Observable。要知道,Rxjava的一個精髓就是“異步”。
那么到底map和flatMap有什么區別,或者說什么時候使用map什么時候使用flatMap呢?
flatMap() 和 map() 有一個相同點:它也是把傳入的參數轉化之后返回另一個對象。但需要注意,和 map() 不同的是, flatMap() 中返回的是個 Observable 對象,並且這個 Observable 對象並不是被直接發送到了 Subscriber 的回調方法中。
首先,如果你需要將一個類型的對象經過處理(非異步)直接轉化成下一個類型,推薦用map,否則的話就用flatMap。
其次,如果你需要在處理中加入容錯的機制(特別是你自己封裝基於RxJava的網絡請求框架),推薦用flatMap。
比如將一個File[] jsonFile中每個File轉換成String,用map的話代碼為:
Observable.from(jsonFile).map(new Func1<File, String>() {
@Override public String call(File file) {
try {
return new Gson().toJson(new FileReader(file), Object.class);
} catch (FileNotFoundException e) {
// So Exception. What to do ?
}
return null; // Not good :(
}
});
可以看到這里在出現錯誤的時候直接拋出異常,這樣的處理其實並不好,特別如果你自己封裝框架,這個異常不大好去抓取。
如果用flatMap,由於flatMap的閉包返回值是一個Observable,所以我們可以在這個閉包的call中通過Observable.create的方式來創建Observable,而要知道create方法是可以控制數據流下端的Subscriber的,即可以調用onNext/onCompete/onError方法。如果出現異常,我們直接調用subscribe.onError即可,封裝框架也很好感知。代碼大致如下:
Observable.from(jsonFile).flatMap(new Func1<File, Observable<String>>() {
@Override public Observable<String> call(final File file) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
try {
String json = new Gson().toJson(new FileReader(file), Object.class);
subscriber.onNext(json);
subscriber.onCompleted();
} catch (FileNotFoundException e) {
subscriber.onError(e);
}
}
});
}
});
RxJava與Retrofit配合解決嵌套請求
這里該討論Retrofit了。可以說Retrofit就是為了RxJava而生的。如果你的項目之前在網絡請求框架用的是Volley或者自己封裝Http請求和TCP/IP,而現在你看到了Retrofit這個框架后想使用起來,我可以負責任的跟你說,如果你的項目中沒有使用RxJava的話,使用Retrofit和Volley是沒有區別的!要用Retrofit的話,就最好或者說強烈建議也使用RxJava進行編程。
Retrofit有callback和Observable兩種模式,前者就像傳統的Volley一樣,有successs和fail的回調方法,我們在success回調方法中處理結果;而Observable模式是將請求回來的數據由Retrofit框架自動的幫你加了一個盒子,即自動幫你裝配成了含有這個數據的Observable,供你使用RxJava的操作符隨意靈活的進行變換。
callback模式的Retrofit是這樣建立的:
retrofit = new Retrofit.Builder()
.baseUrl(SERVER_URL)
.addConverterFactory(GsonConverterFactory.create(gson))
.build();
Observable模式是這樣子建立的:
retrofit2 = new Retrofit.Builder()
.baseUrl(SERVER_URL)
.addConverterFactory(GsonConverterFactory.create(gson))
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
即addCallAdapterFactory這個方法在起作用,在RxJavaCallAdapterFactory的源碼注釋中可以看到這么一句話:
Response wrapped body (e.g., {@code Observable<Response
>}) calls {@code onNext} with a {@link Response} object for all HTTP responses and calls {@code onError} with {@link IOException} for network errors
即它將返回值body為包裹上了一層“Observable”
