Skip to main content

Reactor

Spring Cloud 微服务的内容告一段落。在 Spring Cloud 部分,我们学习了使用 Consul,Load Balancer,Spring Interface Client 与 OpenFeign,Resilience4j,MicroMeter,Gateway,Seata。这是现在 Spring Cloud 官方最推荐的一套组件。诚然,还有其它一些组件也很重要,主要是 Spring Cloud Alibaba 的组件,Nacos 与 Sentinel,Seata 也属于 Spring Cloud Alibaba 的组件,但是我们已经介绍了它。Nacos 和 Sentinel 的使用方法与 Consul 和 Resilience4j 类似,所以这里不再介绍,有了前面的基础,你可以很容易的学习它们。

现在,我们将学习一些 Spring 的高级内容,之后再回到微服务,学习基于 kubernetes 的微服务。

这些内容主要包括,

  • WebFlux,异步的 Spring Web
  • Spring Security,Spring 安全框架;KeyCloak,OIDC 认证服务器
  • 消息队列,Rabbit MQ
  • 非 HTTP 协议,包括 graphQL 和 RPC,RPC 将基于 gRPC 进行介绍

首先,我们学习 WebFlux。Spring 框架我们之前已经学过,因此我们重点学习 Reactor,它是 WebFlux 的基础。

WebFlux 简介

前面我们讲过,Spring 自己是一个基于 Bean 的框架。而 Spring Web 是我们实际开发 Web 程序使用的框架,它是个基于 Servlet 和 Spring 的框架,它有自己的 Spring Boot Starter,可以很方便的使用。

其实,Spring Web 的全名是 Spring Web MVC,但 MVC 已经没人关心了。

但是,与 Spring Web 并行的还有一套框架,叫做 Spring WebFlux。Spring WebFlux 是一个基于 Reactor 的框架,它是一个异步的框架,可以用于构建响应式的 Web 应用程序。它也是基于 Spring 容器,但是它的异步引擎是 Reactor,而服务器是 Netty。Reactor 为 WebFlux 提供了异步的能力,Netty 为 WebFlux 提供了异步的服务器。

相比于 Spring Web,Spring WebFlux 有更高的并发能力。然而,由于 WebFlux 较为年轻,Spring 的整个生态都是围绕 Spring Web 的,所以 Spring WebFlux 可能会遇到一些不兼容问题,但并不严重。但整体而言,坐拥整个 Spring 生态的 Spring WebFlux 是一个很有前途的框架。

Java 标准库异步编程

我们先看为什么要引入 Reactor。

Java 在 8 之后引入了 Future,但是 Future 设计的太差了,以至于你必须手动实现 EventLoop,例如,

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(() -> {
Thread.sleep(1000);
return 1;
});
while (!future.isDone()) {
System.out.println("Waiting...");
Thread.sleep(100);
}
System.out.println(future.get());

这是一个非常糟糕的设计因此,Java 在 8 之后引入了 CompletableFuture,它比 Future 好用多了,例如,

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
future.thenAccept(System.out::println);

显然,这个已经很像其它语言的 Promise 了。但是很可惜,Java 至今还没有引入 async-await,因此经典的回调地狱还是会出现。

不过,回调地狱不只有一种解决方案,除了大部分语言采用的 async-await,还有一种解决方案,就是响应式编程,这就是 Reactor。

不过,Java 在后续的 Java 9 之后引入了 Flow,它设计的和 Reactor 类似。具体而言,它的效果是,

Flow.Publisher<Integer> publisher = subscriber -> {
subscriber.onNext(1);
subscriber.onComplete();
};
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1);
}

@Override
public void onNext(Integer item) {
System.out.println(item);
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("Complete");
}
};
publisher.subscribe(subscriber);

多少有点太啰嗦了。而 Reactor 利用函数式编程的特性,可以更简单的实现这个功能,像是,

Flux.just(1).subscribe(System.out::println);

要简单得多。这就是为什么尽管有了 Flow,但是 Reactor 仍然有市场。

响应式编程的核心概念

响应式编程本质就是经典的流式编程,但是它与传统的流式编程有一些不同。

  • 响应式编程是异步的,而不是同步的。这意味着你可以在等待一个操作完成的时候做其它事情。而在流式编程中,本质只是一个循环。
  • 响应式编程是基于事件的,而不是基于数据的。这意味着你可以在数据到达的时候处理数据,而不是等待数据到达。
  • 响应式编程是惰性的,而不是及时的。这意味着你可以在需要的时候才处理数据,而不是一次性处理所有数据。

响应式编程有两个核心概念,一个是 Publisher,一个是 Subscriber。两者在 Reactor 中都是 Mono 和 Flux,而这两个东西本质上是函数式编程中的 Monad。

下面关于 Monad 的内容,如果不理解可以跳过,不影响工程上的知识和使用。

Monad

Monad 是函数式编程中的一个概念。尽管有些教程写的很复杂,但是其实很简单。Monad 本质上是一种映射。我们用两个空间来诠释 Monad,真值空间和像空间。真值空间是我们熟悉的空间,它包含了所有的真值。而像空间是 Monad 的空间,它包含了 Monad 的值。

Monad 是一个包含三个关键要素的结构:

  • 类型构造器(Type Constructor):将普通的值(真值)映射到像空间中。这类似于在 Java 中的泛型类型,如Optional<T>,它将一个值包裹在一个容器内。类型构造器本质上定义了一个新的数据类型,这个类型包含了原始值及其附加的计算上下文。
  • unit(也称为 return 或 pure):一个函数,用于将真值空间中的元素转换为像空间中的 Monad。换句话说,它将普通的值放入 Monad 中。例如,在 Optional 中,你可以使用Optional.of来将一个值放入 Optional Monad 中。
  • bind(也称为 flatMap,或>>=):一个函数,用于对像空间中的 Monad 进行操作。

也就是说,Monad 先把某个值打包,然后对这个打包好的值进行运算,然后再取出来。这个打包好的值就是像空间中的 Monad。

这种操作看起来可能没有什么意义,但下面这个例子可能会让你明白它的方便之处。即有时候,真值空间内的运算可能没有全局的算子,即没有全局的函数来处理所有情况。还使用 Optional 为例子,例如用户输入一个数字,你想要对这个数字开根号,但是用户可能输入了负数,或者是空值。这时候,你就需要对这个数字进行检查,然后再进行开根号。这个检查的过程就是 Monad 的 bind 过程。

例如,下面的代码可以完成这个 Nomad 的过程。

class NomadOfRoot {
private final Double value;
private final boolean isNegative;

public NomadOfRoot(Double value) {
this.value = value;
this.isNegative = value < 0;
}

public NomadOfRoot root() {
if (isNegative || value == null) {
return new NomadOfRoot(null);
} else {
return new NomadOfRoot((Double) Math.sqrt(value));
}
}

public NomaOfRoot add(NomadOfRoot other) {
if (value == null || other.value == null) {
return new NomadOfRoot(null);
} else {
return new NomadOfRoot(value + other.value);
}
}

public Double getValue() {
return value;
}
}

Double result = (
new NomadOfRoot(1.0)
.add(new NomadOfRoot(2.0).root())
.root()
.add(new NomadOfRoot(-3.0))
.root()
.getValue()
);

这样子,如果我们的所有接口都返回 NomadOfRoot,那么我们就不用再做额外的检查了,因为 NomadOfRoot 会自动检查并返回正确的值。

目前我们的操作还很少。但是如果要链式调用几十个方法呢?这时候 Monad 就显得很方便了。

响应式编程中的 Monad

响应式编程本质就是将数据包装成异步 Monad,这个 Monad 要么处于 Pending 阶段,没有求值;要么已经完成,然后对这个 Monad 进行操作。这个操作是一个管线,它是一个链式的操作,每个操作都是一个 Monad,这个 Monad 会在数据到达的时候进行操作。

在 Reactor 中,这个 Monad 是 Flux 类,Mono 则是 Flux 的特例。对比上面对 Monad 的介绍,我们列出 Flux 的三个要素:

  • 类型构造器:使用Flux.just来构造一个 Flux。但注意,Flux 保存的是一个列表,而不是一个值,因此,例如Flux<Integer>,可以保存多个 Integer,要这样写Flux.just(1, 2, 3)
  • unit:要把 Flux 对象变成普通的值,可以使用block方法,例如Flux.just(1).blockFirst()
  • bind:Flux 的 bind 是mapflatMapfilter等方法。

而 Mono 类是一种特殊的 Flux,它只能保存一个值。显然,Flux 可以被拆成很多个 Mono,而 Mono 也可以被合并成一个 Flux。前者使用next方法,后者使用concat方法。

Publisher, Subscribe 与 Pipe

Publisher 与 Subscriber 是 Reactor 的两个核心接口。Publisher 是一个生产者,它可以生产数据。Subscriber 是一个消费者,它可以消费数据。简单的来说,Publisher 必须产生 Flux,而 Subscriber 必须消费 Flux。

例如,下面的代码是一个 Publisher,

Flux<Integer> publisher = Flux.just(1, 2, 3);

而下面的代码是一个 Subscriber,

publisher.subscribe(System.out::println);

当然,把所有数据取出来也算是一种消费,这时候可以使用block方法。

publisher.blockLast();

而数据之间的转换,则是通过管线方法(Pipe)来实现的。具体而言,map方法是最常用的 Pipe 方法,它可以对数据进行转换。例如,

Flux<Integer> publisher = Flux.just(1, 2, 3);
List<Integer> l = publisher.map(i -> i * 2).collectList().block();

综上,一个完整的管线就是,publisher 从一般的数据转换成 Flux,若干 Pipe 方法对数据进行转换,从 Flux 转成 Flux,最后通过 subscribe方法消费数据。

常用的内建 Publisher 有,

  • Flux.just:将一个或若干个元素转换成 Flux。
  • Flux.fromArray:将一个数组转换成 Flux。
  • Flux.fromIterable:将一个 Iterable 转换成 Flux。
  • Flux.fromStream:将一个 Stream 转换成 Flux。
  • Flux.range:将一个范围内的整数转换成 Flux。
  • Mono.just:将一个元素转换成 Mono。
  • Mono.empty:将一个空值转换成 Mono。
  • Mono.error:将一个错误转换成 Mono。

常用的 Pipe 方法有,

  • flux.map(func):对 Flux 中的每个元素,执行函数,将返回值收集成新的 Flux。
  • flux.flatMap(func):对 Flux 中的每个元素,执行函数,函数应当返回一个元素列表,最后会把每个元素列表连接成一个新的 Flux。
  • flux.filter(func):对 Flux 中的每个元素,执行函数,函数应当返回一个布尔值,最后会把所有返回 true 的元素收集成新的 Flux。
  • flux.collectList():将 Flux 中的所有元素收集成一个列表的 Mono。即Flux<T>变成Mono<List<T>>
  • flux.first():将 Flux 中的第一个元素收集成一个 Mono。
  • flux.last():将 Flux 中的最后一个元素收集成一个 Mono。
  • flux.index(index):将 Flux 中的第 index 个元素收集成一个 Mono。
  • mono.map(func):对 Mono 中的元素,执行函数,将返回值收集成新的 Mono。

常用的 Subscribe 方法有,

  • flux.subscribe():订阅一个 Flux,但是不做任何事情。
  • flux.subscribe(func):订阅一个 Flux,对每个元素执行函数。
  • flux.blockLast():订阅一个 Flux,取出最后一个元素。
  • flux.blockFirst():订阅一个 Flux,取出第一个元素。
  • flux.block():订阅一个 Flux,取出所有元素。
  • mono.block():订阅一个 Mono,取出元素。
  • mono.subscribe():订阅一个 Mono,但是不做任何事情。
  • mono.subscribe(func):订阅一个 Mono,对元素执行函数。

注意,上面的所有方法只是声明管线,并不会真正执行管线。只有 Subscribe 方法才会真正执行管线。执行时,各个元素是异步执行的。

此外,Subscribe 方法不能嵌套。一旦数值进入了 Monad,只能使用 Pipe 方法进行处理。例如,在 Webflux 中,框架会自动为你 Subscibe Monad,因此不能在代码中使用 Subscribe 方法,处理时必须使用响应式语法。