回調“地獄”和反應模式


了解更多有關基於反應流的方法以及如何避免回調地獄的信息。

更好地理解基於反應流的方法的有用性的方法之一是它如何簡化非阻塞 IO 調用。

本篇文章將簡要介紹進行同步遠程調用所涉及的代碼類型。然后,我們將演示非阻塞 IO 中的分層如何高效使用資源(尤其是線程),引入了稱為回調地獄帶來的復雜性以及基於反應流方法如何簡化編程模型。

1. 目標服務

客戶端調用表示城市詳細信息的目標服務有兩個端口。當使用類型為——/cityids 的 URI 調用時,返回城市 id 列表,並且示例結果如下所示:

[
    1, 2, 3, 4, 5, 6, 7 ] 復制代碼

一個端口返回給定其 ID 的城市的詳細信息,例如,當使用 ID 為1——“/cities/1” 調用時:

{
    "country": "USA", "id": 1, "name": "Portland", "pop": 1600000 } 復制代碼

客戶端的責任是獲取城市 ID 的列表,然后對於每個城市,根據 ID 獲取城市的詳細信息並將其組合到城市列表中。

2. 同步調用

我正在使用 Spring Framework 的 RestTemplate 進行遠程調用。獲取 cityId 列表的 Kotlin 函數如下所示:

private fun getCityIds(): List<String> { val cityIdsEntity: ResponseEntity<List<String>> = restTemplate .exchange("http://localhost:$localServerPort/cityids", HttpMethod.GET, null, object : ParameterizedTypeReference<List<String>>() {}) return cityIdsEntity.body!! } 復制代碼

獲取城市詳情:

private fun getCityForId(id: String): City { return restTemplate.getForObject("http://localhost:$localServerPort/cities/$id", City::class.java)!! } 復制代碼

鑒於這兩個函數,它們很容易組合,以便於輕松返回城市列表 :

val cityIds: List<String> = getCityIds() val cities: List<City> = cityIds .stream() .map<City> { cityId -> getCityForId(cityId) } .collect(Collectors.toList()) cities.forEach { city -> LOGGER.info(city.toString()) } 復制代碼

代碼很容易理解;但是,涉及八個阻塞調用:

  1. 獲取 7 個城市 ID 的列表,然后獲取每個城市的詳細信息
  2. 獲取 7 個城市的詳細信息

每一個調用都將在不同的線程上。

3. 非阻塞 IO 回調

我將使用 AsyncHttpClient 庫來進行非阻塞 IO 調用。

進行遠程調用時,AyncHttpClient 返回 ListenableFuture 類型。

val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient .prepareGet("http://localhost:$localServerPort/cityids") .execute() 復制代碼

可以將回調附加到 ListenableFuture 以在可用時對響應進行操作。

responseListenableFuture.addListener(Runnable {
    val response: Response = responseListenableFuture.get() val responseBody: String = response.responseBody val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody, object : TypeReference<List<Long>>() {}) .... } 復制代碼

鑒於 cityIds 的列表,我想獲得城市的詳細信息,因此從響應中,我需要進行更多的遠程調用並為每個調用附加回調以獲取城市的詳細信息:

val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient .prepareGet("http://localhost:$localServerPort/cityids") .execute() responseListenableFuture.addListener(Runnable { val response: Response = responseListenableFuture.get() val responseBody: String = response.responseBody val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody, object : TypeReference<List<Long>>() {}) cityIds.stream().map { cityId -> val cityListenableFuture = asyncHttpClient .prepareGet("http://localhost:$localServerPort/cities/$cityId") .execute() cityListenableFuture.addListener(Runnable { val cityDescResp = cityListenableFuture.get() val cityDesc = cityDescResp.responseBody val city = objectMapper.readValue(cityDesc, City::class.java) LOGGER.info("Got city: $city") }, executor) }.collect(Collectors.toList()) }, executor) 復制代碼

這是一段粗糙的代碼;回調中又包含一組回調,很難推理和理解 - 因此它被稱為“回調地獄”。

4. 在 Java CompletableFuture 中使用非阻塞 IO

通過將 Java 的 CompletableFuture 作為返回類型而不是 ListenableFuture 返回,可以稍微改進此代碼。CompletableFuture 提供允許修改和返回類型的運算符。

例如,考慮獲取城市 ID 列表的功能:

private fun getCityIds(): CompletableFuture<List<Long>> { return asyncHttpClient .prepareGet("http://localhost:$localServerPort/cityids") .execute() .toCompletableFuture() .thenApply { response -> val s = response.responseBody val l: List<Long> = objectMapper.readValue(s, object : TypeReference<List<Long>>() {}) l } } 復制代碼

在這里,我使用 thenApply 運算符將 CompletableFuture<Response> 轉換為 CompletableFuture<List<Long>>

同樣的,獲取城市詳情:

private fun getCityDetail(cityId: Long): CompletableFuture<City> { return asyncHttpClient.prepareGet("http://localhost:$localServerPort/cities/$cityId") .execute() .toCompletableFuture() .thenApply { response -> val s = response.responseBody LOGGER.info("Got {}", s) val city = objectMaper.readValue(s, City::class.java) city } } 復制代碼

這是基於回調的方法的改進。但是,在這個特定情況下,CompletableFuture 缺乏有用的運算符,例如,所有城市細節都需要放在一起:

val cityIdsFuture: CompletableFuture<List<Long>> = getCityIds() val citiesCompletableFuture: CompletableFuture<List<City>> = cityIdsFuture .thenCompose { l -> val citiesCompletable: List<CompletableFuture<City>> = l.stream() .map { cityId -> getCityDetail(cityId) }.collect(toList()) val citiesCompletableFutureOfList: CompletableFuture<List<City>> = CompletableFuture.allOf(*citiesCompletable.toTypedArray()) .thenApply { _: Void? -> citiesCompletable .stream() .map { it.join() } .collect(toList()) } citiesCompletableFutureOfList } 復制代碼

使用了一個名為 CompletableFuture.allOf 的運算符,它返回一個“Void”類型,並且必須強制返回所需類型的 CompletableFuture<List<City>>

5. 使用 Reactor 項目

Project ReactorReactive Streams 規范的實現。它有兩種特殊類型可以返回 0/1 項的流和 0/n 項的流 - 前者是 Mono,后者是 Flux。

Project Reactor 提供了一組非常豐富的運算符,允許以各種方式轉換數據流。首先考慮返回城市 ID 列表的函數:

private fun getCityIds(): Flux<Long> { return webClient.get() .uri("/cityids") .exchange() .flatMapMany { response -> LOGGER.info("Received cities..") response.bodyToFlux<Long>() } } 復制代碼

我正在使用 Spring 優秀的 WebClient 庫進行遠程調用並獲得 Project Reactor Mono <ClientResponse> 類型的響應,可以使用 flatMapMany 運算符將其修改為 Flux<Long> 類型。

根據城市 ID,沿着同樣的路線獲取城市的詳情:

private fun getCityDetail(cityId: Long?): Mono<City> { return webClient.get() .uri("/cities/{id}", cityId!!) .exchange() .flatMap { response -> val city: Mono<City> = response.bodyToMono() LOGGER.info("Received city..") city } } 復制代碼

在這里,Project Reactor Mono<ClientResponse> 類型正在使用 flatMap 運算符轉換為 Mono<City> 類型。

以及從中獲取 cityIds,這是 City 的代碼:

val cityIdsFlux: Flux<Long> = getCityIds() val citiesFlux: Flux<City> = cityIdsFlux .flatMap { this.getCityDetail(it) } return citiesFlux 復制代碼

這非常具有表現力 - 對比基於回調的方法的混亂和基於 Reactive Streams 的方法的簡單性。

6. 結束語

在我看來,這是使用基於反應流的方法的最大原因之一,特別是 Project Reactor,用於涉及跨越異步邊界的場景,例如在此實例中進行遠程調用。它清理了回調和回調的混亂,提供了一種使用豐富的運算符進行修改/轉換類型的自然方法。

本文使用的所有示例的工作版本的存儲庫都可以在 GitHub 上找到。

原文:dzone.com/articles/ca…

作者:Biju Kunjummen

譯者:Emma

 


作者:鍋外的大佬
鏈接:https://juejin.im/post/5d1579aee51d45772a49ad77
來源:掘金
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM