第三部分“Spring AMQP客户端与路由模型实战

2026-04-04
3
-
- 分钟
|

一、 Spring AMQP 快速入门

Spring AMQP 是基于 AMQP(高级消息队列协议)标准的 Java 实现。AMQP 是一种跨语言的消息通信协议,而 Spring 提供了对其的封装和起步依赖 spring-boot-starter-amqp,让消息收发变得极其简单。

  • 配置连接:无需编写连接工厂代码,直接在 application.yml 中配置 MQ 的 hostport (通信端口5672)、virtual-hostusernamepassword

  • 发送消息:自动注入 RabbitTemplate,调用其 convertAndSend(队列名或交换机名, 消息内容) 方法即可发送消息。

  • 接收消息:在消费者的方法上添加 @RabbitListener(queues = "队列名") 注解,Spring 会自动监听队列并将消息传递给方法参数。

二、 Work 队列模型 (工作队列)

核心场景:当消息发送速度太快,超过单个消费者的处理能力时,会导致消息在 MQ 内存中堆积,甚至引发阻塞。

  • 工作原理:让多个消费者绑定到同一个队列,共同消费队列中的消息,从而加快处理速度。需要注意的是,一条消息只能被一个消费者处理一次。

  • 能者多劳优化:默认情况下,队列会将消息**轮询(一人一个)**分发给所有消费者,不考虑消费者的实际处理性能。为了充分利用高性能机器,需要在消费者配置中添加 prefetch=1(预取数),让消费者每次只取1条消息,处理完后再取下一条,实现真正的“能者多劳”。

三、 交换机与路由模型实战

在实际业务中,一条消息往往需要被多个不同的微服务同时处理(例如支付成功后,订单服务和通知服务都需要收到消息),此时必须引入交换机(Exchange)交换机只负责路由和转发消息,本身不能存储消息

  • Fanout Exchange(广播交换机)

    • 将接收到的消息广播给所有与它绑定的队列。

  • Direct Exchange(定向交换机)

    • 允许根据规则将消息路由到指定的队列。

    • 队列与交换机绑定时,需要设置一个或多个 BindingKey(暗号)。

    • 发送消息时必须携带 RoutingKey,只有两者的 Key 完全匹配,消息才会被路由过去。

  • Topic Exchange(主题交换机)

    • 与 Direct 类似,但更加灵活,强烈推荐使用

    • 它的 RoutingKey 必须是多个单词,以 . 分割(如 china.news)。

    • 绑定时可以使用通配符:# 代表0个或多个单词,* 代表恰好1个单词。例如 #.news 可以匹配 china.newsjapan.news

四、 基于代码声明队列与交换机

在企业开发和线上环境中,绝对不能靠人工在控制台手动点击创建队列和交换机,因为极易出错且效率低下,必须通过 Java 代码自动声明。

  • 方式一:基于 Bean 声明

    • 通过 @Configuration 类,使用 QueueBuilderExchangeBuilderBindingBuilder 分别注册 QueueExchangeBinding 的 Bean。

    • 缺点:当需要绑定多个 RoutingKey 时,需要编写大量的 Bean 代码,非常繁琐。

  • 方式二:基于注解声明(推荐)

    • 直接在消费者的 @RabbitListener 注解中,嵌套使用 @QueueBinding@Queue@Exchange 注解。

    • 在一个注解内部就能同时完成队列、交换机的声明以及路由 Key 的绑定,代码极为简洁直观。

五、 消息转换器 (MessageConverter)

  • 默认缺陷:Spring AMQP 默认使用 JDK 的对象序列化机制。这会导致转换后的消息体积庞大、可读性极差(变成一堆乱码),并且存在安全漏洞。

  • 解决方案:采用 JSON 格式进行序列化。

    1. 在项目中引入 jackson-databind 依赖。

    2. 在配置类中定义一个类型为 MessageConverter 的 Bean,返回 Jackson2JsonMessageConverter 对象。

    3. 配置完成后,发送对象类型(如 Map、User)时,会自动转为 JSON 字符串;接收时也会自动将 JSON 反序列化为 Java 对象,极大地提升了传输效率和可读性。

评论交流

文章目录