接上一篇:實戰SpringCloud響應式微服務系列教程(第一章)
1.1.2背壓
背壓是響應式編程的核心概念,這一節也是我們了解響應式編程的重點。
1.背壓的機制
在生產者/消費者模型中,我們意識到消費者在消費由生產者生產的數據的同時,也需要有一種能夠向上游反饋流量需求的機制,這種能夠向上游反饋請求的機制稱之為背壓。
如下圖所示
現在我們從一個具體的角度來說背壓的概念。在1.1.1中我們了解了同步消費和異步消費,其中異步消費者會向生產者訂閱消費數據,當有新的數據可用時,消費者會通過之前訂閱時提供的回調函數激活調用過程。
如果生產者發出的數據比消費者能夠處理的數據量大而且快時,消費者可能會被迫一直再獲取或處理數據,消耗越來越多的資源,從而埋下潛在的風險。為了防止這一點,需要有一種機制,消費者可以通知生產者降低生產數據的速度。生產者可以通過多種方式來實現這一要求,這時候我們就會用到背壓機制。
采用背壓機制后,消費者告訴生產者降低生產數據速度並保存元素,知道消費者能夠處理更多的元素。使用背壓可以有效地避免過快的生產者壓制消費者。如果生產者要一直生產和保存元素,使用背壓也可能會要求其擁有無限制的緩沖區。生產者也可以實現有界緩沖區來保存有限數量的元素,如果緩沖區已滿可以選擇放棄。
2.背壓的實現方式
背壓的實現方式有兩種,一種是阻塞式背壓另一種是非阻塞式背壓。
1、阻塞式背壓
阻塞式背壓是比較容易實現的,例如:當生產者和消費者在同一個線程中運行時,其中任何一方都將阻塞其他線程的執行。這就意味着,當消費者被執行時,生產者就不能發出任何新的數據元素。因而也需要一中自然地方式來平衡生產數據和消費數據的過程。
在有些情況下,阻塞式背壓會出現不良的問題,比如:當生產者有多個消費者時,不是所有消費者都能以同樣的速度消費消息。當消費者和生產者在不同環境中運行時,這就達不到降壓的目的了。
2、非阻塞式背壓
背壓機制應該以非阻塞式的方式工作,實現非阻塞式背壓的方法是放棄推策略,采用拉策略。生產者發送消息給消費者等操作都可以保存在拉策略當中,消費者會要求生產者生成多少消息量,而且最多只能發送這些量,然后等到更多消息的請求。(關於推策略和拉策略請回顧1.1.1中的流的概念)
1.1.3 響應式流
響應式編程的另外一個核心概念就是響應式流。響應式流是一種規范,這種規范表現在技術上就是一批被預先定義好的接口。
1.響應式流規范
響應式流規范是提供非阻塞背壓的異步流處理標准倡議的。響應式流的目標是定義將數據流從生產者傳遞到消費者而不需要生產者阻塞。在響應式流模型中,消費者向生產者發送多個元素的異步請求,然后生產者向消費者發送合適數量的數據。
各個響應式開發庫都要遵循響應式流規范,采用規范的好處是顯而易見的。由於各個響應式都遵循一套規范,因而互相兼容,不同的開發庫之間也是可以進行交流的。甚至可以在同一個項目中使用多個開發庫。而Spring WebFlux響應式web是采用Reactor框架來實現的。(其他開發庫可百度了解這里我們只探討reactor)。
雖然響應式流規范用來約束響應式開發庫的實現方式的,但是作為使用者而言,能夠了解這一規范,對我們了解使用開發庫的方法和基本原理很有幫助,因為規范內容都是對響應式編程思想的精髓呈現。
2.響應式流接口
Java API響應式流有4個接口,即Publisher<T>
、Subscription
、Subscriber<T>
和Processor<T,R>
。
1、Publisher<T>
發布者(Publisher)是潛在的包含無限數量的有序元素的生產者,他根據收到的請求向當前訂閱者發送元素。接口定義如下:
public interface Publisher<T>{ public void subscribe (Subscriber<? super T> s); }
2、Subscriber<T>
訂閱者從發布這那里訂閱並且接收元素。發布者想訂閱者發送訂閱令牌。使用訂閱令牌,訂閱者向發布者請求多個元素。當元素准備就緒時,發布者就會向訂閱者發送合適數量的元素。然后訂閱者可以請求更多的元素,發布者也可能有多個來自訂閱者的待處理請求。接口定義如下:
public interface Subscriber<T>{ public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
當執行發布者的subscribe()
方法時,發布者會回調訂閱者的onSubscribe()
方法。在這個方法中訂閱者通常會借助傳入的Subscription 對象向發布者請求n個數據。然后發布者通過不斷調用onNext()
方法想訂閱者發出最多n個數據。如果數據全部發送完畢,就會調用onComplete()
方法告知訂閱者所需要的n個數據已經發送完畢。如果有錯誤發生就會通過調用onError()
方法發出錯誤數據,這同樣也會終止數據流。
3、Subscription
訂閱(Subscription )表示訂閱者訂閱的一個發布者的令牌,當訂閱請求成功時,發布者將其傳遞給訂閱者,訂閱者使用訂閱令牌與發布者進行交互,比如:請求更多的元素或取消訂閱。接口定義如下:
public interface Subscription{ public void request(Long n); public void cancel(); }
當發布者調用subscribe()
方法注冊訂閱者時,會通過訂閱者的回調方法onSubscribe()
傳入Subscription對象,之后訂閱者就可以使用Subscription對象的request()
方法向發布者請求數據了。
Publisher<T>
、Subscription
、Subscriber<T>
三者的交互如下:
4、Processor<T,R>
處理器(Processor)充當訂閱者和發布者之間的處理媒介,Processor
接口繼承了Publisher
和Subscribe
接口,它用於轉換發布者/訂閱者管道中的元素。
Processor<T,R>
訂閱類型T的數據元素,接收並轉換為R的數據,然后發布該數據。處理器在發布者/訂閱者管道中充當轉換器的角色。接口定義如下:
public interface Processor<T,R> extends Subscriber<T>,Publisher<R>{ }
Processor
集Subscriber
和Publisher
於一身,三者之間的關系如下所示:
這四個接口是實現各個響應式開發庫之間互相兼容的橋梁,響應式流規范也僅僅聚焦於此,而對於轉換、合並、分組等操作並未做要求。也是一個非常抽象且精簡的接口規范。