文章很長,而且持續更新,建議收藏起來,慢慢讀!瘋狂創客圈總目錄 博客園版 為您奉上珍貴的學習資源 :
免費贈送 :《尼恩Java面試寶典》 持續更新+ 史上最全 + 面試必備 2000頁+ 面試必備 + 大廠必備 +漲薪必備
免費贈送 經典圖書:《Java高並發核心編程(卷1)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高並發核心編程(卷2)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高並發核心編程(卷3)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《尼恩Java面試寶典 最新版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 資源寶庫: Java 必備 百度網盤資源大合集 價值>10000元 加尼恩領取
前言
響應式編程用的是越來越多,尤其是在移動端 安卓的應用上邊。
在Java后台服務開發中, 響應式編程用的不是太廣泛,主要原因是, 響應式編程需要一個完整的生態, 包括數據庫、緩存、中間件,都需要配套的響應式組件。 但是這點,其實很多並沒有。
但是,隨着 SpringCloud Gateway 的火爆, 響應式編程又變成了 不可回避, 不得不去學習的技術。
如果要做 SpringCloud Gateway 的開發, 就必須掌握一些響應式編程的知識。 由於最近在做 spring cloud gateway相關的開發工作, 所以:
把響應式編程Flux 和 Mono 的知識梳理一下,形成了此文。
並且,此文會不斷完善。
姊妹篇:關於 SpringCloud Gateway 簡介
SpringCloud Gateway 是 Spring Cloud 的一個全新項目,該項目是基於 Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技術開發的網關,它旨在為微服務架構提供一種簡單有效的統一的 API 路由管理方式。
SpringCloud Gateway 作為 Spring Cloud 生態系統中的網關,目標是替代 Zuul,在Spring Cloud 2.0以上版本中,沒有對新版本的Zuul 2.0以上最新高性能版本進行集成,仍然還是使用的Zuul 2.0之前的非Reactor模式的老版本。而為了提升網關的性能,SpringCloud Gateway是基於WebFlux框架實現的,而WebFlux框架底層則使用了高性能的Reactor模式通信框架Netty。
Spring Cloud Gateway 的目標,不僅提供統一的路由方式,並且基於 Filter 鏈的方式提供了網關基本的功能,例如:安全,監控/指標,和限流。
有關 Spring Cloud Gateway 響應式編程的實戰, 具體請參考本文姊妹篇:
特別說明:
Spring Cloud Gateway 底層使用了高性能的通信框架Netty。
Netty 是高性能中間件的通訊底座, rocketmq 、seata、nacos 、sentinel 、redission 、dubbo 等太多、太多的的大名鼎鼎的中間件,無一例外都是基於netty。
可以毫不誇張的說: netty 是進入大廠、走向高端 的必備技能。
要想深入了解springcloud gateway ,最好是掌握netty 編程。
有關 netty學習 具體請參見機工社出版 、尼恩的暢銷書: 《Java高並發核心編程卷 1》
響應式編程概述
背景知識
為了應對高並發服務器端開發場景,在2009 年,微軟提出了一個更優雅地實現異步編程的方式——Reactive Programming,我們稱之為響應式編程。隨后,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技術,使得Java 平台也有了能夠實現響應式編程的框架。
在2017 年9 月28 日,Spring 5 正式發布。Spring 5 發布最大的意義在於,它將響應式編程技術的普及向前推進了一大步。而同時,作為在背后支持Spring 5 響應式編程的框架Spring Reactor,也進入了里程碑式的3.1.0 版本。
什么是響應式編程
響應式編程是一種面向數據流和變化傳播的編程范式。這意味着可以在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值通過數據流進行傳播。
響應式編程基於reactor(Reactor 是一個運行在 Java8 之上的響應式框架)的思想,當你做一個帶有一定延遲的才能夠返回的io操作時,不會阻塞,而是立刻返回一個流,並且訂閱這個流,當這個流上產生了返回數據,可以立刻得到通知並調用回調函數處理數據。
電子表格程序就是響應式編程的一個例子。單元格可以包含字面值或類似"=B1+C1"的公式,而包含公式的單元格的值會依據其他單元格的值的變化而變化。
響應式傳播核心特點之一:變化傳播:一個單元格變化之后,會像多米諾骨牌一樣,導致直接和間接引用它的其他單元格均發生相應變化。
基於Java8實現觀察者模式
Observable類:此類表示可觀察對象,或模型視圖范例中的“數據”。
它可以被子類實現以表示應用程序想要觀察的對象。
//想要觀察的對象 ObserverDemo
public class ObserverDemo extends Observable {
public static void main(String[] args) {
ObserverDemo observerDemo = new ObserverDemo();
//添加觀察者
observerDemo.addObserver((o,arg)->{
System.out.println("數據發生變化A");
});
observerDemo.addObserver((o,arg)->{
System.out.println("數據發生變化B");
});
observerDemo.setChanged();//將此Observable對象標記為已更改
observerDemo.notifyObservers();//如果該對象發生了變化,則通知其所有觀察者
}
}
啟動程序測試:
創建一個Observable
rxjava中,可以使用Observable.create() 該方法接收一個Obsubscribe對象
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
}
});
來個例子:
Observable<Integer> observable=Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0;i<5;i++){
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
//Observable.subscribe(Observer),Observer訂閱了Observable
Subscription subscribe = observable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "完成");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "異常");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "接收Obsverable中發射的值:" + integer);
}
});
輸出:
接收Obsverable中發射的值:0
接收Obsverable中發射的值:1
接收Obsverable中發射的值:2
接收Obsverable中發射的值:3
接收Obsverable中發射的值:4
從上面的例子可以看出,在Observer訂閱了Observable后,
Observer作為OnSubscribe中call方法的參數傳入,從而調用了Observer的相關方法
基於 Reactor 實現
Reactor 是一個運行在 Java8 之上滿足 Reactice 規范的響應式框架,它提供了一組響應式風格的 API。
Reactor 有兩個核心類: Flux<T>
和 Mono<T>
,這兩個類都實現 Publisher 接口。
- Flux 類似 RxJava 的 Observable,它可以觸發零到多個事件,並根據實際情況結束處理或觸發錯誤。
- Mono 最多只觸發一個事件,所以可以把 Mono 用於在異步任務完成時發出通知。
Flux 和 Mono 都是數據流的發布者,使用 Flux 和 Mono 都可以發出三種數據信號:元素值,錯誤信號,完成信號;錯誤信號和完成信號都代表終止信號,終止信號用於告訴訂閱者數據流結束了,錯誤信號終止數據流同時把錯誤信息傳遞給訂閱者。
三種信號的特點:
- 錯誤信號和完成信號都是終止信號,不能共存
- 如果沒有發送任何元素值,而是直接發送錯誤或者完成信號,表示是空數據流
- 如果沒有錯誤信號,也沒有完成信號,表示是無限數據流
引入依賴
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>1.1.6.RELEASE</version>
</dependency>
just 和 subscribe方法
just():創建Flux序列,並聲明指定數據流
subscribe():訂閱Flux序列,只有進行訂閱后才回觸發數據流,不訂閱就什么都不會發生
public class TestReactor {
public static void main(String[] args) {
//just():創建Flux序列,並聲明數據流,
Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4);//整形
//subscribe():訂閱Flux序列,只有進行訂閱后才回觸發數據流,不訂閱就什么都不會發生
integerFlux.subscribe(System.out::println);
Flux<String> stringFlux = Flux.just("hello", "world");//字符串
stringFlux.subscribe(System.out::println);
//fromArray(),fromIterable()和fromStream():可以從一個數組、Iterable 對象或Stream 對象中創建Flux序列
Integer[] array = {1,2,3,4};
Flux.fromArray(array).subscribe(System.out::println);
List<Integer> integers = Arrays.asList(array);
Flux.fromIterable(integers).subscribe(System.out::println);
Stream<Integer> stream = integers.stream();
Flux.fromStream(stream).subscribe(System.out::println);
}
}
啟動測試:
響應流的特點
要搞清楚這兩個概念,必須說一下響應流規范。它是響應式編程的基石。他具有以下特點:
-
響應流必須是無阻塞的。
-
響應流必須是一個數據流。
-
它必須可以異步執行。
-
並且它也應該能夠處理背壓。
-
即時響應性: 只要有可能, 系統就會及時地做出響應。 即時響應是可用性和實用性的基石, 而更加重要的是,即時響應意味着可以快速地檢測到問題並且有效地對其進行處理。 即時響應的系統專注於提供快速而一致的響應時間, 確立可靠的反饋上限, 以提供一致的服務質量。 這種一致的行為轉而將簡化錯誤處理、 建立最終用戶的信任並促使用戶與系統作進一步的互動。
-
回彈性:系統在出現失敗時依然保持即時響應性。 這不僅適用於高可用的、 任務關鍵型系統——任何不具備回彈性的系統都將會在發生失敗之后丟失即時響應性。 回彈性是通過復制、 遏制、 隔離以及委托來實現的。 失敗的擴散被遏制在了每個組件內部, 與其他組件相互隔離, 從而確保系統某部分的失敗不會危及整個系統,並能獨立恢復。 每個組件的恢復都被委托給了另一個(外部的)組件, 此外,在必要時可以通過復制來保證高可用性。 (因此)組件的客戶端不再承擔組件失敗的處理。
-
彈性: 系統在不斷變化的工作負載之下依然保持即時響應性。 反應式系統可以對輸入(負載)的速率變化做出反應,比如通過增加或者減少被分配用於服務這些輸入(負載)的資源。 這意味着設計上並沒有爭用點和中央瓶頸, 得以進行組件的分片或者復制, 並在它們之間分布輸入(負載)。 通過提供相關的實時性能指標, 反應式系統能支持預測式以及反應式的伸縮算法。 這些系統可以在常規的硬件以及軟件平台上實現成本高效的彈性。
-
消息驅動:反應式系統依賴異步的消息傳遞,從而確保了松耦合、隔離、位置透明的組件之間有着明確邊界。 這一邊界還提供了將失敗作為消息委托出去的手段。 使用顯式的消息傳遞,可以通過在系統中塑造並監視消息流隊列, 並在必要時應用回壓, 從而實現負載管理、 彈性以及流量控制。 使用位置透明的消息傳遞作為通信的手段, 使得跨集群或者在單個主機中使用相同的結構成分和語義來管理失敗成為了可能。 非阻塞的通信使得接收者可以只在活動時才消耗資源, 從而減少系統開銷。
Publisher/Flux和Mono
由於響應流的特點,我們不能再返回一個簡單的POJO對象來表示結果了。必須返回一個類似Java中的Future
的概念,在有結果可用時通知消費者進行消費響應。
Reactive Stream規范中這種被定義為Publisher
Publisher
Subscriber<? super T>
的需求推送元素。
一個Publisher
下面這個Excel計算就能說明一些Publisher
A1-A9就可以看做Publisher
A10-A13分別是求和函數SUM(A1:A9)
、平均函數AVERAGE(A1:A9)
、最大值函數MAX(A1:A9)
、最小值函數MIN(A1:A9)
,
A10-A13可以看作訂閱者Subscriber
。
假如說我們沒有A10-A13,那么A1-A9就沒有實際意義,它們並不產生計算。
這也是響應式的一個重要特點:當沒有訂閱時發布者什么也不做。而Flux和Mono都是Publisher
Publisher
subscribe
方法,允許消費者在有結果可用時進行消費。
如果沒有消費者Publisher
Publisher
Flux
Flux 是一個發出(emit)0-N
個元素組成的異步序列的Publisher
onComplete
信號或者
onError
信號所終止。
在響應流規范中存在三種給下游消費者調用的方法 onNext
, onComplete
, 和onError
。下面這張圖表示了Flux的抽象模型:
以上的的講解對於初次接觸反應式編程的依然是難以理解的,所以這里有一個循序漸進的理解過程。
有些類比並不是很妥當,但是對於你循序漸進的理解這些新概念還是有幫助的。
傳統數據處理
我們在平常是這么寫的:
public List<ClientUser> allUsers() {
return Arrays.asList(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
我們通過迭代返回值List
來get
這些元素進行再處理(消費),不管有沒有消費者, 菜品都會生產出來。
流式數據處理
在Java 8中我們可以改寫為流的表示:
public Stream<ClientUser> allUsers() {
return Stream.of(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
反應式數據處理
在Reactor中我們又可以改寫為Flux表示:
public Flux<ClientUser> allUsers(){
return Flux.just(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
這時候食客來了,發生了訂閱,廚師才開始做。
Flux 的創建Demo
Flux ints = Flux.range(1, 4);
Flux seq1 = Flux.just("bole1", "bole2", "bole3");
List iterable = Arrays.asList("bole_01", "bole_02", "bole_03");
Flux seq2 = Flux.fromIterable(iterable);
seq2.subscribe(i -> System.out.println(i));
Mono
Mono 是一個發出(emit)0-1
個元素的Publisher
onComplete
信號或者
onError
信號所終止。
mono 整體和Flux差不多,只不過這里只會發出0-1個元素。也就是說不是有就是沒有。
象Flux一樣,我們來看看Mono的演化過程以幫助理解。
傳統數據處理
public ClientUser currentUser () {
return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null;
}
直接返回符合條件的對象或者null`。
Optional的處理方式
public Optional<ClientUser> currentUser () {
return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive"))
: Optional.empty();
}
這個Optional我覺得就有反應式的那種味兒了,當然它並不是反應式。當我們不從返回值Optional取其中具體的對象時,我們不清楚里面到底有沒有,但是Optional是一定客觀存在的,不會出現NPE問題。
反應式數據處理
public Mono<ClientUser> currentUser () {
return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
: Mono.empty();
}
和Optional有點類似的機制,當然Mono不是為了解決NPE問題的,它是為了處理響應流中單個值(也可能是Void
)而存在的。
Mono的創建Demo
Mono data = Mono.just("bole");
Mono noData = Mono.empty();
m.subscribe(i -> System.out.println(i));
Flux和Mono總結
Flux和Mono是Java反應式中的重要概念,但是很多同學包括我在開始都難以理解它們。這其實是規定了兩種流式范式,這種范式讓數據具有一些新的特性,比如基於發布訂閱的事件驅動,異步流、背壓等等。另外數據是推送(Push)給消費者的以區別於平時我們的拉(Pull)模式。同時我們可以像Stream Api一樣使用類似map
、flatmap
等操作符(operator)來操作它們。
函數編程
反應式編程,常常和函數式編程結合,這就是讓大家困擾的地方
函數編程接口
接口函數名 | 說明 |
---|---|
BiConsumer | 表示接收兩個輸入參數和不返回結果的操作。 |
BiFunction | 表示接受兩個參數,並產生一個結果的函數。 |
BinaryOperator | 表示在相同類型的兩個操作數的操作,生產相同類型的操作數的結果。 |
BiPredicate | 代表兩個參數謂詞(布爾值函數)。 |
BooleanSupplier | 代表布爾值結果的提供者。 |
Consumer | 表示接受一個輸入參數和不返回結果的操作。 |
DoubleBinaryOperator | 代表在兩個double值操作數的運算,並產生一個double值結果。 |
DoubleConsumer | 表示接受一個double值參數,不返回結果的操作。 |
DoubleFunction | 表示接受double值參數,並產生一個結果的函數。 |
DoublePredicate | 代表一個double值參數謂詞(布爾值函數)。 |
DoubleSupplier | 表示表示接受double值參數,並產生一個結果的函數。值結果的提供者。 |
DoubleToIntFunction | 表示接受一個double值參數,不返回結果的操作。 |
DoubleFunction | 表示接受double值參數,並產生一個結果的函數。 |
DoublePredicate | 代表一個double值參數謂詞(布爾值函數)。 |
DoubleSupplier | DoubleToIntFunction |
DoubleToIntFunction | 表示接受double值參數,並產生一個int值結果的函數。 |
DoubleToLongFunction | 表示上產生一個double值結果的單個double值操作數的操作。 |
Function | 代表接受一個double值參數,並產生一個long值結果的函數。 |
DoubleUnaryOperator | 表示上產生一個double值結果的單個double值操作數的操作。 |
Function | 表示接受一個參數,並產生一個結果的函數。 |
IntConsumer | 表示接受單個int值的參數並沒有返回結果的操作。 |
IntFunction | 表示接受一個int值參數,並產生一個結果的函數。 |
IntPredicate | 表示一個整數值參數謂詞(布爾值函數)。 |
IntSupplier | 代表整型值的結果的提供者。 |
IntToLongFunction | 表示接受一個int值參數,並產生一個long值結果的函數。 |
IntUnaryOperator | 表示產生一個int值結果的單個int值操作數的運算。 |
LongBinaryOperator | 表示在兩個long值操作數的操作,並產生一個ObjLongConsumer值結果。 |
LongFunction | 表示接受long值參數,並產生一個結果的函數。 |
LongPredicate | 代表一個long值參數謂詞(布爾值函數)。 |
LongSupplier | 表示long值結果的提供者。 |
LongToDoubleFunction | 表示接受double參數,並產生一個double值結果的函數。 |
LongToIntFunction | 表示接受long值參數,並產生一個int值結果的函數。 |
LongUnaryOperator | 表示上產生一個long值結果單一的long值操作數的操作。 |
ObjDoubleConsumer | 表示接受對象值和double值參數,並且沒有返回結果的操作。 |
ObjIntConsumer | 表示接受對象值和整型值參數,並返回沒有結果的操作。 |
ObjLongConsumer | 表示接受對象值和整型值參數,並返回沒有結果的操作。 |
ObjLongConsumer | 表示接受對象值和double值參數,並且沒有返回結果的操作。 |
ObjIntConsumer | 表示接受對象值和整型值參數,並返回沒有結果的操作。 |
ObjLongConsumer | 表示接受對象的值和long值的說法,並沒有返回結果的操作。 |
Predicate | 代表一個參數謂詞(布爾值函數)。 |
Supplier | 表示一個提供者的結果。 |
ToDoubleBiFunction | 表示接受兩個參數,並產生一個double值結果的功能。 |
ToDoubleFunction | 代表一個產生一個double值結果的功能。 |
ToIntBiFunction | 表示接受兩個參數,並產生一個int值結果的函數。 |
ToIntFunction | 代表產生一個int值結果的功能。 |
ToLongBiFunction | 表示接受兩個參數,並產生long值結果的功能。 |
ToLongFunction | 代表一個產生long值結果的功能。 |
UnaryOperator | 表示上產生相同類型的操作數的結果的單個操作數的操作。 |
常用函數編程示例
Consumer 一個輸入 無輸出
Product product=new Product();
//類名+靜態方法 一個輸入T 沒有輸出
Consumer consumer1 = Product->Product.nameOf(product);//lambda
consumer1.accept(product);
Consumer consumer = Product::nameOf;//方法引用
consumer.accept(product);
Funtion<T,R> 一個輸入 一個輸出
//對象+方法 一個輸入T 一個輸出R
Function<Integer, Integer> function = product::reduceStock;
System.out.println("剩余庫存:" + function.apply(10));
//帶參數的構造函數
Function<Integer,Product> function1=Product::new;
System.out.println("新對象:" +function1.apply(200));
Predicate 一個輸入T, 一個輸出 Boolean
//Predicate 一個輸入T 一個輸出Boolean
Predicate predicate= i -> product.isEnough(i);//lambda
System.out.println("庫存是否足夠:"+predicate.test(100));
Predicate predicate1= product::isEnough;//方法引用
System.out.println("庫存是否足夠:"+predicate1.test(100));
UnaryOperator 一元操作符 輸入輸出都是T
//一元操作符 輸入和輸出T
UnaryOperator integerUnaryOperator =product::reduceStock;
System.out.println("剩余庫存:" + integerUnaryOperator.apply(20));
IntUnaryOperator intUnaryOperator = product::reduceStock;
System.out.println("剩余庫存:" + intUnaryOperator.applyAsInt(30));
Supplier 沒有輸入 只有輸出
//無參數構造函數
Supplier supplier = Product::new;
System.out.println("創建新對象:" + supplier.get());
Supplier supplier1=()->product.getStock();
System.out.println("剩余庫存:" + supplier1.get());
BiFunction 二元操作符 兩個輸入<T,U> 一個輸出
//類名+方法
BiFunction<Product, Integer, Integer> binaryOperator = Product::reduceStock;
System.out.println(" 剩余庫存(BiFunction):" + binaryOperator.apply(product, 10));
BinaryOperator 二元操作符 ,二個輸入 一個輸出
//BinaryOperator binaryOperator1=(x,y)->product.reduceStock(x,y);
BinaryOperator binaryOperator1=product::reduceStock;
System.out.println(" 剩余庫存(BinaryOperator):" +binaryOperator1.apply(product.getStock(),10));
Flux類中的靜態方法:
簡單的創建方法
just():
可以指定序列中包含的全部元素。創建出來的Flux序列在發布這些元素之后會自動結束
fromArray(),fromIterable(),fromStream():
可以從一個數組,Iterable對象或Stream對象中穿件Flux對象
empty():
創建一個不包含任何元素,只發布結束消息的序列
error(Throwable error):
創建一個只包含錯誤消息的序列
never():
傳建一個不包含任務消息通知的序列
range(int start, int count):
創建包含從start起始的count個數量的Integer對象的序列
interval(Duration period)和interval(Duration delay, Duration period):
創建一個包含了從0開始遞增的Long對象的序列。其中包含的元素按照指定的間隔來發布。除了間隔時間之外,還可以指定起始元素發布之前的延遲時間
intervalMillis(long period)和intervalMillis(long delay, long period):
與interval()方法相同,但該方法通過毫秒數來指定時間間隔和延遲時間
例子
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[]{1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscirbe(System.out::println);
復雜的序列創建 generate()
當序列的生成需要復雜的邏輯時,則應該使用generate()或create()方法。
generate()方法通過同步和逐一的方式來產生Flux序列。
序列的產生是通過調用所提供的的SynchronousSink對象的next(),complete()和error(Throwable)方法來完成的。
逐一生成的含義是在具體的生成邏輯中,next()方法只能最多被調用一次。
在某些情況下,序列的生成可能是有狀態的,需要用到某些狀態對象,此時可以使用
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator),
其中stateSupplier用來提供初始的狀態對象。
在進行序列生成時,狀態對象會作為generator使用的第一個參數傳入,可以在對應的邏輯中對改狀態對象進行修改以供下一次生成時使用。
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if( list.size() ==10 )
sink.complete();
return list;
}).subscribe(System.out::println);
復雜的序列創建 create()
create()方法與generate()方法的不同之處在於所使用的是FluxSink對象。
FluxSink支持同步和異步的消息產生,並且可以在一次調用中產生多個元素。
Flux.create(sink -> {
for(int i = 0; i < 10; i ++)
sink.next(i);
sink.complete();
}).subscribe(System.out::println);
Mono靜態方法
Mono類包含了與Flux類中相同的靜態方法:just(),empty()和never()等。
除此之外,Mono還有一些獨有的靜態方法:
fromCallable(),fromCompletionStage(),fromFuture(),fromRunnable()和fromSupplier():分別從Callable,CompletionStage,CompletableFuture,Runnable和Supplier中創建Mono
delay(Duration duration)和delayMillis(long duration):創建一個Mono序列,在指定的延遲時間之后,產生數字0作為唯一值
ignoreElements(Publisher
justOrEmpty(Optional<? extends T> data)和justOrEmpty(T data):從一個Optional對象或可能為null的對象中創建Mono。只有Optional對象中包含之或對象不為null時,Mono序列才產生對應的元素
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribte(System.out::println);
操作符
操作符buffer和bufferTimeout
這兩個操作符的作用是把當前流中的元素收集到集合中,並把集合對象作為流中的新元素。
在進行收集時可以指定不同的條件:所包含的元素的最大數量或收集的時間間隔。方法buffer()僅使用一個條件,而bufferTimeout()可以同時指定兩個條件。
指定時間間隔時可以使用Duration對象或毫秒數,即使用bufferMillis()或bufferTimeoutMillis()兩個方法。
除了元素數量和時間間隔外,還可以通過bufferUntil和bufferWhile操作符來進行收集。這兩個操作符的參數時表示每個集合中的元素索要滿足的條件的Predicate對象。
bufferUntil會一直收集直到Predicate返回true。
使得Predicate返回true的那個元素可以選擇添加到當前集合或下一個集合中;bufferWhile則只有當Predicate返回true時才會收集。一旦為false,會立即開始下一次收集。
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i%2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i%2 == 0).subscribe(System.out::println);
操作符Filter
對流中包含的元素進行過濾,只留下滿足Predicate指定條件的元素。
Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println);
操作符zipWith
zipWith操作符把當前流中的元素與另一個流中的元素按照一對一的方式進行合並。在合並時可以不做任何處理,由此得到的是一個元素類型為Tuple2的流;也可以通過一個BiFunction函數對合並的元素進行處理,所得到的流的元素類型為該函數的返回值。
Flux.just("a", "b").zipWith(Flux.just("c", "d")).subscribe(System.out::println);
Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println);
操作符take
take系列操作符用來從當前流中提取元素。提取方式如下:
take(long n),take(Duration timespan)和takeMillis(long timespan):按照指定的數量或時間間隔來提取
takeLast(long n):提取流中的最后N個元素
takeUntil(Predicate<? super T> predicate) :提取元素直到Predicate返回true
takeWhile(Predicate<? super T> continuePredicate):當Predicate返回true時才進行提取
takeUntilOther(Publisher<?> other):提取元素知道另外一個流開始產生元素
Flux.range(1, 1000).take(10).subscribe(System.out::println);
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
操作符reduce和reduceWith
reduce和reduceWith操作符對流中包含的所有元素進行累計操作,得到一個包含計算結果的Mono序列。累計操作是通過一個BiFunction來表示的。在操作時可以指定一個初始值。若沒有初始值,則序列的第一個元素作為初始值。
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x + y) -> x + y).subscribe(System.out::println);
操作符merge和mergeSequential
merge和mergeSequential操作符用來把多個流合並成一個Flux序列。merge按照所有流中元素的實際產生序列來合並,而mergeSequential按照所有流被訂閱的順序,以流為單位進行合並。
Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
操作符flatMap和flatMapSequential
flatMap和flatMapSequential操作符把流中的每個元素轉換成一個流,再把所有流中的元素進行合並。flatMapSequential和flatMap之間的區別與mergeSequential和merge是一樣的。
Flux.just(5, 10).flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x)).toStream().forEach(System.out::println);
操作符concatMap
concatMap操作符的作用也是把流中的每個元素轉換成一個流,再把所有流進行合並。concatMap會根據原始流中的元素順序依次把轉換之后的流進行合並,並且concatMap堆轉換之后的流的訂閱是動態進行的,而flatMapSequential在合並之前就已經訂閱了所有的流。
Flux.just(5, 10).concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x)).toStream().forEach(System.out::println);
操作符combineLatest
combineLatest操作符把所有流中的最新產生的元素合並成一個新的元素,作為返回結果流中的元素。只要其中任何一個流中產生了新的元素,合並操作就會被執行一次,結果流中就會產生新的元素。
Flux.combineLatest(Arrays::toString, Flux.intervalMillis(100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
消息處理
當需要處理Flux或Mono中的消息時,可以通過subscribe方法來添加相應的訂閱邏輯。
在調用subscribe方法時可以指定需要處理的消息類型。
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).subscribe(System.out::println, System.err::println);
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).onErrorReturn(0).subscribe(System.out::println);
第2種可以通過switchOnError()方法來使用另外的流來產生元素。
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).switchOnError(Mono.just(0)).subscribe(System.out::println);
第三種是通過onErrorResumeWith()方法來根據不同的異常類型來選擇要使用的產生元素的流。
Flux.just(1, 2).concatWith(Mono.error(new IllegalArgumentException())).onErrorResumeWith(e -> {
if(e instanceof IllegalStateException)
return Mono.just(0);
else if(e instanceof IllegalArgumentException)
return Mono.just(-1);
return Mono.epmty();
}).subscribe(System,.out::println);
當出現錯誤時還可以使用retry操作符來進行重試。重試的動作是通過重新訂閱序列來實現的。在使用retry操作時還可以指定重試的次數。
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).retry(1).subscrible(System.out::println);
調度器Scheduler
通過調度器可以指定操作執行的方式和所在的線程。有以下幾種不同的調度器實現
當前線程,通過Schedulers.immediate()方法來創建
單一的可復用的線程,通過Schedulers.single()方法來創建
使用彈性的線程池,通過Schedulers.elastic()方法來創建。線程池中的線程是可以復用的。當所需要時,新的線程會被創建。若一個線程閑置時間太長,則會被銷毀。該調度器適用於I/O操作相關的流的處理
使用對並行操作優化的線程池,通過Schedulers.parallel()方法來創建。其中的線程數量取決於CPU的核的數量。該調度器適用於計算密集型的流的處理
使用支持任務調度的調度器,通過Schedulers.timer()方法來創建
從已有的ExecutorService對象中創建調度器,通過Schedulers.fromExecutorService()方法來創建
通過publishOn()和subscribeOn()方法可以切換執行操作調度器。publishOn()方法切換的是操作符的執行方式,而subscribeOn()方法切換的是產生流中元素時的執行方式
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
}).publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
測試
StepVerifier的作用是可以對序列中包含的元素進行逐一驗證。通過StepVerifier.create()方法對一個流進行包裝之后再進行驗證。expectNext()方法用來聲明測試時所期待的流中的下一個元素的值,而verifyComplete()方法則驗證流是否正常結束。verifyError()來驗證流由於錯誤而終止。
StepVerifier.create(Flux.just(a, b)).expectNext("a").expectNext("b").verifyComplete();
使用StepVerifier.withVirtualTime()方法可以創建出使用虛擬時鍾的SteoVerifier。通過thenAwait(Duration)方法可以讓虛擬時鍾前進。
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))
.expectSubscription()
.expectNoEvent(Duration.ofHours(4))
.expectNext(0L)
.thenAwait(Duration.ofDays(1))
.expectNext(1L)
.verifyComplete();
TestPublisher的作用在於可以控制流中元素的產生,甚至是違反反應流規范的情況。通過create()方法創建一個新的TestPublisher對象,然后使用next()方法來產生元素,使用complete()方法來結束流。
final TestPublisher<String> testPublisher = TestPublisher.creater();
testPublisher.next("a");
testPublisher.next("b");
testPublisher.complete();
StepVerifier.create(testPublisher)
.expectNext("a")
.expectNext("b")
.expectComplete();
調試
在調試模式啟用之后,所有的操作符在執行時都會保存額外的與執行鏈相關的信息。當出現錯誤時,這些信息會被作為異常堆棧信息的一部分輸出。
Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());
也可以通過checkpoint操作符來對特定的流處理鏈來啟用調試模式。
Flux.just(1, 0).map(x -> 1/x).checkpoint("test").subscribe(System.out::println);
日志記錄
可以通過添加log操作把流相關的事件記錄在日志中,
Flux.range(1, 2).log("Range").subscribe(System.out::println);
冷熱序列
冷序列的含義是不論訂閱者在何時訂閱該序列,總是能收到序列中產生的全部消息。熱序列是在持續不斷的產生消息,訂閱者只能獲取到在其訂閱之后產生的消息。
final Flux<Long> source = Flux.intervalMillis(1000).take(10).publish.autoConnect();
source.subscribe();
Thread.sleep(5000);
source.toStream().forEach(System.out::println);
ServerWebExchange交換機
ServerWebExchange
與過濾器的關系:
Spring Cloud Gateway同zuul類似,有“pre”和“post”兩種方式的filter。
客戶端的請求先經過“pre”類型的filter,然后將請求轉發到具體的業務服務,收到業務服務的響應之后,再經過“post”類型的filter處理,最后返回響應到客戶端。
引用Spring Cloud Gateway官網上的一張圖:
與zuul不同的是,filter除了分為“pre”和“post”兩種方式的filter外,在Spring Cloud Gateway中,filter從作用范圍可分為另外兩種,
一種是針對於單個路由的gateway filter,它在配置文件中的寫法同predict類似;
一種是針對於所有路由的global gateway filer。
現在從作用范圍划分的維度來講解這兩種filter。
我們在使用Spring Cloud Gateway
的時候,注意到過濾器(包括GatewayFilter
、GlobalFilter
和過濾器鏈GatewayFilterChain
)。
Spring Cloud Gateway根據作用范圍划分為GatewayFilter和GlobalFilter,二者區別如下:
- GatewayFilter : 需要通過spring.cloud.routes.filters 配置在具體路由下,只作用在當前路由上或通過spring.cloud.default-filters配置在全局,作用在所有路由上
- GlobalFilter : 全局過濾器,不需要在配置文件中配置,作用在所有的路由上,最終通過GatewayFilterAdapter包裝成GatewayFilterChain可識別的過濾器,它為請求業務以及路由的URI轉換為真實業務服務的請求地址的核心過濾器,不需要配置,系統初始化時加載,並作用在每個路由上。
Spring Cloud Gateway框架內置的GlobalFilter如下:
上圖中每一個GlobalFilter都作用在每一個router上,能夠滿足大多數的需求。
但是如果遇到業務上的定制,可能需要編寫滿足自己需求的GlobalFilter。
過濾器都依賴到ServerWebExchange
:
public interface GlobalFilter {
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
public interface GatewayFilter extends ShortcutConfigurable {
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
public interface GatewayFilterChain {
Mono<Void> filter(ServerWebExchange exchange);
}
這里的設計和Servlet
中的Filter
是相似的,
當前過濾器可以決定是否執行下一個過濾器的邏輯,由GatewayFilterChain#filter()
是否被調用來決定。
而ServerWebExchange
就相當於當前請求和響應的上下文。
ServerWebExchange
實例不單存儲了Request
和Response
對象,還提供了一些擴展方法,如果想實現改造請求參數或者響應參數,就必須深入了解ServerWebExchange
。
理解ServerWebExchange
先看ServerWebExchange
的注釋:
Contract for an HTTP request-response interaction.
Provides access to the HTTP request and response and also exposes additional server-side processing related properties and features such as request attributes.
翻譯一下大概是:
ServerWebExchange是一個HTTP請求-響應交互的契約。提供對HTTP請求和響應的訪問,並公開額外的服務器端處理相關屬性和特性,如請求屬性。
其實,ServerWebExchange
命名為服務網絡交換器,存放着重要的請求-響應屬性、請求實例和響應實例等等,有點像Context
的角色。
ServerWebExchange接口
ServerWebExchange
接口的所有方法:
public interface ServerWebExchange {
// 日志前綴屬性的KEY,值為org.springframework.web.server.ServerWebExchange.LOG_ID
// 可以理解為 attributes.set("org.springframework.web.server.ServerWebExchange.LOG_ID","日志前綴的具體值");
// 作用是打印日志的時候會拼接這個KEY對飲的前綴值,默認值為""
String LOG_ID_ATTRIBUTE = ServerWebExchange.class.getName() + ".LOG_ID";
String getLogPrefix();
// 獲取ServerHttpRequest對象
ServerHttpRequest getRequest();
// 獲取ServerHttpResponse對象
ServerHttpResponse getResponse();
// 返回當前exchange的請求屬性,返回結果是一個可變的Map
Map<String, Object> getAttributes();
// 根據KEY獲取請求屬性
@Nullable
default <T> T getAttribute(String name) {
return (T) getAttributes().get(name);
}
// 根據KEY獲取請求屬性,做了非空判斷
@SuppressWarnings("unchecked")
default <T> T getRequiredAttribute(String name) {
T value = getAttribute(name);
Assert.notNull(value, () -> "Required attribute '" + name + "' is missing");
return value;
}
// 根據KEY獲取請求屬性,需要提供默認值
@SuppressWarnings("unchecked")
default <T> T getAttributeOrDefault(String name, T defaultValue) {
return (T) getAttributes().getOrDefault(name, defaultValue);
}
// 返回當前請求的網絡會話
Mono<WebSession> getSession();
// 返回當前請求的認證用戶,如果存在的話
<T extends Principal> Mono<T> getPrincipal();
// 返回請求的表單數據或者一個空的Map,只有Content-Type為application/x-www-form-urlencoded的時候這個方法才會返回一個非空的Map -- 這個一般是表單數據提交用到
Mono<MultiValueMap<String, String>> getFormData();
// 返回multipart請求的part數據或者一個空的Map,只有Content-Type為multipart/form-data的時候這個方法才會返回一個非空的Map -- 這個一般是文件上傳用到
Mono<MultiValueMap<String, Part>> getMultipartData();
// 返回Spring的上下文
@Nullable
ApplicationContext getApplicationContext();
// 這幾個方法和lastModified屬性相關
boolean isNotModified();
boolean checkNotModified(Instant lastModified);
boolean checkNotModified(String etag);
boolean checkNotModified(@Nullable String etag, Instant lastModified);
// URL轉換
String transformUrl(String url);
// URL轉換映射
void addUrlTransformer(Function<String, String> transformer);
// 注意這個方法,方法名是:改變,這個是修改ServerWebExchange屬性的方法,返回的是一個Builder實例,Builder是ServerWebExchange的內部類
default Builder mutate() {
return new DefaultServerWebExchangeBuilder(this);
}
interface Builder {
// 覆蓋ServerHttpRequest
Builder request(Consumer<ServerHttpRequest.Builder> requestBuilderConsumer);
Builder request(ServerHttpRequest request);
// 覆蓋ServerHttpResponse
Builder response(ServerHttpResponse response);
// 覆蓋當前請求的認證用戶
Builder principal(Mono<Principal> principalMono);
// 構建新的ServerWebExchange實例
ServerWebExchange build();
}
}
ServerWebExchange#mutate()
方法
注意到ServerWebExchange#mutate()
方法,ServerWebExchange
實例可以理解為不可變實例,
如果我們想要修改它,需要通過mutate()
方法生成一個新的實例,例如這樣:
public class CustomGlobalFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 這里可以修改ServerHttpRequest實例
ServerHttpRequest newRequest = ...
ServerHttpResponse response = exchange.getResponse();
// 這里可以修改ServerHttpResponse實例
ServerHttpResponse newResponse = ...
// 構建新的ServerWebExchange實例
ServerWebExchange newExchange = exchange.mutate().request(newRequest).response(newResponse).build();
return chain.filter(newExchange);
}
}
ServerHttpRequest接口
ServerHttpRequest
實例是用於承載請求相關的屬性和請求體,
Spring Cloud Gateway
中底層使用Netty
處理網絡請求,通過追溯源碼,
可以從ReactorHttpHandlerAdapter
中得知ServerWebExchange
實例中持有的ServerHttpRequest
實例的具體實現是ReactorServerHttpRequest
。
之所以列出這些實例之間的關系,是因為這樣比較容易理清一些隱含的問題,例如:
ReactorServerHttpRequest
的父類AbstractServerHttpRequest
中初始化內部屬性headers的時候把請求的HTTP頭部封裝為只讀的實例:
public AbstractServerHttpRequest(URI uri, @Nullable String contextPath, HttpHeaders headers) {
this.uri = uri;
this.path = RequestPath.parse(uri, contextPath);
this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
}
// HttpHeaders類中的readOnlyHttpHeaders方法,
// ReadOnlyHttpHeaders屏蔽了所有修改請求頭的方法,直接拋出UnsupportedOperationException
public static HttpHeaders readOnlyHttpHeaders(HttpHeaders headers) {
Assert.notNull(headers, "HttpHeaders must not be null");
if (headers instanceof ReadOnlyHttpHeaders) {
return headers;
}
else {
return new ReadOnlyHttpHeaders(headers);
}
}
所以, 不能直接從ServerHttpRequest
實例中直接獲取請求頭HttpHeaders
實例並且進行修改。
ServerHttpRequest
接口如下:
public interface HttpMessage {
// 獲取請求頭,目前的實現中返回的是ReadOnlyHttpHeaders實例,只讀
HttpHeaders getHeaders();
}
public interface ReactiveHttpInputMessage extends HttpMessage {
// 返回請求體的Flux封裝
Flux<DataBuffer> getBody();
}
public interface HttpRequest extends HttpMessage {
// 返回HTTP請求方法,解析為HttpMethod實例
@Nullable
default HttpMethod getMethod() {
return HttpMethod.resolve(getMethodValue());
}
// 返回HTTP請求方法,字符串
String getMethodValue();
// 請求的URI
URI getURI();
}
public interface ServerHttpRequest extends HttpRequest, ReactiveHttpInputMessage {
// 連接的唯一標識或者用於日志處理標識
String getId();
// 獲取請求路徑,封裝為RequestPath對象
RequestPath getPath();
// 返回查詢參數,是只讀的MultiValueMap實例
MultiValueMap<String, String> getQueryParams();
// 返回Cookie集合,是只讀的MultiValueMap實例
MultiValueMap<String, HttpCookie> getCookies();
// 遠程服務器地址信息
@Nullable
default InetSocketAddress getRemoteAddress() {
return null;
}
// SSL會話實現的相關信息
@Nullable
default SslInfo getSslInfo() {
return null;
}
// 修改請求的方法,返回一個建造器實例Builder,Builder是內部類
default ServerHttpRequest.Builder mutate() {
return new DefaultServerHttpRequestBuilder(this);
}
interface Builder {
// 覆蓋請求方法
Builder method(HttpMethod httpMethod);
// 覆蓋請求的URI、請求路徑或者上下文,這三者相互有制約關系,具體可以參考API注釋
Builder uri(URI uri);
Builder path(String path);
Builder contextPath(String contextPath);
// 覆蓋請求頭
Builder header(String key, String value);
Builder headers(Consumer<HttpHeaders> headersConsumer);
// 覆蓋SslInfo
Builder sslInfo(SslInfo sslInfo);
// 構建一個新的ServerHttpRequest實例
ServerHttpRequest build();
}
}
注意:
ServerHttpRequest
或者說HttpMessage
接口提供的獲取請求頭方法HttpHeaders getHeaders();
返回結果是一個只讀的實例,具體是ReadOnlyHttpHeaders
類型,
如果要修改ServerHttpRequest
實例,那么需要這樣做:
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequest newRequest = request.mutate().header("key","value").path("/myPath").build();
ServerHttpResponse接口
ServerHttpResponse
實例是用於承載響應相關的屬性和響應體,
Spring Cloud Gateway
中底層使用Netty
處理網絡請求,通過追溯源碼,可以從ReactorHttpHandlerAdapter
中得知ServerWebExchange
實例中持有的ServerHttpResponse
實例的具體實現是ReactorServerHttpResponse
。
之所以列出這些實例之間的關系,是因為這樣比較容易理清一些隱含的問題,例如:
// ReactorServerHttpResponse的父類
public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory, HttpHeaders headers) {
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
Assert.notNull(headers, "HttpHeaders must not be null");
this.dataBufferFactory = dataBufferFactory;
this.headers = headers;
this.cookies = new LinkedMultiValueMap<>();
}
public ReactorServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) {
super(bufferFactory, new HttpHeaders(new NettyHeadersAdapter(response.responseHeaders())));
Assert.notNull(response, "HttpServerResponse must not be null");
this.response = response;
}
可知ReactorServerHttpResponse
構造函數初始化實例的時候,存放響應Header的是HttpHeaders
實例,也就是響應Header是可以直接修改的。
ServerHttpResponse
接口如下:
public interface HttpMessage {
// 獲取響應Header,目前的實現中返回的是HttpHeaders實例,可以直接修改
HttpHeaders getHeaders();
}
public interface ReactiveHttpOutputMessage extends HttpMessage {
// 獲取DataBufferFactory實例,用於包裝或者生成數據緩沖區DataBuffer實例(創建響應體)
DataBufferFactory bufferFactory();
// 注冊一個動作,在HttpOutputMessage提交之前此動作會進行回調
void beforeCommit(Supplier<? extends Mono<Void>> action);
// 判斷HttpOutputMessage是否已經提交
boolean isCommitted();
// 寫入消息體到HTTP協議層
Mono<Void> writeWith(Publisher<? extends DataBuffer> body);
// 寫入消息體到HTTP協議層並且刷新緩沖區
Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body);
// 指明消息處理已經結束,一般在消息處理結束自動調用此方法,多次調用不會產生副作用
Mono<Void> setComplete();
}
public interface ServerHttpResponse extends ReactiveHttpOutputMessage {
// 設置響應狀態碼
boolean setStatusCode(@Nullable HttpStatus status);
// 獲取響應狀態碼
@Nullable
HttpStatus getStatusCode();
// 獲取響應Cookie,封裝為MultiValueMap實例,可以修改
MultiValueMap<String, ResponseCookie> getCookies();
// 添加響應Cookie
void addCookie(ResponseCookie cookie);
}
這里可以看到除了響應體比較難修改之外,其他的屬性都是可變的。
ServerWebExchangeUtils和上下文屬性
ServerWebExchangeUtils
里面存放了很多靜態公有的字符串KEY值
(這些字符串KEY的實際值是org.springframework.cloud.gateway.support.ServerWebExchangeUtils.
+ 下面任意的靜態公有KEY),
這些字符串KEY值一般是用於ServerWebExchange
的屬性(Attribute
,見上文的ServerWebExchange#getAttributes()
方法)的KEY,這些屬性值都是有特殊的含義,在使用過濾器的時候如果時機適當可以直接取出來使用,下面逐個分析。
PRESERVE_HOST_HEADER_ATTRIBUTE
:是否保存Host屬性,值是布爾值類型,寫入位置是PreserveHostHeaderGatewayFilterFactory
,使用的位置是NettyRoutingFilter
,作用是如果設置為true,HTTP請求頭中的Host屬性會寫到底層Reactor-Netty的請求Header屬性中。CLIENT_RESPONSE_ATTR
:保存底層Reactor-Netty的響應對象,類型是reactor.netty.http.client.HttpClientResponse
。CLIENT_RESPONSE_CONN_ATTR
:保存底層Reactor-Netty的連接對象,類型是reactor.netty.Connection
。URI_TEMPLATE_VARIABLES_ATTRIBUTE
:PathRoutePredicateFactory
解析路徑參數完成之后,把解析完成后的占位符KEY-路徑Path映射存放在ServerWebExchange
的屬性中,KEY就是URI_TEMPLATE_VARIABLES_ATTRIBUTE
。CLIENT_RESPONSE_HEADER_NAMES
:保存底層Reactor-Netty的響應Header的名稱集合。GATEWAY_ROUTE_ATTR
:用於存放RoutePredicateHandlerMapping
中匹配出來的具體的路由(org.springframework.cloud.gateway.route.Route
)實例,通過這個路由實例可以得知當前請求會路由到下游哪個服務。GATEWAY_REQUEST_URL_ATTR
:java.net.URI
類型的實例,這個實例代表直接請求或者負載均衡處理之后需要請求到下游服務的真實URI。GATEWAY_ORIGINAL_REQUEST_URL_ATTR
:java.net.URI
類型的實例,需要重寫請求URI的時候,保存原始的請求URI。GATEWAY_HANDLER_MAPPER_ATTR
:保存當前使用的HandlerMapping
具體實例的類型簡稱(一般是字符串"RoutePredicateHandlerMapping")。GATEWAY_SCHEME_PREFIX_ATTR
:確定目標路由URI中如果存在schemeSpecificPart屬性,則保存該URI的scheme在此屬性中,路由URI會被重新構造,見RouteToRequestUrlFilter
。GATEWAY_PREDICATE_ROUTE_ATTR
:用於存放RoutePredicateHandlerMapping
中匹配出來的具體的路由(org.springframework.cloud.gateway.route.Route
)實例的ID。WEIGHT_ATTR
:實驗性功能(此版本還不建議在正式版本使用)存放分組權重相關屬性,見WeightCalculatorWebFilter
。ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR
:存放響應Header中的ContentType的值。HYSTRIX_EXECUTION_EXCEPTION_ATTR
:Throwable
的實例,存放的是Hystrix執行異常時候的異常實例,見HystrixGatewayFilterFactory
。GATEWAY_ALREADY_ROUTED_ATTR
:布爾值,用於判斷是否已經進行了路由,見NettyRoutingFilter
。GATEWAY_ALREADY_PREFIXED_ATTR
:布爾值,用於判斷請求路徑是否被添加了前置部分,見PrefixPathGatewayFilterFactory
。
ServerWebExchangeUtils
提供的上下文屬性用於Spring Cloud Gateway
的ServerWebExchange
組件處理請求和響應的時候,內部一些重要實例或者標識屬性的安全傳輸和使用,使用它們可能存在一定的風險,
因為沒有人可以確定在版本升級之后,原有的屬性KEY或者VALUE是否會發生改變,如果評估過風險或者規避了風險之后,可以安心使用。
例如我們在做請求和響應日志(類似Nginx的Access Log)的時候,可以依賴到GATEWAY_ROUTE_ATTR
,因為我們要打印路由的目標信息。舉個簡單例子:
@Slf4j
@Component
public class AccessLogFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String path = request.getPath().pathWithinApplication().value();
HttpMethod method = request.getMethod();
// 獲取路由的目標URI
URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
InetSocketAddress remoteAddress = request.getRemoteAddress();
return chain.filter(exchange.mutate().build()).then(Mono.fromRunnable(() -> {
ServerHttpResponse response = exchange.getResponse();
HttpStatus statusCode = response.getStatusCode();
log.info("請求路徑:{},客戶端遠程IP地址:{},請求方法:{},目標URI:{},響應碼:{}",
path, remoteAddress, method, targetUri, statusCode);
}));
}
}
修改請求體
修改請求體是一個比較常見的需求。
例如我們使用Spring Cloud Gateway
實現網關的時候,要實現一個功能:
把存放在請求頭中的JWT解析后,提取里面的用戶ID,然后寫入到請求體中。
我們簡化這個場景,假設我們把userId明文存放在請求頭中的accessToken中,請求體是一個JSON結構:
{
"serialNumber": "請求流水號",
"payload" : {
// ... 這里是有效載荷,存放具體的數據
}
}
我們需要提取accessToken,也就是userId插入到請求體JSON中如下:
{
"userId": "用戶ID",
"serialNumber": "請求流水號",
"payload" : {
// ... 這里是有效載荷,存放具體的數據
}
}
這里為了簡化設計,用全局過濾器GlobalFilter
實現,實際需要結合具體場景考慮:
@Slf4j
@Component
public class ModifyRequestBodyGlobalFilter implements GlobalFilter {
private final DataBufferFactory dataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
@Autowired
private ObjectMapper objectMapper;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String accessToken = request.getHeaders().getFirst("accessToken");
if (!StringUtils.hasLength(accessToken)) {
throw new IllegalArgumentException("accessToken");
}
// 新建一個ServerHttpRequest裝飾器,覆蓋需要裝飾的方法
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
Flux<DataBuffer> body = super.getBody();
InputStreamHolder holder = new InputStreamHolder();
body.subscribe(buffer -> holder.inputStream = buffer.asInputStream());
if (null != holder.inputStream) {
try {
// 解析JSON的節點
JsonNode jsonNode = objectMapper.readTree(holder.inputStream);
Assert.isTrue(jsonNode instanceof ObjectNode, "JSON格式異常");
ObjectNode objectNode = (ObjectNode) jsonNode;
// JSON節點最外層寫入新的屬性
objectNode.put("userId", accessToken);
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer();
String json = objectNode.toString();
log.info("最終的JSON數據為:{}", json);
dataBuffer.write(json.getBytes(StandardCharsets.UTF_8));
return Flux.just(dataBuffer);
} catch (Exception e) {
throw new IllegalStateException(e);
}
} else {
return super.getBody();
}
}
};
// 使用修改后的ServerHttpRequestDecorator重新生成一個新的ServerWebExchange
return chain.filter(exchange.mutate().request(decorator).build());
}
private class InputStreamHolder {
InputStream inputStream;
}
}
測試一下:
// HTTP
POST /order/json HTTP/1.1
Host: localhost:9090
Content-Type: application/json
accessToken: 10086
Accept: */*
Cache-Control: no-cache
Host: localhost:9090
accept-encoding: gzip, deflate
content-length: 94
Connection: keep-alive
cache-control: no-cache
{
"serialNumber": "請求流水號",
"payload": {
"name": "doge"
}
}
// 日志輸出
最終的JSON數據為:{"serialNumber":"請求流水號","payload":{"name":"doge"},"userId":"10086"}
最重要的是用到了ServerHttpRequest
裝飾器ServerHttpRequestDecorator
,主要覆蓋對應獲取請求體數據緩沖區的方法即可,至於怎么處理其他邏輯需要自行考慮,這里只是做一個簡單的示范。
一般的代碼邏輯如下:
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequestDecorator requestDecorator = new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
// 拿到承載原始請求體的Flux
Flux<DataBuffer> body = super.getBody();
// 這里通過自定義方式生成新的承載請求體的Flux
Flux<DataBuffer> newBody = ...
return newBody;
}
}
return chain.filter(exchange.mutate().request(requestDecorator).build());
修改響應體
修改響應體的需求也是比較常見的,具體的做法和修改請求體差不多。
例如我們想要實現下面的功能:第三方服務請求經過網關,原始報文是密文,我們需要在網關實現密文解密,然后把解密后的明文路由到下游服務,下游服務處理成功響應明文,需要在網關把明文加密成密文再返回到第三方服務。
現在簡化整個流程,用AES加密算法,統一密碼為字符串"throwable",假設請求報文和響應報文明文如下:
// 請求密文
{
"serialNumber": "請求流水號",
"payload" : "加密后的請求消息載荷"
}
// 請求明文(僅僅作為提示)
{
"serialNumber": "請求流水號",
"payload" : "{\"name:\":\"doge\"}"
}
// 響應密文
{
"code": 200,
"message":"ok",
"payload" : "加密后的響應消息載荷"
}
// 響應明文(僅僅作為提示)
{
"code": 200,
"message":"ok",
"payload" : "{\"name:\":\"doge\",\"age\":26}"
}
為了方便一些加解密或者編碼解碼的實現,需要引入Apache
的commons-codec
類庫:
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.12</version>
</dependency>
這里定義一個全局過濾器專門處理加解密,實際上最好結合真實的場景決定是否適合全局過濾器,這里只是一個示例:
// AES加解密工具類
public enum AesUtils {
// 單例
X;
private static final String PASSWORD = "throwable";
private static final String KEY_ALGORITHM = "AES";
private static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";
private static final String DEFAULT_CIPHER_ALGORITHM = "AES/ECB/PKCS5Padding";
public String encrypt(String content) {
try {
Cipher cipher = Cipher.getInstance(DEFAULT_CIPHER_ALGORITHM);
cipher.init(Cipher.ENCRYPT_MODE, provideSecretKey());
return Hex.encodeHexString(cipher.doFinal(content.getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
public byte[] decrypt(String content) {
try {
Cipher cipher = Cipher.getInstance(DEFAULT_CIPHER_ALGORITHM);
cipher.init(Cipher.DECRYPT_MODE, provideSecretKey());
return cipher.doFinal(Hex.decodeHex(content));
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
private SecretKey provideSecretKey() {
try {
KeyGenerator keyGen = KeyGenerator.getInstance(KEY_ALGORITHM);
SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);
secureRandom.setSeed(PASSWORD.getBytes(StandardCharsets.UTF_8));
keyGen.init(128, secureRandom);
return new SecretKeySpec(keyGen.generateKey().getEncoded(), KEY_ALGORITHM);
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
}
// EncryptionGlobalFilter
@Slf4j
@Component
public class EncryptionGlobalFilter implements GlobalFilter, Ordered {
@Autowired
private ObjectMapper objectMapper;
@Override
public int getOrder() {
return -2;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 響應體
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
ServerHttpRequestDecorator requestDecorator = processRequest(request, bufferFactory);
ServerHttpResponseDecorator responseDecorator = processResponse(response, bufferFactory);
return chain.filter(exchange.mutate().request(requestDecorator).response(responseDecorator).build());
}
private ServerHttpRequestDecorator processRequest(ServerHttpRequest request, DataBufferFactory bufferFactory) {
Flux<DataBuffer> body = request.getBody();
DataBufferHolder holder = new DataBufferHolder();
body.subscribe(dataBuffer -> {
int len = dataBuffer.readableByteCount();
holder.length = len;
byte[] bytes = new byte[len];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
String text = new String(bytes, StandardCharsets.UTF_8);
JsonNode jsonNode = readNode(text);
JsonNode payload = jsonNode.get("payload");
String payloadText = payload.asText();
byte[] content = AesUtils.X.decrypt(payloadText);
String requestBody = new String(content, StandardCharsets.UTF_8);
log.info("修改請求體payload,修改前:{},修改后:{}", payloadText, requestBody);
rewritePayloadNode(requestBody, jsonNode);
DataBuffer data = bufferFactory.allocateBuffer();
data.write(jsonNode.toString().getBytes(StandardCharsets.UTF_8));
holder.dataBuffer = data;
});
HttpHeaders headers = new HttpHeaders();
headers.putAll(request.getHeaders());
headers.remove(HttpHeaders.CONTENT_LENGTH);
return new ServerHttpRequestDecorator(request) {
@Override
public HttpHeaders getHeaders() {
int contentLength = holder.length;
if (contentLength > 0) {
headers.setContentLength(contentLength);
} else {
headers.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return headers;
}
@Override
public Flux<DataBuffer> getBody() {
return Flux.just(holder.dataBuffer);
}
};
}
private ServerHttpResponseDecorator processResponse(ServerHttpResponse response, DataBufferFactory bufferFactory) {
return new ServerHttpResponseDecorator(response) {
@SuppressWarnings("unchecked")
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>) body;
return super.writeWith(flux.map(buffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
DataBufferUtils.release(buffer);
JsonNode jsonNode = readNode(charBuffer.toString());
JsonNode payload = jsonNode.get("payload");
String text = payload.toString();
String content = AesUtils.X.encrypt(text);
log.info("修改響應體payload,修改前:{},修改后:{}", text, content);
setPayloadTextNode(content, jsonNode);
return bufferFactory.wrap(jsonNode.toString().getBytes(StandardCharsets.UTF_8));
}));
}
return super.writeWith(body);
}
};
}
private void rewritePayloadNode(String text, JsonNode root) {
try {
JsonNode node = objectMapper.readTree(text);
ObjectNode objectNode = (ObjectNode) root;
objectNode.set("payload", node);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private void setPayloadTextNode(String text, JsonNode root) {
try {
ObjectNode objectNode = (ObjectNode) root;
objectNode.set("payload", new TextNode(text));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private JsonNode readNode(String in) {
try {
return objectMapper.readTree(in);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private class DataBufferHolder {
DataBuffer dataBuffer;
int length;
}
}
先准備一份密文:
Map<String, Object> json = new HashMap<>(8);
json.put("serialNumber", "請求流水號");
String content = "{\"name\": \"doge\"}";
json.put("payload", AesUtils.X.encrypt(content));
System.out.println(new ObjectMapper().writeValueAsString(json));
// 輸出
{"serialNumber":"請求流水號","payload":"144e3dc734743f5709f1adf857bca473da683246fd612f86ac70edeb5f2d2729"}
模擬請求:
POST /order/json HTTP/1.1
Host: localhost:9090
accessToken: 10086
Content-Type: application/json
User-Agent: PostmanRuntime/7.13.0
Accept: */*
Cache-Control: no-cache
Postman-Token: bda07fc3-ea1a-478c-b4d7-754fe6f37200,634734d9-feed-4fc9-ba20-7618bd986e1c
Host: localhost:9090
cookie: customCookieName=customCookieValue
accept-encoding: gzip, deflate
content-length: 104
Connection: keep-alive
cache-control: no-cache
{
"serialNumber": "請求流水號",
"payload": "FE49xzR0P1cJ8a34V7ykc9poMkb9YS+GrHDt618tJyk="
}
// 響應結果
{
"serialNumber": "請求流水號",
"payload": "oo/K1igg2t/S8EExkBVGWOfI1gAh5pBpZ0wyjNPW6e8=" # <--- 解密后:{"name":"doge","age":26}
}
遇到的問題:
- 必須實現
Ordered
接口,返回一個小於-1的order值,這是因為NettyWriteResponseFilter
的order值為-1,我們需要覆蓋返回響應體的邏輯,自定義的GlobalFilter
必須比NettyWriteResponseFilter
優先執行。 - 網關每次重啟之后,第一個請求總是無法從原始的
ServerHttpRequest
讀取到有效的Body,准確來說出現的現象是NettyRoutingFilter
調用ServerHttpRequest#getBody()
的時候獲取到一個空的對象,導致空指針;奇怪的是從第二個請求開始就能正常調用。筆者把**Spring Cloud Gateway**
的版本降低到**Finchley.SR3**
,**Spring Boot**
的版本降低到**2.0.8.RELEASE**
,問題不再出現,初步確定是**Spring Cloud Gateway**
版本升級導致的兼容性問題或者是BUG。
最重要的是用到了ServerHttpResponse
裝飾器ServerHttpResponseDecorator
,主要覆蓋寫入響應體數據緩沖區的部分,至於怎么處理其他邏輯需要自行考慮,這里只是做一個簡單的示范。一般的代碼邏輯如下:
ServerHttpResponse response = exchange.getResponse();
ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>) body;
return super.writeWith(flux.map(buffer -> {
// buffer就是原始的響應數據的緩沖區
// 下面處理完畢之后返回新的響應數據的緩沖區即可
return bufferFactory.wrap(...);
}));
}
return super.writeWith(body);
}
};
return chain.filter(exchange.mutate().response(responseDecorator).build());
參考文獻
https://blog.csdn.net/wpc2018/article/details/122634049
https://www.jianshu.com/p/7d80b94068b3
https://blog.csdn.net/yhj_911/article/details/119540000
http://bjqianye.cn/detail/6845.html
https://blog.csdn.net/hao134838/article/details/110824092
https://blog.csdn.net/hao134838/article/details/110824092
https://blog.csdn.net/weixin_34096182/article/details/91436704
https://blog.csdn.net/fly910905/article/details/121682625