java8流及reactor(stream+flow响应式流)


一个java 8的stream是由三部分组成的。数据源,零个或一个或多个中间操作,一个或零个终止操作。

中间操作是对数据的加工,注意,中间操作是lazy操作,并不会立马启动,需要等待终止操作才会执行。

终止操作是stream的启动操作,只有加上终止操作,stream才会真正的开始执行。

lambda实现惰性求值

什么是惰性求值(惰性计算)

编程语言理论中,惰性求值(英语:Lazy Evaluation),又译为惰性计算、懒惰求值,也称为传需求调用(call-by-need),是一个计算机编程中的一个概念,它的目的是要最小化计算机要做的工作。它有两个相关而又有区别的含意,可以表示为“延迟求值”和“最小化求值”,本条目专注前者,后者请参见最小化计算条目。除可以得到性能的提升外,惰性计算的最重要的好处是它可以构造一个无限的数据类型

惰性求值的相反是及早求值(热情求值),这是在大多数编程语言中随处可见的一种计算方式,例如:

int x = 1;
String name = getUserName();

上面的表达式在绑定了变量后就立即求值,得到计算的结果。

Java中的惰性求值
以下Java代码就是惰性求值的范例。这段代码在定义 nameStream 这个流的时候,System.out.println 语句不会被立即执行。

public static void main(String[] args) {
    // 定义流
    Stream<String> nameStream = Stream.of("Zebe", "July", "Yaha").filter(name -> {
        if (!name.isEmpty()) {
            System.out.println("过滤流,当前名称:" + name);
            return true;
        }
        return false;
    });

    // 取出流的值,这时候才会调用计算
    List<String> names1 = nameStream.collect(Collectors.toList());
    // 流只能被使用一次,下面这行代码会报错,提示流已经被操作或者关闭了
    List<String> names2 = nameStream.collect(Collectors.toList());
}

在jdk8的stream流编程里面,没有调用最终操作的时候,中间操作的方法都不会执行,这也是惰性求值。

stream流编程

stream编程主要是学习API的使用,但前提是学好lambda,基础好了,看这些方法定义非常简单,要是没有打好基础,你会有很多东西需要记忆。

内部迭代和外部迭代

一般来说,我们之前的编码方法,叫外部迭代,stream的写法叫内部迭代。内部迭代代码更加可读更加优雅,关注点是做什么(外部迭代关注是怎么样做),也很容易让我们养成编程小函数的好习惯!这点在编程习惯里面非常重要!看例子:

package com.dxz.stream;
import java.util.stream.IntStream;

public class StreamDemo1 {

  public static void main(String[] args) {
    int[] nums = { 1, 2, 3 };
    // 外部迭代
    int sum = 0;
    for (int i : nums) {
      sum += i;
    }
    System.out.println("结果1为:" + sum);

    // 使用stream的内部迭代
    // map就是中间操作(返回stream的操作)
    // sum就是终止操作
    int sum2 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum();
    System.out.println("结果2为:" + sum2);

    System.out.println("惰性求值就是终止没有调用的情况下,中间操作不会执行,下面的不会打印了");
    IntStream.of(nums).map(StreamDemo1::doubleNum);
  }

  public static int doubleNum(int i) {
    System.out.println("执行了乘以2");
    return i * 2;
  }
}

结果:

结果1为:6
执行了乘以2
执行了乘以2
执行了乘以2
结果2为:12
惰性求值就是终止没有调用的情况下,中间操作不会执行,下面的不会打印了

操作类型

操作类型概念要理清楚。有几个维度。

首先分为 中间操作 和 最终操作,在最终操作没有调用的情况下,所有的中级操作都不会执行。那么那些是中间操作那些是最终操作呢? 简单来说,返回stream流的就是中间操作,可以继续链式调用下去,不是返回stream的就是最终操作。这点很好理解。

最终操作里面分为短路操作非短路操作,短路操作就是limit/findxxx/xxxMatch这种,就是找了符合条件的就终止,其他的就是非短路操作。在无限流里面需要调用短路操作,否则像炫迈口香糖一样根本停不下来!

中间操作又分为 有状态操作 和 无状态操作,怎么样区分呢? 一开始很多同学需要死记硬背,其实你主要掌握了状态这个关键字就不需要死记硬背。状态就是和其他数据有关系。我们可以看方法的参数,如果是一个参数的,就是无状态操作,因为只和自己有关,其他的就是有状态操作。如map/filter方法,只有一个参数就是自己,就是无状态操作;而distinct/sorted就是有状态操作,因为去重和排序都需要和其他数据比较,理解了这点,就不需要死记硬背了!

为什么要知道有状态和无状态操作呢?在多个操作的时候,我们需要把无状态操作写在一起,有状态操作放到最后,这样效率会更加高。

运行机制

我们可以通过下面的代码来理解stream的运行机制

package com.dxz.stream;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
 * 验证stream运行机制
 * 
 * 1. 所有操作是链式调用, 一个元素只迭代一次 
 * 2. 每一个中间操作返回一个新的流. 流里面有一个属性sourceStage指向同一个 地方,就是Head 
 * 3. Head->nextStage->nextStage->... -> null
 * 4. 有状态操作会把无状态操作阶段,单独处理
 * 5. 并行环境下, 有状态的中间操作不一定能并行操作.
 * 
 * 6. parallel/ sequetial 这2个操作也是中间操作(也是返回stream)
 *     但是他们不创建流, 他们只修改 Head的并行标志
 * 
 */
public class RunStream {

    public static void main(String[] args) {
        Random random = new Random();
        // 随机产生数据
        Stream<Integer> stream = Stream.generate(() -> random.nextInt())
                // 产生500个 ( 无限流需要短路操作. )
                .limit(50)
                // 第1个无状态操作
                .peek(s -> print("peek相当于debug操作: " + s))
                // 第2个无状态操作,大于1000000的值留下
                .filter(s -> {
                    print("filter: " + s);
                    return s > 1000000;
                })
                // 有状态操作
                .sorted((i1, i2) -> {
                    print("排序: " + i1 + ", " + i2);
                    return i1.compareTo(i2);
                })
                // 又一个无状态操作
                .peek(s -> {
                    print("peek相当于debug操作2: " + s);
                }).parallel();

        // 终止操作
        long count = stream.count();
        System.out.println("end=" + count);
    }

    /**
     * 打印日志并sleep 5 毫秒
     * 
     * @param s
     */
    public static void print(String s) {
        // System.out.println(s);
        // 带线程名(测试并行情况)
        System.out.println(Thread.currentThread().getName() + " > " + s);
        /*try {
            TimeUnit.MILLISECONDS.sleep(5);
        } catch (InterruptedException e) {
        }*/
    }
}

 

结果:

ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 467408314
ForkJoinPool.commonPool-worker-1 > filter: 467408314
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -694002895
ForkJoinPool.commonPool-worker-1 > filter: -694002895
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1018018271
ForkJoinPool.commonPool-worker-1 > filter: 1018018271
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -2055711792
ForkJoinPool.commonPool-worker-1 > filter: -2055711792
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 362753392
ForkJoinPool.commonPool-worker-1 > filter: 362753392
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1420256006
ForkJoinPool.commonPool-worker-1 > filter: 1420256006
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1759751716
ForkJoinPool.commonPool-worker-1 > filter: 1759751716
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1451030142
ForkJoinPool.commonPool-worker-1 > filter: -1451030142
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1091423344
ForkJoinPool.commonPool-worker-1 > filter: -1091423344
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1377817193
ForkJoinPool.commonPool-worker-1 > filter: 1377817193
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1159545806
ForkJoinPool.commonPool-worker-1 > filter: -1159545806
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -173182413
ForkJoinPool.commonPool-worker-1 > filter: -173182413
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 600209961
ForkJoinPool.commonPool-worker-1 > filter: 600209961
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1200504990
ForkJoinPool.commonPool-worker-1 > filter: 1200504990
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 260857935
ForkJoinPool.commonPool-worker-1 > filter: 260857935
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -2078810651
ForkJoinPool.commonPool-worker-1 > filter: -2078810651
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 402189452
ForkJoinPool.commonPool-worker-1 > filter: 402189452
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -946047183
ForkJoinPool.commonPool-worker-1 > filter: -946047183
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1516256407
ForkJoinPool.commonPool-worker-1 > filter: -1516256407
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -161522096
ForkJoinPool.commonPool-worker-1 > filter: -161522096
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 346222819
ForkJoinPool.commonPool-worker-1 > filter: 346222819
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1492195164
ForkJoinPool.commonPool-worker-1 > filter: -1492195164
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -823860607
ForkJoinPool.commonPool-worker-1 > filter: -823860607
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1400321699
ForkJoinPool.commonPool-worker-1 > filter: -1400321699
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -594055512
ForkJoinPool.commonPool-worker-1 > filter: -594055512
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -968008982
ForkJoinPool.commonPool-worker-1 > filter: -968008982
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -538739410
ForkJoinPool.commonPool-worker-1 > filter: -538739410
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1403417684
ForkJoinPool.commonPool-worker-1 > filter: -1403417684
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -938859220
ForkJoinPool.commonPool-worker-1 > filter: -938859220
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1962390769
ForkJoinPool.commonPool-worker-1 > filter: -1962390769
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1952141841
ForkJoinPool.commonPool-worker-1 > filter: 1952141841
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 68659719
ForkJoinPool.commonPool-worker-1 > filter: 68659719
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -397822084
ForkJoinPool.commonPool-worker-1 > filter: -397822084
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1047484685
ForkJoinPool.commonPool-worker-1 > filter: 1047484685
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 2051123152
ForkJoinPool.commonPool-worker-1 > filter: 2051123152
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1789203084
ForkJoinPool.commonPool-worker-1 > filter: 1789203084
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1447008850
ForkJoinPool.commonPool-worker-1 > filter: -1447008850
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1982526816
ForkJoinPool.commonPool-worker-1 > filter: 1982526816
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 432837685
ForkJoinPool.commonPool-worker-1 > filter: 432837685
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 2139658303
ForkJoinPool.commonPool-worker-1 > filter: 2139658303
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1386330515
ForkJoinPool.commonPool-worker-1 > filter: 1386330515
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 5070967
ForkJoinPool.commonPool-worker-1 > filter: 5070967
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -2131175714
ForkJoinPool.commonPool-worker-1 > filter: -2131175714
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 390402009
ForkJoinPool.commonPool-worker-1 > filter: 390402009
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1096668893
ForkJoinPool.commonPool-worker-1 > filter: -1096668893
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -929907211
ForkJoinPool.commonPool-worker-1 > filter: -929907211
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1859217343
ForkJoinPool.commonPool-worker-1 > filter: 1859217343
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -1526881589
ForkJoinPool.commonPool-worker-1 > filter: -1526881589
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: 1122349191
ForkJoinPool.commonPool-worker-1 > filter: 1122349191
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作: -882024070
ForkJoinPool.commonPool-worker-1 > filter: -882024070
main > 排序: 1018018271, 467408314
main > 排序: 362753392, 1018018271
main > 排序: 362753392, 1018018271
main > 排序: 362753392, 467408314
main > 排序: 1420256006, 467408314
main > 排序: 1420256006, 1018018271
main > 排序: 1759751716, 1018018271
main > 排序: 1759751716, 1420256006
main > 排序: 1377817193, 1018018271
main > 排序: 1377817193, 1759751716
main > 排序: 1377817193, 1420256006
main > 排序: 600209961, 1377817193
main > 排序: 600209961, 467408314
main > 排序: 600209961, 1018018271
main > 排序: 1200504990, 1018018271
main > 排序: 1200504990, 1420256006
main > 排序: 1200504990, 1377817193
main > 排序: 260857935, 1200504990
main > 排序: 260857935, 600209961
main > 排序: 260857935, 467408314
main > 排序: 260857935, 362753392
main > 排序: 402189452, 1018018271
main > 排序: 402189452, 467408314
main > 排序: 402189452, 362753392
main > 排序: 346222819, 1018018271
main > 排序: 346222819, 402189452
main > 排序: 346222819, 362753392
main > 排序: 346222819, 260857935
main > 排序: 1952141841, 600209961
main > 排序: 1952141841, 1377817193
main > 排序: 1952141841, 1759751716
main > 排序: 68659719, 1018018271
main > 排序: 68659719, 402189452
main > 排序: 68659719, 346222819
main > 排序: 68659719, 260857935
main > 排序: 1047484685, 600209961
main > 排序: 1047484685, 1420256006
main > 排序: 1047484685, 1200504990
main > 排序: 1047484685, 1018018271
main > 排序: 2051123152, 1018018271
main > 排序: 2051123152, 1420256006
main > 排序: 2051123152, 1952141841
main > 排序: 1789203084, 1018018271
main > 排序: 1789203084, 1420256006
main > 排序: 1789203084, 1952141841
main > 排序: 1789203084, 1759751716
main > 排序: 1982526816, 1047484685
main > 排序: 1982526816, 1759751716
main > 排序: 1982526816, 1952141841
main > 排序: 1982526816, 2051123152
main > 排序: 432837685, 1047484685
main > 排序: 432837685, 402189452
main > 排序: 432837685, 600209961
main > 排序: 432837685, 467408314
main > 排序: 2139658303, 1047484685
main > 排序: 2139658303, 1789203084
main > 排序: 2139658303, 1982526816
main > 排序: 2139658303, 2051123152
main > 排序: 1386330515, 1047484685
main > 排序: 1386330515, 1789203084
main > 排序: 1386330515, 1420256006
main > 排序: 1386330515, 1377817193
main > 排序: 5070967, 1200504990
main > 排序: 5070967, 432837685
main > 排序: 5070967, 346222819
main > 排序: 5070967, 260857935
main > 排序: 5070967, 68659719
main > 排序: 390402009, 1047484685
main > 排序: 390402009, 402189452
main > 排序: 390402009, 260857935
main > 排序: 390402009, 362753392
main > 排序: 1859217343, 1047484685
main > 排序: 1859217343, 1789203084
main > 排序: 1859217343, 2051123152
main > 排序: 1859217343, 1982526816
main > 排序: 1859217343, 1952141841
main > 排序: 1122349191, 1047484685
main > 排序: 1122349191, 1789203084
main > 排序: 1122349191, 1386330515
main > 排序: 1122349191, 1377817193
main > 排序: 1122349191, 1200504990
main > peek相当于debug操作2: 1386330515
ForkJoinPool.commonPool-worker-3 > peek相当于debug操作2: 1200504990
ForkJoinPool.commonPool-worker-3 > peek相当于debug操作2: 1377817193
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作2: 1982526816
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作2: 2051123152
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作2: 2139658303
ForkJoinPool.commonPool-worker-3 > peek相当于debug操作2: 1122349191
ForkJoinPool.commonPool-worker-3 > peek相当于debug操作2: 1789203084
ForkJoinPool.commonPool-worker-2 > peek相当于debug操作2: 432837685
ForkJoinPool.commonPool-worker-2 > peek相当于debug操作2: 467408314
ForkJoinPool.commonPool-worker-2 > peek相当于debug操作2: 402189452
main > peek相当于debug操作2: 1420256006
main > peek相当于debug操作2: 1759751716
ForkJoinPool.commonPool-worker-2 > peek相当于debug操作2: 1018018271
ForkJoinPool.commonPool-worker-2 > peek相当于debug操作2: 1047484685
ForkJoinPool.commonPool-worker-3 > peek相当于debug操作2: 346222819
ForkJoinPool.commonPool-worker-3 > peek相当于debug操作2: 362753392
ForkJoinPool.commonPool-worker-3 > peek相当于debug操作2: 390402009
ForkJoinPool.commonPool-worker-3 > peek相当于debug操作2: 5070967
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作2: 1859217343
ForkJoinPool.commonPool-worker-1 > peek相当于debug操作2: 1952141841
ForkJoinPool.commonPool-worker-2 > peek相当于debug操作2: 68659719
ForkJoinPool.commonPool-worker-2 > peek相当于debug操作2: 260857935
main > peek相当于debug操作2: 600209961
end=24

 

 

大家自己测试一下代码,能发现stream的调用方法,就像现实中的流水线一样,一个元素只会迭代一次,但如果中间有无状态操作,前后的操作会单独处理(元素就会被多次迭代)。

jdk9的响应式流

就是reactive stream,也就是flow。其实和jdk8的stream没有一点关系。说白了就一个发布-订阅模式,一共只有4个接口,3个对象,非常简单清晰。写一个入门例子就可以掌握。

package jdk9; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; /** * 带 process 的 flow demo */ /** * Processor, 需要继承SubmissionPublisher并实现Processor接口 * * 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去 */ class MyProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存订阅关系, 需要用它来给发布者响应 this.subscription = subscription; // 请求一个数据 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一个数据, 处理 System.out.println("处理器接受到数据: " + item); // 过滤掉小于0的, 然后发布出去 if (item > 0) { this.submit("转换后的数据:" + item); } // 处理完调用request再请求一个数据 this.subscription.request(1); // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); // 我们可以告诉发布者, 后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { // 全部数据处理完了(发布者关闭了) System.out.println("处理器处理完了!"); // 关闭发布者 this.close(); } } public class FlowDemo2 { public static void main(String[] args) throws Exception { // 1. 定义发布者, 发布的数据类型是 Integer // 直接使用jdk自带的SubmissionPublisher SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>(); // 2. 定义处理器, 对数据进行过滤, 并转换为String类型 MyProcessor processor = new MyProcessor(); // 3. 发布者 和 处理器 建立订阅关系 publiser.subscribe(processor); // 4. 定义最终订阅者, 消费 String 类型数据 Subscriber<String> subscriber = new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存订阅关系, 需要用它来给发布者响应 this.subscription = subscription; // 请求一个数据 this.subscription.request(1); } @Override public void onNext(String item) { // 接受到一个数据, 处理 System.out.println("接受到数据: " + item); // 处理完调用request再请求一个数据 this.subscription.request(1); // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); // 我们可以告诉发布者, 后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { // 全部数据处理完了(发布者关闭了) System.out.println("处理完了!"); } }; // 5. 处理器 和 最终订阅者 建立订阅关系 processor.subscribe(subscriber); // 6. 生产数据, 并发布 // 这里忽略数据生产过程 publiser.submit(-111); publiser.submit(111); // 7. 结束后 关闭发布者 // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭 publiser.close(); // 主线程延迟停止, 否则数据没有消费就退出 Thread.currentThread().join(1000);}}

背压

背压依我的理解来说,是指订阅者能和发布者交互(通过代码里面的调用request和cancel方法交互),可以调节发布者发布数据的速率,解决把订阅者压垮的问题。关键在于上面例子里面的订阅关系Subscription这个接口,他有request和cancel 2个方法,用于通知发布者需要数据和通知发布者不再接受数据。

我们重点理解背压在jdk9里面是如何实现的。关键在于发布者Publisher的实现类SubmissionPublisher的submit方法是block方法。订阅者会有一个缓冲池,默认为Flow.defaultBufferSize() = 256。当订阅者的缓冲池满了之后,发布者调用submit方法发布数据就会被阻塞,发布者就会停(慢)下来;订阅者消费了数据之后(调用Subscription.request方法),缓冲池有位置了,submit方法就会继续执行下去,就是通过这样的机制,实现了调节发布者发布数据的速率,消费得快,生成就快,消费得慢,发布者就会被阻塞,当然就会慢下来了。

怎么样实现发布者和多个订阅者之间的阻塞和同步呢?使用的jdk7的Fork/Join的ManagedBlocker,有兴趣的请自己查找相关资料。

reactor

spring webflux是基于reactor来实现响应式的。那么reactor是什么呢?我是这样理解的
reactor = jdk8的stream + jdk9的flow响应式流。理解了这句话,reactor就很容易掌握。
reactor里面Flux和Mono就是stream,他的最终操作就是 subscribe/block 2种。reactor里面说的不订阅将什么也不会方法就是我们最开始学习的惰性求值。

我们来看一段代码,理解一下:

package com.imooc; import java.util.concurrent.TimeUnit; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; public class ReactorDemo { public static void main(String[] args) { // reactor = jdk8 stream + jdk9 reactive stream // Mono 0-1个元素 // Flux 0-N个元素 String[] strs = { "1", "2", "3" }; // 2. 定义订阅者 Subscriber<Integer> subscriber = new Subscriber<Integer>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存订阅关系, 需要用它来给发布者响应 this.subscription = subscription; // 请求一个数据 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一个数据, 处理 System.out.println("接受到数据: " + item); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } // 处理完调用request再请求一个数据 this.subscription.request(1); // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); // 我们可以告诉发布者, 后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { // 全部数据处理完了(发布者关闭了) System.out.println("处理完了!"); } }; // 这里就是jdk8的stream Flux.fromArray(strs).map(s -> Integer.parseInt(s)) // 最终操作 // 这里就是jdk9的reactive stream .subscribe(subscriber); } }

上面的例子里面,我们可以把jdk9里面flowdemo的订阅者代码原封不动的copy过来,直接就可以用在reactor的subscribe方法上。订阅就是相当于调用了stream的最终操作。有了 reactor = jdk8 stream + jdk9 reactive stream 概念后,在掌握了jdk8的stream和jkd9的flow之后,reactor也不难掌握。

spring5的webflux

上面的基础和原理掌握之后,学习webflux就水到渠成了!webflux的关键是自己编写的代码里面返回流(Flux/Mono),spring框架来负责处理订阅。 spring框架提供2种开发模式来编写响应式代码,使用mvc之前的注解模式和使用router function模式,都需要我们的代码返回流,spring的响应式数据库spring data jpa,如使用mongodb,也是返回流,订阅都需要交给框架,自己不能订阅。而编写响应式代码之前,我们还需要了解2个重要的概念,就是异步servlet和SSE。

异步servlet

学习异步servlet我们最重要的了解同步servlet阻塞了什么?为什么需要异步servlet?异步servlet能支持高吞吐量的原理是什么?

servlet容器(如tomcat)里面,每处理一个请求会占用一个线程,同步servlet里面,业务代码处理多久,servlet容器的线程就会等(阻塞)多久,而servlet容器的线程是由上限的,当请求多了的时候servlet容器线程就会全部用完,就无法再处理请求(这个时候请求可能排队也可能丢弃,得看如何配置),就会限制了应用的吞吐量!

而异步serlvet里面,servlet容器的线程不会傻等业务代码处理完毕,而是直接返回(继续处理其他请求),给业务代码一个回调函数(asyncContext.complete()),业务代码处理完了再通知我!这样就可以使用少量的线程处理更加高的请求,从而实现高吞吐量!

我们看示例代码:

 import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class AsyncServlet */ @WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" }) public class AsyncServlet extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public AsyncServlet() { super(); } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { long t1 = System.currentTimeMillis(); // 开启异步 AsyncContext asyncContext = request.startAsync(); // 执行业务代码 CompletableFuture.runAsync(() -> doSomeThing(asyncContext, asyncContext.getRequest(), asyncContext.getResponse())); System.out.println("async use:" + (System.currentTimeMillis() - t1)); } private void doSomeThing(AsyncContext asyncContext, ServletRequest servletRequest, ServletResponse servletResponse) { // 模拟耗时操作 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { } // try { servletResponse.getWriter().append("done"); } catch (IOException e) { e.printStackTrace(); } // 业务代码处理完毕, 通知结束 asyncContext.complete(); } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { doGet(request, response); } }

大家可以运行上面代码,业务代码花了5秒,但servlet容器的线程几乎没有任何耗时。而如果是同步servlet的,线程就会傻等5秒,这5秒内这个线程只处理了这一个请求。

SSE(server-sent event)

响应式流里面,可以多次返回数据(其实和响应式没有关系),使用的技术就是H5的SSE。我们学习技术,API的使用只是最初级也是最简单的,更加重要的是需要知其然并知其所以然,否则你只能死记硬背不用就忘!我们不满足在spring里面能实现sse效果,更加需要知道spring是如何做到的。其实SSE很简单,我们花一点点时间就可以掌握,我们在纯servlet环境里面实现。我们看代码,这里一个最简单的示例。

 import java.io.IOException; import java.util.concurrent.TimeUnit; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class SSE */ @WebServlet("/SSE") public class SSE extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public SSE() { super(); } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); for (int i = 0; i < 5; i++) { // 指定事件标识 response.getWriter().write("event:me\n"); // 格式: data: + 数据 + 2个回车 response.getWriter().write("data:" + i + "\n\n"); response.getWriter().flush(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } } } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { doGet(request, response); } }

关键是ContentType 是 "text/event-stream",然后返回的数据有固定的要求格式即可。

结束语

经过上面的一步一个脚印的学习,我们的基础已经打牢,障碍已经扫清,现在可以进入轻松愉快的spring flux学习之旅了!Enjoy!

 

原文链接:https://blog.csdn.net/zebe1989/article/details/82692508

https://www.imooc.com/article/27181


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM