Appearance
Spring In Action 6th:发送异步消息
1. 使用 JMS 发送消息
JMS 是一个 Java 标准,它定义了一个用于处理消息代理的通用 API。JMS 首次在 2001 年引入,对于 Java 中的异步消息传递,JMS 已经成为首选方法很长一段时间了。在 JMS 之前,每个消息代理都有一个专有的 API,这使得应用程序的消息代码在代理之间的可移植性较差。但是有了 JMS,所有符合规范的实现都可以通过一个通用接口进行操作,就像 JDBC 为关系数据库操作提供了一个通用接口一样。
Spring 通过一个基于模板的抽象,即 JmsTemplate 来支持 JMS。使用 JmsTemplate,从生产者端发送消息到队列和主题,以及在消费者端接收这些消息变得很容易。Spring 也支持消息驱动 POJOs 的概念:简单的 Java 对象以异步的方式对到达队列或主题的消息做出反应。
我们将探索 Spring 对 JMS 的支持,包括 JmsTemplate 和消息驱动的 POJOs。我们的重点将是 Spring 对 JMS 消息传递的支持,但是如果你想了解更多关于 JMS 的信息,那么可以看一下 Bruce Snyder、Dejan Bosanac 和 Rob Davies 的《ActiveMQ in Action》(Manning,2011)。
在你可以发送和接收消息之前,你需要一个准备好的消息代理,它可以在生产者和消费者之间传递这些消息。让我们通过在 Spring 中设置一个消息代理来开始我们对 Spring JMS 的探索。
1.1. 设置 JMS
在你可以使用 JMS 之前,你必须将一个 JMS 客户端添加到你的项目构建中。有了 Spring Boot,这个过程再简单不过了。你需要做的就是向构建中添加一个启动器依赖。但是首先,你必须决定你是要使用 Apache ActiveMQ,还是较新的 Apache ActiveMQ Artemis 代理。
如果你使用的是 ActiveMQ,你需要将以下依赖添加到你的项目的 pom.xml 文件中:
XML
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>1
2
3
4
2
3
4
如果选择的是 ActiveMQ Artemis,那么启动器依赖应该是这样的:
XML
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>1
2
3
4
2
3
4
Artemis 是 ActiveMQ 的下一代重新实现,有效地使 ActiveMQ 成为遗留选项。因此,对于 Taco Cloud,你将选择 Artemis。但是,这个选择最终对你编写发送和接收消息的代码的方式影响不大。唯一的显著差异将在于你如何配置 Spring 来创建到 broker 的连接。
运行 Artemis broker:你需要运行一个 Artemis broker 才能运行本章中介绍的代码。如果你还没有运行 Artemis 实例,你可以按照 Artemis 文档中的说明进行操作。
默认情况下,Spring 假设你的 Artemis broker 在 localhost 的 61616 端口上监听。这对于开发目的来说是可以的,但是一旦你准备将你的应用程序发送到生产环境,你将需要设置一些属性,告诉 Spring 如何访问 broker。表 1.1 中列出了最有用的一些属性:
| 属性 | 描述 |
|---|---|
spring.artemis.host | broker 主机 |
spring.artemis.port | broker 端口 |
spring.artemis.user | 用于访问 broker 的用户(可选) |
spring.artemis.password | 用于访问 broker 的密码(可选) |
例如,考虑以下可能在非开发环境中使用的 application.yml 文件的条目:
YAML
spring:
artemis:
host: artemis.tacocloud.com
port: 61617
user: tacoweb
password: l3tm31n1
2
3
4
5
6
2
3
4
5
6
这将设置 Spring 创建到监听在 artemis.tacocloud.com,端口 61617 的 Artemis broker 的连接。它还设置了将与该 broker 交互的应用程序的凭据。凭据是可选的,但是建议在生产部署中使用。
如果你要使用 ActiveMQ 而不是 Artemis,则你需要使用表 1.2 中列出的特定于 ActiveMQ 的属性:
| 属性 | 描述 |
|---|---|
spring.activemq.broker-url | Broker 的 URL |
spring.activemq.user | 用于访问 Broker 的用户(可选) |
spring.activemq.password | 用于访问 Broker 的密码(可选) |
spring.activemq.in-memory | 是否启动内存 Broker(默认:true) |
注意,ActiveMQ broker 的地址是通过一个属性 spring.activemq.broker-url 指定的,而不是提供 broker 的主机名和端口的单独属性。URL 应该是 tcp:// URL,如下面的 YAML 片段所示:
YAML
spring:
activemq:
broker-url: tcp://activemq.tacocloud.com
user: tacoweb
password: l3tm31n1
2
3
4
5
2
3
4
5
如果你正在使用 ActiveMQ,你需要将 spring.activemq.inmemory 属性设置为 false,以防止 Spring 启动一个内存中的 broker。内存中的 broker 似乎很有用,但只有在你从发布它们的同一应用程序中消费消息时,它才有用(这有限的用途)。
你应该在继续之前安装并启动一个 Artemis(或 ActiveMQ)broker,而不是使用嵌入式 broker。我在这里不再重复安装说明,而是将你引导到 broker 文档以获取详细信息:
有了你的构建中的 JMS 启动器和一个等待从一个应用程序传送消息到另一个应用程序的 broker,你已经准备好开始发送消息了。
1.2. 使用 JmsTemplate 发送消息
在你的构建中有一个 JMS 启动器依赖(Artemis 或 ActiveMQ),Spring Boot 将自动配置一个 JmsTemplate(以及其他一些东西),你可以注入并使用它来发送和接收消息。
JmsTemplate 是 Spring 的 JMS 集成支持的核心。就像 Spring 的其他面向模板的组件一样,JmsTemplate 消除了大量与 JMS 一起工作时所需的样板代码。没有 JmsTemplate,你需要编写代码来创建与消息 broker 的连接和会话,并编写更多的代码来处理在发送消息过程中可能抛出的任何异常。JmsTemplate 关注你真正想做的事情:发送消息。
JmsTemplate 有几个对发送消息有用的方法,包括以下内容:
Java
// Send raw messages
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;
// Send messages converted from objects
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;
// Send messages converted from objects with post-processing
void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
如你所见,实际上只有两个方法,send() 和 convertAndSend(),每个都被重写以支持不同的参数。如果你仔细看,你会注意到 convertAndSend() 的各种形式可以分为两个子类别。在试图理解这些方法都做什么的时候,考虑以下的分类:
- 三个
send()方法需要一个MessageCreator来制造一个Message对象; - 三个
convertAndSend()方法接受一个Object并自动将该Object在后台转换为Message; - 三个
convertAndSend()方法自动将Object转换为Message,但也接受一个MessagePostProcessor以允许在发送之前自定义Message;
此外,这三个方法类别中的每一个都由三个重写的方法组成,这些方法通过如何指定 JMS 目标(队列或主题)来区分,如下所示:
- 一个方法不接受目标参数,并将消息发送到默认目标;
- 一个方法接受一个
Destination对象,该对象指定消息的目标; - 一个方法接受一个
String,该String通过名称指定消息的目标;
将这些方法付诸实践,考虑下面代码中的 JmsOrderMessagingService,它使用了 send() 方法的最基本形式。
Java
@Service
public class JmsOrderMessagingService implements OrderMessagingService {
private final JmsTemplate jms;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms) {
this.jms = jms;
}
@Override
public void sendOrder(TacoOrder order) {
jms.send(new MessageCreator() {
@Override
public Message createMessage(Session session)
throws JMSException {
return session.createObjectMessage(order);
}
}
);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
sendOrder() 方法调用了 jms.send(),并传入了一个 MessageCreator 的匿名内部类实现。该实现重写了 createMessage() 以从给定的 TacoOrder 对象创建一个新的对象消息。
由于特定于 JMS 的 JmsOrderMessagingService 实现了更通用的 OrderMessagingService 接口,我们可以通过将此服务注入到 OrderApiController 中并在创建订单时调用 sendOrder() 来使用此服务,如下所示:
Java
@RestController
@RequestMapping(path = "/api/orders",
produces = "application/json")
@CrossOrigin(origins = "http://localhost:8080")
public class OrderApiController {
private final OrderRepository repo;
private final OrderMessagingService messageService;
public OrderApiController(
OrderRepository repo,
OrderMessagingService messageService) {
this.repo = repo;
this.messageService = messageService;
}
@PostMapping(consumes = "application/json")
@ResponseStatus(HttpStatus.CREATED)
public TacoOrder postOrder(@RequestBody TacoOrder order) {
messageService.sendOrder(order);
return repo.save(order);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
现在,当你通过 Taco Cloud 网站创建一个订单时,应该会向代理发送一条消息,以便路由到将接收该订单的另一个应用程序。尽管我们还没有任何东西可以接收那条消息。即便如此,你仍然可以使用 Artemis 控制台查看队列的内容。关于如何做到这一点的详细信息,请参阅 Artemis 文档。
我不确定你怎么看,但我认为 JmsOrderMessagingService 中的代码虽然直截了当,但有点笨拙。声明匿名内部类所涉及的仪式使得本来简单的方法调用变得复杂。认识到 MessageCreator 是一个函数式接口,你可以用一个 lambda 表达式稍微整理一下 sendOrder() 方法,如下所示:
Java
@Override
public void sendOrder(TacoOrder order) {
jms.send(session -> session.createObjectMessage(order));
}1
2
3
4
2
3
4
但请注意,对 jms.send() 的调用并没有指定目的地。为了使其工作,你还必须使用 spring.jms.template.default-destination 属性指定一个默认的目的地名称。例如,你可以在你的 application.yml 文件中这样设置属性:
YAML
spring:
jms:
template:
default-destination: tacocloud.order.queue1
2
3
4
2
3
4
在许多情况下,使用默认目的地是最简单的选择。它让你一次性指定目的地名称,使得代码只需关心发送消息,而不用关心消息被发送到哪里。但是,如果你需要向除默认目的地之外的其他目的地发送消息,你需要将该目的地作为参数传递给 send()。
做到这一点的一种方式是将一个 Destination 对象作为 send() 的第一个参数传递。做到这一点的最简单的方法是声明一个 Destination bean,然后将其注入到执行消息传递的 bean 中。例如,以下 bean 声明了 Taco Cloud 订单队列 Destination:
Java
@Bean
public Destination orderQueue() {
return new ActiveMQQueue("tacocloud.order.queue");
}1
2
3
4
2
3
4
这个 bean 方法可以添加到应用程序中的任何配置类中,该应用程序将通过 JMS 发送或接收消息。为了组织起来,最好将其添加到为消息配置指定的配置类中,例如 MessagingConfig。
Note:值得注意的是,这里使用的
ActiveMQQueue实际上来自 Artemis(来自org.apache.activemq.artemis.jms.client包)。如果你正在使用 ActiveMQ(而不是 Artemis),也有一个名为ActiveMQQueue的类(来自org.apache.activemq.command包)。
如果这个 Destination bean 被注入到 JmsOrderMessagingService 中,你可以在调用 send() 时使用它来指定目的地,如下所示:
Java
@Service
public class JmsOrderMessagingService implements OrderMessagingService {
private final JmsTemplate jms;
private final Destination orderQueue;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms,
Destination orderQueue) {
this.jms = jms;
this.orderQueue = orderQueue;
}
...
@Override
public void sendOrder(TacoOrder order) {
jms.send(orderQueue, session -> session.createObjectMessage(order));
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
虽然像这样使用 Destination 对象指定目的地,为你提供了更多的配置 Destination 的机会,而不仅仅是目的地名称。但是在实际使用中,你几乎不会用到除了目标名称之外的其他配置。通常情况下,直接将名称作为 send() 函数的第一个参数发送会更方便:
Java
@Override
public void sendOrder(TacoOrder order) {
jms.send("tacocloud.order.queue", session -> session.createObjectMessage(order));
}1
2
3
4
2
3
4
虽然 send() 方法的使用并不特别困难(尤其是当 MessageCreator 以 lambda 的形式给出时),但是需要你提供一个 MessageCreator 的要求增加了一点复杂性。如果你只需要指定要发送的对象(以及可选的目标),那不是更简单吗?这就是 convertAndSend() 工作方式的简洁描述。让我们来看一看。
1.2.1. 在发送前转换消息
JmsTemplate 的 convertAndSend() 方法通过消除提供 MessageCreator 的需要,简化了消息发布。相反,你直接将要发送的对象传递给 convertAndSend(),并且该对象将在发送之前被转换为 Message。
例如,以下 sendOrder() 的重新实现使用 convertAndSend() 将 TacoOrder 发送到一个指定的目标:
Java
@Override
public void sendOrder(TacoOrder order) {
jms.convertAndSend("tacocloud.order.queue", order);
}1
2
3
4
2
3
4
就像 send() 方法一样,convertAndSend() 会接受 Destination 或 String 值来指定目标,或者你可以完全省略目标,将消息发送到默认目标。
无论你选择哪种形式的 convertAndSend(),传入 convertAndSend() 的 TacoOrder 在发送之前都会被转换为 Message。在幕后,这是通过一个 MessageConverter 的实现来实现的,它做了将应用程序域对象转换为 Message 对象的繁重工作。
1.2.2. 配置消息转换器
MessageConverter 是一个由 Spring 定义的接口,只需要实现以下两个方法:
Java
public interface MessageConverter {
Message toMessage(Object object, Session session)
throws JMSException, MessageConversionException;
Object fromMessage(Message message);
}1
2
3
4
5
2
3
4
5
虽然这个接口足够简单,可以实现,但你通常不需要创建自定义实现。Spring 已经提供了一些实现,比如表 1.3 中描述的那些:
| 消息转换器 | 功能 |
|---|---|
MappingJackson2MessageConverter | 使用 Jackson 2 JSON 库对消息进行与 JSON 的转换 |
MarshallingMessageConverter | 使用 JAXB 对消息进行与 XML 的转换 |
MessagingMessageConverter | 使用底层的 MessageConverter 将消息从消息抽象转换为 Message,并使用 JmsHeaderMapper 将 JMS 头映射到标准消息头,反之亦然 |
SimpleMessageConverter | 将 String 转换为 TextMessage,将字节数组转换为 BytesMessage,将 Map 转换为 MapMessage,将 Serializable 转换为 ObjectMessage |
org.springframework.jms.support.converter 包中)SimpleMessageConverter 是默认的,但它要求被发送的对象实现 Serializable。这可能是个好主意,但你可能更喜欢使用其他的消息转换器,比如 MappingJackson2MessageConverter,以避免这个限制。
要应用不同的消息转换器,你只需要声明所选转换器的一个实例作为 bean。例如,以下的 bean 声明将启用 MappingJackson2MessageConverter 代替 SimpleMessageConverter:
Java
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter =
new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");
return messageConverter;
}1
2
3
4
5
6
7
2
3
4
5
6
7
注意,你在返回 MappingJackson2MessageConverter 之前调用了 setTypeIdPropertyName()。这非常重要,因为它使接收者知道要将传入消息转换为何种类型。默认情况下,它将包含正在转换的类型的完全限定类名。但是这种方法有些不灵活,要求接收者也具有相同的类型,具有相同的完全限定类名。
为了提供更多的灵活性,你可以通过在消息转换器上调用 setTypeIdMappings() 来将合成类型名称映射到实际类型。例如,以下对消息转换器 bean 方法的更改将合成的 TacoOrder 类型 ID 映射到 TacoOrder 类:
Java
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter =
new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");
Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();
typeIdMappings.put("order", TacoOrder.class);
messageConverter.setTypeIdMappings(typeIdMappings);
return messageConverter;
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
在消息的 _typeId 属性中发送的不再是完全限定的类名,而是 TacoOrder 的值。在接收应用程序中,将配置一个类似的消息转换器,将 TacoOrder 映射到它自己对订单的理解。这个订单的实现可能在一个不同的包中,有一个不同的名称,甚至可能只有发送者的 TacoOrder 属性的一个子集。这样,即使发送和接收方的 TacoOrder 实现不同,也能保证消息的正确传递和处理。这就是 MessageConverter 的灵活性和强大之处。
1.2.3. 后期处理消息
假设 Taco Cloud 除了其盈利丰厚的网络业务外,还决定开设一些实体 taco 餐厅。鉴于他们的任何餐厅也可能是网络业务的配送中心,他们需要一种方式来向餐厅的厨房传达订单的来源。这将使厨房员工能够对店内订单和网络订单采用不同的处理流程。
在 TacoOrder 对象中添加一个新的 source 属性以携带这个信息是合理的,用 WEB 来填充在线下单的订单,用 STORE 来填充在店内下单的订单。但这将需要同时更改网站的 TacoOrder 类和厨房应用的 TacoOrder 类,而实际上,这些信息只是 taco 制备人员所需要的。
一个更简单的解决方案是在消息中添加一个自定义头部来携带订单的来源。如果你使用 send() 方法发送 taco 订单,这可以通过在 Message 对象上调用 setStringProperty() 方法来轻松实现,如下所示:
Java
jms.send("tacocloud.order.queue",
session -> {
Message message = session.createObjectMessage(order);
message.setStringProperty("X_ORDER_SOURCE", "WEB");
});1
2
3
4
5
2
3
4
5
这里的问题是你没有使用 send()。通过选择使用 convertAndSend(),Message 对象在底层被创建,你无法访问它。幸运的是,你有一种方法可以在发送前调整在底层创建的 Message。通过将 MessagePostProcessor 作为 convertAndSend() 的最后一个参数传入,你可以在 Message 被创建后对其进行任何操作。以下代码仍然使用 convertAndSend(),但它也使用了一个 MessagePostProcessor 在消息发送前添加 X_ORDER_SOURCE 头部:
Java
jms.convertAndSend("tacocloud.order.queue", order, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
});1
2
3
4
5
6
7
2
3
4
5
6
7
你可能已经注意到,MessagePostProcessor 是一个函数式接口。这意味着你可以通过用 lambda 表达式替换匿名内部类来简化它,如下所示:
Java
jms.convertAndSend("tacocloud.order.queue", order,
message -> {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
});1
2
3
4
5
2
3
4
5
虽然你只需要在这一次调用 convertAndSend() 时使用这个特定的 MessagePostProcessor,但你可能会发现自己在多次不同的调用 convertAndSend() 时使用相同的 MessagePostProcessor。在这些情况下,也许方法引用是一个比 lambda 表达式更好的选择,如下所示,可以避免不必要的代码重复:
Java
@GetMapping("/convertAndSend/order")
public String convertAndSendOrder() {
TacoOrder order = buildOrder();
jms.convertAndSend("tacocloud.order.queue", order,
this::addOrderSource);
return "Convert and sent order";
}
private Message addOrderSource(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
你现在已经看到了发送消息的几种方式。但是,如果没有人接收消息,发送消息就没有任何意义。让我们看看如何使用 Spring JMS 接收消息。
1.3. 接收 JMS 消息
在消费消息时,你可以选择拉取模型,即你的代码请求一个消息并等待其到来,或者推送模型,即消息一旦可用就会传递给你的代码。
JmsTemplate 提供了几种接收消息的方法,但它们都使用拉取模型。你调用其中的一个方法来请求一个消息,线程会被阻塞直到一个消息可用(可能立即可用,也可能需要等待一段时间)。
另一方面,你也可以选择使用推送模型,即定义一个消息监听器,每当有消息可用时就会被调用。
这两种选项都适用于各种用例。通常认为推送模型是最好的选择,因为它不会阻塞线程。但在某些用例中,如果消息到达得太快,监听器可能会负担过重。拉取模型使消费者能够声明他们已经准备好处理新消息。
让我们看看接收消息的两种方式。我们将从 JmsTemplate 提供的拉取模型开始。
1.3.1. 使用 JmsTemplate 接收消息
JmsTemplate 提供了从代理拉取方法的几种方法,包括以下几种:
Java
Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;1
2
3
4
5
6
2
3
4
5
6
如你所见,这六种方法与 JmsTemplate 的 send() 和 convertAndSend() 方法相对应。receive() 方法接收一个原始的 Message,而 receiveAndConvert() 方法使用配置好的消息转换器将消息转换为领域类型。对于这些方法,你可以指定一个 Destination 或包含目标名称的 String,或者你可以从默认目标拉取一条消息。
为了看到这些方法的实际效果,你将编写一些代码,从 tacocloud.order.queue 目标拉取一个 TacoOrder。下面代码中的 OrderReceiver 使用 JmsTemplate.receive() 接收订单数据的服务组件。
Java
@Component
public class JmsOrderReceiver implements OrderReceiver {
private final JmsTemplate jms;
private final MessageConverter converter;
@Autowired
public JmsOrderReceiver(JmsTemplate jms, MessageConverter converter) {
this.jms = jms;
this.converter = converter;
}
@Override
public TacoOrder receiveOrder() {
Message message = jms.receive("tacocloud.order.queue");
return (TacoOrder) converter.fromMessage(message);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
你在这里使用了一个 String 来指定从哪个目标拉取订单。receive() 方法返回一个未转换的 Message。但你真正需要的是 Message 内部的 TacoOrder,所以接下来你使用一个注入的消息转换器来转换消息。消息中的类型 ID 属性将指导转换器将其转换为 TacoOrder,但它被返回为一个需要在返回之前进行类型转换的 Object。
在某些需要检查消息的属性和头部的情况下,接收一个原始的 Message 对象可能会有用。但通常你只需要有效载荷。将有效载荷转换为领域类型是一个两步过程,需要将消息转换器注入到组件中。当你只关心消息的有效载荷时,receiveAndConvert() 要简单得多。下面的代码显示了如何重新设计 JmsOrderReceiver,以便使用 receiveAndConvert() 而不是 receive()。
Java
@Component
public class JmsOrderReceiver implements OrderReceiver {
private final JmsTemplate jms;
@Autowired
public JmsOrderReceiver(JmsTemplate jms) {
this.jms = jms;
}
@Override
public TacoOrder receiveOrder() {
return (TacoOrder) jms.receiveAndConvert("tacocloud.order.queue");
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
这个新版本的 JmsOrderReceiver 的 receiveOrder() 方法已经被简化为只有一行。此外,你不再需要注入一个 MessageConverter,因为所有的消息转换都将在 receiveAndConvert() 的背后完成。
在继续之前,让我们考虑一下 receiveOrder() 可能如何在 Taco Cloud 厨房应用中使用。Taco Cloud 厨房的一位食品准备人员可能会按下一个按钮或采取某些行动来表示他们准备开始制作 tacos。在那个时候,将调用 receiveOrder(),并且对 receive() 或 receiveAndConvert() 的调用将被阻塞。直到有一个订单消息准备好,否则什么也不会发生。一旦订单到达,它将从 receiveOrder() 返回,并且其信息将被用于显示订单的详细信息,以便食品准备人员开始工作。这似乎是拉取模型的一个自然选择。
现在让我们看看通过声明一个 JMS 监听器,推送模型是如何工作的。
1.3.2. 声明消息监听器
与拉取模型不同,拉取模型需要明确调用 receive() 或 receiveAndConvert() 来接收消息,而消息监听器是一个被动的组件,它会处于空闲状态,直到消息到达。
要创建一个对 JMS 消息做出反应的消息监听器,你只需在组件的一个方法上使用 @JmsListener 注解。下面的代码显示了一个新的 OrderListener 组件,它被动地监听消息,而不是主动请求它们。
Java
@Profile("jms-listener")
@Component
public class OrderListener {
private final KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@JmsListener(destination = "tacocloud.order.queue")
public void receiveOrder(TacoOrder order) {
ui.displayOrder(order);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
receiveOrder() 方法使用 @JmsListener 注解来 “监听” tacocloud.order.queue 目标上的消息。它不处理 JmsTemplate,也不会被你的应用程序代码显式调用。相反,Spring 框架中的代码会等待消息到达指定的目标,当它们到达时,receiveOrder() 方法会自动被调用,消息的 TacoOrder 有效载荷作为参数。
在很多方面,@JmsListener 注解就像 Spring MVC 的请求映射注解,比如 @GetMapping 或 @PostMapping。在 Spring MVC 中,用请求映射方法注解的方法会对指定路径的请求做出反应。类似地,用 @JmsListener 注解的方法会对到达目标的消息做出反应。
消息监听器通常被誉为最好的选择,因为它们不会阻塞并且能够快速处理多个消息。然而,在 Taco Cloud 应用的上下文中,它们可能并不是最好的选择。食品准备人员是系统中的一个重要瓶颈,他们可能无法像订单进来那样快速地准备 tacos。当新的订单显示在屏幕上时,食品准备人员可能已经完成了一半的订单。厨房用户界面需要在订单到达时对它们进行缓冲,以避免厨房工作人员负担过重。
这并不是说消息监听器是坏的。相反,当消息可以被快速处理时,它们是完美的选择。但是,当消息处理器需要能够按照他们自己的时间来请求更多的消息时,JmsTemplate 提供的拉取模型似乎更合适。
因为 JMS 是由标准的 Java 规范定义的,并且得到了许多消息代理实现的支持,所以它是 Java 中消息传递的常见选择。但是 JMS 有一些缺点,其中最大的一个是,作为 Java 规范,它的使用仅限于 Java 应用。像 RabbitMQ 和 Kafka 这样的新的消息选项解决了这些缺点,并且可以用于 JVM 之外的其他语言和平台。让我们把 JMS 放在一边,看看你如何用 RabbitMQ 实现你的 taco 订单消息传递。
2. 使用 RabbitMQ 和 AMQP
作为 AMQP 最突出的实现之一,RabbitMQ 提供了比 JMS 更先进的消息路由策略。JMS 消息是用接收者将从中检索它们的目的地的名称来寻址的,而 AMQP 消息是用交换机和路由键的名称来寻址的,这些都与接收者正在监听的队列解耦。交换机和队列之间的这种关系在图 2.1 中有所说明。

当消息到达 RabbitMQ 代理时,它会被发送到它所寻址的交换机。交换机负责将其路由到一个或多个队列,这取决于交换机的类型、交换机和队列之间的绑定,以及消息的路由键的值。
有几种不同类型的交换机,包括以下几种:
- 默认:这是由代理自动创建的特殊交换机。它将消息路由到名称与消息的路由键相同的队列。所有队列将自动绑定到默认交换机;
- 直接:将消息路由到绑定键与消息的路由键相同的队列;
- 主题:将消息路由到一个或多个绑定键(可能包含通配符)与消息的路由键匹配的队列;
- 扇出:将消息路由到所有绑定的队列,而不考虑绑定键或路由键;
- 头部:类似于主题交换机,但路由是基于消息头部值而不是路由键;
- 死信:对于任何无法投递的消息(即它们不匹配任何定义的交换机到队列的绑定)的接收器;
最简单的交换机形式是默认和扇出,这些大致对应于 JMS 队列和主题。但是其他交换机允许你定义更灵活的路由方案。
最重要的一点是,消息是带有路由键发送到交换机的,它们从队列中被消费。它们如何从交换机到达队列取决于绑定定义和最适合你的用例。
你使用哪种交换机类型以及如何定义从交换机到队列的绑定对于在你的 Spring 应用程序中如何发送和接收消息影响不大。
因此,我们将重点关注如何编写发送和接收 Rabbit 消息的代码。
Note:关于如何最好地将队列绑定到交换机的更详细的讨论,请参阅 Gavin Roy 的《RabbitMQ in Depth》(Manning,2017)或 Alvaro Videla 和 Jason J. W. Williams 的《RabbitMQ in Action》(Manning,2012)。
2.1. 添加 RabbitMQ 到 Spring 中
在你开始使用 Spring 发送和接收 RabbitMQ 消息之前,你需要在你的构建中添加 Spring Boot 的 AMQP 启动器依赖,以替换你在上一节中添加的 Artemis 或 ActiveMQ 启动器,如下所示:
XML
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>1
2
3
4
2
3
4
将 AMQP 启动器添加到你的构建中将触发自动配置,这将创建 AMQP 连接工厂和 RabbitTemplate beans,以及其他支持组件。只需添加此依赖项,你就可以开始使用 Spring 从 RabbitMQ 代理发送和接收消息。但是,你可能会想了解一些有用的属性,这些属性列在表 2.1 中。
出于开发目的,你可能有一个不需要身份验证的 RabbitMQ 代理在你的本地机器上运行,监听 5672 端口。这些属性在你还在开发阶段时可能不会被大量使用,但是当你的应用程序进入生产环境时,它们无疑会变得非常有用。
| 属性 | 描述 |
|---|---|
spring.rabbitmq.addresses | 一个逗号分隔的 RabbitMQ Broker 地址列表 |
spring.rabbitmq.host | Broker 主机(默认为 localhost) |
spring.rabbitmq.port | Broker 端口(默认为 5672) |
spring.rabbitmq.username | 访问 Broker 的用户名(可选) |
spring.rabbitmq.password | 访问 Broker 的密码(可选) |
Note:如果你还没有 RabbitMQ 代理可以使用,请查看 RabbitMQ 的官方文档以获取运行 RabbitMQ 的最新指南。你可以直接搜索 “RabbitMQ 官方文档” 来找到相关信息。
例如,假设你正在进入生产阶段,你的 RabbitMQ 代理位于一个名为 rabbit.tacocloud.com 的服务器上,监听 5673 端口,并需要凭证。在这种情况下,当 prod 配置文件处于活动状态时,你的 application.yml 文件中的以下配置将设置这些属性:
YAML
spring:
profiles: prod
rabbitmq:
host: rabbit.tacocloud.com
port: 5673
username: tacoweb
password: l3tm31n1
2
3
4
5
6
7
2
3
4
5
6
7
现在 RabbitMQ 已经在你的应用中配置好了,是时候开始使用 RabbitTemplate 发送消息了。
2.2. 使用 RabbitTemplate 发送消息
Spring 对 RabbitMQ 消息的支持的核心是 RabbitTemplate。RabbitTemplate 类似于 JmsTemplate,并提供了一组类似的方法。然而,你会看到,一些微妙的差异与 RabbitMQ 的独特工作方式相符。
关于使用 RabbitTemplate 发送消息,send() 和 convertAndSend() 方法与 JmsTemplate 的同名方法相对应。但是,与只将消息路由到给定队列或主题的 JmsTemplate 方法不同,RabbitTemplate 方法根据交换机和路由键发送消息。以下是使用 RabbitTemplate 发送消息的一些最相关的方法:
Java
// Send raw messages
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
// Send messages converted from objects
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
// Send messages converted from objects with post-processing
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
如你所见,这些方法遵循了与 JmsTemplate 中的对应方法类似的模式。前三个 send() 方法都发送一个原始的 Message 对象。接下来的三个 convertAndSend() 方法接受一个对象,该对象将在发送之前在后台被转换为 Message。最后的三个 convertAndSend() 方法与前面的三个方法类似,但它们接受一个 MessagePostProcessor,可以在消息发送到 broker 之前用来操作 Message 对象。
这些方法与它们的 JmsTemplate 对应方法的不同之处在于,它们接受 String 值来指定交换机和路由键,而不是目标名称(或 Destination 对象)。不指定交换机的方法将把它们的消息发送到默认交换机。同样,不接受路由键的方法将使用默认路由键路由它们的消息。
我们可以使用 RabbitTemplate 的 send() 方法来发送 taco 订单,但在你调用 send() 之前,你需要先将一个 TacoOrder 对象转换为 Message,而这可以通过 getMessageConverter() 方法来实现:
Java
@Service
public class RabbitOrderMessagingService
implements OrderMessagingService {
private final RabbitTemplate rabbit;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
@Override
public void sendOrder(TacoOrder order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
你会注意到,RabbitOrderMessagingService 实现了 OrderMessagingService,就像 JmsOrderMessagingService 一样。这意味着它可以以相同的方式注入到 OrderApiController 中,以在下订单时发送订单消息。然而,由于我们还没有任何东西来接收这些消息,你可以使用 RabbitMQ 的基于浏览器的管理控制台。请参阅 https://www.rabbitmq.com/management.html 了解如何启用和使用 RabbitMQ 控制台的详细信息。
一旦你有了一个 MessageConverter,将 TacoOrder 转换为 Message 就是一项简单的工作。你必须用 MessageProperties 提供所有消息属性,但是如果你不需要设置任何这样的属性,用 MessageProperties 的默认实例就可以了。然后,剩下的就是调用 send(),传入交换机和路由键(这两者都是可选的)以及消息。在这个例子中,你只指定了路由键 —— tacocloud.order 和消息,所以将使用默认的交换机。
说到默认交换机,默认的交换机名称是 ""(一个空字符串),它对应于 RabbitMQ broker 自动创建的默认交换机。同样,默认的路由键是 ""(其路由取决于所涉及的交换机和绑定)。你可以通过设置 spring.rabbitmq.template.exchange 和 spring.rabbitmq.template.routing-key 属性来覆盖这些默认值,如下所示:
YAML
spring:
rabbitmq:
template:
exchange: tacocloud.order
routing-key: kitchens.central1
2
3
4
5
2
3
4
5
在这种情况下,所有未指定交换机的发送的消息将自动发送到名为 tacocloud.order 的交换机。如果在调用 send() 或 convertAndSend() 时也未指定路由键,那么消息将具有 kitchens.central 的路由键。
从消息转换器创建一个 Message 对象已经足够简单了,但使用 convertAndSend() 让 RabbitTemplate 为你处理所有的转换工作会更简单,如下所示:
Java
public void sendOrder(TacoOrder order) {
rabbit.convertAndSend("tacocloud.order", order);
}1
2
3
2
3
2.2.1. 配置消息转换器
默认情况下,消息转换是由 SimpleMessageConverter 执行的,它能够将简单类型(如 String 和 Serializable 对象)转换为 Message 对象。但是,Spring 为 RabbitTemplate 提供了几个消息转换器,包括以下内容:
| 消息转换器 | 功能 |
|---|---|
Jackson2JsonMessageConverter | 使用 Jackson 2 JSON 处理器将对象转换为 JSON 并从 JSON 转换回来 |
MarshallingMessageConverter | 使用 Spring 的 Marshaller 和 Unmarshaller 进行转换 |
SerializerMessageConverter | 使用 Spring 的 Serializer 和 Deserializer 抽象来转换 String 和任何类型的原生对象 |
SimpleMessageConverter | 转换 String、字节数组和 Serializable 类型 |
ContentTypeDelegatingMessageConverter | 根据 contentType 头部信息将消息转换委托给另一个 MessageConverter |
MessagingMessageConverter | 将消息转换委托给底层的 MessageConverter,并将头部信息转换委托给 AmqpHeaderConverter |
RabbitTemplate 提供了几个消息转换器如果你需要更改消息转换器,只需配置一个 MessageConverter 类型的 bean。例如,对于基于 JSON 的消息转换,你可以像这样配置一个 Jackson2JsonMessageConverter:
Java
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}1
2
3
4
2
3
4
Spring Boot 自动配置将会发现这个 bean,并将其注入到 RabbitTemplate 中,以替代默认的消息转换器。
2.2.2. 设置消息属性
就像使用 JMS 一样,你可能需要在你发送的消息中设置一些头部信息。例如,假设你需要为通过 Taco Cloud 网站提交的所有订单发送一个 X_ORDER_SOURCE。当你创建自己的 Message 对象时,你可以通过你给消息转换器的 MessageProperties 实例来设置头部信息。如下所示:
Java
public void sendOrder(TacoOrder order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}1
2
3
4
5
6
7
2
3
4
5
6
7
然而,当使用 convertAndSend() 时,你不能快速访问 MessageProperties 对象。不过,MessagePostProcessor 可以帮助你解决这个问题,如下所示:
Java
public void sendOrder(TacoOrder order) {
rabbit.convertAndSend("tacocloud.order", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)
throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
在这里,你为 convertAndSend() 提供了一个 MessagePostProcessor 的匿名内部类实现。在 postProcessMessage() 方法中,你从 Message 中提取出 MessageProperties,然后调用 setHeader() 来设置 X_ORDER_SOURCE 头部。
现在你已经看到了如何使用 RabbitTemplate 发送消息,让我们将注意力转向从 RabbitMQ 队列接收消息的代码。
2.3. 从 RabbitMQ 接收消息
你已经看到,使用 RabbitTemplate 发送消息与使用 JmsTemplate 发送消息没有太大的区别。事实证明,从 RabbitMQ 队列接收消息与从 JMS 接收消息也没有太大的区别。
与 JMS 一样,你有以下两个选择:
- 使用
RabbitTemplate从队列中拉取消息; - 让消息推送到一个带有
@RabbitListener注解的方法;
让我们开始看一下基于拉取的 RabbitTemplate.receive() 方法。
2.3.1. 使用 RabbitTemplate 接收消息
RabbitTemplate 提供了几种从队列中拉取消息的方法。以下是一些最有用的方法:
Java
// Receive messages
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// Receive objects converted from messages
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
// Receive type-safe objects converted from messages
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
这些方法是前面描述的 send() 和 convertAndSend() 方法的镜像。send() 用于发送原始的 Message 对象,receive() 从队列中接收原始的 Message 对象。同样,receiveAndConvert() 接收消息,并使用消息转换器将它们转换为领域对象,然后再返回。
但是,在方法签名中有一些明显的差异。首先,这些方法都不接受交换或路由键作为参数。这是因为交换和路由键用于将消息路由到队列,但一旦消息在队列中,它们的下一个目的地就是从队列中拉取它们的消费者。消费应用程序不需要关心交换或路由键。队列是消费应用程序需要知道的唯一事情。
你还会注意到,许多方法接受一个 long 参数来指示接收消息的超时时间。默认情况下,接收超时时间为 0 毫秒。也就是说,调用 receive() 将立即返回,如果没有消息可用,可能会返回一个 null 值。这与 JmsTemplate 中的 receive() 方法的行为有明显的区别。通过传入一个超时值,你可以让 receive() 和 receiveAndConvert() 方法阻塞,直到消息到达或超时时间到期。但是,即使有非零的超时时间,你的代码也需要准备好处理 null 返回。
让我们看看你如何将这个付诸实践。下一个清单显示了一个新的基于 Rabbit 的 OrderReceiver 实现,它使用 RabbitTemplate 来接收订单。
Java
@Component
public class RabbitOrderReceiverService implements OrderReceiver {
private final RabbitTemplate rabbit;
private final MessageConverter converter;
@Autowired
public RabbitOrderReceiverService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}
@Override
public TacoOrder receiveOrder() {
Message message = rabbit.receive("tacocloud.order");
return message != null
? (TacoOrder) converter.fromMessage(message)
: null;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
receiveOrder() 方法是所有操作发生的地方。它调用注入的 RabbitTemplate 上的 receive() 方法,从 tacocloud.order 队列中拉取一个订单。它不提供超时值,所以你只能假设该调用立即返回一个 Message 或 null。如果返回了一个 Message,你使用 RabbitTemplate 的 MessageConverter 将 Message 转换为 TacoOrder。另一方面,如果 receive() 返回 null,你将返回 null。
根据使用情况,你可能能够容忍小延迟。例如,在 Taco Cloud 厨房的顶部显示器中,如果没有订单可用,你可能可以等待一段时间。假设你决定等待最多 30 秒钟再放弃。那么 receiveOrder() 方法可以更改为向 receive() 传递 30,000 毫秒的延迟,如下所示:
Java
public TacoOrder receiveOrder() {
Message message = rabbit.receive("tacocloud.order", 30000);
return message != null
? (TacoOrder) converter.fromMessage(message)
: null;
}1
2
3
4
5
6
2
3
4
5
6
如果你想通过配置来设置超时时间,只需在你的配置中使用 spring.rabbitmq.template.receive-timeout 属性来设置,如下所示:
YAML
spring:
rabbitmq:
template:
receive-timeout: 300001
2
3
4
2
3
4
在 receiveOrder() 方法中,你可能注意到你必须使用 RabbitTemplate 的消息转换器将传入的 Message 对象转换为 TacoOrder 对象。但是,如果 RabbitTemplate 已经有了一个消息转换器,为什么它不能为你做这个转换呢?这正是 receiveAndConvert() 方法的用途。使用 receiveAndConvert(),你可以像下面这样重写 receiveOrder() 方法:
Java
public TacoOrder receiveOrder() {
return (TacoOrder) rabbit.receiveAndConvert("tacocloud.order");
}1
2
3
2
3
这简单多了,不是吗?我唯一觉得麻烦的是从 Object 到 TacoOrder 的类型转换。不过,有替代强制类型转换的方法。你可以将 ParameterizedTypeReference 传递给 receiveAndConvert() 方法,直接接收一个 TacoOrder 对象,如下所示:
Java
public TacoOrder receiveOrder() {
return rabbit.receiveAndConvert("tacocloud.order",
new ParameterizedTypeReference<TacoOrder>() {
});
}1
2
3
4
5
2
3
4
5
关于这是否比类型转换更好,这是有争议的,但这比类型转换是一种更安全的类型方法。使用 ParameterizedTypeReference 和 receiveAndConvert() 的唯一要求是,消息转换器必须是 SmartMessageConverter 的实现(Jackson2JsonMessageConverter 是目前其唯一可以选择的开箱即用的实现)。
RabbitTemplate 提供的拉取模型适用于许多用例,但通常更好的做法是有一个监听消息的代码,当消息到达时被调用。让我们看看你如何编写响应 RabbitMQ 消息的驱动消息的 bean。
2.3.2. 使用监听器处理 RabbitMQ 消息
对于消息驱动的 RabbitMQ beans,Spring 提供了 RabbitListener,它是 JmsListener 的 RabbitMQ 对应物。要指定当消息到达 RabbitMQ 队列时应调用的方法,可以使用 @RabbitListener 注解 bean 的方法。例如,下面的代码显示了一个 RabbitMQ 实现的 OrderListener,它被注解为监听订单消息,而不是使用 RabbitTemplate 轮询它们。
Java
@Component
public class OrderListener {
private final KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@RabbitListener(queues = "tacocloud.order")
public void receiveOrder(TacoOrder order) {
ui.displayOrder(order);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
你无疑会注意到,这看起来非常像声明消息监听器一节中的代码。事实上,唯一改变的就是监听器注解——从 @JmsListener 变为 @RabbitListener。
3. 使用 Kafka 发送消息
乍一看,Kafka 就像 ActiveMQ、Artemis 或 Rabbit 一样是一个消息代理。但 Kafka 有一些独特的技巧。
Kafka 设计为在集群中运行,提供了极好的可扩展性。通过在集群的所有实例之间划分主题,它非常具有弹性。而 RabbitMQ 主要处理交换中的队列,Kafka 只利用主题来提供发布/订阅消息。
Kafka 的主题在集群的所有代理中都有复制。集群中的每个节点都作为一个或多个主题的领导者,负责该主题的数据,并将其复制到集群中的其他节点。
更进一步,每个主题可以被划分为多个分区。在这种情况下,集群中的每个节点是一个主题的一个或多个分区的领导者,但不是整个主题的领导者。主题的责任在所有节点之间分配。图 3.1 说明了这是如何工作的:

由于 Kafka 的独特架构,我鼓励你阅读 Dylan Scott、Viktor Gamov 和 Dave Klein 的《Kafka 实战》(Manning,2021)以了解更多关于它的信息。对于我们的目的,我们将关注如何使用 Spring 向 Kafka 发送消息并从 Kafka 接收消息。
3.1. 在 Spring 中设置 Kafka
要开始使用 Kafka 进行消息传递,你需要在你的构建中添加适当的依赖项。然而,与 JMS 和 RabbitMQ 选项不同,Spring Boot 并没有 Kafka 的启动器。但是,别担心,你只需要一个依赖项,如下所示:
XML
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>1
2
3
4
2
3
4
以上就是你项目中所有所需要的 kafka 依赖项。更重要的是,它的存在将触发 Spring Boot 为 Kafka 的自动配置,这将安排 Spring 应用程序上下文中的 KafkaTemplate 等等。你需要做的就是注入 KafkaTemplate 并开始发送和接收消息。
然而,在你开始发送和接收消息之前,你应该了解一些在使用 Kafka 时会派上用场的属性。具体来说,KafkaTemplate 默认与在 localhost 上运行的 Kafka 代理一起工作,监听 9092 端口。在开发应用程序时启动一个本地的 Kafka 代理是可以的,但是当需要进入生产环境时,你需要配置一个不同的主机和端口。
Note:如果你想运行本中介绍的示例,请参照 Kafka 官方文档搭建一个可用的 Kafka 集群。
spring.kafka.bootstrap-servers 属性设置了一个或多个用于建立初始连接到 Kafka 集群的 Kafka 服务器的位置。例如,如果集群中的一个 Kafka 服务器在 kafka.tacocloud.com 上运行并监听 9092 端口,你可以像这样在 YAML 中配置它的位置:
YAML
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:90921
2
3
4
2
3
4
但是请注意,spring.kafka.bootstrap-servers 是复数,它接受一个列表。因此,你可以向它提供集群中的多个 Kafka 服务器,如下所示:
YAML
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:90941
2
3
4
5
6
2
3
4
5
6
如果你在本地运行你的 Kafka 集群,那么你会想要使用 localhost,如下所示:
YAML
spring:
kafka:
bootstrap-servers:
- localhost:90921
2
3
4
2
3
4
在你的项目中设置 Kafka 后,你就可以开始发送和接收消息了。你将开始使用 KafkaTemplate 向 Kafka 发送 TacoOrder 对象。
3.2. 使用 KafkaTemplate 发送消息
在许多方面,KafkaTemplate 与其 JMS 和 RabbitMQ 的对应项相似。然而,同时,它也非常不同。例如它发送消息的方法:
Java
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
你可能首先注意到的是,没有 convertAndSend() 方法。这是因为 KafkaTemplate 是泛型类型的,可以在发送消息时直接处理领域类型。在某种程度上,所有的 send() 方法都在执行 convertAndSend() 的工作。
你可能也注意到,send() 和 sendDefault() 的几个参数与你在 JMS 和 Rabbit 中使用的参数有很大的不同。在 Kafka 中发送消息时,你可以指定以下参数来指导如何发送消息:
- 发送消息的主题(
send()所需) - 写入主题的分区(可选)
- 在记录上发送的键(可选)
- 时间戳(可选;默认为
System.currentTimeMillis()) - 有效载荷(必需)
主题和有效载荷是两个最重要的参数。分区和键对你如何使用 KafkaTemplate 的影响不大,除了作为 send() 和 sendDefault() 的参数提供额外的信息。对于我们的目的,我们将专注于向给定主题发送消息有效载荷,而不会担心分区和键。
对于 send() 方法,你还可以选择发送 ProducerRecord,它只不过是一个类型,可以在一个对象中捕获所有前面的参数。你也可以发送 Message 对象,但这样做需要你将你的领域对象转换为 Message。通常,使用其他方法比创建和发送 ProducerRecord 或 Message 对象更容易。
使用 KafkaTemplate 及其 send() 方法,你可以编写基于 Kafka 的 OrderMessagingService 实现,例如:
Java
@Service
public class KafkaOrderMessagingService
implements OrderMessagingService {
private final KafkaTemplate<String, TacoOrder> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(
KafkaTemplate<String, TacoOrder> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(TacoOrder order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
在这个新的 OrderMessagingService 实现中,sendOrder() 方法使用注入的 KafkaTemplate 的 send() 方法将 TacoOrder 发送到名为 tacocloud.orders.topic 的主题。除了代码中散布的 “Kafka” 一词外,这与你为 JMS 和 Rabbit 编写的代码没有太大的不同。
就像 OrderMessagingService 的其他实现一样,它可以注入到 OrderApiController 中,并在通过 /api/orders 端点下订单时通过 Kafka 发送订单。
在我们创建消息接收器的 Kafka 实现之前,你需要一个控制台来查看发送的内容。有几个可用于 Kafka 的管理控制台,包括 Offset Explorer 和 Confluent 的 Apache Kafka UI。
如果你设置了一个默认主题,你可以稍微简化 sendOrder() 方法。首先,通过设置 spring.kafka.template.default-topic 属性,将你的默认主题设置为 tacocloud.orders.topic,如下所示:
YAML
spring:
kafka:
bootstrap-servers:
- localhost:9092
template:
default-topic: tacocloud.orders.topic1
2
3
4
5
6
2
3
4
5
6
然后,在 sendOrder() 方法中,你可以调用 sendDefault() 而不是 send(),并且不需要指定主题名称,如下所示:
Java
@Override
public void sendOrder(TacoOrder order) {
kafkaTemplate.sendDefault(order);
}1
2
3
4
2
3
4
好的,现在你的消息发送代码已经编写完毕,让我们把注意力转向编写将从 Kafka 接收这些消息的代码。
3.3. 编写 Kafka 监听器
除了 send() 和 sendDefault() 的独特方法签名外,KafkaTemplate 与 JmsTemplate 和 RabbitTemplate 的不同之处在于,它没有提供接收消息的任何方法。这意味着使用 Spring 从 Kafka 主题消费消息的唯一方式是编写一个消息监听器。
对于 Kafka,消息监听器被定义为用 @KafkaListener 注解的方法。@KafkaListener 注解大致类似于 @JmsListener 和 @RabbitListener,并且使用方式大致相同:
Java
@Component
public class OrderListener {
private final KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics = "tacocloud.orders.topic")
public void receiveOrder(TacoOrder order) {
ui.displayOrder(order);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
receiveOrder() 方法被 @KafkaListener 注解,表示当一个消息到达名为 tacocloud.orders.topic 的主题时,它应该被调用。如上述代码,只有 TacoOrder(负载)被给予 receiveOrder()。但是,如果你需要从消息中获取额外的元数据,它也可以接受 ConsumerRecord 或 Message 对象。
例如,下面的 receiveOrder() 实现接受一个 ConsumerRecord,这样你就可以记录消息的分区和时间戳:
Java
@KafkaListener(topics = "tacocloud.orders.topic")
public void receiveOrder(
TacoOrder order, ConsumerRecord<String, TacoOrder> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}1
2
3
4
5
6
7
2
3
4
5
6
7
同样,你可以使用 Message 而不是 ConsumerRecord,并达到相同的效果,如下所示:
Java
@KafkaListener(topics = "tacocloud.orders.topic")
public void receiveOrder(TacoOrder order, Message<TacoOrder> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
值得注意的是,消息负载也可以通过 ConsumerRecord.value() 或 Message.getPayload() 获取。这意味着你可以通过这些对象来请求 TacoOrder,而不是直接作为 receiveOrder() 的参数来请求。
4. 总结
异步消息在通信应用程序之间提供了一个间接层,这允许更松散的耦合和更大的可扩展性;
Spring 支持使用 JMS、RabbitMQ 或 Apache Kafka 进行异步消息传递;
应用程序可以使用基于模板的客户端(
JmsTemplate、RabbitTemplate或KafkaTemplate)通过消息代理发送消息;接收应用程序可以使用相同的基于模板的客户端以拉取模型消费消息;
通过将消息监听器注解(
@JmsListener、@RabbitListener或@KafkaListener)应用到 bean 方法,消息也可以被推送到消费者;