springboot2 已經發布,其中最亮眼的非webflux響應式編程莫屬了!響應式的weblfux可以支持高吞吐量,意味着使用相同的資源可以處理更加多的請求,毫無疑問將會成為未來技術的趨勢,是必學的技術!很多人都看過相關的入門教程,但看完之后總覺得很迷糊,知其然不知道其所以然,包括我本人也有相同的疑惑。后面在研究和學習中發現,是我的學習路徑不對,很多基本概念不熟悉,之前公司主打的jdk版本還是1.6/1.7,直接跳到運行在jdk8上的webflux,跨度太大,迷惑是在所難免的!
在這里我個人推薦的學習途徑如下:先學習jdk8的lambda表達式和stream流編程,了解函數式編程的知識點和思想,接着學習jdk9的響應式流flux,理解響應式流概念,理解背壓和實現機制。這2者學好之后,很容易理解webflux的基石reactor,再學習webflux就水到渠成了!
這里我記錄了自己的學習之路,列出了每一塊的學習重點,除了API的知識點學習之外,更加重要的了解底層運行機制和實現原理。對於我個人來說,學習技術如果不了解原理,知識點需要死記硬背,而了解了底層機制之后,不但不需要死記硬背,還可以把自己的技術點連成面融會貫通,很容易舉一反三,知識點也不會忘記,也能和別人扯扯技術的底層實現了。
下面只講解重點/高級知識和底層原理,入門教程請自行搜索學習
lambda表達式
lambda表達式中的this
lambda表達式最終會返回一個實現了指定接口的實例,看上去和內部匿名類很像,但有一個最大的區別就是代碼里面的this,內部匿名類this指向的就是匿名類,而lambda表達式里面的this指向的當前類。
package jdk8.lambda;
/**
* lambda表達式的this
*
* @author 曉風輕
*
*/
public class ThisDemo {
private String name = "ThisDemo";
public void test() {
// 匿名類實現
new Thread(new Runnable() {
private String name = "Runnable";
@Override
public void run() {
System.out.println("這里的this指向匿名類:" + this.name);
}
}).start();
// lambda實現
new Thread(() -> {
System.out.println("這里的this指向當前的ThisDemo類:" + this.name);
}).start();
}
public static void main(String[] args) {
ThisDemo demo = new ThisDemo();
demo.test();
}
}
輸出
這里的this指向匿名類:Runnable
這里的this指向當前的ThisDemo類:ThisDemo
實現原理
lambda表達式里面,會把lambda表達式在本類中生成一個以lambda$+數字的方法。關鍵點:該方法不一定是static的方法,是static還是非static,取決於lambda表達式里面是否引用了this。這就是為什么lambda表達式里面的this指向的是本地,因為他在本類里面創建了一個方法,然后把lambda表達式里面的代碼放進去。
// lambda實現
// 下面會自動生成lambda$0方法,由於使用了this,所以是非static方法
new Thread(() -> {
System.out.println("這里的this指向當前的ThisDemo類:" + this.name);
}).start();
// lambda實現
// 下面會自動生成lambda$1方法,由於使用了this,所以是static方法
new Thread(() -> {
System.out.println("這里沒有引用this,生成的lambda1方法是static的");
}).start();
上面代碼會自動生成2個lambda$方法
使用javap -s -p 類名, 可以看出一個是static,一個是非staic的

這就是為什么lambda表達式里面的this指向當前類的底層機制!因為代碼就是在本類的一個方法里面執行的。
額外說一句,自動生成的方法是否帶參數取決於lambda是否有參數,例子中表達式沒有參數(箭頭左邊是空的),所以自動生成的也沒有。
實例方法的方法引用
方法引用有多種,靜態方法的方法引用很好理解,但實例對象的方法引用一開始確實讓我有點費解,這和靜態方法引用由啥區別?看上去很像啊。
class DemoClass {
/**
* 這里是一個靜態方法
*/
public static int staticMethod(int i) {
return i * 2;
}
/**
* 這里是一個實例方法
*/
public int normalMethod(int i) {
System.out.println("實例方法可以訪問this:" + this);
return i * 3;
}
}
public class MethodRefrenceDemo {
public static void main(String[] args) {
// 靜態方法的方法引用
IntUnaryOperator methodRefrence1 = DemoClass::staticMethod;
System.out.println(methodRefrence1.applyAsInt(111));
DemoClass demo = new DemoClass();
// 實例方法的方法引用
IntUnaryOperator methodRefrence2 = demo::normalMethod;
System.out.println(methodRefrence2.applyAsInt(111));
}
}
這里牽涉到不同的語言里面對this的實現方法。我們知道靜態方法和實例方法的區別是實例方法有this,靜態方法沒有。java里面是怎么樣實現this的呢?
java里面在默認把this作為參數,放到實例方法的第一個參數。
就是說:
/**
* 這里是一個實例方法
*/
public int normalMethod(int i) {
System.out.println("實例方法可以訪問this:" + this);
return i * 2;
}
編譯之后和下面這樣的代碼編譯之后是一樣的!
/**
* 這里是一個實例方法
*/
public int normalMethod(DemoClass this,int i) {
System.out.println("實例方法可以訪問this:" + this);
return i * 2;
}
如何證明?
第1個證據,看反編譯里面的本地變量表。
靜態方法: 
而實例方法 
第2個證據,下面這樣的代碼能正確執行。
class DemoCl2{
/**
* 這里是一個實例方法, 代碼上2個參數
* 而我們調用的時候只有一個參數
*/
public int normalMethod(DemoClass2 this,int i) {
return i * 2;
}
}
public class MethodRefrenceDemo {
public static void main(String[] args) {
DemoClass2 demo2 = new DemoClass2();
// 代碼定義上有2個參數, 第一個參數為this
// 但實際上調用的時候只需要一個參數
demo2.normalMethod(1);
}
}
所以,我的理解,java里面的所有方法都是靜態方法,只是有些方法有this變量,有些沒有。
所以,成員方法我們也可以寫成靜態方法的方法引用。如下:
public class MethodRefrenceDemo {
public static void main(String[] args) {
// 靜態方法的方法引用
IntUnaryOperator methodRefrence1 = DemoClass::staticMethod;
System.out.println(methodRefrence1.applyAsInt(111));
DemoClass demo = new DemoClass();
// 實例方法normalMethod的方法引用
IntUnaryOperator methodRefrence2 = demo::normalMethod;
System.out.println(methodRefrence2.applyAsInt(111));
// 對同一個實例方法normalMethod也可以使用靜態引用
// 代碼上normalMethod雖然只有一個參數,但實際上有一個隱含的this函數
// 所以使用的是2個參數bifunction函數接口
BiFunction<DemoClass, Integer, Integer> methodRefrence3 = DemoClass::normalMethod;
System.out.println(methodRefrence3.apply(demo, 111));
}
}
上面代碼里面。對同一個實例方法normalMethod,我們既可以使用實例方法引用(實例::方法名),也可以使用靜態方法引用(類名::方法名)。
lambda實現惰性求值
惰性求值在lambda里面非常重要,也非常有用。
舉例,編程規范里面有一條規范,是打印日志前需要判斷日志級別(性能要求高的時候)。如下
// 打印日志前需要先判斷日志級別
if (logger.isLoggable(Level.FINE)) {
logger.fine("打印一些日志:" + this);
}
為什么要加判斷呢?不加判斷會有問題呢? 看如下代碼:
package jdk8.lambda;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* lambda的惰性求值
*
* @author 曉風輕
*/
public class LogDemo {
private static final Logger logger = Logger
.getLogger(LogDemo.class.getName());
@Override
public String toString() {
System.out.println("這個方法執行了, 耗時1秒鍾");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
return "LogDemo";
}
public void test() {
// 如果不加判斷直接打印, 會有額外多余的開銷, 就算最終日志並沒有打印
logger.fine("打印一些日志:" + this);
}
public static void main(String[] args) {
LogDemo demo = new LogDemo();
demo.test();
}
}
執行代碼,發現雖然日志沒有打印,但toString方法還是執行了,屬於多余浪費的開銷。
每一個日志打印都加判斷,看着很別扭,現在有了lambda表達式之后,可以使用lambda的惰性求值,就可以去掉if判斷,如下
// 使用lambda表達式的惰性求值,不需要判斷日志級別
logger.fine(() -> "打印一些日志:" + this);
底層機制
這個現象很好理解,簡單講解一下。就是沒有使用表達式的時候,相當於
String msg = "打印一些日志:" + this logger.fine(msg);
雖然最后沒有打印,但字符串拼接的工作還是執行了。而使用了lambda表達式之后,字符串的拼接放到一個函數里面,fine日志需要打印的時候才去調用這個方法才真正執行!從而實現了惰性求值。
后面我們學習的jdk8的stream流編程里面,沒有調用最終操作的時候,中間操作的方法都不會執行,這也是惰性求值。
stream流編程
stream編程主要是學習API的使用,但前提是學好lambda,基礎好了,看這些方法定義非常簡單,要是沒有打好基礎,你會有很多東西需要記憶。
內部迭代和外部迭代
一般來說,我們之前的編碼方法,叫外部迭代,stream的寫法叫內部迭代。內部迭代代碼更加可讀更加優雅,關注點是做什么(外部迭代關注是怎么樣做),也很容易讓我們養成編程小函數的好習慣!這點在編程習慣里面非常重要!看例子:
import java.util.stream.IntStream;
public class StreamDemo1 {
public static void main(String[] args) {
int[] nums = { 1, 2, 3 };
// 外部迭代
int sum = 0;
for (int i : nums) {
sum += i;
}
System.out.println("結果為:" + sum);
// 使用stream的內部迭代
// map就是中間操作(返回stream的操作)
// sum就是終止操作
int sum2 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum();
System.out.println("結果為:" + sum2);
System.out.println("惰性求值就是終止沒有調用的情況下,中間操作不會執行");
IntStream.of(nums).map(StreamDemo1::doubleNum);
}
public static int doubleNum(int i) {
System.out.println("執行了乘以2");
return i * 2;
}
}
操作類型
操作類型概念要理清楚。有幾個維度。
首先分為 中間操作 和 最終操作,在最終操作沒有調用的情況下,所有的中級操作都不會執行。那么那些是中間操作那些是最終操作呢? 簡單來說,返回stream流的就是中間操作,可以繼續鏈式調用下去,不是返回stream的就是最終操作。這點很好理解。
最終操作里面分為短路操作和非短路操作,短路操作就是limit/findxxx/xxxMatch這種,就是找了符合條件的就終止,其他的就是非短路操作。在無限流里面需要調用短路操作,否則像炫邁口香糖一樣根本停不下來!
中間操作又分為 有狀態操作 和 無狀態操作,怎么樣區分呢? 一開始很多同學需要死記硬背,其實你主要掌握了狀態這個關鍵字就不需要死記硬背。狀態就是和其他數據有關系。我們可以看方法的參數,如果是一個參數的,就是無狀態操作,因為只和自己有關,其他的就是有狀態操作。如map/filter方法,只有一個參數就是自己,就是無狀態操作;而distinct/sorted就是有狀態操作,因為去重和排序都需要和其他數據比較,理解了這點,就不需要死記硬背了!
為什么要知道有狀態和無狀態操作呢?在多個操作的時候,我們需要把無狀態操作寫在一起,有狀態操作放到最后,這樣效率會更加高。
運行機制
我們可以通過下面的代碼來理解stream的運行機制
package stream;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
* 驗證stream運行機制
*
* 1. 所有操作是鏈式調用, 一個元素只迭代一次
* 2. 每一個中間操作返回一個新的流. 流里面有一個屬性sourceStage
* 指向同一個 地方,就是Head
* 3. Head->nextStage->nextStage->... -> null
* 4. 有狀態操作會把無狀態操作階段,單獨處理
* 5. 並行環境下, 有狀態的中間操作不一定能並行操作.
*
* 6. parallel/ sequetial 這2個操作也是中間操作(也是返回stream)
* 但是他們不創建流, 他們只修改 Head的並行標志
*
* @author 曉風輕
*
*/
public class RunStream {
public static void main(String[] args) {
Random random = new Random();
// 隨機產生數據
Stream<Integer> stream = Stream.generate(() -> random.nextInt())
// 產生500個 ( 無限流需要短路操作. )
.limit(500)
// 第1個無狀態操作
.peek(s -> print("peek: " + s))
// 第2個無狀態操作
.filter(s -> {
print("filter: " + s);
return s > 1000000;
})
// 有狀態操作
.sorted((i1, i2) -> {
print("排序: " + i1 + ", " + i2);
return i1.compareTo(i2);
})
// 又一個無狀態操作
.peek(s -> {
print("peek2: " + s);
}).parallel();
// 終止操作
stream.count();
}
/**
* 打印日志並sleep 5 毫秒
*
* @param s
*/
public static void print(String s) {
// System.out.println(s);
// 帶線程名(測試並行情況)
System.out.println(Thread.currentThread().getName() + " > " + s);
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
}
大家自己測試一下代碼,能發現stream的調用方法,就像現實中的流水線一樣,一個元素只會迭代一次,但如果中間有無狀態操作,前后的操作會單獨處理(元素就會被多次迭代)。
jdk9的響應式流
就是reactive stream,也就是flow。其實和jdk8的stream沒有一點關系。說白了就一個發布-訂閱模式,一共只有4個接口,3個對象,非常簡單清晰。寫一個入門例子就可以掌握。
package jdk9;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
/**
* 帶 process 的 flow demo
*/
/**
* Processor, 需要繼承SubmissionPublisher並實現Processor接口
*
* 輸入源數據 integer, 過濾掉小於0的, 然后轉換成字符串發布出去
*/
class MyProcessor extends SubmissionPublisher<String>
implements Processor<Integer, String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存訂閱關系, 需要用它來給發布者響應
this.subscription = subscription;
// 請求一個數據
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一個數據, 處理
System.out.println("處理器接受到數據: " + item);
// 過濾掉小於0的, 然后發布出去
if (item > 0) {
this.submit("轉換后的數據:" + item);
}
// 處理完調用request再請求一個數據
this.subscription.request(1);
// 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現了異常(例如處理數據的時候產生了異常)
throwable.printStackTrace();
// 我們可以告訴發布者, 后面不接受數據了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數據處理完了(發布者關閉了)
System.out.println("處理器處理完了!");
// 關閉發布者
this.close();
}
}
public class FlowDemo2 {
public static void main(String[] args) throws Exception {
// 1. 定義發布者, 發布的數據類型是 Integer
// 直接使用jdk自帶的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定義處理器, 對數據進行過濾, 並轉換為String類型
MyProcessor processor = new MyProcessor();
// 3. 發布者 和 處理器 建立訂閱關系
publiser.subscribe(processor);
// 4. 定義最終訂閱者, 消費 String 類型數據
Subscriber<String> subscriber = new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存訂閱關系, 需要用它來給發布者響應
this.subscription = subscription;
// 請求一個數據
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接受到一個數據, 處理
System.out.println("接受到數據: " + item);
// 處理完調用request再請求一個數據
this.subscription.request(1);
// 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現了異常(例如處理數據的時候產生了異常)
throwable.printStackTrace();
// 我們可以告訴發布者, 后面不接受數據了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數據處理完了(發布者關閉了)
System.out.println("處理完了!");
}
};
// 5. 處理器 和 最終訂閱者 建立訂閱關系
processor.subscribe(subscriber);
// 6. 生產數據, 並發布
// 這里忽略數據生產過程
publiser.submit(-111);
publiser.submit(111);
// 7. 結束后 關閉發布者
// 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
publiser.close();
// 主線程延遲停止, 否則數據沒有消費就退出
Thread.currentThread().join(1000);
}
}
背壓
背壓依我的理解來說,是指訂閱者能和發布者交互(通過代碼里面的調用request和cancel方法交互),可以調節發布者發布數據的速率,解決把訂閱者壓垮的問題。關鍵在於上面例子里面的訂閱關系Subscription這個接口,他有request和cancel 2個方法,用於通知發布者需要數據和通知發布者不再接受數據。
我們重點理解背壓在jdk9里面是如何實現的。關鍵在於發布者Publisher的實現類SubmissionPublisher的submit方法是block方法。訂閱者會有一個緩沖池,默認為Flow.defaultBufferSize() = 256。當訂閱者的緩沖池滿了之后,發布者調用submit方法發布數據就會被阻塞,發布者就會停(慢)下來;訂閱者消費了數據之后(調用Subscription.request方法),緩沖池有位置了,submit方法就會繼續執行下去,就是通過這樣的機制,實現了調節發布者發布數據的速率,消費得快,生成就快,消費得慢,發布者就會被阻塞,當然就會慢下來了。
怎么樣實現發布者和多個訂閱者之間的阻塞和同步呢?使用的jdk7的Fork/Join的ManagedBlocker,有興趣的請自己查找相關資料。
reactor
spring webflux是基於reactor來實現響應式的。那么reactor是什么呢?我是這樣理解的
reactor = jdk8的stream + jdk9的flow響應式流。理解了這句話,reactor就很容易掌握。
reactor里面Flux和Mono就是stream,他的最終操作就是 subscribe/block 2種。reactor里面說的不訂閱將什么也不會方法就是我們最開始學習的惰性求值。
我們來看一段代碼,理解一下:
package com.imooc;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
public class ReactorDemo {
public static void main(String[] args) {
// reactor = jdk8 stream + jdk9 reactive stream
// Mono 0-1個元素
// Flux 0-N個元素
String[] strs = { "1", "2", "3" };
// 2. 定義訂閱者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存訂閱關系, 需要用它來給發布者響應
this.subscription = subscription;
// 請求一個數據
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一個數據, 處理
System.out.println("接受到數據: " + item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 處理完調用request再請求一個數據
this.subscription.request(1);
// 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現了異常(例如處理數據的時候產生了異常)
throwable.printStackTrace();
// 我們可以告訴發布者, 后面不接受數據了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數據處理完了(發布者關閉了)
System.out.println("處理完了!");
}
};
// 這里就是jdk8的stream
Flux.fromArray(strs).map(s -> Integer.parseInt(s))
// 最終操作
// 這里就是jdk9的reactive stream
.subscribe(subscriber);
}
}
上面的例子里面,我們可以把jdk9里面flowdemo的訂閱者代碼原封不動的copy過來,直接就可以用在reactor的subscribe方法上。訂閱就是相當於調用了stream的最終操作。有了 reactor = jdk8 stream + jdk9 reactive stream 概念后,在掌握了jdk8的stream和jkd9的flow之后,reactor也不難掌握。
spring5的webflux
上面的基礎和原理掌握之后,學習webflux就水到渠成了!webflux的關鍵是自己編寫的代碼里面返回流(Flux/Mono),spring框架來負責處理訂閱。 spring框架提供2種開發模式來編寫響應式代碼,使用mvc之前的注解模式和使用router function模式,都需要我們的代碼返回流,spring的響應式數據庫spring data jpa,如使用mongodb,也是返回流,訂閱都需要交給框架,自己不能訂閱。而編寫響應式代碼之前,我們還需要了解2個重要的概念,就是異步servlet和SSE。
異步servlet
學習異步servlet我們最重要的了解同步servlet阻塞了什么?為什么需要異步servlet?異步servlet能支持高吞吐量的原理是什么?
servlet容器(如tomcat)里面,每處理一個請求會占用一個線程,同步servlet里面,業務代碼處理多久,servlet容器的線程就會等(阻塞)多久,而servlet容器的線程是由上限的,當請求多了的時候servlet容器線程就會全部用完,就無法再處理請求(這個時候請求可能排隊也可能丟棄,得看如何配置),就會限制了應用的吞吐量!
而異步serlvet里面,servlet容器的線程不會傻等業務代碼處理完畢,而是直接返回(繼續處理其他請求),給業務代碼一個回調函數(asyncContext.complete()),業務代碼處理完了再通知我!這樣就可以使用少量的線程處理更加高的請求,從而實現高吞吐量!
我們看示例代碼:
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Servlet implementation class AsyncServlet
*/
@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public AsyncServlet() {
super();
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
long t1 = System.currentTimeMillis();
// 開啟異步
AsyncContext asyncContext = request.startAsync();
// 執行業務代碼
CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
asyncContext.getRequest(), asyncContext.getResponse()));
System.out.println("async use:" + (System.currentTimeMillis() - t1));
}
private void doSomeThing(AsyncContext asyncContext,
ServletRequest servletRequest, ServletResponse servletResponse) {
// 模擬耗時操作
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
//
try {
servletResponse.getWriter().append("done");
} catch (IOException e) {
e.printStackTrace();
}
// 業務代碼處理完畢, 通知結束
asyncContext.complete();
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}
}
大家可以運行上面代碼,業務代碼花了5秒,但servlet容器的線程幾乎沒有任何耗時。而如果是同步servlet的,線程就會傻等5秒,這5秒內這個線程只處理了這一個請求。
SSE(server-sent event)
響應式流里面,可以多次返回數據(其實和響應式沒有關系),使用的技術就是H5的SSE。我們學習技術,API的使用只是最初級也是最簡單的,更加重要的是需要知其然並知其所以然,否則你只能死記硬背不用就忘!我們不滿足在spring里面能實現sse效果,更加需要知道spring是如何做到的。其實SSE很簡單,我們花一點點時間就可以掌握,我們在純servlet環境里面實現。我們看代碼,這里一個最簡單的示例。
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Servlet implementation class SSE
*/
@WebServlet("/SSE")
public class SSE extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public SSE() {
super();
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
for (int i = 0; i < 5; i++) {
// 指定事件標識
response.getWriter().write("event:me\n");
// 格式: data: + 數據 + 2個回車
response.getWriter().write("data:" + i + "\n\n");
response.getWriter().flush();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}
}
關鍵是ContentType 是 “text/event-stream”,然后返回的數據有固定的要求格式即可。
結束語
經過上面的一步一個腳印的學習,我們的基礎已經打牢,障礙已經掃清,現在可以進入輕松愉快的spring flux學習之旅了!Enjoy!
個人認為,spring的weblfux響應式編程的高吞吐量特性,將會逐步會成為技術趨勢,成為我們對系統進行垂直擴展的首選。那么應該如何進行spring的weblfux響應式編程呢?請參考我這篇文章:springboot2 webflux 響應式編程學習路徑,個人建議把基礎夯實了在學習,直接學習步子扯的太大會有太多疑惑,這些疑惑遲早你要退回來補。當然更加高效的觀看我的實戰課程 SpringBoot2.0不容錯過的新特性 WebFlux響應式編程,海量的知識點,從簡到難,一個一個知識點底層原理運行機制的講解,最后還直播講解使用IoC/AOP編寫類似feign的聲明式的全響應式框架,相信你一定有能有所獲!聽完課程后,你也可以和面試官扯扯相關知識點的底層實現了!7小時只需要128元,良心課程,絕對物超所值!!:) 去看看吧
