java8流及reactor(stream+flow響應式流)


一個java 8的stream是由三部分組成的。數據源,零個或一個或多個中間操作,一個或零個終止操作。

中間操作是對數據的加工,注意,中間操作是lazy操作,並不會立馬啟動,需要等待終止操作才會執行。

終止操作是stream的啟動操作,只有加上終止操作,stream才會真正的開始執行。

lambda實現惰性求值

什么是惰性求值(惰性計算)

編程語言理論中,惰性求值(英語:Lazy Evaluation),又譯為惰性計算、懶惰求值,也稱為傳需求調用(call-by-need),是一個計算機編程中的一個概念,它的目的是要最小化計算機要做的工作。它有兩個相關而又有區別的含意,可以表示為“延遲求值”和“最小化求值”,本條目專注前者,后者請參見最小化計算條目。除可以得到性能的提升外,惰性計算的最重要的好處是它可以構造一個無限的數據類型

惰性求值的相反是及早求值(熱情求值),這是在大多數編程語言中隨處可見的一種計算方式,例如:

int x = 1;
String name = getUserName();

上面的表達式在綁定了變量后就立即求值,得到計算的結果。

Java中的惰性求值
以下Java代碼就是惰性求值的范例。這段代碼在定義 nameStream 這個流的時候,System.out.println 語句不會被立即執行。

public static void main(String[] args) {
    // 定義流
    Stream<String> nameStream = Stream.of("Zebe", "July", "Yaha").filter(name -> {
        if (!name.isEmpty()) {
            System.out.println("過濾流,當前名稱:" + name);
            return true;
        }
        return false;
    });

    // 取出流的值,這時候才會調用計算
    List<String> names1 = nameStream.collect(Collectors.toList());
    // 流只能被使用一次,下面這行代碼會報錯,提示流已經被操作或者關閉了
    List<String> names2 = nameStream.collect(Collectors.toList());
}

在jdk8的stream流編程里面,沒有調用最終操作的時候,中間操作的方法都不會執行,這也是惰性求值。

stream流編程

stream編程主要是學習API的使用,但前提是學好lambda,基礎好了,看這些方法定義非常簡單,要是沒有打好基礎,你會有很多東西需要記憶。

內部迭代和外部迭代

一般來說,我們之前的編碼方法,叫外部迭代,stream的寫法叫內部迭代。內部迭代代碼更加可讀更加優雅,關注點是做什么(外部迭代關注是怎么樣做),也很容易讓我們養成編程小函數的好習慣!這點在編程習慣里面非常重要!看例子:

package com.dxz.stream;
import java.util.stream.IntStream;

public class StreamDemo1 {

  public static void main(String[] args) {
    int[] nums = { 1, 2, 3 };
    // 外部迭代
    int sum = 0;
    for (int i : nums) {
      sum += i;
    }
    System.out.println("結果1為:" + sum);

    // 使用stream的內部迭代
    // map就是中間操作(返回stream的操作)
    // sum就是終止操作
    int sum2 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum();
    System.out.println("結果2為:" + sum2);

    System.out.println("惰性求值就是終止沒有調用的情況下,中間操作不會執行,下面的不會打印了");
    IntStream.of(nums).map(StreamDemo1::doubleNum);
  }

  public static int doubleNum(int i) {
    System.out.println("執行了乘以2");
    return i * 2;
  }
}

結果:

結果1為:6
執行了乘以2
執行了乘以2
執行了乘以2
結果2為:12
惰性求值就是終止沒有調用的情況下,中間操作不會執行,下面的不會打印了

操作類型

操作類型概念要理清楚。有幾個維度。

首先分為 中間操作 和 最終操作,在最終操作沒有調用的情況下,所有的中級操作都不會執行。那么那些是中間操作那些是最終操作呢? 簡單來說,返回stream流的就是中間操作,可以繼續鏈式調用下去,不是返回stream的就是最終操作。這點很好理解。

最終操作里面分為短路操作非短路操作,短路操作就是limit/findxxx/xxxMatch這種,就是找了符合條件的就終止,其他的就是非短路操作。在無限流里面需要調用短路操作,否則像炫邁口香糖一樣根本停不下來!

中間操作又分為 有狀態操作 和 無狀態操作,怎么樣區分呢? 一開始很多同學需要死記硬背,其實你主要掌握了狀態這個關鍵字就不需要死記硬背。狀態就是和其他數據有關系。我們可以看方法的參數,如果是一個參數的,就是無狀態操作,因為只和自己有關,其他的就是有狀態操作。如map/filter方法,只有一個參數就是自己,就是無狀態操作;而distinct/sorted就是有狀態操作,因為去重和排序都需要和其他數據比較,理解了這點,就不需要死記硬背了!

為什么要知道有狀態和無狀態操作呢?在多個操作的時候,我們需要把無狀態操作寫在一起,有狀態操作放到最后,這樣效率會更加高。

運行機制

我們可以通過下面的代碼來理解stream的運行機制

package com.dxz.stream;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
 * 驗證stream運行機制
 * 
 * 1. 所有操作是鏈式調用, 一個元素只迭代一次 
 * 2. 每一個中間操作返回一個新的流. 流里面有一個屬性sourceStage指向同一個 地方,就是Head 
 * 3. Head->nextStage->nextStage->... -> null
 * 4. 有狀態操作會把無狀態操作階段,單獨處理
 * 5. 並行環境下, 有狀態的中間操作不一定能並行操作.
 * 
 * 6. parallel/ sequetial 這2個操作也是中間操作(也是返回stream)
 *     但是他們不創建流, 他們只修改 Head的並行標志
 * 
 */
public class RunStream {

    public static void main(String[] args) {
        Random random = new Random();
        // 隨機產生數據
        Stream<Integer> stream = Stream.generate(() -> random.nextInt())
                // 產生500個 ( 無限流需要短路操作. )
                .limit(50)
                // 第1個無狀態操作
                .peek(s -> print("peek相當於debug操作: " + s))
                // 第2個無狀態操作,大於1000000的值留下
                .filter(s -> {
                    print("filter: " + s);
                    return s > 1000000;
                })
                // 有狀態操作
                .sorted((i1, i2) -> {
                    print("排序: " + i1 + ", " + i2);
                    return i1.compareTo(i2);
                })
                // 又一個無狀態操作
                .peek(s -> {
                    print("peek相當於debug操作2: " + s);
                }).parallel();

        // 終止操作
        long count = stream.count();
        System.out.println("end=" + count);
    }

    /**
     * 打印日志並sleep 5 毫秒
     * 
     * @param s
     */
    public static void print(String s) {
        // System.out.println(s);
        // 帶線程名(測試並行情況)
        System.out.println(Thread.currentThread().getName() + " > " + s);
        /*try {
            TimeUnit.MILLISECONDS.sleep(5);
        } catch (InterruptedException e) {
        }*/
    }
}

 

結果:

ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 467408314
ForkJoinPool.commonPool-worker-1 > filter: 467408314
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -694002895
ForkJoinPool.commonPool-worker-1 > filter: -694002895
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1018018271
ForkJoinPool.commonPool-worker-1 > filter: 1018018271
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -2055711792
ForkJoinPool.commonPool-worker-1 > filter: -2055711792
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 362753392
ForkJoinPool.commonPool-worker-1 > filter: 362753392
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1420256006
ForkJoinPool.commonPool-worker-1 > filter: 1420256006
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1759751716
ForkJoinPool.commonPool-worker-1 > filter: 1759751716
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1451030142
ForkJoinPool.commonPool-worker-1 > filter: -1451030142
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1091423344
ForkJoinPool.commonPool-worker-1 > filter: -1091423344
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1377817193
ForkJoinPool.commonPool-worker-1 > filter: 1377817193
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1159545806
ForkJoinPool.commonPool-worker-1 > filter: -1159545806
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -173182413
ForkJoinPool.commonPool-worker-1 > filter: -173182413
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 600209961
ForkJoinPool.commonPool-worker-1 > filter: 600209961
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1200504990
ForkJoinPool.commonPool-worker-1 > filter: 1200504990
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 260857935
ForkJoinPool.commonPool-worker-1 > filter: 260857935
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -2078810651
ForkJoinPool.commonPool-worker-1 > filter: -2078810651
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 402189452
ForkJoinPool.commonPool-worker-1 > filter: 402189452
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -946047183
ForkJoinPool.commonPool-worker-1 > filter: -946047183
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1516256407
ForkJoinPool.commonPool-worker-1 > filter: -1516256407
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -161522096
ForkJoinPool.commonPool-worker-1 > filter: -161522096
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 346222819
ForkJoinPool.commonPool-worker-1 > filter: 346222819
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1492195164
ForkJoinPool.commonPool-worker-1 > filter: -1492195164
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -823860607
ForkJoinPool.commonPool-worker-1 > filter: -823860607
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1400321699
ForkJoinPool.commonPool-worker-1 > filter: -1400321699
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -594055512
ForkJoinPool.commonPool-worker-1 > filter: -594055512
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -968008982
ForkJoinPool.commonPool-worker-1 > filter: -968008982
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -538739410
ForkJoinPool.commonPool-worker-1 > filter: -538739410
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1403417684
ForkJoinPool.commonPool-worker-1 > filter: -1403417684
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -938859220
ForkJoinPool.commonPool-worker-1 > filter: -938859220
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1962390769
ForkJoinPool.commonPool-worker-1 > filter: -1962390769
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1952141841
ForkJoinPool.commonPool-worker-1 > filter: 1952141841
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 68659719
ForkJoinPool.commonPool-worker-1 > filter: 68659719
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -397822084
ForkJoinPool.commonPool-worker-1 > filter: -397822084
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1047484685
ForkJoinPool.commonPool-worker-1 > filter: 1047484685
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 2051123152
ForkJoinPool.commonPool-worker-1 > filter: 2051123152
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1789203084
ForkJoinPool.commonPool-worker-1 > filter: 1789203084
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1447008850
ForkJoinPool.commonPool-worker-1 > filter: -1447008850
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1982526816
ForkJoinPool.commonPool-worker-1 > filter: 1982526816
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 432837685
ForkJoinPool.commonPool-worker-1 > filter: 432837685
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 2139658303
ForkJoinPool.commonPool-worker-1 > filter: 2139658303
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1386330515
ForkJoinPool.commonPool-worker-1 > filter: 1386330515
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 5070967
ForkJoinPool.commonPool-worker-1 > filter: 5070967
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -2131175714
ForkJoinPool.commonPool-worker-1 > filter: -2131175714
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 390402009
ForkJoinPool.commonPool-worker-1 > filter: 390402009
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1096668893
ForkJoinPool.commonPool-worker-1 > filter: -1096668893
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -929907211
ForkJoinPool.commonPool-worker-1 > filter: -929907211
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1859217343
ForkJoinPool.commonPool-worker-1 > filter: 1859217343
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1526881589
ForkJoinPool.commonPool-worker-1 > filter: -1526881589
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1122349191
ForkJoinPool.commonPool-worker-1 > filter: 1122349191
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -882024070
ForkJoinPool.commonPool-worker-1 > filter: -882024070
main > 排序: 1018018271, 467408314
main > 排序: 362753392, 1018018271
main > 排序: 362753392, 1018018271
main > 排序: 362753392, 467408314
main > 排序: 1420256006, 467408314
main > 排序: 1420256006, 1018018271
main > 排序: 1759751716, 1018018271
main > 排序: 1759751716, 1420256006
main > 排序: 1377817193, 1018018271
main > 排序: 1377817193, 1759751716
main > 排序: 1377817193, 1420256006
main > 排序: 600209961, 1377817193
main > 排序: 600209961, 467408314
main > 排序: 600209961, 1018018271
main > 排序: 1200504990, 1018018271
main > 排序: 1200504990, 1420256006
main > 排序: 1200504990, 1377817193
main > 排序: 260857935, 1200504990
main > 排序: 260857935, 600209961
main > 排序: 260857935, 467408314
main > 排序: 260857935, 362753392
main > 排序: 402189452, 1018018271
main > 排序: 402189452, 467408314
main > 排序: 402189452, 362753392
main > 排序: 346222819, 1018018271
main > 排序: 346222819, 402189452
main > 排序: 346222819, 362753392
main > 排序: 346222819, 260857935
main > 排序: 1952141841, 600209961
main > 排序: 1952141841, 1377817193
main > 排序: 1952141841, 1759751716
main > 排序: 68659719, 1018018271
main > 排序: 68659719, 402189452
main > 排序: 68659719, 346222819
main > 排序: 68659719, 260857935
main > 排序: 1047484685, 600209961
main > 排序: 1047484685, 1420256006
main > 排序: 1047484685, 1200504990
main > 排序: 1047484685, 1018018271
main > 排序: 2051123152, 1018018271
main > 排序: 2051123152, 1420256006
main > 排序: 2051123152, 1952141841
main > 排序: 1789203084, 1018018271
main > 排序: 1789203084, 1420256006
main > 排序: 1789203084, 1952141841
main > 排序: 1789203084, 1759751716
main > 排序: 1982526816, 1047484685
main > 排序: 1982526816, 1759751716
main > 排序: 1982526816, 1952141841
main > 排序: 1982526816, 2051123152
main > 排序: 432837685, 1047484685
main > 排序: 432837685, 402189452
main > 排序: 432837685, 600209961
main > 排序: 432837685, 467408314
main > 排序: 2139658303, 1047484685
main > 排序: 2139658303, 1789203084
main > 排序: 2139658303, 1982526816
main > 排序: 2139658303, 2051123152
main > 排序: 1386330515, 1047484685
main > 排序: 1386330515, 1789203084
main > 排序: 1386330515, 1420256006
main > 排序: 1386330515, 1377817193
main > 排序: 5070967, 1200504990
main > 排序: 5070967, 432837685
main > 排序: 5070967, 346222819
main > 排序: 5070967, 260857935
main > 排序: 5070967, 68659719
main > 排序: 390402009, 1047484685
main > 排序: 390402009, 402189452
main > 排序: 390402009, 260857935
main > 排序: 390402009, 362753392
main > 排序: 1859217343, 1047484685
main > 排序: 1859217343, 1789203084
main > 排序: 1859217343, 2051123152
main > 排序: 1859217343, 1982526816
main > 排序: 1859217343, 1952141841
main > 排序: 1122349191, 1047484685
main > 排序: 1122349191, 1789203084
main > 排序: 1122349191, 1386330515
main > 排序: 1122349191, 1377817193
main > 排序: 1122349191, 1200504990
main > peek相當於debug操作2: 1386330515
ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 1200504990
ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 1377817193
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作2: 1982526816
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作2: 2051123152
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作2: 2139658303
ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 1122349191
ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 1789203084
ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 432837685
ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 467408314
ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 402189452
main > peek相當於debug操作2: 1420256006
main > peek相當於debug操作2: 1759751716
ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 1018018271
ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 1047484685
ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 346222819
ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 362753392
ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 390402009
ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 5070967
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作2: 1859217343
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作2: 1952141841
ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 68659719
ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 260857935
main > peek相當於debug操作2: 600209961
end=24

 

 

大家自己測試一下代碼,能發現stream的調用方法,就像現實中的流水線一樣,一個元素只會迭代一次,但如果中間有無狀態操作,前后的操作會單獨處理(元素就會被多次迭代)。

jdk9的響應式流

就是reactive stream,也就是flow。其實和jdk8的stream沒有一點關系。說白了就一個發布-訂閱模式,一共只有4個接口,3個對象,非常簡單清晰。寫一個入門例子就可以掌握。

package jdk9; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; /** * 帶 process 的 flow demo */ /** * Processor, 需要繼承SubmissionPublisher並實現Processor接口 * * 輸入源數據 integer, 過濾掉小於0的, 然后轉換成字符串發布出去 */ class MyProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關系, 需要用它來給發布者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("處理器接受到數據: " + item); // 過濾掉小於0的, 然后發布出去 if (item > 0) { this.submit("轉換后的數據:" + item); } // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 我們可以告訴發布者, 后面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 全部數據處理完了(發布者關閉了) System.out.println("處理器處理完了!"); // 關閉發布者 this.close(); } } public class FlowDemo2 { public static void main(String[] args) throws Exception { // 1. 定義發布者, 發布的數據類型是 Integer // 直接使用jdk自帶的SubmissionPublisher SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>(); // 2. 定義處理器, 對數據進行過濾, 並轉換為String類型 MyProcessor processor = new MyProcessor(); // 3. 發布者 和 處理器 建立訂閱關系 publiser.subscribe(processor); // 4. 定義最終訂閱者, 消費 String 類型數據 Subscriber<String> subscriber = new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關系, 需要用它來給發布者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(String item) { // 接受到一個數據, 處理 System.out.println("接受到數據: " + item); // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 我們可以告訴發布者, 后面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 全部數據處理完了(發布者關閉了) System.out.println("處理完了!"); } }; // 5. 處理器 和 最終訂閱者 建立訂閱關系 processor.subscribe(subscriber); // 6. 生產數據, 並發布 // 這里忽略數據生產過程 publiser.submit(-111); publiser.submit(111); // 7. 結束后 關閉發布者 // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉 publiser.close(); // 主線程延遲停止, 否則數據沒有消費就退出 Thread.currentThread().join(1000);}}

背壓

背壓依我的理解來說,是指訂閱者能和發布者交互(通過代碼里面的調用request和cancel方法交互),可以調節發布者發布數據的速率,解決把訂閱者壓垮的問題。關鍵在於上面例子里面的訂閱關系Subscription這個接口,他有request和cancel 2個方法,用於通知發布者需要數據和通知發布者不再接受數據。

我們重點理解背壓在jdk9里面是如何實現的。關鍵在於發布者Publisher的實現類SubmissionPublisher的submit方法是block方法。訂閱者會有一個緩沖池,默認為Flow.defaultBufferSize() = 256。當訂閱者的緩沖池滿了之后,發布者調用submit方法發布數據就會被阻塞,發布者就會停(慢)下來;訂閱者消費了數據之后(調用Subscription.request方法),緩沖池有位置了,submit方法就會繼續執行下去,就是通過這樣的機制,實現了調節發布者發布數據的速率,消費得快,生成就快,消費得慢,發布者就會被阻塞,當然就會慢下來了。

怎么樣實現發布者和多個訂閱者之間的阻塞和同步呢?使用的jdk7的Fork/Join的ManagedBlocker,有興趣的請自己查找相關資料。

reactor

spring webflux是基於reactor來實現響應式的。那么reactor是什么呢?我是這樣理解的
reactor = jdk8的stream + jdk9的flow響應式流。理解了這句話,reactor就很容易掌握。
reactor里面Flux和Mono就是stream,他的最終操作就是 subscribe/block 2種。reactor里面說的不訂閱將什么也不會方法就是我們最開始學習的惰性求值。

我們來看一段代碼,理解一下:

package com.imooc; import java.util.concurrent.TimeUnit; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; public class ReactorDemo { public static void main(String[] args) { // reactor = jdk8 stream + jdk9 reactive stream // Mono 0-1個元素 // Flux 0-N個元素 String[] strs = { "1", "2", "3" }; // 2. 定義訂閱者 Subscriber<Integer> subscriber = new Subscriber<Integer>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關系, 需要用它來給發布者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("接受到數據: " + item); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 我們可以告訴發布者, 后面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 全部數據處理完了(發布者關閉了) System.out.println("處理完了!"); } }; // 這里就是jdk8的stream Flux.fromArray(strs).map(s -> Integer.parseInt(s)) // 最終操作 // 這里就是jdk9的reactive stream .subscribe(subscriber); } }

上面的例子里面,我們可以把jdk9里面flowdemo的訂閱者代碼原封不動的copy過來,直接就可以用在reactor的subscribe方法上。訂閱就是相當於調用了stream的最終操作。有了 reactor = jdk8 stream + jdk9 reactive stream 概念后,在掌握了jdk8的stream和jkd9的flow之后,reactor也不難掌握。

spring5的webflux

上面的基礎和原理掌握之后,學習webflux就水到渠成了!webflux的關鍵是自己編寫的代碼里面返回流(Flux/Mono),spring框架來負責處理訂閱。 spring框架提供2種開發模式來編寫響應式代碼,使用mvc之前的注解模式和使用router function模式,都需要我們的代碼返回流,spring的響應式數據庫spring data jpa,如使用mongodb,也是返回流,訂閱都需要交給框架,自己不能訂閱。而編寫響應式代碼之前,我們還需要了解2個重要的概念,就是異步servlet和SSE。

異步servlet

學習異步servlet我們最重要的了解同步servlet阻塞了什么?為什么需要異步servlet?異步servlet能支持高吞吐量的原理是什么?

servlet容器(如tomcat)里面,每處理一個請求會占用一個線程,同步servlet里面,業務代碼處理多久,servlet容器的線程就會等(阻塞)多久,而servlet容器的線程是由上限的,當請求多了的時候servlet容器線程就會全部用完,就無法再處理請求(這個時候請求可能排隊也可能丟棄,得看如何配置),就會限制了應用的吞吐量!

而異步serlvet里面,servlet容器的線程不會傻等業務代碼處理完畢,而是直接返回(繼續處理其他請求),給業務代碼一個回調函數(asyncContext.complete()),業務代碼處理完了再通知我!這樣就可以使用少量的線程處理更加高的請求,從而實現高吞吐量!

我們看示例代碼:

 import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class AsyncServlet */ @WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" }) public class AsyncServlet extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public AsyncServlet() { super(); } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { long t1 = System.currentTimeMillis(); // 開啟異步 AsyncContext asyncContext = request.startAsync(); // 執行業務代碼 CompletableFuture.runAsync(() -> doSomeThing(asyncContext, asyncContext.getRequest(), asyncContext.getResponse())); System.out.println("async use:" + (System.currentTimeMillis() - t1)); } private void doSomeThing(AsyncContext asyncContext, ServletRequest servletRequest, ServletResponse servletResponse) { // 模擬耗時操作 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { } // try { servletResponse.getWriter().append("done"); } catch (IOException e) { e.printStackTrace(); } // 業務代碼處理完畢, 通知結束 asyncContext.complete(); } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { doGet(request, response); } }

大家可以運行上面代碼,業務代碼花了5秒,但servlet容器的線程幾乎沒有任何耗時。而如果是同步servlet的,線程就會傻等5秒,這5秒內這個線程只處理了這一個請求。

SSE(server-sent event)

響應式流里面,可以多次返回數據(其實和響應式沒有關系),使用的技術就是H5的SSE。我們學習技術,API的使用只是最初級也是最簡單的,更加重要的是需要知其然並知其所以然,否則你只能死記硬背不用就忘!我們不滿足在spring里面能實現sse效果,更加需要知道spring是如何做到的。其實SSE很簡單,我們花一點點時間就可以掌握,我們在純servlet環境里面實現。我們看代碼,這里一個最簡單的示例。

 import java.io.IOException; import java.util.concurrent.TimeUnit; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class SSE */ @WebServlet("/SSE") public class SSE extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public SSE() { super(); } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); for (int i = 0; i < 5; i++) { // 指定事件標識 response.getWriter().write("event:me\n"); // 格式: data: + 數據 + 2個回車 response.getWriter().write("data:" + i + "\n\n"); response.getWriter().flush(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } } } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { doGet(request, response); } }

關鍵是ContentType 是 "text/event-stream",然后返回的數據有固定的要求格式即可。

結束語

經過上面的一步一個腳印的學習,我們的基礎已經打牢,障礙已經掃清,現在可以進入輕松愉快的spring flux學習之旅了!Enjoy!

 

原文鏈接:https://blog.csdn.net/zebe1989/article/details/82692508

https://www.imooc.com/article/27181


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM