一、 Spring AMQP 快速入门
Spring AMQP 是基于 AMQP(高级消息队列协议)标准的 Java 实现。AMQP 是一种跨语言的消息通信协议,而 Spring 提供了对其的封装和起步依赖 spring-boot-starter-amqp,让消息收发变得极其简单。
配置连接:无需编写连接工厂代码,直接在
application.yml中配置 MQ 的host、port(通信端口5672)、virtual-host、username和password。发送消息:自动注入
RabbitTemplate,调用其convertAndSend(队列名或交换机名, 消息内容)方法即可发送消息。接收消息:在消费者的方法上添加
@RabbitListener(queues = "队列名")注解,Spring 会自动监听队列并将消息传递给方法参数。
二、 Work 队列模型 (工作队列)
核心场景:当消息发送速度太快,超过单个消费者的处理能力时,会导致消息在 MQ 内存中堆积,甚至引发阻塞。
工作原理:让多个消费者绑定到同一个队列,共同消费队列中的消息,从而加快处理速度。需要注意的是,一条消息只能被一个消费者处理一次。
能者多劳优化:默认情况下,队列会将消息**轮询(一人一个)**分发给所有消费者,不考虑消费者的实际处理性能。为了充分利用高性能机器,需要在消费者配置中添加
prefetch=1(预取数),让消费者每次只取1条消息,处理完后再取下一条,实现真正的“能者多劳”。
三、 交换机与路由模型实战
在实际业务中,一条消息往往需要被多个不同的微服务同时处理(例如支付成功后,订单服务和通知服务都需要收到消息),此时必须引入交换机(Exchange)。交换机只负责路由和转发消息,本身不能存储消息。
Fanout Exchange(广播交换机):
将接收到的消息广播给所有与它绑定的队列。
Direct Exchange(定向交换机):
允许根据规则将消息路由到指定的队列。
队列与交换机绑定时,需要设置一个或多个
BindingKey(暗号)。发送消息时必须携带
RoutingKey,只有两者的 Key 完全匹配,消息才会被路由过去。
Topic Exchange(主题交换机):
与 Direct 类似,但更加灵活,强烈推荐使用。
它的
RoutingKey必须是多个单词,以.分割(如china.news)。绑定时可以使用通配符:
#代表0个或多个单词,*代表恰好1个单词。例如#.news可以匹配china.news或japan.news。
四、 基于代码声明队列与交换机
在企业开发和线上环境中,绝对不能靠人工在控制台手动点击创建队列和交换机,因为极易出错且效率低下,必须通过 Java 代码自动声明。
方式一:基于 Bean 声明
通过
@Configuration类,使用QueueBuilder、ExchangeBuilder和BindingBuilder分别注册Queue、Exchange和Binding的 Bean。缺点:当需要绑定多个 RoutingKey 时,需要编写大量的 Bean 代码,非常繁琐。
方式二:基于注解声明(推荐)
直接在消费者的
@RabbitListener注解中,嵌套使用@QueueBinding、@Queue和@Exchange注解。在一个注解内部就能同时完成队列、交换机的声明以及路由 Key 的绑定,代码极为简洁直观。
五、 消息转换器 (MessageConverter)
默认缺陷:Spring AMQP 默认使用 JDK 的对象序列化机制。这会导致转换后的消息体积庞大、可读性极差(变成一堆乱码),并且存在安全漏洞。
解决方案:采用 JSON 格式进行序列化。
在项目中引入
jackson-databind依赖。在配置类中定义一个类型为
MessageConverter的 Bean,返回Jackson2JsonMessageConverter对象。配置完成后,发送对象类型(如 Map、User)时,会自动转为 JSON 字符串;接收时也会自动将 JSON 反序列化为 Java 对象,极大地提升了传输效率和可读性。