Spring Boot 3 整合 RabbitMQ 超详细教程
一、RabbitMQ 核心概念与作用
在整合前,先明确 RabbitMQ 的核心组件与价值,为后续实战打下基础:
1. 核心组件
Producer(生产者):发送消息的应用服务(如 Spring Boot 接口)。Consumer(消费者):接收并处理消息的应用服务。Broker:RabbitMQ 服务器实例,负责存储、路由消息。Exchange(交换机):接收生产者消息,根据路由规则转发到队列,核心类型有 4 种:-
Direct Exchange(直连交换机):精确匹配 Routing Key 与队列绑定的 Binding Key,适用于一对一通信(如订单状态通知)。
-
Topic Exchange(主题交换机):通过通配符(* 匹配单个词,# 匹配多个词)模糊匹配 Routing Key,适用于一对多通信(如用户通知:user.# 可匹配 user.login、user.register)。
-
Fanout Exchange(扇形交换机):无视 Routing Key,将消息广播到所有绑定的队列,适用于广播场景(如系统公告)。
-
Headers Exchange(头交换机):通过消息头(而非 Routing Key)匹配队列,较少用于常规业务。
Queue(队列):存储消息的缓冲区,消息在此等待消费者处理,支持持久化、限流等配置。Binding(绑定):建立交换机与队列的关联,同时指定 Routing Key(或匹配规则)。Virtual Host(虚拟主机):实现多租户隔离,不同虚拟主机的交换机、队列相互独立(默认虚拟主机为 /)。
2. 核心作用
- 解耦服务:生产者与消费者无需直接通信,通过消息队列隔离(如订单服务无需直接调用库存服务,只需发送 “扣减库存” 消息)。
- 削峰填谷:应对高并发场景(如秒杀),消息队列暂存请求,消费者按能力逐步处理,避免服务崩溃。
- 异步通信:非核心流程异步化(如用户注册后,异步发送短信 / 邮件通知,无需等待通知完成再返回结果)。
- 可靠投递:通过持久化、确认机制、死信队列等,确保消息不丢失、不重复消费。
二、环境准备
1. 安装 RabbitMQ
方式 1:Docker 安装(推荐,快速便捷)
# 1. 拉取 RabbitMQ 镜像(带管理界面,版本选择 3.12.x 适配 Spring Boot 3)
docker pull rabbitmq:3.12-management
# 2. 启动容器(映射端口 5672:消息通信端口;15672:管理界面端口)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.12-management
管理界面访问:浏览器打开http://localhost:15672,输入账号admin、密码123456,即可进入控制台(可查看交换机、队列、消息等)。
方式 2:本地安装(以 Windows 为例)
-
先安装 Erlang(RabbitMQ 依赖 Erlang 运行环境),下载地址:Erlang 官网,选择与 RabbitMQ 兼容的版本(参考 RabbitMQ 官网兼容性表)。
-
安装 RabbitMQ,下载地址:RabbitMQ 官网,安装后启动服务:
- 打开命令行,进入 RabbitMQ 安装目录的
**in文件夹(如C:\Program Files\RabbitMQ Server\rabbitmq_server-3.12.0\**in)。 - 执行命令启用管理界面:
rabbitmq-plugins enable rabbitmq_management。 - 启动服务:
rabbitmq-server start(或通过 Windows 服务列表启动)。
2. Spring Boot 3 项目初始化
1. 新建 Maven 项目(或使用 Spring Initializr)
Spring Boot 版本:3.0+(如 3.2.0)。核心依赖:在pom.xml中添加 RabbitMQ starters 和 Web 依赖(用于测试接口):
<dependencies>
<!-- Spring Boot RabbitMQ 整合依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Spring Boot Web 依赖(用于提供测试接口) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok 简化代码(可选) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
2. 配置 RabbitMQ 连接信息
在application.yml(或 application.properties)中配置 RabbitMQ 地址、账号、密码等:
spring:
rabbitmq:
host:localhost# RabbitMQ 服务器地址(本地为 localhost,远程填服务器 IP)
port:5672 # 消息通信端口(默认 5672)
username:admin# 登录账号(Docker 启动时配置的 RABBITMQ_DEFAULT_USER)
password:123456# 登录密码(Docker 启动时配置的 RABBITMQ_DEFAULT_PASS)
virtual-host:/# 虚拟主机(默认 /)
# 生产者确认配置(可选,确保消息成功投递到交换机)
publisher-confirm-type:correlated
# 生产者退回配置(可选,消息无法路由到队列时回调)
publisher-returns:true
# 消费者配置(可选,手动确认消息,避免自动确认导致消息丢失)
listener:
simple:
acknowledge-mode:manual# 手动确认消息(Ack)
concurrency:1 # 消费者最小线程数
max-concurrency:5 # 消费者最大线程数
prefetch:1
三、基础整合实战(4 种交换机场景)
1. 1. Direct 直连交换机(一对一通信)
步骤 1:配置交换机、队列与绑定
创建 RabbitConfig 类,通过 @Bean 定义交换机、队列,并绑定两者(指定 Routing Key):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
publicclass RabbitDirectConfig {
// 1. 定义直连交换机(名称:direct_exchange,持久化:true,自动删除:false)
@Bean
public DirectExchange directExchange() {
returnnew DirectExchange("direct_exchange", true, false);
}
// 2. 定义队列(名称:direct_queue,持久化:true,排他:false,自动删除:false)
@Bean
public Queue directQueue() {
returnnew Queue("direct_queue", true, false, false);
}
// 3. 绑定交换机与队列(指定 Routing Key:direct_routing_key)
@Bean
public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
return BindingBuilder.bind(directQueue).to(directExchange).with("direct_routing_key");
}
}
- 参数说明:
-
- durable=true:交换机 / 队列持久化(RabbitMQ 重启后不丢失)。
-
- autoDelete=false:交换机 / 队列不会在没有绑定 / 消费者时自动删除。
步骤 2:实现生产者(发送消息)
创建 RabbitProducer 类,注入 RabbitTemplate(Spring 提供的 RabbitMQ 模板类),用于发送消息:
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor// lombok 自动注入构造函数
publicclass RabbitProducer {
privatefinal RabbitTemplate rabbitTemplate;
// 发送 Direct 消息(参数:交换机名称、Routing Key、消息内容)
public void sendDirectMessage(String message) {
// 发送消息:交换机 -> 按 Routing Key 路由到队列
rabbitTemplate.convertAndSend(
"direct_exchange", // 交换机名称
"direct_routing_key", // Routing Key(需与绑定的一致)
message // 消息内容(可传字符串、对象等,Spring 会自动序列化)
);
System.out.println("Direct 生产者发送消息:" + message);
}
}
步骤 3:实现消费者(接收并处理消息)
创建 RabbitConsumer 类,通过 @RabbitListener 注解监听指定队列,处理消息:
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j // lombok 日志注解
publicclass RabbitConsumer {
// 监听 Direct 队列(队列名称:direct_queue)
@RabbitListener(queues = "direct_queue")
public void receiveDirectMessage(String message, Channel channel, Message msg) throws IOException {
try {
// 1. 处理业务逻辑(如订单状态更新、库存扣减等)
log.info("Direct 消费者接收消息:{}", message);
// 2. 手动确认消息(Ack):告知 RabbitMQ 消息已处理完成,可删除
// 第二个参数:multiple=false(仅确认当前消息)
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 3. 消息处理失败:拒绝消息并重回队列(或转发到死信队列,根据业务调整)
// 第二个参数:multiple=false;第三个参数:requeue=true(重回队列)
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
log.error("Direct 消息处理失败", e);
}
}
}
步骤 4:测试接口(触发消息发送)
创建 TestController 类,提供 HTTP 接口,调用生产者发送消息:
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/rabbit")
@RequiredArgsConstructor
publicclass TestController {
privatefinal RabbitProducer rabbitProducer;
// 测试 Direct 消息发送(访问:http://localhost:8080/rabbit/direct/hello)
@GetMapping("/direct/{message}")
public String sendDirectMessage(@PathVariable String message) {
rabbitProducer.sendDirectMessage(message);
return"Direct 消息发送成功!发送内容:" + message;
}
}
测试流程
- 启动 Spring Boot 项目。
- 浏览器访问 http://localhost:8080/rabbit/direct/hello。
- 查看控制台日志:
- 生产者日志:Direct 生产者发送消息:hello。
- 消费者日志:Direct 消费者接收消息:hello。
- 访问 RabbitMQ 管理界面(http://localhost:15672):
- 在 Exchanges 中查看 direct_exchange 状态(绑定数为 1)。
- 在 Queues 中查看 direct_queue 状态(消息数为 0,已被消费)。
2. Topic 主题交换机(一对多通信)
步骤 1:配置交换机、队列与绑定
在 RabbitConfig 中添加 Topic 相关配置(多个队列绑定到同一交换机,使用不同 Routing Key 规则):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
publicclass RabbitTopicConfig {
// 1. 定义主题交换机
@Bean
public TopicExchange topicExchange() {
returnnew TopicExchange("topic_exchange", true, false);
}
// 2. 定义 2 个队列(分别处理用户登录、用户注册消息)
@Bean
public Queue topicLoginQueue() {
returnnew Queue("topic_login_queue", true, false, false);
}
@Bean
public Queue topicRegisterQueue() {
returnnew Queue("topic_register_queue", true, false, false);
}
// 3. 绑定队列到交换机(Routing Key 规则:user.login 匹配登录队列)
@Bean
public Binding topicLoginBinding(TopicExchange topicExchange, Queue topicLoginQueue) {
return BindingBuilder.bind(topicLoginQueue).to(topicExchange).with("user.login");
}
// 绑定队列到交换机(Routing Key 规则:user.# 匹配所有用户相关消息,包括注册)
@Bean
public Binding topicRegisterBinding(TopicExchange topicExchange, Queue topicRegisterQueue) {
return BindingBuilder.bind(topicRegisterQueue).to(topicExchange).with("user.#");
}
}
步骤 2:生产者添加 Topic 消息发送方法
在 RabbitProducer 中添加:
// 发送 Topic 消息(参数:交换机名称、Routing Key、消息内容)
public void sendTopicMessage(String routingKey, String message) {
rabbitTemplate.convertAndSend("topic_exchange", routingKey, message);
System.out.println("Topic 生产者发送消息:" + message + ",Routing Key:" + routingKey);
}
步骤 3:消费者添加 Topic 队列监听
在 RabbitConsumer 中添加:
// 监听用户登录队列
@RabbitListener(queues = "topic_login_queue")
public void receiveTopicLoginMessage(String message, Channel channel, Message msg) throws IOException {
try {
log.info("Topic 登录消费者接收消息:{}", message);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
log.error("Topic 登录消息处理失败", e);
}
}
// 监听用户注册队列
@RabbitListener(queues = "topic_register_queue")
public void receiveTopicRegisterMessage(String message, Channel channel, Message msg) throws IOException {
try {
log.info("Topic 注册消费者接收消息:{}", message);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
log.error("Topic 注册消息处理失败", e);
}
}
步骤 4:测试接口
- 访问 http://localhost:8080/rabbit/topic/user.login/``用户登录了:
-
- 登录队列消费者接收消息(user.login 匹配 user.login 规则)。
-
- 注册队列消费者也接收消息(user.login 匹配 user.# 规则)。
- 访问 http://localhost:8080/rabbit/topic/user.register/``用户注册了:
-
- 仅注册队列消费者接收消息(user.register 不匹配 user.login,但匹配 user.#)。





