Appearance
Spring In Action 6th:响应式编程
在这一章中,我们将暂时离开 Tacos Cloud 应用程序,来探讨 Project Reactor。Reactor 是 Spring 项目系列中的响应式编程库。因为它是 Spring 对响应式编程支持的基础,所以在我们研究如何使用 Spring 构建响应式控制器和存储库之前,了解 Reactor 是很重要的。在我们开始使用 Reactor 之前,让我们快速了解一下响应式编程的基本概念。
1. 理解响应式编程
如果你和我以及许多开发者一样,是从命令式编程开始入门的。很可能你今天写的大部分(或全部)代码仍然是命令式的。命令式编程足够直观,以至于年轻的学生可以在学校的 STEM(科学、技术、工程和数学)课程中轻松学习,它也足够强大,以至于构成了驱动最大企业的大部分代码。
其原理很简单:你编写的代码是一系列要按顺序执行的指令。执行一个任务后,程序会等待它完成才会继续执行下一个任务。在每一步中,要处理的数据必须全部可用,以便作为整体进行处理。
这种方式在某些情况下是适用的,但在某些情况下不适用。当执行任务时(特别是如果是 I/O 任务,比如将数据写入数据库或从远程服务器获取数据时),调用该任务的线程会被阻塞,无法执行其他任务,直到该任务完成。简而言之,阻塞线程是低效的。
大多数编程语言,包括 Java,都支持并发编程。在 Java 中很容易启动另一个线程,并让它执行一些工作,同时调用线程可以继续执行其他任务。但尽管创建线程很容易,这些线程很可能最终也会被阻塞。在多个线程中管理并发是具有挑战性的。更多的线程意味着更多的复杂性。
相比之下,响应式编程具有功能性和声明性。与描述要按顺序执行的一组步骤不同,响应式编程涉及描述数据流经过的管道或流。与要求数据可用并作为整体进行处理不同,响应式流会在数据可用时处理数据。实际上,传入的数据可能是无限的(例如,某个位置的实时温度数据的不断流入)。
1.1. 定义响应式流
响应式流(Reactive Streams)是由 Netflix、Lightbend 和 Pivotal(Spring 的背后公司)的工程师在 2013 年末发起的一项倡议。响应式流旨在提供异步流处理和非阻塞背压(Nonblocking Backpressure)的标准。
Note
Java streams 和 Reactive Streams 之间有很多相似之处。首先,它们的名称中都包含了 “streams” 这个词。它们还都提供了用于处理数据的函数式 API。事实上,当我们在后面看到 Reactor 时,你会发现它们甚至共享了许多相同的操作。
然而,Java streams 通常是同步的,并且适用于有限的数据集。它们本质上是用于使用函数迭代集合的一种方式。
Reactive Streams 支持任意大小的数据集的异步处理,包括无限数据集。它们会实时处理数据,只要数据可用,就会进行处理,并通过背压来避免压倒消费者。
另一方面,JDK 9 的 Flow API 对应于 Reactive Streams。在 JDK 9 中,
Flow.Publisher、Flow.Subscriber、Flow.Subscription和Flow.Processor类型直接映射到 Reactive Streams 中的Publisher、Subscriber、Subscription和Processor。尽管如此,JDK 9 的 Flow API 并不是 Reactive Streams 的实际实现。
我们已经提到了响应式编程的异步特性。它使我们能够并行执行任务,从而实现更好的可伸缩性。背压是数据的消费者可以通过建立对处理数据量的限制来避免被过快的数据源压倒的一种手段。
响应式流规范可以通过四个接口定义来概括:Publisher、Subscriber、Subscription 和 Processor。Publisher 产生数据并通过 Subscription 发送给 Subscriber。Publisher 接口声明了一个方法 subscribe(),通过这个方法 Subscriber 可以订阅 Publisher,如下所示:
Java
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}1
2
3
2
3
一旦订阅者(Subscriber)订阅了发布者(Publisher),它就可以通过 Subscriber 接口的方法接收来自发布者的事件,这些方法如下所示:
Java
public interface Subscriber<T> {
void onSubscribe(Subscription sub);
void onNext(T item);
void onError(Throwable ex);
void onComplete();
}1
2
3
4
5
6
2
3
4
5
6
订阅者将接收到的第一个事件是通过调用 onSubscribe() 方法实现的。当发布者调用 onSubscribe() 方法时,它会向订阅者传递一个 Subscription 对象。通过 Subscription,订阅者可以管理它的订阅,具体如下所示:
Java
public interface Subscription {
void request(long n);
void cancel();
}1
2
3
4
2
3
4
订阅者可以调用 request() 方法来请求发送数据,也可以调用 cancel() 方法来表示它不再对接收数据感兴趣,并取消订阅。在调用 request() 方法时,订阅者传递一个 long 值,表示它愿意接受多少个数据项。这就是背压的作用所在,它防止发布者发送的数据超过订阅者的处理能力。在发布者发送了订阅者请求的数据项之后,订阅者可以再次调用 request() 方法来请求更多的数据。
一旦订阅者请求了数据,数据就开始通过流进行传输。对于发布者发布的每个项目,都会调用 onNext() 方法将数据传递给订阅者。如果出现任何错误,将调用 onError() 方法。如果发布者没有更多数据要发送,也不会再产生更多数据,它将调用 onComplete() 方法来告知订阅者它已经完成了任务。
至于 Processor 接口,它是 Subscriber 和 Publisher 的结合体,如下所示:
Java
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}作为一个订阅者,处理器(Processor)会接收数据并对其进行某种处理。然后,它会切换角色,充当发布者,将处理结果发布给它的订阅者。
正如你所看到的,Reactive Streams 规范相当简单直接。你很容易就能看出如何构建一个数据处理流水线,从一个发布者开始,通过零个或多个处理器传输数据,然后将最终结果发送给一个订阅者。
然而,Reactive Streams 接口本身并不适合以函数式方式组合这样的流。Project Reactor 是 Reactive Streams 规范的一个实现,它提供了一个用于组合 Reactive Streams 的函数式 API。正如你将在接下来的章节中看到的,Reactor 是 Spring 响应式编程模型的基础。在本章的剩余部分,我们将探讨 Project Reactor。
2. 使用 Reactor
响应式编程要求我们以一种与命令式编程非常不同的方式思考。与描述一系列要执行的步骤不同,响应式编程意味着构建一个数据流水线。当数据通过流水线时,它可以被修改或以某种方式使用。
例如,假设你想要获取一个人的姓名,将所有字母改为大写,用它创建一个问候消息,然后最终打印出来。在命令式编程模型中,代码可能如下所示:
Java
String name = "Craig";
String capitalName = name.toUpperCase();
String greeting = "Hello, " + capitalName + "!";
System.out.println(greeting);1
2
3
4
2
3
4
在命令式模型中,每行代码执行一个步骤,依次进行,并且肯定在同一个线程中执行。每个步骤都会阻塞执行线程,直到完成。
相比之下,函数式、响应式代码可以通过以下方式实现相同的功能:
Java
Mono.just("Craig")
.map(n -> n.toUpperCase())
.map(cn -> "Hello, " + cn + "!")
.subscribe(System.out::println);1
2
3
4
2
3
4
不要太担心这个例子的细节;我们很快就会详细讨论 just()、map() 和 subscribe() 操作。现在重要的是要理解,尽管响应式示例似乎仍然遵循一个逐步进行的模式,但实际上它是一个数据流经过的管道。在管道的每个阶段,数据都会以某种方式进行调整,但不能假设任何操作都是在哪个线程上执行的。它们可能是同一个线程,也可能不是。
在这个例子中,Mono 是 Reactor 的两种核心类型之一。另一种是 Flux。它们都是 Reactive Streams 的 Publisher 实现。Flux 表示零个、一个或多个(可能是无限的)数据项的流水线。而 Mono 是一种专门用于当数据集已知不超过一个数据项时进行优化的响应式类型。
Note
如果你已经熟悉 RxJava 或 ReactiveX,你可能会觉得
Mono和Flux听起来很像Observable和Single。事实上,它们在语义上是大致等价的。它们甚至提供了许多相同的操作。虽然我们在这本书中专注于 Reactor,但你可能会高兴地知道,在 Reactor 和 RxJava 类型之间是可以进行转换的。此外,正如你将在接下来的章节中看到的那样,Spring 也可以与 RxJava 类型一起使用。
前面的例子实际上包含了三个 Mono 对象。just() 操作创建了第一个 Mono。当 Mono 发出一个值时,该值会传递给 map() 操作进行大写处理,并用于创建另一个 Mono。当第二个 Mono 发布其数据时,它会传递给第二个 map() 操作进行一些字符串连接,其结果被用来创建第三个 Mono。最后,调用 subscribe() 订阅了 Mono,接收数据并将其打印出来。
2.1. 图解响应式流
响应式流经常用弹珠图来进行说明。在最简单的形式中,弹珠图描述了数据在 Flux 或 Mono 中流动的时间线,从顶部开始,中间是操作,底部是结果。图 2.1 展示了一个 Flux 的弹珠图模板。正如你所看到的,随着数据通过原始 Flux,它经过某些操作处理,最终形成一个新的 Flux。

图 2.2 展示了一个类似的弹珠图,但是针对的是 Mono。正如你所看到的,关键区别在于 Mono 可能有零个或一个数据项,或者是一个错误。

在通用响应式操作实战小节,我们将探讨 Flux 和 Mono 支持的许多操作,并使用弹珠图来可视化它们的工作原理。
2.2. 添加 Reactor 依赖
要开始使用 Reactor,请将以下依赖项添加到项目构建中:
XML
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>1
2
3
4
2
3
4
Reactor 还提供了一些很棒的测试支持。由于你将会编写大量围绕 Reactor 代码的测试,所以你肯定会想要将下一个依赖项添加到你的构建中:
XML
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>1
2
3
4
5
2
3
4
5
我假设你是在一个 Spring Boot 项目中添加这些依赖项,Spring Boot 会为你处理依赖管理,所以不需要为依赖项指定 <version> 元素。但如果你想在非 Spring Boot 项目中使用 Reactor,你需要在构建中设置 Reactor 的 BOM(材料清单)。以下依赖管理条目将 Reactor 的 2020.0.4 版本添加到构建中:
XML
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2020.0.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
本章中我们将使用的示例是独立的,与我们之前一直在处理的 Taco Cloud 项目无关。因此,最好是创建一个全新的 Spring 项目,其中包含 Reactor 的依赖项,并从那里开始工作。
现在 Reactor 已经添加到你的项目构建中,你可以开始使用 Mono 和 Flux 创建响应式流水线了。在本章的其余部分,我们将逐步介绍 Mono 和 Flux 提供的几个操作。
3. 通用响应式操作实战
Flux 和 Mono 是 Reactor 提供的最基本的构建模块,而这两种响应式类型提供的操作则是将它们绑定在一起,创建数据流的粘合剂。Flux 和 Mono 提供了 500 多个操作,可以大致分类如下:
- 创建(Creation)
- 合并(Combination)
- 转换(Transformation)
- 逻辑(Logic)
虽然逐个探究这 500 多个操作会很有趣,但在本章中时间和空间有限。我选择了一些最有用的操作来在本节进行实验。我们将从创建操作开始。
Note:Mono 示例在哪里?
Mono和Flux共享许多相同的操作,因此没必要为Mono和Flux分别展示相同的操作。而且,虽然Mono的操作很有用,但相比之下,它们可能不如应用到Flux上那么有趣。我们将主要使用Flux来进行示例。只需知道,Mono通常具有相应的操作即可。
3.1. 创建响应式类型
在 Spring 中使用响应式类型时,通常会从存储库或服务中获得 Flux 或 Mono,因此不需要自己创建。但偶尔你可能需要创建一个新的响应式发布者。
Reactor 提供了几种用于创建 Flux 或 Mono 的操作。在本节中,我们将看一些最有用的创建操作。
3.1.1. 从对象创建
如果你有一个或多个对象,想要用它们来创建一个 Flux 或 Mono,你可以使用 Flux 或 Mono 上的静态 just() 方法来创建一个由这些对象驱动数据的响应式类型。例如,以下测试方法从五个 String 对象创建了一个 Flux:
Java
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
}1
2
3
4
5
2
3
4
5
此时,Flux 已经被创建,但还没有订阅者。没有订阅者,数据就不会流动。要添加一个订阅者,你可以在 Flux 上调用 subscribe() 方法,如下所示:
Java
fruitFlux.subscribe(
f -> System.out.println("Here's some fruit: " + f)
);1
2
3
2
3
传递给 subscribe() 的 lambda 实际上是一个 java.util.Consumer,它用于创建一个 Reactive Streams 订阅者。调用 subscribe() 后,数据开始流动。在这个示例中,没有中间操作,因此数据直接从 Flux 流向订阅者。
将 Flux 或 Mono 的条目打印到控制台是查看响应式类型的工作方式的好方法。但实际上测试 Flux 或 Mono 的更好方法是使用 Reactor 的 StepVerifier。给定一个 Flux 或 Mono,StepVerifier 订阅了这个响应式类型,然后对数据进行断言,最后验证流是否按预期完成。
例如,要验证预定的数据是否通过 fruitFlux 流动,你可以编写如下的测试:
Java
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();1
2
3
4
5
6
7
2
3
4
5
6
7
在这种情况下,StepVerifier 订阅了 Flux,然后断言每个条目是否与预期的水果名称匹配。最后,它验证在 Flux 产生了 Strawberry 后,Flux 是否已经完成。
在本章的其余示例中,你将使用 StepVerifier 编写学习测试 —— 验证行为并帮助你了解某些 Reactor 最有用的操作,以此来更好地了解它们的工作原理。
3.1.2. 从集合创建
Flux 也可以从数组、Iterable 或 Java Stream 创建。图 3.1 用一个弹珠图说明了这是如何工作的:

Iterable 或 Stream 创建一个 Flux要从数组创建一个 Flux,可以调用静态的 fromArray() 方法,将源数组传递进去,像这样:
Java
@Test
public void createAFlux_fromArray() {
String[] fruits = new String[]{
"Apple", "Orange", "Grape", "Banana", "Strawberry"};
Flux<String> fruitFlux = Flux.fromArray(fruits);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
如果你需要从 java.util.List、java.util.Set 或 java.lang.Iterable 的任何其他实现中创建一个 Flux,你可以将它传递给静态的 fromIterable() 方法,如下所示:
Java
@Test
public void createAFlux_fromIterable() {
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux<String> fruitFlux = Flux.fromIterable(fruitList);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
如果你碰巧有一个 Java Stream,想要将它作为 Flux 的源,你可以使用 fromStream() 方法,如下所示:
Java
@Test
public void createAFlux_fromStream() {
Stream<String> fruitStream =
Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux<String> fruitFlux = Flux.fromStream(fruitStream);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
3.1.3. 生成 Flux 数据
有时候你没有任何数据可用,只需要 Flux 充当一个计数器,每次发出的值都会递增。要创建一个计数器 Flux,你可以使用静态的 range() 方法。图 3.2 中的图示了 range() 的工作原理:

range 创建一个 Flux 会以计数器的形式发布消息以下测试方法演示了如何创建一个范围 Flux:
Java
@Test
public void createAFlux_range() {
Flux<Integer> intervalFlux =
Flux.range(1, 5);
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
在这个例子中,范围 Flux 是从 1 开始到 5 结束创建的。StepVerifier 证明它将发布五个项,分别是整数 1 到 5。
另一个类似于 range() 创建 Flux 的方法是 interval()。像 range() 方法一样,interval() 创建一个发出递增值的 Flux。但 interval() 特殊之处在于,你不需要给它一个起始值和结束值,而是指定一个持续时间或者值应该被发出的频率。图 3.3 展示了 interval() 创建方法的弹珠图:

interval 创建的 Flux 具有定期发布的条目例如,要创建一个每秒发出一个值的间隔 Flux,你可以使用静态的 interval() 方法,如下所示:
Java
@Test
public void createAFlux_interval() {
Flux<Long> intervalFlux =
Flux.interval(Duration.ofSeconds(1))
.take(5);
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
请注意,间隔 Flux 发出的值从 0 开始,并在每个连续的项上递增。此外,由于 interval() 没有给定最大值,它可能会无限运行。因此,你还可以使用 take() 操作来限制结果只包括前五个条目。我们将在下一节更详细地讨论 take() 操作。
3.2. 响应式类型结合
你可能会发现自己有两种需要以某种方式合并在一起的响应式类型。或者,在其他情况下,你可能需要将一个 Flux 拆分成多个响应式类型。在本节中,我们将研究合并和拆分 Reactor Flux 和 Mono 的操作。
3.2.1. 合并响应式类型
假设你有两个 Flux 流,并且需要创建一个单一的结果 Flux,它将从两个上游 Flux 流中的任意一个产生数据。要将一个 Flux 与另一个合并,你可以使用 mergeWith() 操作,如图 3.4 中的弹珠图所示:

Flux 流会将它们的消息交错到一个新的 Flux 中例如,假设你有一个值是电视和电影角色名称的 Flux,还有一个值是这些角色喜欢吃的食物名称的第二个 Flux。下面的测试方法展示了如何使用 mergeWith() 方法合并这两个 Flux 对象:
Java
@Test
public void mergeFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500));
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
StepVerifier.create(mergedFlux)
.expectNext("Garfield")
.expectNext("Lasagna")
.expectNext("Kojak")
.expectNext("Lollipops")
.expectNext("Barbossa")
.expectNext("Apples")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
通常,Flux 会尽可能快地发布数据。因此,你可以在这两个创建的 Flux 流上使用 delayElements() 操作来稍微减慢它们的速度,以每 500 毫秒发出一个条目。此外,为了让食物 Flux 在角色 Flux 后开始流动,你可以对食物 Flux 应用 delaySubscription() 操作,这样它在订阅后的 250 毫秒内不会发出任何数据。
在合并了这两个 Flux 对象之后,就会创建一个新的合并 Flux。当 StepVerifier 订阅合并的 Flux 时,它将依次订阅两个源 Flux 流,从而开始数据流动。
从合并的 Flux 发出的项的顺序与它们从源中发出的时间顺序一致。由于两个 Flux 对象都设置为以固定的速度发出,值将通过合并的 Flux 交错。所以先是一个角色,然后是一个食物,再然后是一个角色,依此类推。如果任何一个 Flux 的时间发生变化,可能会看到两个角色项或两个食物项依次发布。
因为 mergeWith() 无法保证其源之间的完美交替,所以你可能会考虑使用 zip() 操作。当两个 Flux 对象被压缩在一起时,会产生一个新的 Flux,它会生成一个包含每个源 Flux 中的一个项的元组。图 3.5 说明了两个 Flux 对象如何被压缩在一起:

Flux 流进行压缩会产生一个包含每个 Flux 中一个元素的元组的 Flux要看到 zip() 操作的实际效果,考虑下面的测试方法,它将角色 Flux 和食物 Flux 进行了压缩:
Java
@Test
public void zipFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
StepVerifier.create(zippedFlux)
.expectNextMatches(p ->
p.getT1().equals("Garfield") &&
p.getT2().equals("Lasagna"))
.expectNextMatches(p ->
p.getT1().equals("Kojak") &&
p.getT2().equals("Lollipops"))
.expectNextMatches(p ->
p.getT1().equals("Barbossa") &&
p.getT2().equals("Apples"))
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
请注意,与 mergeWith() 不同,zip() 操作是一个静态创建操作。创建的 Flux 在角色和他们喜欢的食物之间有着完美的对齐。从压缩的 Flux 中发出的每个项都是一个 Tuple2(一个包含两个其他对象的容器对象),其中包含来自每个源 Flux 的项,按照它们发布的顺序。
如果你不想使用 Tuple2,而是想使用其他类型,你可以为 zip() 提供一个函数,根据两个项生成任何你想要的对象(如图 3.6 中的弹珠图所示):

zip() 操作的另一种形式可以生成任意对象的 Flux例如,以下测试方法展示了如何将角色 Flux 与食物 Flux 进行压缩,使其产生一个 String 对象的 Flux:
Java
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
StepVerifier.create(zippedFlux)
.expectNext("Garfield eats Lasagna")
.expectNext("Kojak eats Lollipops")
.expectNext("Barbossa eats Apples")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
zip() 方法中提供的函数(在这里使用 lambda 表达式表示)简单地将这两个项连接成一个句子,然后由压缩后的 Flux 发出。
3.2.2. 选择第一个要发布的响应式类型
假设你有两个 Flux 对象,而不是将它们合并在一起,你只是想创建一个新的 Flux,它会发出第一个产生值的 Flux 的值。如图 3.7 所示,firstWithSignal() 操作会选择两个 Flux 对象中的第一个,并回显它发布的值。

first() 操作选择第一个发出消息的 Flux,然后只从该 Flux 中产生消息以下测试方法创建了一个快速 Flux 和一个慢速 Flux(这里的 “慢速” 意味着它在订阅后 100 毫秒才会发布一项)。使用 firstWithSignal(),它创建了一个新的 Flux,它只会发布来自第一个发布值的源 Flux 的值。
Java
@Test
public void firstWithSignalFlux() {
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100));
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
Flux<String> firstFlux = Flux.firstWithSignal(slowFlux, fastFlux);
StepVerifier.create(firstFlux)
.expectNext("hare")
.expectNext("cheetah")
.expectNext("squirrel")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
在这种情况下,因为慢速 Flux 在快速 Flux 开始发布后 100 毫秒才会发布任何值,所以新创建的 Flux 将简单地忽略慢速 Flux,并且只从快速 Flux 发布值。
3.3. 转换和过滤响应式流
当数据通过流动时,你可能需要过滤掉一些值并修改其他值。在这一节中,我们将看看如何对流中流动的数据进行转换和过滤的操作。
3.3.1. 从响应式类型中过滤数据
在数据从 Flux 流中流动时,最基本的过滤方法之一是简单地忽略前面的一些条目。skip() 操作正是做这件事。

skip() 操作会跳过指定数量的消息给定一个包含多个条目的 Flux,skip() 操作将创建一个新的 Flux,它在发出源 Flux 中剩余的项之前,会跳过指定数量的项。以下测试方法展示了如何使用 skip():
Java
@Test
public void skipAFew() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3);
StepVerifier.create(countFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
在这种情况下,你有一个包含五个字符串项的 Flux。在这个 Flux 上调用 skip(3) 会产生一个新的 Flux,它会跳过前三个项,只发布最后两个项。
但也许你不想跳过特定数量的项,而是需要在经过一段时间后再开始发布项。skip() 操作的另一种形式会在指定的时间段过去后开始发布来自源 Flux 的项。

skip() 操作的另一种形式会在经过一段时间后才开始传递消息接下来的测试方法使用 skip() 创建了一个在发出任何值之前等待 4 秒的 Flux。因为这个 Flux 是从一个在项之间有 1 秒延迟的 Flux(使用 delayElements())创建的,所以只会发出最后两个项。
Java
@Test
public void skipAFewSeconds() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1))
.skip(Duration.ofSeconds(4));
StepVerifier.create(countFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
你已经看到了 take() 操作的一个例子,但考虑到 skip() 操作,take() 可以被视为 skip() 的相反操作。skip() 跳过前几个项,而 take() 只会发出前面的几个项(如图 3.10 中的弹珠图所示):
Java
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Acadia")
.take(3);
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9

take() 操作仅传递来自传入 Flux 的前几个消息,然后取消订阅与 skip() 类似,take() 也有一种基于时间而不是项数的替代形式。它会获取并发出通过源 Flux 的尽可能多的项,直到经过了一段时间之后完成 Flux。这在图 3.11 中有所说明:

take() 操作的另一种形式只会在一段时间内传递消息给结果 Flux以下测试方法使用 take() 的另一种形式,在订阅后的前 3.5 秒内尽可能多地发出项:
Java
@Test
public void takeForAwhile() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
skip() 和 take() 操作可以被视为基于计数或持续时间的过滤操作。对于更通用的 Flux 值过滤,你会发现 filter() 操作非常有用。
filter() 操作给定一个断言(Predicate),它决定了一个项是否会通过 Flux。它允许你根据你想要的任何条件进行选择性地发布。图 3.12 中的弹珠图展示了 filter() 的工作原理:

Flux 进行过滤,以便结果 Flux 只接收与给定断言匹配的消息要查看 filter() 的使用方法,请考虑以下测试方法:
Java
@Test
public void filter() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
.filter(np -> !np.contains(" "));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Zion")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
在这里,filter() 接收了一个 Predicate 作为 lambda 表达式,它只接受没有任何空格的字符串值。因此,Grand Canyon 和 Grand Teton 被过滤出了结果 Flux。
也许你需要的过滤是过滤掉你已经收到的任何项。distinct() 操作,如图 3.13 所示,会导致一个 Flux,它只发布来自源 Flux 中尚未发布的项。

distinct() 操作会过滤掉任何重复的消息在下面的测试中,只会从 distinct 的 Flux 中发出唯一的字符串值:
Java
@Test
public void distinct() {
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "dog", "bird", "anteater")
.distinct();
StepVerifier.create(animalFlux)
.expectNext("dog", "cat", "bird", "anteater")
.verifyComplete();
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
虽然 dog 和 bird 分别从源 Flux 发布了两次,但是 distinct 的 Flux 只会发布它们一次。
3.3.2. 映射响应式数据
在 Flux 或 Mono 上最常见的操作之一是将发布的项转换为其他形式或类型。Reactor 提供了 map() 和 flatMap() 操作来实现这一目的。
map() 操作会创建一个 Flux,它会在重新发布对象之前,根据给定的 Function 对每个接收到的对象执行一次转换。图 3.14 说明了 map() 操作的工作原理。

map() 操作会将传入的消息转换为新的消息,并在结果流中进行处理在下面的测试方法中,一个代表篮球运动员的字符串值的 Flux 被映射为一个新的 Flux,其中包含 Player 对象:
Java
@Test
public void map() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
});
StepVerifier.create(playerFlux)
.expectNext(new Player("Michael", "Jordan"))
.expectNext(new Player("Scottie", "Pippen"))
.expectNext(new Player("Steve", "Kerr"))
.verifyComplete();
}
@Data
private static class Player {
private final String firstName;
private final String lastName;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
map() 接收的函数(作为 lambda 表达式)将传入的字符串在空格处拆分,并使用生成的字符串数组创建 Player 对象。虽然使用 just() 创建的 Flux 包含字符串对象,但使用 map() 生成的 Flux 包含 Player 对象。
关于 map() 的重要理解是,映射是同步执行的,即在源 Flux 发布每个项时都会执行映射。如果想要执行异步映射,应该考虑使用 flatMap() 操作。
flatMap() 操作需要一些思考和实践才能完全掌握。如图 3.15 所示,与 map() 不同,flatMap() 将每个对象映射到一个新的 Mono 或 Flux。Mono 或 Flux 的结果被展平为一个新的 Flux。当与 subscribeOn() 一起使用时,flatMap() 可以释放 Reactor 类型的异步能力。

flatMap() 操作使用中间的 Flux 来执行转换,因此允许进行异步转换以下测试方法演示了如何使用 flatMap() 和 subscribeOn():
Java
@Test
public void flatMap() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.flatMap(n -> Mono.just(n)
.map(p -> {
String[] split = p.split("\\s");
return new Player(split[0], split[1]);
})
.subscribeOn(Schedulers.parallel())
);
List<Player> playerList = Arrays.asList(
new Player("Michael", "Jordan"),
new Player("Scottie", "Pippen"),
new Player("Steve", "Kerr"));
StepVerifier.create(playerFlux)
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
请注意,flatMap() 接收了一个 lambda 函数,将传入的字符串转换为类型为 String 的 Mono。然后,对 Mono 应用了 map() 操作,将字符串转换为 Player。在每个内部 Flux 中将字符串映射为 Player 后,它们被发布到由 flatMap() 返回的单个 Flux 中,从而完成了结果的展平。
如果仅仅到这里,生成的 Flux 将携带 Player 对象,按照与 map() 示例相同的顺序同步生成。但是你对 Mono 执行了 subscribeOn(),指示每个订阅应在一个并行线程中进行。因此,多个传入字符串对象的映射操作可以异步并行地执行。
虽然 subscribeOn() 的名称与 subscribe() 类似,但它们是完全不同的。subscribe() 是一个动作,用于订阅一个响应式流并有效地启动它;而 subscribeOn() 更具描述性,指定了订阅应如何并发处理。Reactor 并不强制任何特定的并发模型;通过 subscribeOn(),你可以使用 Schedulers 中的静态方法之一指定你想要使用的并发模型。在这个例子中,你使用了 parallel(),它使用了一个固定大小的工作线程池(大小与 CPU 核心数相同)。但是 Schedulers 支持多种并发模型,比如表 3.1 中描述的那些:
Schedulers 方法 | 描述 |
|---|---|
.immediate() | 在当前线程中执行订阅 |
.single() | 在单个可重用线程中执行订阅。对所有调用者重用同一个线程 |
.newSingle() | 在每次调用中专用线程中执行订阅 |
.elastic() | 在从无界弹性池中获取的工作线程中执行订阅。根据需要创建新的工作线程,并且空闲的工作线程会在一段时间后(默认为 60 秒)被销毁 |
.parallel() | 在从固定大小的池中获取的工作线程中执行订阅,池的大小与 CPU 核心数相同 |
Schedulers 的并发模型使用 flatMap() 和 subscribeOn() 的优点是可以通过将工作分配到多个并行线程中来提高流的吞吐量。但是因为工作是并行进行的,没有任何关于哪个会先完成的保证,所以无法知道生成的 Flux 中发出的项目顺序。因此,StepVerifier 只能验证每个发出的项目是否存在于预期的 Player 对象列表中,并且在 Flux 完成之前会有三个这样的项目。
3.3.3. 在响应式流上进行数据缓冲
在处理通过 Flux 流动的数据时,你可能会发现将数据流分成易于处理的小块很有帮助。buffer() 操作可以帮助实现这一点,如图 3.16 所示:

buffer() 操作将产生一个由特定最大大小的列表组成的 Flux假设有一个包含水果名称的 String 值的 Flux,你可以按照以下方式创建一个新的 Flux,其中每个列表的集合都不超过指定的元素数量:
Java
@Test
public void buffer() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);
StepVerifier
.create(bufferedFlux)
.expectNext(Arrays.asList("apple", "orange", "banana"))
.expectNext(Arrays.asList("kiwi", "strawberry"))
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
在这种情况下,String 元素的 Flux 被缓冲成一个新的 Flux,其中每个列表集合都不超过三个元素。因此,原始的 Flux 发出五个 String 值,将被转换为发出两个列表集合的 Flux,一个包含三个水果,另一个包含两个水果。
这样做有什么用呢?将来自响应式 Flux 的值缓冲到非响应式的列表集合中似乎是得不偿失的。但是当你将 buffer() 与 flatMap() 结合使用时,它使得每个列表集合都可以并行处理,如下所示:
Java
@Test
public void bufferAndFlatMap() throws Exception {
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry")
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
.log()
).subscribe();
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
在这个新的例子中,你依然将包含五个 String 值的 Flux 缓冲成一个新的列表集合 Flux。然后你将 flatMap() 应用于这个列表集合的 Flux。这将每个列表缓冲区取出,并从其中的元素创建一个新的 Flux,然后对其应用 map() 操作。因此,每个缓冲的列表都会在各自的线程中并行处理。
为了证明它的工作原理,我还包含了一个 log() 操作应用于每个子 Flux。log() 操作只是简单地记录所有的响应式流事件,这样你就可以看到实际发生的情况。因此,以下条目将被写入日志中(为了简洁起见,省略了时间组件):
Text
[main] INFO reactor.Flux.SubscribeOn.1 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.1 - request(32)
[main] INFO reactor.Flux.SubscribeOn.2 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.2 - request(32)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(APPLE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(KIWI)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(ORANGE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(STRAWBERRY)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(BANANA)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onComplete()
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onComplete()1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
正如日志条目清楚地显示的那样,第一个缓冲区(apple、orange 和 banana)在 parallel-1 线程中处理。与此同时,第二个缓冲区(kiwi 和 strawberry)在 parallel-2 线程中处理。由于从每个缓冲区的日志条目交织在一起的事实,可以明显看出这两个缓冲区是并行处理的。
如果出于某种原因,你需要将 Flux 发出的所有内容收集到一个 List 中,可以按以下方式调用不带参数的 buffer():
Java
Flux<List<String>> bufferedFlux = fruitFlux.buffer();这将产生一个新的 Flux,它会发出一个包含源 Flux 发出的所有项的 List。你也可以使用 collectList() 操作来实现相同的功能,如图 3.17 中所示:

collectList() 操作会产生一个 Mono,其中包含了来自输入 Flux 发出的所有消息组成的列表与生成发布 List 的 Flux 不同,collectList() 生成发布 List 的 Mono。下面的测试方法展示了它的使用方式:
Java
@Test
public void collectList() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Mono<List<String>> fruitListMono = fruitFlux.collectList();
StepVerifier
.create(fruitListMono)
.expectNext(Arrays.asList(
"apple", "orange", "banana", "kiwi", "strawberry"))
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
将 Flux 发出的项收集到 Map 中是一种更加有趣的方式。如图 3.18 所示,collectMap() 操作返回一个发布 Map 的 Mono,其中 Map 包含了由给定函数计算得出的键的条目。

collectMap() 操作会产生一个 Mono要查看 collectMap() 的实际效果,请看以下测试方法:
Java
@Test
public void collectMap() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =
animalFlux.collectMap(a -> a.charAt(0));
StepVerifier
.create(animalMapMono)
.expectNextMatches(map -> {
return
map.size() == 3 &&
map.get('a').equals("aardvark") &&
map.get('e').equals("eagle") &&
map.get('k').equals("kangaroo");
})
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
源 Flux 发出了几种动物的名称。你使用 collectMap() 从该 Flux 中创建一个新的 Mono,它会发出一个 Map,其中键由动物名的首字母确定,值是动物的名称本身。
如果有两个动物名称以相同的字母开头(例如 elephant 和 eagle,或者 koala 和 kangaroo),则流中的最后一个条目将覆盖任何之前的条目。
3.4. 对响应类型执行逻辑操作
有时你只需要知道 Mono 或 Flux 发出的条目是否满足某些条件。all() 和 any() 操作执行这样的逻辑。图 3.19 和 3.20 描述了 all() 和 any() 的工作原理。

all() 操作可以测试 Flux,以确保所有的消息都满足某个条件
any() 操作可以测试 Flux,以确保至少有一个消息满足某个条件假设你想知道 Flux 发布的每个字符串是否包含字母 a 或字母 k。以下示例展示了如何使用 all() 来检查这个条件:
Java
@Test
public void all() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
StepVerifier.create(hasKMono)
.expectNext(false)
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
在第一个 StepVerifier 中,你检查了字母 a。all 操作应用于源 Flux,结果是一个类型为 Boolean 的 Mono。在这种情况下,所有动物名称都包含字母 a,因此从生成的 Mono 中发出 true。但是在第二个 StepVerifier 中,生成的 Mono 将发出 false,因为并非所有动物名称都包含字母 k。
与其执行全有或全无的检查,也许只要有一个条目匹配就可以了。在这种情况下,any() 操作就是你想要的。这个新的测试用例使用 any() 来检查字母 t 和 z:
Java
public void any() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasTMono = animalFlux.any(a -> a.contains("t"));
StepVerifier.create(hasTMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
StepVerifier.create(hasZMono)
.expectNext(false)
.verifyComplete();
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
在第一个 StepVerifier 中,你可以看到生成的 Mono 发出 true,因为至少有一个动物名称包含字母 t(具体来说是 elephant)。在第二种情况下,生成的 Mono 发出 false,因为没有一个动物名称包含 z。
4. 总结
响应式编程涉及创建数据流管道;
响应式流规范定义了四种类型:
Publisher、Subscriber、Subscription和Processor(它是Publisher和Subscriber的组合);Project Reactor 实现了响应式流,并将流定义抽象为两种主要类型:
Flux和Mono,每种类型都提供了数百个操作;Spring 利用 Reactor 创建了响应式控制器、存储库、REST 客户端和其他响应式框架支持;