Spring Boot 3 整合 RabbitMQ 超详细教程
156
类别: 
开发交流

一、RabbitMQ 核心概念与作用

在整合前,先明确 RabbitMQ 的核心组件与价值,为后续实战打下基础:

1. 核心组件

  • Producer(生产者):发送消息的应用服务(如 Spring Boot 接口)。
  • Consumer(消费者):接收并处理消息的应用服务。
  • Broker:RabbitMQ 服务器实例,负责存储、路由消息。
  • Exchange(交换机):接收生产者消息,根据路由规则转发到队列,核心类型有 4 种:
    • Direct Exchange(直连交换机):精确匹配 Routing Key 与队列绑定的 Binding Key,适用于一对一通信(如订单状态通知)。
    • Topic Exchange(主题交换机):通过通配符(* 匹配单个词,# 匹配多个词)模糊匹配 Routing Key,适用于一对多通信(如用户通知:user.# 可匹配 user.login、user.register)。
    • Fanout Exchange(扇形交换机):无视 Routing Key,将消息广播到所有绑定的队列,适用于广播场景(如系统公告)。
    • Headers Exchange(头交换机):通过消息头(而非 Routing Key)匹配队列,较少用于常规业务。
  • Queue(队列):存储消息的缓冲区,消息在此等待消费者处理,支持持久化、限流等配置。
  • Binding(绑定):建立交换机与队列的关联,同时指定 Routing Key(或匹配规则)。
  • Virtual Host(虚拟主机):实现多租户隔离,不同虚拟主机的交换机、队列相互独立(默认虚拟主机为 /)。

2. 核心作用

  • 解耦服务:生产者与消费者无需直接通信,通过消息队列隔离(如订单服务无需直接调用库存服务,只需发送 “扣减库存” 消息)。
  • 削峰填谷:应对高并发场景(如秒杀),消息队列暂存请求,消费者按能力逐步处理,避免服务崩溃。
  • 异步通信:非核心流程异步化(如用户注册后,异步发送短信 / 邮件通知,无需等待通知完成再返回结果)。
  • 可靠投递:通过持久化、确认机制、死信队列等,确保消息不丢失、不重复消费。

二、环境准备

1. 安装 RabbitMQ

方式 1:Docker 安装(推荐,快速便捷)

# 1. 拉取 RabbitMQ 镜像(带管理界面,版本选择 3.12.x 适配 Spring Boot 3)
docker pull rabbitmq:3.12-management

# 2. 启动容器(映射端口 5672:消息通信端口;15672:管理界面端口)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.12-management
  • 管理界面访问:浏览器打开 http://localhost:15672,输入账号 admin、密码 123456,即可进入控制台(可查看交换机、队列、消息等)。

方式 2:本地安装(以 Windows 为例)

  1. 先安装 Erlang(RabbitMQ 依赖 Erlang 运行环境),下载地址:Erlang 官网,选择与 RabbitMQ 兼容的版本(参考 RabbitMQ 官网兼容性表)。

  2. 安装 RabbitMQ,下载地址:RabbitMQ 官网,安装后启动服务:

  • 打开命令行,进入 RabbitMQ 安装目录的 **in 文件夹(如 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.12.0\**in)。
  • 执行命令启用管理界面:rabbitmq-plugins enable rabbitmq_management
  • 启动服务:rabbitmq-server start(或通过 Windows 服务列表启动)。

2. Spring Boot 3 项目初始化

1. 新建 Maven 项目(或使用 Spring Initializr)

  • Spring Boot 版本:3.0+(如 3.2.0)。
  • 核心依赖:在 pom.xml 中添加 RabbitMQ starters 和 Web 依赖(用于测试接口):
<dependencies>
  <!-- Spring Boot RabbitMQ 整合依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Spring Boot Web 依赖(用于提供测试接口) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--  lombok 简化代码(可选) -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
</dependencies>

2. 配置 RabbitMQ 连接信息

application.yml(或 application.properties)中配置 RabbitMQ 地址、账号、密码等:

spring:
  rabbitmq:
    host:localhost# RabbitMQ 服务器地址(本地为 localhost,远程填服务器 IP)
    port:5672       # 消息通信端口(默认 5672)
    username:admin# 登录账号(Docker 启动时配置的 RABBITMQ_DEFAULT_USER)
    password:123456# 登录密码(Docker 启动时配置的 RABBITMQ_DEFAULT_PASS)
    virtual-host:/# 虚拟主机(默认 /)
    # 生产者确认配置(可选,确保消息成功投递到交换机)
    publisher-confirm-type:correlated
    # 生产者退回配置(可选,消息无法路由到队列时回调)
    publisher-returns:true
    # 消费者配置(可选,手动确认消息,避免自动确认导致消息丢失)
    listener:
      simple:
        acknowledge-mode:manual# 手动确认消息(Ack)
        concurrency:1            # 消费者最小线程数
        max-concurrency:5        # 消费者最大线程数
        prefetch:1     

三、基础整合实战(4 种交换机场景)

1. 1. Direct 直连交换机(一对一通信)

步骤 1:配置交换机、队列与绑定

创建 RabbitConfig 类,通过 @Bean 定义交换机、队列,并绑定两者(指定 Routing Key):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
publicclass RabbitDirectConfig {
 // 1. 定义直连交换机(名称:direct_exchange,持久化:true,自动删除:false)
 @Bean
 public DirectExchange directExchange() {
     returnnew DirectExchange("direct_exchange", true, false);
 }

 // 2. 定义队列(名称:direct_queue,持久化:true,排他:false,自动删除:false)
 @Bean
 public Queue directQueue() {
     returnnew Queue("direct_queue", true, false, false);
 }

 // 3. 绑定交换机与队列(指定 Routing Key:direct_routing_key)
 @Bean
 public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
     return BindingBuilder.bind(directQueue).to(directExchange).with("direct_routing_key");
 }
}
  • 参数说明:
    • durable=true:交换机 / 队列持久化(RabbitMQ 重启后不丢失)。
    • autoDelete=false:交换机 / 队列不会在没有绑定 / 消费者时自动删除。
步骤 2:实现生产者(发送消息)

创建 RabbitProducer 类,注入 RabbitTemplate(Spring 提供的 RabbitMQ 模板类),用于发送消息:

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor// lombok 自动注入构造函数
publicclass RabbitProducer {
    privatefinal RabbitTemplate rabbitTemplate;

    // 发送 Direct 消息(参数:交换机名称、Routing Key、消息内容)
    public void sendDirectMessage(String message) {
        // 发送消息:交换机 -> 按 Routing Key 路由到队列
        rabbitTemplate.convertAndSend(
                "direct_exchange",  // 交换机名称
                "direct_routing_key",  // Routing Key(需与绑定的一致)
                message  // 消息内容(可传字符串、对象等,Spring 会自动序列化)
        );
        System.out.println("Direct 生产者发送消息:" + message);
    }
}
步骤 3:实现消费者(接收并处理消息)

创建 RabbitConsumer 类,通过 @RabbitListener 注解监听指定队列,处理消息:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j  // lombok 日志注解
publicclass RabbitConsumer {
    // 监听 Direct 队列(队列名称:direct_queue)
    @RabbitListener(queues = "direct_queue")
    public void receiveDirectMessage(String message, Channel channel, Message msg) throws IOException {
        try {
            // 1. 处理业务逻辑(如订单状态更新、库存扣减等)
            log.info("Direct 消费者接收消息:{}", message);

            // 2. 手动确认消息(Ack):告知 RabbitMQ 消息已处理完成,可删除
            // 第二个参数:multiple=false(仅确认当前消息)
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 3. 消息处理失败:拒绝消息并重回队列(或转发到死信队列,根据业务调整)
            // 第二个参数:multiple=false;第三个参数:requeue=true(重回队列)
            channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
            log.error("Direct 消息处理失败", e);
        }
    }
}
步骤 4:测试接口(触发消息发送)

创建 TestController 类,提供 HTTP 接口,调用生产者发送消息:

import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/rabbit")
@RequiredArgsConstructor
publicclass TestController {
    privatefinal RabbitProducer rabbitProducer;

    // 测试 Direct 消息发送(访问:http://localhost:8080/rabbit/direct/hello)
    @GetMapping("/direct/{message}")
    public String sendDirectMessage(@PathVariable String message) {
        rabbitProducer.sendDirectMessage(message);
        return"Direct 消息发送成功!发送内容:" + message;
    }
}

测试流程

  1. 启动 Spring Boot 项目。
  2. 浏览器访问 http://localhost:8080/rabbit/direct/hello。
  3. 查看控制台日志:
  • 生产者日志:Direct 生产者发送消息:hello。
  • 消费者日志:Direct 消费者接收消息:hello。
  1. 访问 RabbitMQ 管理界面(http://localhost:15672):
  • 在 Exchanges 中查看 direct_exchange 状态(绑定数为 1)。
  • 在 Queues 中查看 direct_queue 状态(消息数为 0,已被消费)。

2. Topic 主题交换机(一对多通信)

步骤 1:配置交换机、队列与绑定

RabbitConfig 中添加 Topic 相关配置(多个队列绑定到同一交换机,使用不同 Routing Key 规则):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
publicclass RabbitTopicConfig {
    // 1. 定义主题交换机
    @Bean
    public TopicExchange topicExchange() {
        returnnew TopicExchange("topic_exchange", true, false);
    }

    // 2. 定义 2 个队列(分别处理用户登录、用户注册消息)
    @Bean
    public Queue topicLoginQueue() {
        returnnew Queue("topic_login_queue", true, false, false);
    }

    @Bean
    public Queue topicRegisterQueue() {
        returnnew Queue("topic_register_queue", true, false, false);
    }

    // 3. 绑定队列到交换机(Routing Key 规则:user.login 匹配登录队列)
    @Bean
    public Binding topicLoginBinding(TopicExchange topicExchange, Queue topicLoginQueue) {
        return BindingBuilder.bind(topicLoginQueue).to(topicExchange).with("user.login");
    }

    // 绑定队列到交换机(Routing Key 规则:user.# 匹配所有用户相关消息,包括注册)
    @Bean
    public Binding topicRegisterBinding(TopicExchange topicExchange, Queue topicRegisterQueue) {
        return BindingBuilder.bind(topicRegisterQueue).to(topicExchange).with("user.#");
    }
}
步骤 2:生产者添加 Topic 消息发送方法

RabbitProducer 中添加:

// 发送 Topic 消息(参数:交换机名称、Routing Key、消息内容)
public void sendTopicMessage(String routingKey, String message) {
    rabbitTemplate.convertAndSend("topic_exchange", routingKey, message);
    System.out.println("Topic 生产者发送消息:" + message + ",Routing Key:" + routingKey);
}
步骤 3:消费者添加 Topic 队列监听

RabbitConsumer 中添加:

// 监听用户登录队列
@RabbitListener(queues = "topic_login_queue")
public void receiveTopicLoginMessage(String message, Channel channel, Message msg) throws IOException {
    try {
        log.info("Topic 登录消费者接收消息:{}", message);
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
        log.error("Topic 登录消息处理失败", e);
    }
}

// 监听用户注册队列
@RabbitListener(queues = "topic_register_queue")
public void receiveTopicRegisterMessage(String message, Channel channel, Message msg) throws IOException {
    try {
        log.info("Topic 注册消费者接收消息:{}", message);
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
        log.error("Topic 注册消息处理失败", e);
    }
}
步骤 4:测试接口
标签:
评论 0
/ 1000
0
0
收藏