Appearance
Spring In Action 6th:集成 Spring
就像我们需要连接互联网才能提高生产力一样,许多应用程序必须连接到外部系统才能完成它们的工作。一个应用程序可能需要读取或发送电子邮件,与外部 API 交互,或者对数据库写入的数据做出反应。而且,当数据从这些外部系统中被摄取或写入时,应用程序可能需要以某种方式处理数据,将其转换为应用程序自己的领域,或者从应用程序的领域转换出来。
在本章中,你将看到如何使用 Spring Integration 来实现常见的集成模式。Spring Integration 是 Gregor Hohpe 和 Bobby Woolf 在《企业集成模式》(Addison-Wesley,2003)中编目的许多集成模式的现成实现。每个模式都被实现为一个组件,通过这个组件,消息在管道中传送数据。使用 Spring 配置,你可以将这些组件组装成一个数据流动的管道。让我们开始定义一个简单的集成流,介绍一下使用 Spring Integration 的许多特性和特点。
1. 声明简单的集成流
一般来说,Spring Integration 可以创建集成流,通过这些集成流,应用程序可以接收或发送数据到应用程序本身之外的某些资源。应用程序可能会与之集成的一种资源就是文件系统。因此,Spring Integration 的许多组件中包括用于读取和写入文件的通道适配器。
为了让你对 Spring Integration 有个初步了解,你将创建一个将数据写入文件系统的集成流。首先,你需要将 Spring Integration 添加到你的项目构建中。对于 Maven,所需的依赖关系如下:
XML
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
第一个依赖项是 Spring Integration 的 Spring Boot 启动器。这个依赖项对于开发 Spring Integration 流是必不可少的,无论流可能与什么集成。像所有的 Spring Boot 启动器依赖项一样。
第二个依赖项是 Spring Integration 的文件端点模块。这个模块是用于与外部系统集成的两打多个端点模块之一。我们将在端点模块小节中更详细地讨论端点模块。但是,现在,你需要知道的是,文件端点模块提供了从文件系统中摄取文件到集成流中和/或从流中将数据写入文件系统的能力。
接下来,你需要为应用程序创建一种方式,以便将数据发送到集成流中,以便它可以被写入到文件中。为此,你将创建一个网关接口,如下所示:
Java
@MessagingGateway(defaultRequestChannel = "textInChannel")
public interface FileWriterGateway {
void writeToFile(@Header(FileHeaders.FILENAME) String filename, String data);
}1
2
3
4
2
3
4
尽管它只是一个简单的 Java 接口,但是关于 FileWriterGateway 还有很多值得说的地方。你首先会注意到它被 @MessagingGateway 注解。这个注解告诉 Spring Integration 在运行时生成这个接口的实现,这与 Spring Data 自动生成仓库接口的实现类似。代码的其他部分在需要写入文件时会使用这个接口。
@MessagingGateway 的 defaultRequestChannel 属性表示,由接口方法调用产生的任何消息都应该发送到给定的消息通道。在这种情况下,你声明任何由 writeToFile() 调用产生的消息都应该发送到名为 textInChannel 的通道。
至于 writeToFile() 方法,它接受一个作为 String 的文件名,和另一个将要写入文件的文本的 String。这个方法签名的值得注意的地方是,文件名参数被 @Header 注解。在这种情况下,@Header 注解表示传递给文件名的值应该放在消息头(指定为 FileHeaders.FILENAME,这是 FileHeaders 类中等于 file_name 的值的常量)而不是在消息负载中。另一方面,数据参数值被载在消息负载中。
现在你已经创建了一个消息网关,你需要配置集成流。尽管你添加到构建中的 Spring Integration 启动器依赖项启用了 Spring Integration 的基本自动配置,但你仍然需要编写额外的配置来定义满足应用程序需求的流。声明集成流的三个配置选项如下:
- XML 配置
- Java 配置
- 使用 DSL 的 Java 配置
我们将查看 Spring Integration 的所有这三种配置风格,先从经典的 XML 配置开始。
1.1. 使用 XML 定义集成流
尽管在这本书中我避免使用 XML 配置,但 Spring Integration 有着用 XML 定义集成流的悠久历史。因此,我认为至少展示一个用 XML 定义的集成流的例子是值得的。下面的代码显示了如何用 XML 配置你的示例流:
XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/springintegration-file.xsd">
<int:channel id="textInChannel" />
<int:transformer id="upperCase" input-channel="textInChannel" output-channel="fileWriterChannel" expression="payload.toUpperCase()" />
<int:channel id="fileWriterChannel" />
<int-file:outbound-channel-adapter id="writer" channel="fileWriterChannel" directory="/tmp/sia6/files" mode="APPEND" append-new-line="true" />
</beans>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
解析以上 XML 的内容,我们可以得出:
你配置了一个名为
textInChannel的通道。你会认出这就是设置为FileWriterGateway请求通道的同一通道。当在FileWriterGateway上调用writeToFile()方法时,生成的消息会发布到这个通道;你配置了一个从
textInChannel接收消息的转换器。它使用 SpEL(Spring Expression Language)表达式来调用消息有效载荷的toUpperCase()方法。将转为大写后的结果然后发布到fileWriterChannel;你配置了一个名为
fileWriterChannel的通道。这个通道充当连接转换器和出站通道适配器的管道;最后,你使用
int-file命名空间配置了一个出站通道适配器。这个 XML 命名空间由 Spring Integration 的文件模块提供以写入文件。按照你的配置,它从fileWriterChannel接收消息,并将消息有效载荷写入到在消息的file_name头中指定的文件名的文件中,该文件位于directory属性指定的目录中。如果文件已经存在,那么文件将以新行追加,而不是被覆盖;
这个流在图 1.1 中用图形元素进行了说明,这些图形元素的样式参照了企业集成模式(Enterprise Integration Patterns):

这个流包含五个组件:一个网关,两个通道,一个转换器和一个通道适配器。这只是可以组装成集成流的一部分组件。我们将在探索 Spring Integration 小节中探讨这些组件以及 Spring Integration 支持的其他组件。
如果你想在 Spring Boot 应用程序中使用 XML 配置,你需要将 XML 作为资源导入到 Spring 应用程序中。做到这一点最简单的方法是在你的应用程序的 Java 配置类中使用 Spring 的 @ImportResource 注解,如以下示例代码所示:
Java
@Configuration
@ImportResource("classpath:/filewriter-config.xml")
public class FileWriterIntegrationConfig { ... }1
2
3
2
3
尽管基于 XML 的配置对 Spring Integration 服务良好,但大多数开发者已经对使用 XML 感到疲倦。接下来让我们将注意力转向 Spring Integration 的 Java 配置风格。
1.2. 在 Java 中配置集成流
大多数现代的 Spring 应用程序已经放弃了 XML 配置,转而使用 Java 配置。事实上,在 Spring Boot 应用程序中,Java 配置是自动配置的自然补充。因此,如果你要向 Spring Boot 应用程序添加一个集成流,用 Java 定义流是非常合理的。
以下基于 Java 编写的配置集成流示例,与之前使用 XML 文件的配置效果相同:
Java
@Configuration
public class FileWriterIntegrationConfig {
@Bean
@Transformer(inputChannel = "textInChannel",
outputChannel = "fileWriterChannel")
public GenericTransformer<String, String> upperCaseTransformer() {
return text -> text.toUpperCase();
}
@Bean
@ServiceActivator(inputChannel = "fileWriterChannel")
public FileWritingMessageHandler fileWriter() {
FileWritingMessageHandler handler =
new FileWritingMessageHandler(new File("/tmp/sia6/files"));
handler.setExpectReply(false);
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setAppendNewLine(true);
return handler;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
在 Java 配置中,你声明了两个 bean:一个转换器和一个文件写入消息处理器。转换器是一个 GenericTransformer。因为 GenericTransformer 是一个函数式接口,你可以提供其实现作为一个 lambda,该 lambda 在消息文本上调用 toUpperCase()。转换器 bean 使用 @Transformer 注解,将其指定为在集成流中接收来自名为 textInChannel 的通道的消息并将消息写入名为 fileWriterChannel 的通道的转换器。
至于文件写入 bean,它使用 @ServiceActivator 注解来指示它将接受来自 fileWriterChannel 的消息,并将这些消息交给由 FileWritingMessageHandler 实例定义的服务。FileWritingMessageHandler 是一个消息处理器,它将消息有效载荷写入到在消息的 file_name 头中指定的文件名的文件中,该文件位于指定的目录中。与 XML 示例一样,FileWritingMessageHandler 配置为以新行追加到文件。
FileWritingMessageHandler bean 的配置中有一个独特之处,那就是有一个调用 setExpectReply(false) 的地方,以指示服务激活器不应该期望一个回复通道(一个可以将值返回到流中上游组件的通道)。如果你不调用 setExpectReply(false),文件写入 bean 默认为 true,尽管管道仍然按预期工作,但你会看到有几个错误被记录,指出没有配置回复通道。
你还会注意到,你不需要显式声明通道。如果没有这些名称的 bean 存在,textInChannel 和 fileWriterChannel 将自动创建。但是,如果你想更多地控制通道的配置方式,你可以像这样显式地将它们构造为 bean:
Java
@Bean
public MessageChannel textInChannel() {
return new DirectChannel();
}
...
@Bean
public MessageChannel fileWriterChannel() {
return new DirectChannel();
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
Java 配置选项可以说更易于阅读,稍微简洁一些。但是,使用 Spring Integration 的 Java DSL(领域特定语言)配置风格,可以使其变得更加精简。
1.3. 使用 Spring Integration 的 DSL 配置
让我们再尝试一次定义文件写入的集成流。这一次,你仍然会使用 Java,但是你将使用 Spring Integration 的 Java DSL。与其为流中的每个组件声明单独的 bean,你将声明一个单一的 bean,用于定义整个流。
Java
@Configuration
public class FileWriterIntegrationConfig {
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel"))
.<String, String>transform(t -> t.toUpperCase())
.handle(Files
.outboundAdapter(new File("/tmp/sia6/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
这个新配置尽可能地简洁,将整个流都捕获在一个单一的 bean 方法中。IntegrationFlows 类启动了构建器 API,你可以从中声明流。
在上述示例中,你首先从名为 textInChannel 的通道接收消息,然后将其传递给一个将消息载荷转换为大写的转换器。在转换器之后,消息被一个从 Spring Integration 的文件模块提供的 Files 类创建的出站通道适配器处理。最后,调用 get() 构建了要返回的 IntegrationFlow。简而言之,这个单一的 bean 方法定义了与 XML 和 Java 配置示例相同的集成流。
你会注意到,与 Java 配置示例一样,你不需要显式声明通道 bean。虽然你引用了 textInChannel,但是因为没有使用该名称的现有通道 bean,Spring Integration 会自动创建它。但是如果你愿意,你可以显式声明通道 bean。
至于连接转换器和出站通道适配器的通道,你甚至不需要按名称引用它。如果需要显式配置通道,你可以在流定义中通过调用 channel() 引用它的名称,如下所示:
Java
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel"))
.<String, String>transform(t -> t.toUpperCase())
.channel(MessageChannels.direct("FileWriterChannel"))
.handle(Files
.outboundAdapter(new File("/tmp/sia6/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
在使用 Spring Integration 的 Java DSL(就像使用任何流式 API 一样),需要明智地使用空格来保持可读性。在这里给出的示例中,我已经小心地缩进了行,以表示相关代码块。对于更长、更复杂的流,你甚至可以考虑将流的部分提取到单独的方法或子流中,以提高可读性。
现在你已经看到了使用三种不同配置风格定义的简单流,让我们退后一步,看看 Spring Integration 的整体架构。
2. 探索 Spring Integration
Spring Integration 涵盖了大量的集成场景。试图在单一章节中包含所有内容就像试图把大象塞进信封一样困难。与其对 Spring Integration 进行全面的介绍,不如给你展示一张 Spring Integration 大象的照片,让你对它的工作方式有一些了解。然后,你将创建一个额外的集成流,为 Taco Cloud 应用程序增加功能。
一个集成流由以下一种或多种组件组成。在你编写更多代码之前,我们将简要介绍每个组件在集成流中所起的作用:
- Channel:将消息从一个元素传递到另一个元素;
- Filter:基于某些条件有选择地允许消息通过流;
- Transformer:更改消息值和/或将消息载荷从一种类型转换为另一种类型;
- Router:根据消息头将消息定向到多个通道中的一个;
- Splitter:将传入的消息拆分为两个或多个消息,每个消息发送到不同的通道;
- Aggregator:与 Splitter 相反,将来自不同通道的多个消息合并为单个消息;
- Service activator:将消息交给某个 Java 方法进行处理,然后在输出通道上发布返回值;
- Channel adapter:将通道连接到某个外部系统或传输,可以接受输入或将消息写入外部系统;
- Gateway:通过接口将数据传递到集成流中;
当你定义文件写入集成流时,已经看到了其中一些组件的作用。FileWriterGateway 接口是应用程序提交要写入文件的文本的网关。你还定义了一个转换器将给定的文本转换为大写;然后你声明了一个服务网关,用于执行将文本写入文件的任务。流中还有两个通道 textInChannel 和 fileWriterChannel,用于连接其他组件。现在,让我们快速浏览一下集成流组件。
2.1. 消息通道
消息通道是消息在集成管道中传递的手段,如图 2.1 所示,它们是连接 Spring Integration 所有其他部分的管道。

Spring Integration 提供了几种通道实现,包括以下几种:
PublishSubscribeChannel:发布到PublishSubscribeChannel的消息将传递给一个或多个消费者。如果存在多个消费者,则它们都会收到消息;QueueChannel:发布到QueueChannel的消息会存储在队列中,直到按照先进先出(FIFO)的顺序被消费者拉取。如果存在多个消费者,则只有一个消费者会收到消息;PriorityChannel:类似于QueueChannel,但是消息根据消息的优先级标头由消费者拉取,而不是采用 FIFO 行为;RendezvousChannel:类似于QueueChannel,不同之处在于发送者会阻塞通道直到消费者接收消息,有效地将发送者与消费者同步;DirectChannel:类似于PublishSubscribeChannel,但是通过在发送者的同一线程中调用消费者来将消息发送给单个消费者,这允许事务跨通道进行;ExecutorChannel:类似于DirectChannel,但是消息分发是通过TaskExecutor进行的,发生在与发送者不同的线程中,这种通道类型不支持跨通道的事务;FluxMessageChannel:基于 Project Reactor 的 Flux 的响应式流发布者消息通道(我们将在后续更详细地讨论响应式流、Reactor 和 Flux);
在 Java 配置和 Java DSL 风格中,输入通道会自动创建,默认情况下是 DirectChannel。但如果你想使用其他通道实现,你需要显式地将通道声明为一个 bean,并在集成流中引用它。例如,要声明一个 PublishSubscribeChannel,你可以声明以下 @Bean 方法:
Java
@Bean
public MessageChannel orderChannel() {
return new PublishSubscribeChannel();
}1
2
3
4
2
3
4
接下来,你会在集成流定义中通过名称引用这个通道。例如,如果这个通道被服务激活器 bean 所消费,你会在 @ServiceActivator 的 inputChannel 属性中引用它,像这样:
Java
@ServiceActivator(inputChannel="orderChannel")或者,如果你使用的是 Java DSL 配置风格,你可以通过调用 channel() 来引用它,如下所示:
Java
@Bean
public IntegrationFlow orderFlow() {
return IntegrationFlows
...
.channel("orderChannel")
...
.get();
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
需要注意的是,如果你使用 QueueChannel,消费者必须配置一个轮询器。例如,假设你声明了一个 QueueChannel bean,像这样:
Java
@Bean
public MessageChannel orderChannel() {
return new QueueChannel();
}1
2
3
4
2
3
4
你需要确保消费者被配置为轮询通道以获取消息。对于服务激活器来说,@ServiceActivator 注解可能如下所示:
Java
@ServiceActivator(inputChannel="orderChannel", poller=@Poller(fixedRate="1000"))在这个例子中,服务激活器每 1,000 毫秒从名为 orderChannel 的通道中轮询获取消息。
2.2. 过滤器
你可以将过滤器放置在集成管道中的任意位置,以允许或阻止消息继续流向流的下一步,就像图 2.2 中所示。

例如,假设包含整数值的消息通过名为 numberChannel 的通道发布,但你只希望偶数通过并传递到名为 evenNumberChannel 的通道。在这种情况下,你可以使用 @Filter 注解声明一个过滤器,如下所示:
Java
@Filter(inputChannel="numberChannel", outputChannel="evenNumberChannel")
public boolean evenNumberFilter(Integer number) {
return number % 2 == 0;
}1
2
3
4
2
3
4
另外,如果你使用 Java DSL 配置风格来定义你的集成流,你可以像这样调用 filter():
Java
@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
return IntegrationFlows
...
.<Integer>filter((p) -> p % 2 == 0)
...
.get();
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
在这种情况下,你使用一个 lambda 来实现过滤器。但事实上,filter() 方法接受一个 GenericSelector 作为参数。这意味着,如果你的过滤需求对于一个简单的 lambda 来说太复杂,你可以实现 GenericSelector 接口。
2.3. 转换器
转换器对消息执行一些操作,通常会产生一个不同的消息,可能还会有不同的载荷类型(参见图 2.3)。转换可以是一些简单的操作,比如对数字执行数学运算或操作字符串值。也可以是更复杂的操作,比如使用表示 ISBN 的字符串值查找并返回相应书籍的详细信息。

例如,假设整数值被发布到一个名为 numberChannel 的通道上,你想将这些数字转换为包含罗马数字等价物的字符串。在这种情况下,你可以声明一个类型为 GenericTransformer 的 bean,并使用 @Transformer 进行注解,如下所示:
Java
@Bean
@Transformer(inputChannel="numberChannel", outputChannel="romanNumberChannel")
public GenericTransformer<Integer, String> romanNumTransformer() {
return RomanNumbers::toRoman;
}1
2
3
4
5
2
3
4
5
@Transformer 注解将此 bean 标识为一个转换器 bean,它从名为 numberChannel 的通道接收整数值,并使用名为 toRoman() 的静态方法进行转换。结果被发布到名为 romanNumberChannel 的通道。
在 Java DSL 配置风格中,可以通过调用 transform() 并传递对 toRoman() 方法的方法引用来更加简便地实现:
Java
@Bean
public IntegrationFlow transformerFlow() {
return IntegrationFlows
...
.transform(RomanNumbers::toRoman)
...
.get();
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
尽管你在这两个转换器代码示例中都使用了方法引用,但要知道转换器也可以使用 lambda 表达式来指定。或者,如果转换器足够复杂,需要单独的 Java 类来实现,你可以将它作为一个 bean 注入到流配置中,并像下面这样将引用传递给 transform() 方法:
Java
@Bean
public RomanNumberTransformer romanNumberTransformer() {
return new RomanNumberTransformer();
}
@Bean
public IntegrationFlow transformerFlow(RomanNumberTransformer romanNumberTransformer) {
return IntegrationFlows
...
.transform(romanNumberTransformer)
...
.get();
}1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
在这里,你声明了一个类型为 RomanNumberTransformer 的 bean,它本身是 Spring Integration 的 Transformer 或 GenericTransformer 接口的实现。这个 bean 被注入到 transformerFlow() 方法中,并在定义集成流时传递给 transform() 方法。
2.4. 路由
路由器根据一些路由标准允许在集成流中进行分支,将消息定向到不同的通道(参见图 2.4)。

例如,假设你有一个名为 numberChannel 的通道,通过它传递整数值。假设你想将所有偶数的消息定向到名为 evenChannel 的通道,而奇数的消息则路由到名为 oddChannel 的通道。为了在你的集成流中创建这样的路由,你可以声明一个类型为 AbstractMessageRouter 的 bean,并使用 @Router 对其进行注解,如下所示:
Java
@Bean
@Router(inputChannel="numberChannel")
public AbstractMessageRouter evenOddRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
Integer number = (Integer) message.getPayload();
if (number % 2 == 0) {
return Collections.singleton(evenChannel());
}
return Collections.singleton(oddChannel());
}
};
}
@Bean
public MessageChannel evenChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel oddChannel() {
return new DirectChannel();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
这里声明的 AbstractMessageRouter bean 接受来自名为 numberChannel 的输入通道的消息。该实现定义为匿名内部类,检查消息载荷,如果是偶数,则返回名为 evenChannel 的通道(在路由器 bean 之后声明为一个 bean)。否则,通道载荷中的数字必须是奇数,此时返回名为 oddChannel 的通道(也在一个 bean 声明方法中声明)。
在 Java DSL 形式中,通过在流定义中调用 route() 来声明路由器,如下所示:
Java
@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
return IntegrationFlows
...
.<Integer, String>route(n -> n % 2 == 0 ? "EVEN" : "ODD", mapping -> mapping
.subFlowMapping("EVEN", sf -> sf
.<Integer, Integer>transform(n -> n * 10)
.handle((i, h) -> { ... })
)
.subFlowMapping("ODD", sf -> sf
.transform(RomanNumbers::toRoman)
.handle((i, h) -> { ... })
)
)
.get();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
虽然仍然可以声明一个 AbstractMessageRouter 并将其传递给 route(),但这个例子使用了一个 lambda 来判断消息载荷是奇数还是偶数。如果是偶数,则返回字符串值 EVEN。如果是奇数,则返回 ODD。然后使用这些值来确定哪个子映射将处理消息。
2.5. 分割器
在集成流中,有时将消息拆分为多个独立处理的消息可能很有用。分割器会为你拆分和处理这些消息,如图 2.5 所示。

分割器在许多情况下都很有用,但你可能会为以下两个基本用例使用分割器:
消息载荷包含了一组相同类型的项目,你希望将其作为单独的消息载荷进行处理。例如,携带产品列表的消息可能会被拆分成多个携带单个产品的消息;
消息载荷携带了虽然相关但可以分成两个或更多不同类型消息的信息。例如,购买订单可能携带了交付、结算和商品信息。交付细节可能由一个子流处理,结算由另一个处理,而商品信息则由另一个处理。在这种情况下,分割器通常后面跟着一个路由器,根据载荷类型路由消息,以确保数据被正确处理;
当将消息载荷拆分为两个或更多不同类型的消息时,通常只需要定义一个 POJO,该 POJO 可以提取传入载荷的各个部分,并将它们作为集合的元素返回。
例如,假设你想将携带购买订单的消息拆分为两个消息:一个携带结算信息,另一个携带商品清单。下面的 OrderSplitter 就可以完成这项工作:
Java
public class OrderSplitter {
public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
ArrayList<Object> parts = new ArrayList<>();
parts.add(po.getBillingInfo());
parts.add(po.getLineItems());
return parts;
}
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
然后,你可以通过使用 @Splitter 注解来声明 OrderSplitter bean 作为集成流的一部分,如下所示:
Java
@Bean
@Splitter(inputChannel="poChannel", outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
return new OrderSplitter();
}1
2
3
4
5
2
3
4
5
在这里,购买订单到达名为 poChannel 的通道,并由 OrderSplitter 进行拆分。然后,返回集合中的每个项目都作为集成流中的单独消息发布到名为 splitOrderChannel 的通道中。在这种情况下,你可以声明一个 PayloadTypeRouter 将结算信息和商品信息路由到它们自己的子流中,如下所示:
Java
@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(BillingInfo.class.getName(), "billingInfoChannel");
router.setChannelMapping(List.class.getName(), "lineItemsChannel");
return router;
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
顾名思义,PayloadTypeRouter 根据消息的载荷类型将消息路由到不同的通道。在这里配置的是,载荷类型为 BillingInfo 的消息将被路由到名为 billingInfoChannel 的通道进行进一步处理。至于商品信息,它们在一个 java.util.List 集合中;因此,你可以将类型为 List 的载荷映射到名为 lineItemsChannel 的通道。
目前,该流分为两个子流:一个是处理 BillingInfo 对象的流,另一个是处理 List<LineItem> 的流。但如果你希望进一步细分,不是处理一个 LineItem 对象的列表,而是单独处理每个 LineItem 对象,你只需要编写一个(而不是一个 bean)使用 @Splitter 注解的方法,该方法返回一个 LineItem 对象的集合,可能类似于这样:
Java
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
return lineItems;
}1
2
3
4
2
3
4
当携带着 List<LineItem> 载荷的消息到达名为 lineItemsChannel 的通道时,它会进入 lineItemSplitter() 方法。根据分割器的规则,方法必须返回一个要拆分的项目集合。在这种情况下,你已经有了一个 LineItem 对象的集合,所以你只需直接返回这个集合。因此,集合中的每个 LineItem 都会被单独发布为一条消息到名为 lineItemChannel 的通道。
如果你更喜欢使用 Java DSL 来声明相同的分割器/路由器配置,你可以通过调用 split() 和 route() 来实现,如下所示:
Java
return IntegrationFlows
...
.split(orderSplitter())
.<Object, String>route(
p -> {
if (p.getClass().isAssignableFrom(BillingInfo.class)) {
return "BILLING_INFO";
} else {
return "LINE_ITEMS";
}
}, mapping -> mapping
.subFlowMapping("BILLING_INFO", sf -> sf
.<BillingInfo>handle((billingInfo, h) -> { ... }))
.subFlowMapping("LINE_ITEMS", sf -> sf
.split()
.<LineItem>handle((lineItem, h) -> { ... }))
)
.get();1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
流定义的 DSL 形式确实更加简洁,尽管可能稍微难以理解一些。我们可以通过将 lambda 表达式提取到方法中来使代码更清晰。例如,我们可以使用以下三个方法来替换流定义中使用的 lambda 表达式:
Java
private String route(Object p) {
return p.getClass().isAssignableFrom(BillingInfo.class) ? "BILLING_INFO" : "LINE_ITEMS";
}
private BillingInfo handleBillingInfo(BillingInfo billingInfo, MessageHeaders h) {
// ...
}
private LineItem handleLineItems(LineItem lineItem, MessageHeaders h) {
// ...
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
然后,我们可以使用方法引用重写集成流,如下所示:
Java
return IntegrationFlows
...
.split()
.route(
this::route,
mapping -> mapping
.subFlowMapping("BILLING_INFO", sf -> sf
.<BillingInfo>handle(this::handleBillingInfo))
.subFlowMapping("LINE_ITEMS", sf -> sf
.split()
.<LineItem>handle(this::handleLineItems)));1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
无论哪种方式,这都使用了与 Java 配置示例相同的 OrderSplitter 来拆分订单。订单拆分后,根据其类型被路由到两个不同的子流中。
2.6. 服务激活器
服务激活器从输入通道接收消息,并将这些消息发送给 MessageHandler 的实现,如图 2.6 所示:

MessageHandler 调用某个服务Spring Integration 提供了几个开箱即用的 MessageHandler 实现(甚至 PayloadTypeRouter 也是 MessageHandler 的一种实现),但你通常需要提供一些自定义的实现来充当服务激活器。例如,以下代码展示了如何声明一个 MessageHandler bean,并配置为服务激活器:
Java
@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
return message -> {
System.out.println("Message payload: " + message.getPayload());
};
}1
2
3
4
5
6
7
2
3
4
5
6
7
该 bean 使用 @ServiceActivator 注解标记,表示它是一个服务激活器,用于处理来自名为 someChannel 的通道的消息。至于 MessageHandler 本身,它是通过 lambda 表达式实现的。虽然它是一个简单的 MessageHandler,但在接收到消息后,它会将其载荷输出到标准输出流中。另外,你也可以声明一个服务激活器,在返回新载荷之前处理传入消息中的数据,就像下面的代码片段所示。在这种情况下,该 bean 应该是 GenericHandler 而不是 MessageHandler:
Java
@Bean
@ServiceActivator(inputChannel="orderChannel", outputChannel="completeChannel")
public GenericHandler<EmailOrder> orderHandler(OrderRepository orderRepo) {
return (payload, headers) -> {
return orderRepo.save(payload);
};
}1
2
3
4
5
6
7
2
3
4
5
6
7
在这种情况下,服务激活器是一个 GenericHandler,它期望具有 EmailOrder 类型负载的消息。当订单到达时,它会通过一个仓库进行保存;保存后的 EmailOrder 将被返回,以便发送到名为 completeChannel 的输出通道。
你可能会注意到,GenericHandler 不仅接收载荷,还接收消息头(即使示例中没有以任何方式使用这些头)。如果你愿意的话,你也可以在 Java DSL 配置风格中使用服务激活器,方法是将 MessageHandler 或 GenericHandler 传递给流定义中的 handle(),如下所示:
Java
public IntegrationFlow someFlow() {
return IntegrationFlows
...
.handle(msg -> {
System.out.println("Message payload: " + msg.getPayload());
})
.get();
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
在这种情况下,MessageHandler 被提供为 lambda 表达式,但你也可以将其提供为方法引用,甚至是实现了 MessageHandler 接口的类的实例。如果你使用 lambda 表达式或方法引用,请注意它接受一个消息作为参数。
同样地,如果服务激活器不打算作为流的末端,则 handle() 可以编写为接受 GenericHandler。以前面保存订单的服务激活器为例,你可以像这样使用 Java DSL 配置流:
Java
public IntegrationFlow orderFlow(OrderRepository orderRepo) {
return IntegrationFlows
...
.<EmailOrder>handle((payload, headers) -> {
return orderRepo.save(payload);
})
...
.get();
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
当使用 GenericHandler 时,lambda 表达式或方法引用将接受消息的负载和头部作为参数。此外,如果你选择在流的末端使用 GenericHandler,你需要返回 null,否则会出现错误,指示未指定输出通道。
2.7. 网关
网关是应用程序可以将数据提交到集成流中,并可选择接收作为流结果的响应的方式。Spring Integration 实现了这些网关,它们被实现为应用程序可以调用的接口,用于将消息发送到集成流中(参见图 2.7)。

你已经看到了一个消息网关的例子,即 FileWriterGateway。FileWriterGateway 是一个单向网关,它有一个接受字符串并将其写入文件的方法,返回值为 void。编写双向网关同样简单。在编写网关接口时,确保方法返回某个值以发布到集成流中。
举个例子,想象一个网关,它代表一个简单的集成流,接受一个字符串并将给定的字符串转换为大写。网关接口可能如下所示:
Java
@Component
@MessagingGateway(defaultRequestChannel="inChannel", defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
String uppercase(String in);
}1
2
3
4
5
2
3
4
5
这个接口的惊人之处在于不需要手动实现它。Spring Integration 在运行时会自动提供一个实现,通过指定的通道发送和接收数据。
当调用 uppercase() 方法时,给定的字符串将被发布到名为 inChannel 的集成流通道中。无论流如何定义或执行何种操作,一旦数据到达名为 outChannel 的通道,它就会从 uppercase() 方法中返回。
至于转换为大写的集成流,它是一个简单的集成流,只有一个步骤将字符串转换为大写。下面是它在 Java DSL 配置中的表达方式:
Java
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows
.from("inChannel")
.<String, String> transform(s -> s.toUpperCase())
.channel("outChannel")
.get();
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
根据这里的定义,流从数据进入名为 inChannel 的通道开始。然后,消息负载通过转换器进行转换,这里定义为一个 lambda 表达式,用于执行大写操作。然后将结果消息发布到名为 outChannel 的通道中,这就是你在 UpperCaseGateway 接口中声明的回复通道。
2.8. 通道适配器
通道适配器代表集成流的入口和出口点。数据通过入站通道适配器进入集成流,并通过出站通道适配器离开集成流。这在图 2.8 中有所说明。

入站通道适配器可以有许多形式,取决于它们引入流的数据来源。例如,你可以声明一个入站通道适配器,从 AtomicInteger 中引入递增的数字到流中。使用 Java 配置,它可能如下所示:
Java
@Bean
@InboundChannelAdapter(poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
return () -> {
return new GenericMessage<>(source.getAndIncrement());
};
}1
2
3
4
5
6
7
2
3
4
5
6
7
这个 @Bean 方法声明了一个入站通道适配器 bean,在 @InboundChannelAdapter 注解的作用下,它每 1 秒(或 1,000 毫秒)从注入的 AtomicInteger 中提交一个数字到名为 numberChannel 的通道中。
当使用 Java 配置时,@InboundChannelAdapter 表示一个入站通道适配器,而在使用 Java DSL 定义集成流时,from() 方法则是实现这一功能的方式。下面是一个流定义的片段,展示了在 Java DSL 中定义的类似入站通道适配器:
Java
@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
return IntegrationFlows
.from(integerSource, "getAndIncrement", c -> c.poller(Pollers.fixedRate(1000)))
...
.get();
}1
2
3
4
5
6
7
2
3
4
5
6
7
通常情况下,通道适配器是由 Spring Integration 的众多端点模块之一提供的。例如,假设你需要一个入站通道适配器来监视指定的目录,并将写入该目录的任何文件作为消息提交到名为 file-channel 的通道。以下是使用 Spring Integration 的文件端点模块中的 FileReadingMessageSource 实现这一目标的 Java 配置示例:
Java
@Bean
@InboundChannelAdapter(channel="file-channel", poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
在 Java DSL 中编写等效的文件读取入站通道适配器时,Files 类的 inboundAdapter() 方法可以实现相同的功能。如下所示,出站通道适配器是集成流的末端,将最终的消息交给应用程序或其他系统处理:
Java
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(INPUT_DIR))
.patternFilter(FILE_PATTERN))
.get();
}1
2
3
4
5
6
7
2
3
4
5
6
7
服务激活器作为消息处理器实现时,通常用于作为出站通道适配器,特别是当数据需要传递给应用程序本身时。我们已经讨论过服务激活器,所以没有必要重复讨论了。
值得注意的是,Spring Integration 端点模块为几种常见用例提供了有用的消息处理器。例如 FileWritingMessageHandler。说到 Spring Integration 端点模块,让我们快速看一下有哪些现成的集成端点模块可用。
2.9. 端点模块
很棒的是,Spring Integration 允许你创建自己的通道适配器。但更棒的是,Spring Integration 提供了两打以上的端点模块,其中包含用于与各种常见外部系统集成的通道适配器,包括表 2.1 中列出的内容:
| 模块 | 依赖的 Artifact ID (Group ID: org.springframework.integration) |
|---|---|
| AMQP | spring-integration-amqp |
| Application events | spring-integration-event |
| Atom and RSS | spring-integration-feed |
spring-integration-mail | |
| Filesystem | spring-integration-file |
| FTP/FTPS | spring-integration-ftp |
| GemFire | spring-integration-gemfire |
| HTTP | spring-integration-http |
| JDBC | spring-integration-jdbc |
| JMS | spring-integration-jms |
| JMX | spring-integration-jmx |
| JPA | spring-integration-jpa |
| Kafka | spring-integration-kafka |
| MongoDB | spring-integration-mongodb |
| MQTT | spring-integration-mqtt |
| R2DBC | spring-integration-r2dbc |
| Redis | spring-integration-redis |
| RMI | spring-integration-rmi |
| RSocket | spring-integration-rsocket |
| SFTP | spring-integration-sftp |
| STOMP | spring-integration-stomp |
| Stream | spring-integration-stream |
| Syslog | spring-integration-syslog |
| TCP/UDP | spring-integration-ip |
| WebFlux | spring-integration-webflux |
| Web Services | spring-integration-ws |
| WebSocket | spring-integration-websocket |
| XMPP | spring-integration-xmpp |
| ZeroMQ | spring-integration-zeromq |
| ZooKeeper | spring-integration-zookeeper |
从查看表格 2.1 可以明确一件事,那就是 Spring Integration 提供了丰富的组件来满足许多集成需求。大多数应用程序甚至不会用到 Spring Integration 提供的一小部分功能。但知道 Spring Integration 提供了这些组件是很好的,如果你需要其中任何组件,它都能应对。
更重要的是,在本章的篇幅内不可能详尽介绍表格 2.1 中列出的模块提供的所有通道适配器。你已经看到了使用文件系统模块向文件系统写入的示例,并且你很快就会使用电子邮件模块来读取电子邮件。
每个端点模块都提供了通道适配器,可以在使用 Java 配置时声明为 bean,也可以在使用 Java DSL 配置时通过静态方法引用。我鼓励你探索你最感兴趣的任何其他端点模块。你会发现它们在使用上相当一致。但现在,让我们把注意力转向电子邮件端点模块,看看你如何在 Taco Cloud 应用程序中使用它。
3. 创建 Email 集成流
你决定让 Taco Cloud 的客户可以通过电子邮件提交他们的 taco 设计并下订单。你发出传单,并在报纸上刊登外卖广告,邀请所有人通过电子邮件提交他们的 taco 订单。这一举措取得了巨大成功!不过,成功得有些过头了。收到了这么多电子邮件,你不得不雇佣临时工来做的事情就是阅读所有的电子邮件并将订单详情提交到订餐系统中。
在这一部分中,你将实现一个集成流,该流会定期轮询 Taco Cloud 的收件箱,解析电子邮件中的订单详情,并将订单提交到 Taco Cloud 进行处理。简而言之,你需要的集成流将使用电子邮件端点模块中的入站通道适配器,将 Taco Cloud 收件箱中的电子邮件导入到集成流中。
集成流中的下一步将会将电子邮件解析为订单对象,并将其交给另一个处理程序,以便将订单提交到 Taco Cloud 的 REST API 中进行处理,处理方式与其他订单相同。首先,让我们定义一个简单的配置属性类,以捕获如何处理 Taco Cloud 电子邮件的具体信息,如下所示:
Java
@Data
@ConfigurationProperties(prefix = "taco.email")
@Component
public class EmailProperties {
private String username;
private String password;
private String host;
private String mailbox;
private long pollRate = 30000;
public String getImapUrl() {
return String.format("imaps://%s:%s@%s/%s",
this.username, this.password, this.host, this.mailbox);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
正如你所看到的,EmailProperties 捕获了用于生成 IMAP URL 的属性。该流使用此 URL 连接到 Taco Cloud 的电子邮件服务器并轮询邮件。其中捕获的属性包括电子邮件用户的用户名和密码,以及 IMAP 服务器的主机名、轮询的邮箱以及轮询邮箱的频率(默认为每 30 秒一次)。
EmailProperties 类在类级别上用 @ConfigurationProperties 注解,并设置了 prefix 属性为 taco.email。这意味着你可以在 application.yml 文件中配置如下以消费电子邮件的详细信息:
YAML
taco:
email:
host: imap.tacocloud.com
mailbox: INBOX
username: taco-in-flow
password: 1L0v3T4c0s
poll-rate: 100001
2
3
4
5
6
7
2
3
4
5
6
7
当然,这里显示的电子邮件服务器配置是虚构的。你需要调整它以匹配你将要使用的电子邮件服务器的详细信息。
Note:此时,你的集成开发环境(IDE)可能会出现 “未知属性” 的警告。这是因为 IDE 正在寻找它需要了解这些属性意义的元数据。这些警告不会影响实际代码,你可以忽略它们,或者你可以通过声明配置属性元数据来消除该警告。
现在让我们使用 EmailProperties 来配置集成流。你打算创建的流将类似于图 3.1。
在定义此流时,你有以下两个选项:
在 Taco Cloud 应用程序内部定义它。在流的结尾,一个服务激活器将调用你定义的存储库来创建 taco 订单;
将其定义为一个单独的应用程序。在流的结尾,一个服务激活器将发送一个 POST 请求到 Taco Cloud API 来提交 taco 订单;

你选择的方式对整个流的影响并不大,除非涉及服务激活器的实现方式。但由于你需要一些表示 taco、订单和配料的类型,这些类型与你已经在主要的 Taco Cloud 应用程序中定义的类型略有不同,你会通过在单独的应用程序中定义集成流来避免与现有的领域类型混淆。
你也可以选择使用 XML 配置、Java 配置或者 Java DSL 来定义流。我比较喜欢 Java DSL 的优雅。如果你对其他配置风格感兴趣,可以尝试使用其中一种来编写流,这样可以增加一些额外的挑战。现在,让我们看一下下面展示的用于 taco 订单邮件流的 Java DSL 配置。
Java
@Configuration
public class TacoOrderEmailIntegrationConfig {
@Bean
public IntegrationFlow tacoOrderEmailFlow(
EmailProperties emailProps,
EmailToOrderTransformer emailToOrderTransformer,
OrderSubmitMessageHandler orderSubmitHandler) {
return IntegrationFlows
.from(Mail.imapInboundAdapter(emailProps.getImapUrl()),
e -> e.poller(
Pollers.fixedDelay(emailProps.getPollRate())))
.transform(emailToOrderTransformer)
.handle(orderSubmitHandler)
.get();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
在 tacoOrderEmailFlow() 方法中定义的 taco 订单邮件流由以下三个不同的组件组成:
一个 IMAP 邮件入站通道适配器:该通道适配器使用
EmailProperties的getImapUrl()方法生成的 IMAP URL 创建,并在EmailProperties的pollRate属性设置的延迟上进行轮询。进来的邮件会被传递到一个通道,将其连接到转换器;一个转换器,将邮件转换为订单对象:该转换器由
EmailToOrderTransformer实现,在tacoOrderEmailFlow()方法中被注入。转换后得到的订单会通过另一个通道传递给最终的组件;一个处理器(充当出站通道适配器):该处理器接受一个订单对象,并将其提交给 Taco Cloud 的 REST API;
对 Mail.imapInboundAdapter() 的调用是通过将 Email endpoint 模块作为依赖项包含在项目构建中实现的。Maven 依赖项如下所示:
XML
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId>
</dependency>1
2
3
4
2
3
4
EmailToOrderTransformer 类是 Spring Integration 的 Transformer 接口的实现,通过扩展 AbstractMailMessageTransformer 实现的,如下所示:
Java
@Component
public class EmailToOrderTransformer
extends AbstractMailMessageTransformer<EmailOrder> {
private static Logger log =
LoggerFactory.getLogger(EmailToOrderTransformer.class);
private static final String SUBJECT_KEYWORDS = "TACO ORDER";
@Override
protected AbstractIntegrationMessageBuilder<EmailOrder>
doTransform(Message mailMessage) throws Exception {
EmailOrder tacoOrder = processPayload(mailMessage);
return MessageBuilder.withPayload(tacoOrder);
}
private EmailOrder processPayload(Message mailMessage) {
try {
String subject = mailMessage.getSubject();
if (subject.toUpperCase().contains(SUBJECT_KEYWORDS)) {
String email =
((InternetAddress) mailMessage.getFrom()[0]).getAddress();
String content = mailMessage.getContent().toString();
return parseEmailToOrder(email, content);
}
} catch (MessagingException e) {
log.error("MessagingException: {}", e);
} catch (IOException e) {
log.error("IOException: {}", e);
}
return null;
}
private EmailOrder parseEmailToOrder(String email, String content) {
EmailOrder order = new EmailOrder(email);
String[] lines = content.split("\\r?\\n");
for (String line : lines) {
if (line.trim().length() > 0 && line.contains(":")) {
String[] lineSplit = line.split(":");
String tacoName = lineSplit[0].trim();
String ingredients = lineSplit[1].trim();
String[] ingredientsSplit = ingredients.split(",");
List<String> ingredientCodes = new ArrayList<>();
for (String ingredientName : ingredientsSplit) {
String code = lookupIngredientCode(ingredientName.trim());
if (code != null) {
ingredientCodes.add(code);
}
}
Taco taco = new Taco(tacoName);
taco.setIngredients(ingredientCodes);
order.addTaco(taco);
}
}
return order;
}
private String lookupIngredientCode(String ingredientName) {
for (Ingredient ingredient : ALL_INGREDIENTS) {
String ucIngredientName = ingredientName.toUpperCase();
if (LevenshteinDistance.getDefaultInstance()
.apply(ucIngredientName, ingredient.getName()) < 3 ||
ucIngredientName.contains(ingredient.getName()) ||
ingredient.getName().contains(ucIngredientName)) {
return ingredient.getCode();
}
}
return null;
}
private static Ingredient[] ALL_INGREDIENTS = new Ingredient[]{
new Ingredient("FLTO", "FLOUR TORTILLA"),
new Ingredient("COTO", "CORN TORTILLA"),
new Ingredient("GRBF", "GROUND BEEF"),
new Ingredient("CARN", "CARNITAS"),
new Ingredient("TMTO", "TOMATOES"),
new Ingredient("LETC", "LETTUCE"),
new Ingredient("CHED", "CHEDDAR"),
new Ingredient("JACK", "MONTERREY JACK"),
new Ingredient("SLSA", "SALSA"),
new Ingredient("SRCR", "SOUR CREAM")
};
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
AbstractMailMessageTransformer 是一个方便的基类,用于处理负载为电子邮件的消息。它负责从传入的消息中提取电子邮件信息,并将其传递给 doTransform() 方法中的 Message 对象。在 doTransform() 方法中,你将 Message 对象传递给一个名为 processPayload() 的私有方法,以将邮件解析为 EmailOrder 对象。尽管相似,但这里的 EmailOrder 对象与主要的 Taco Cloud 应用程序中使用的 TacoOrder 对象不同;它更简单,如下所示:
Java
@Data
public class EmailOrder {
private final String email;
private List<Taco> tacos = new ArrayList<>();
public void addTaco(Taco taco) {
this.tacos.add(taco);
}
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
这个 EmailOrder 类并不携带顾客的整个交付和账单信息,而只包含从传入邮件中获取的顾客电子邮件地址。
将邮件解析为 TACO 订单是一项复杂的任务。事实上,即使是一个简单的实现也需要几十行代码。而这几十行代码并没有进一步讨论 Spring Integration 及其如何实现转换器。为了节省空间,我将省略 processPayload() 方法的细节。
EmailToOrderTransformer 的最后一步是返回一个包含 EmailOrder 对象的 MessageBuilder。MessageBuilder 产生的消息被发送到集成流中的最终组件:一个消息处理器,将订单提交给 TACO Cloud 的 API。下方代码中显示的 OrderSubmitMessageHandler 实现了 Spring Integration 的 GenericHandler,用于处理携带 EmailOrder 负载的消息。
Java
@Component
public class OrderSubmitMessageHandler
implements GenericHandler<EmailOrder> {
private RestTemplate rest;
private ApiProperties apiProps;
public OrderSubmitMessageHandler(ApiProperties apiProps, RestTemplate rest) {
this.apiProps = apiProps;
this.rest = rest;
}
@Override
public Object handle(EmailOrder order, MessageHeaders headers) {
rest.postForObject(apiProps.getUrl(), order, String.class);
return null;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
为了满足 GenericHandler 接口的要求,OrderSubmitMessageHandler 重写了 handle() 方法。这个方法接收传入的 EmailOrder 对象,并使用一个注入的 RestTemplate 来通过 POST 请求将 EmailOrder 提交到一个注入的 ApiProperties 对象中捕获的 URL。最后,handle() 方法返回 null,表示这个处理器标志着流的结束。
ApiProperties 用于避免在调用 postForObject() 时将 URL 硬编码。它是一个配置属性文件,看起来像这样:
Java
@Data
@ConfigurationProperties(prefix = "taco.api")
@Component
public class ApiProperties {
private String url;
}1
2
3
4
5
6
2
3
4
5
6
在 application.yml 中,Taco Cloud API 的 URL 可以这样配置:
YAML
taco:
api:
url: http://localhost:8080/orders/fromEmail1
2
3
2
3
为了使 RestTemplate 在项目中可用,以便注入到 OrderSubmitMessageHandler 中,你需要像这样在项目构建中添加 Spring Boot web starter:
XML
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>1
2
3
4
2
3
4
虽然这样可以使 RestTemplate 在类路径中可用,但它也会触发 Spring MVC 的自动配置。作为一个独立的 Spring Integration 流,应用程序不需要 Spring MVC,甚至也不需要自动配置提供的内嵌 Tomcat。因此,你应该在 application.yml 中添加以下条目来禁用 Spring MVC 的自动配置:
YAML
spring:
main:
web-application-type: none1
2
3
2
3
spring.main.web-application-type 属性可以设置为 servlet、reactive 或 none。当 Spring MVC 在类路径中时,自动配置将其值设置为 servlet。但在这里,你将其覆盖为 none,这样 Spring MVC 和 Tomcat 就不会被自动配置。
4. 总结
Spring Integration 能够定义数据进入或离开应用程序时的流处理;
集成流可以使用 XML、Java 或简洁的 Java DSL 配置样式来定义;
消息网关和通道适配器充当集成流的入口和出口点;
消息可以在流中通过转换器、分割器、聚合器、路由器以及服务激活器进行转换、分割、聚合、路由和处理;
消息通道连接集成流的各个组件;