在學習webflux 之前我們先要了解一個概念
什么是背壓(back press)
對我們以前使用的發布訂閱模式來說.我們的以前的模式是 消費只能通過提供者提供的數據來持續消費 如果一直發送消息,那么就只能一直消費
我們對背壓做一個詳細的比喻
比如我們每家每戶,都有一個水龍頭.自來水公司相當於發布者者,我們家庭就是個訂閱者,水就是數據,在之前的模式我們的訂閱者是一個被動接受的概念
背壓就是相當於我們家里安裝了一個水龍頭,我們需要的時候就把他打開.不需要的時候就把他關閉
reactive stream
響應式流.這是jdk9 引入的一套標准,他能夠很好的實現背壓,但是我去官網的時候,發現jdk9已經結束.我們看看jdk11吧
jdk11有一個flow接口 里面有4個方法
1.publisher 就是發布者
subscribe:就是和訂閱者產生一個關系
2.subscribe 就是訂閱者
onSubscribe:簽署一個訂閱關系傳入subscription
onNext(): 接受到一條數據
onError(): 就是出錯
onComplete(): 完成
3.Subscription接口中就是其中實現背壓的關鍵 其中request方法就是告訴發布者我需要多少資源,發布者那里 就會發布多少資源
4.Processor 既可以做發布者,也可以做訂閱者,具體是用來中間環節的數據處理工作
簡單的例子我們來運行下
每次處理完之后告訴發布者我還可以處理的數據是多少
public static void main(String[] args) throws InterruptedException { //1.定義發布者 SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); //2. 定義訂閱者 Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() { private Flow.Subscription subscription; int total = 0; @Override public void onSubscribe(Flow.Subscription subscription) { //保存訂閱關系 this.subscription = subscription; //請求一個數據 subscription.request(1); } @Override public void onNext(Integer item) { System.out.println("接受到: "+ item); total++; System.out.println("接受的條數為 : "+ total); this.subscription.request(1); //或者到達一定數量告訴發布者不接受數據了 if(total ==10){ this.subscription.cancel(); System.out.println("接受數據已經足夠"); } } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); //拋出異常就返回 this.subscription.cancel(); } @Override public void onComplete() { System.out.println("數據處理完了."); } }; //3發布和訂閱 建立訂閱關系 publisher.subscribe(subscriber); //4.生產數據 for (int i = 0; i < 100; i++) { publisher.submit(i); } //5.關閉發布者 publisher.close(); Thread.currentThread().join(5000); }
processor
public static void main(String[] args) throws InterruptedException { //1.定義發布者 SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); //2.定義一個處理器 對數據進行過濾,並轉化為string的類型 MyProcessor myProcessor = new MyProcessor(); //3.發布者與處理器建立關系 publisher.subscribe(myProcessor); //4. 定義最終訂閱者 Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { //保存訂閱關系 this.subscription = subscription; //請求一個數據 subscription.request(1); } LinkedList<String > list = new LinkedList<>(); @Override public void onNext(String item) { list.add(item); this.subscription.request(1); //或者到達一定數量告訴發布者不接受數據了 System.out.println(item); if(list.size() == 10){ this.subscription.cancel(); System.out.println("接受數據已經足夠"); this.onComplete(); } } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); //拋出異常就返回 this.subscription.cancel(); } @Override public void onComplete() { System.out.println("數據處理完了."+list.toString()); } }; //5 處理器和最終的訂閱者建立關系 myProcessor.subscribe(subscriber); //4.生產數據 for (int i = 0; i < 100; i++) { publisher.submit(i); } //5.關閉發布者 publisher.close(); Thread.currentThread().join(5000); } static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(Integer item) { // System.out.println("processor-> 處理器接收到的數據.."+item); if(item % 2 ==0){ this.submit("轉->" +item); } this.subscription.request(1); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); this.subscription.cancel(); } @Override public void onComplete() { System.out.println("processor 處理器已經處理完成!"); } }
里面的運行機制
publiser.submit():是一個阻塞方法
訂閱者有一個緩沖池.當緩沖池滿了之后 submit()方法就會被阻塞.這樣就不會再去生產數據了
subscription 緩沖的capacity默認是256個.