響應式編程筆記三:一個簡單的HTTP服務器


# 響應式編程筆記三:一個簡單的HTTP服務器

本文我們將繼續前面的學習,但將更多的注意力放在用例和編寫實際能用的代碼上面,而非基本的APIs學習。

我們會看到Reactive是一個有用的抽象 - 對於並發編程來說 - 但它還有一些非常低級別的特性,應該引起我們的注意。

如果我們開始使用這些功能,挖掘其全部潛能,那我們可以控制我們應用中的layers - 那些之前不可見的、被容器|平台|框架隱藏起來的layers!

 

## Bridging from Blocking to Reactive with Spring MVC | 在Spring MVC中,將阻塞式橋接到響應式

響應式會強迫你用不同的眼光去看待世界。

不再是請求和獲取(也許沒有獲取到),所有的東西都作為一個`sequence(Publisher)`被投遞過來,當然你必須先`subscribe`。

不再是一直等待一個響應,你必須注冊一個`callback`。

當你習慣了的時候,就變得很輕松,但是,除非整個世界都變成了響應式的,不然你還是需要與舊式的阻塞式API打交道。

 

假定我們有一個阻塞式方法,會返回一個 `HttpStatus`:

private RestTemplate restTemplate = new RestTemplate();

private HttpStatus block(int value) {
    return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
            .getStatusCode();
}

 

我們希望以不同的參數來調用它,並聚合所有的結果。這是一個典型的 **分散-聚集 (scatter-gather)** 用例,假定每次請求你都獲得了一個分頁結果,但最后需要得到所有結果的 top N。由於阻塞式操作與 scatter-gather模式無關,我們將其放到一個 `block()`方法中,並在稍后實現它。現在,先來看一個**壞例子**

Flux.range(1, 10) //(1)
    .log()
    .map(this::block) //(2)
    .collect(Result::new, Result::add) //(3)
    .doOnSuccess(Result::stop) //(4)

1. 發起請求的次數

2. 阻塞式代碼

3. 收集結果,並聚合到一個獨立的對象中

4. 最后結束(結果是 `Mono<Result>`)

不要學習這個壞例子,因為,雖然APIs用的沒問題,但仍然會阻塞住線程!

這個例子和直接循環發起請求沒什么不同。

 

更好的實現 則應該將對`block()`的調用放到一個后台線程中。例如:

private Mono<HttpStatus> fetch(int value) {
    return Mono.fromCallable(()->block(value))  //(1)
        .subscribeOn(this.scheduler);           //(2)
}

1. 阻塞式代碼現在位於Callable中,會延遲執行。

2. 在后台線程訂閱。

 

`scheduler` 需要另外聲明:

Scheduler scheduler = Schedulers.parallel();

然后,我們就可以使用`flatMap()`,而非`map()`:

Flux.range(1, 10)
    .log()
    .flatMap(                           //(1)
        this::fetch, 4)                 //(2)
    .collect(Result::new, Result::add)
    .doOnSuccess(Result::stop);

1. 使用新的publisher來並行處理

2. flatMap 的並發量

 

## Embedding in a Non-Reactive Server 內置一個非響應式服務器

如果你想在一個非響應式服務器中運行上面的代碼,可以使用Spring MVC:

@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
    return Flux.range(1, 10)
        .log()
        .flatMap(this::fetch, 4)
        .doOnSuccess(Result::stop)
        .toFuture();
}

 

如果你讀過`@RequestMapping`的Javadocs,那你會發現其方法可以返回一個`CompletableFuture`,這樣,應用會使用這個返回來生成一個真正的返回值 - 在另一個線程中。本例中的這個"另一個線程"是由"scheduler"提供的, scheduler是一個線程池,所以真正的處理是多線程的,上面的代碼,會4線程並發!

 

## No Such Thing as a Free Lunch | 沒有免費的午餐

雖然上面 用后台線程運行 scatter-gather 代碼 是一個有用的模式,但它仍不夠完美 - 雖然沒有阻塞調用者,但阻塞了別的,就是說,它只是轉移了問題。

我們有一個HTTP服務器,可能會帶有NIO handlers,將工作傳回到一個線程池,每個線程處理一個HTTP request - 所有的這些都發生在Servlet容器內部。

request是被異步處理的,因此Tomcat的worker線程沒有被阻塞,scheduler中創建的線程池會用4線程來處理。

我們在處理10個后端請求( 對 `block()`的調用),因此,使用scheduler會有一個最大的、理論的受益,就是降低4倍延遲。

換句話說,如果在一個線程中處理需要 1000ms的話,那現在可能只需要 250ms了。

注意,這里只是可能:只有在沒有競爭的情況下才會那么快。

提示:tomcat默認分配了100個線程來處理HTTP請求。如果所有的請求都經過我們的scheduler線程池的話,那完全超出了線程池的容量。這是一個完全錯誤的搭配:scheduler線程池可能是一個瓶頸!這意味着性能調優會非常艱難,你可能調整了所有配置,然后達到一個很脆弱的平衡 - 隨時可能被破壞。

Tomcat allocates 100 threads for processing HTTP requests by default. That is excessive if we know all the processing is going to be on our scheduler thread pool. There is an impedance mismatch: the scheduler thread pool can be a bottleneck because it has fewer threads than the upstream Tomcat thread pool. This highlights the fact that performance tuning can be very hard, and, while you might have control of all the configuration, it’s a delicate balance.

我們可以使用彈性的線程池,而非固定的。 這對Reactor來說非常簡單,只要使用 `Schedulers.elastic()`即可 - 可以多次調用,但只會有一個實例!

 

## Reactive all the Way Down

從阻塞式到響應式的橋接是一個有用的模式,且很容易在Spring MVC中實現。

下一步就是完全地干掉應用線程中的阻塞式,這得使用新的APIs和新的工具。

極限就是完全響應式,從服務器到客戶端。這是Spring Reactive的目標!

Spring Reactive是一個新的框架,與Spring MVC是完全不同的方向,但會實現同樣的需求,並使用相似的編程模型。

 

>注意,Spring Reactive開始是一個單獨的項目,但已經被打包進Spring Framework了,版本5 。

 

還是拿前面的 scatter-gather例子來說,如果想全響應式,那第一步就是使用`spring-boot-starter-web-reactive`來代替`spring-boot-starter-web`。

org.springframework.boot.experimentalspring-boot-starter-web-reactive ...org.springframework.boot.experimentalspring-boot-dependencies-web-reactive0.1.0.M1pomimport

>注意,上面的版本可能已過時,請自行查找新版本。

<dependencies>
  <dependency>
    <groupId>org.springframework.boot.experimental</groupId>
    <artifactId>spring-boot-starter-web-reactive</artifactId>
  </dependency>
  ...
</dependencies>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot.experimental</groupId>
            <artifactId>spring-boot-dependencies-web-reactive</artifactId>
            <version>0.1.0.M1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

 

然后,在controller中,只需要這樣:

@RequestMapping("/parallel")
public Mono<Result> parallel(){
    return Flux.range(1, 10)
        .log()
        .flatMap(this::fetch, 4)
        .collect(Result::new, Result::add)
        .doOnSuccess(Result::stop);
}

在Spring Boot中運行,會自動在Tomcat、Jetty或Netty中運行。默認是Tomcat,如果有不同的選擇,需要先exclude tomcat!

 

我們仍然使用阻塞式后端調用 `block()`,因此,我們仍然需要`subscribe()`一個線程池來避免阻塞住調用者。

我們可以使用非阻塞式客戶端WebClient,而非RestTemplate!

private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());

private Mono<HttpStatus> fetch(int value){
    return this.client.perform(HttpRequestBuilders.get("http://example.com"))
               .extract(WebResponseExtractors.response(String.class)) 
               .map(response -> response.getStatusCode());
}

注意 `WebClient.perform()` (或者更確切的說是 `WebResponseExtractor` ) 會有一個響應式返回類型,我們已經將其轉換為 `Mono<HttpStatus>`,但我們沒有`subscribe`它。我們希望框架負責所有的訂閱,這樣我們就是全響應式了。

 

>警告:Spring Reactive中 那些返回`Publisher`的方法,都是非阻塞的!而普通方法返回`Publisher`(或`Flux`|`Mono`|`Observable`) 則可能是非阻塞的!如果你在編寫這樣的方法,最好認真分析、測試一下它們是否阻塞。

 

>注意:上面我們使用了一個非阻塞的客戶端來簡化了HTTP棧,同樣也可以在常規Spring MVC中工作。`fetch()`的結果可被轉換成一個 `CompletableFuture`,從常規的`@RequestMapping`方法中傳出。

 

## Inversion of Control | 控制反轉

現在,我們可以移除並發數量了:

@RequestMapping("/netty")
public Mono<Result> netty() {
    return Flux.range(1, 10)        //(1)
        .log()
        .flatMap(this::fetch)       //(2)
        .collect(Result::new, Result::add)
        .doOnSuccess(Result::stop);
}

1. 發起10個調用

2. 使用新的publisher來並發處理

 

現在我們不需要額外的 callable和 subscriber線程了,代碼簡潔了不要太多。

響應式`WebClient` 返回了一個 `Mono`,這會驅動我們在變形鏈中直接選擇`flatMap()`,然后就可以得到我們需要的code了!

優雅,可讀性更高,更易於維護!

還有,因為沒有了線程池和並發量,也就沒有了魔數 4

當然,還是有一個限制,但不會再影響我們在應用層(application tier)的選擇,也不會局限於服務器容器了。

這不是魔法,因為仍然是物理規律綜合的結果,所以,后端調用仍然會執行 100ms,但競爭很少 - 我們可能看到10個請求完全同時執行。

當服務器的負載增加了延遲時,吞吐率會自然地降級,而降級方式則由緩沖競爭和內核網絡治理,而非應用線程管理。

這是一種控制反轉,由底層控制,而非應用代碼控制。

 

請記住,同樣的應用代碼運行在Tomcat、Jetty或Netty中。

目前,Tomcat和Jetty的支持,是基於Servlet 3.1 異步處理之上的,所以受限於 一個請求一個線程。

當同樣的代碼運行在Netty服務器平台上時,就沒有這個限制了,服務器可以分派請求到web客戶端。

只要客戶端沒有阻塞,所有人都會高興。

netty服務器和客戶端的性能指標是類似的,但Netty服務器不受限於 一個線程處理一個請求,所以,它不使用大的線程池,我們可能期望看到一些不同的資源利用。

我們會在本系列的其他文章中討論它。

 

>提示:在[樣例代碼](https://github.com/dsyer/reactive-notes)中,reactive樣例支持的maven profiles有:tomcat、tomcatNext(for Tomcat 8.5)、jetty、netty。

>注意:很多應用中的阻塞式代碼不是HTTP后端調用,而是數據庫交互。目前很少有數據庫支持非阻塞式客戶端(MongoDB和Couchbase是禮物,但也不如HTTP客戶端成熟)。線程池和 blocking-to-reactive pattern會存在很長時間,一直到所有數據庫能夠跟上。

 

## Still No Free Lunch | 還是沒有免費的午餐

雖然到目前為止我們做的看起來都很好,但很快就會有一些錯誤發生,例如 表現惡劣的網絡連接、后端服務忍受嚴重的延遲。

 

首先,最明顯的就是我們寫的代碼都是聲明式的,所以很難調試。當錯誤發生時,診斷可能很模糊。使用原生的、低級別的APIs,例如不帶Spring的Reactor,或者沒有Reactor的Netty級別,可能會讓情況變得更糟,因為我們必須構建大量錯誤處理,每次與網絡交互,都要重復一些呆板的代碼。起碼,混合使用Spring和Reactor,我們可以看到棧追蹤記錄、未捕獲的異常。它們可能不是那么好理解,因為它們是發生在我們不能控制的線程中;有些時候,它們還給出一些非常低級的信息。

 

另一個痛苦之源則是,如果我們犯了錯誤,並阻塞在我們的響應式callbacks里,我們會停住(hold up)該線程里的**所有requests**。

在基於Servlet的容器中,每個request都被隔離到一個線程中,阻塞不會停住其他的requests,因為它們是在不同的線程上處理。

阻塞所有requests是麻煩的來源,但它僅會在延遲增加(見下面原文) 時出現。在響應式世界里,阻塞一個request會導致所有requests加大延遲,而阻塞所有requests則會讓服務器跪下來唱征服,因為沒有額外的緩沖層和線程去處理。

> Blocking all requests is still a recipe for trouble, but it only shows up as increased latency with roughly a constant factor per request.

 

## Conclusion | 結論

能夠控制異步處理中的每個移動部分,是一件很爽事:每一層都有一個線程池尺寸和一個隊列。

我們可以讓某層使用彈性的線程池,根據它們的工作去調整。

但同時,這也是一種負擔,我們開始尋找更簡單的或者更簡潔的。

大量分析的結論是,移除額外的線程,配合物理硬件的限制來使用,通常是一個更好的選擇。

This is an example of "mechanical sympathy", as is famously exploited by LMAX to great effect in the [Disruptor Pattern](https://lmax-exchange.github.io/disruptor/).

 

我們已經開始看到響應式的強大,但是請記住,強大伴隨着責任。

它是激進的,它也是基礎的。

它是"放下一切,從頭開始"的領域。

你可能希望看到響應式不是所有問題的解決方案。事實上它的確不是,它只是特定一類問題的解決方案。

你的收獲可能遠超學習、修改、維護的代價。

 

## 原文

https://spring.io/blog/2016/07/20/notes-on-reactive-programming-part-iii-a-simple-http-server-application


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM