侧边栏壁纸
  • 累计撰写 69 篇文章
  • 累计创建 28 个标签
  • 累计收到 21 条评论

目 录CONTENT

文章目录

RabbitMQ 死信队列 + 延迟消息全攻略:Spring Boot 3.x 实战与踩坑记录

Administrator
2026-03-23 / 0 评论 / 0 点赞 / 0 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

前言

在博客之前的文章里,我们装好了 RabbitMQ(CentOS7 安装篇),也在 Spring Boot 3.x 系列里搞定了 Docker 多阶段构建。这次来解决一个生产中绕不开的问题:消息失败了怎么办?订单30分钟未支付怎么自动关闭?

答案就是两个核心机制:

  1. 死信队列(Dead Letter Exchange,DLX):消息"死了"之后去哪?
  2. 延迟消息(Delayed Message):消息怎么"定时发"?

这两个功能在电商、支付、物流系统里几乎是标配。本文基于 Spring Boot 3.2 + RabbitMQ 3.12 全程实战,踩过的坑都给你标出来。


一、死信队列是什么?

先搞清楚概念,再上代码。

1.1 什么是"死信"?

消息在以下三种情况下会变成死信(Dead Letter)

情况说明
消息被拒绝basicNackbasicReject,且 requeue=false
消息TTL过期消息在队列中存活时间超过了设定的 TTL
队列达到最大长度队列满了,最早的消息被挤出去

1.2 死信队列的工作原理

生产者 → 正常交换机 → 正常队列 → 消费者处理
                                    ↓ (失败/过期/满了)
                              死信交换机 → 死信队列 → 死信消费者

简单说就是:给正常队列配一个"后备队列",消息死了之后自动路由过去,而不是直接丢掉。


二、Spring Boot 3.x 实战:死信队列

2.1 引入依赖

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 配置文件

# application.yml
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual   # 手动确认,非常重要!
        prefetch: 1                # 每次只拉1条,防止消息堆积

⚠️ 坑1acknowledge-mode 必须设为 manual,否则消息消费失败也会被自动 ACK,死信机制完全失效!

2.3 声明交换机和队列

@Configuration
public class RabbitMQConfig {

    // ============ 交换机名称 ============
    public static final String NORMAL_EXCHANGE  = "normal.exchange";
    public static final String DLX_EXCHANGE     = "dlx.exchange";

    // ============ 队列名称 ============
    public static final String NORMAL_QUEUE = "normal.queue";
    public static final String DLX_QUEUE    = "dlx.queue";

    // ============ RoutingKey ============
    public static final String NORMAL_ROUTING_KEY = "normal.key";
    public static final String DLX_ROUTING_KEY    = "dlx.key";

    // ---------- 声明交换机 ----------
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE, true, false);
    }

    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE, true, false);
    }

    // ---------- 声明队列 ----------

    /**
     * 正常队列:绑定死信交换机,设置 TTL(30秒过期)
     */
    @Bean
    public Queue normalQueue() {
        Map<String, Object> args = new HashMap<>();
        // 绑定死信交换机
        args.put("x-dead-letter-exchange", DLX_EXCHANGE);
        // 死信路由 Key
        args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
        // 消息 TTL:30秒(毫秒单位)
        args.put("x-message-ttl", 30_000);
        // 队列最大长度(可选)
        // args.put("x-max-length", 100);
        return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();
    }

    /**
     * 死信队列:普通队列,用来接收死信消息
     */
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable(DLX_QUEUE).build();
    }

    // ---------- 绑定关系 ----------
    @Bean
    public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);
    }

    @Bean
    public Binding dlxBinding(Queue dlxQueue, DirectExchange dlxExchange) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_ROUTING_KEY);
    }
}

2.4 生产者:发送消息

@Service
@RequiredArgsConstructor
public class OrderService {

    private final RabbitTemplate rabbitTemplate;

    /**
     * 下单时发送延迟关闭消息(30秒内未支付则进入死信队列处理)
     */
    public void createOrder(String orderId) {
        // 业务逻辑:创建订单...
        System.out.println("订单创建成功:" + orderId);

        // 发送到正常队列,30秒后若未被消费则成为死信
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.NORMAL_EXCHANGE,
            RabbitMQConfig.NORMAL_ROUTING_KEY,
            orderId
        );
    }
}

2.5 消费者:处理死信

@Component
@Slf4j
public class DeadLetterConsumer {

    /**
     * 监听死信队列:订单超时未支付,自动关闭
     */
    @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)
    public void handleDeadLetter(String orderId, Channel channel,
                                  @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            log.info("收到死信消息,订单超时关闭:{}", orderId);
            // 查询订单状态,如果还是"待支付"则关闭
            closeOrderIfUnpaid(orderId);
            // 手动确认
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("处理死信消息失败:{}", e.getMessage(), e);
            try {
                // 处理失败,不重新入队(避免无限循环)
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException ioException) {
                log.error("NACK失败", ioException);
            }
        }
    }

    private void closeOrderIfUnpaid(String orderId) {
        // 查库确认状态,执行关闭逻辑...
        log.info("订单 {} 已超时关闭", orderId);
    }
}

⚠️ 坑2:死信消费者里消费失败后,千万不能 requeue=true,否则消息会在死信队列里无限循环


三、延迟消息:更灵活的方案

上面用 TTL + 死信队列实现了"定时关闭",但有个硬伤:所有消息的超时时间是一样的(队列级别的 TTL)。

如果我想让 A 订单 15 分钟超时、B 订单 60 分钟超时,就需要用延迟消息插件

3.1 安装 rabbitmq-delayed-message-exchange 插件

# 进入 RabbitMQ 容器
docker exec -it rabbitmq bash

# 下载插件(版本要与 RabbitMQ 匹配)
cd /plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 退出并重启容器
exit
docker restart rabbitmq

或者在 docker-compose.yml 里一步到位:

# docker-compose.yml
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - ./plugins:/plugins_extra
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    # 启动时自动加载插件
    command: >
      bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"

⚠️ 坑3:插件版本必须和 RabbitMQ 版本严格匹配,否则启用失败但不报错,消息根本不会延迟!

3.2 声明延迟交换机

// 在 RabbitMQConfig 中追加
public static final String DELAY_EXCHANGE = "delay.exchange";
public static final String DELAY_QUEUE    = "delay.queue";
public static final String DELAY_KEY      = "delay.key";

/**
 * 延迟交换机:类型为 x-delayed-message
 */
@Bean
public CustomExchange delayExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");  // 内部路由类型
    return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}

@Bean
public Queue delayQueue() {
    return QueueBuilder.durable(DELAY_QUEUE).build();
}

@Bean
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
    return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_KEY).noargs();
}

3.3 发送延迟消息

/**
 * 发送延迟消息,延迟时间可以每条消息单独设置
 * @param orderId   订单ID
 * @param delayMs   延迟毫秒数(如 30分钟 = 30 * 60 * 1000)
 */
public void sendDelayMessage(String orderId, long delayMs) {
    rabbitTemplate.convertAndSend(
        RabbitMQConfig.DELAY_EXCHANGE,
        RabbitMQConfig.DELAY_KEY,
        orderId,
        message -> {
            // 设置消息级别的延迟时间(单位:毫秒)
            message.getMessageProperties().setDelay((int) delayMs);
            return message;
        }
    );
    log.info("延迟消息已发送,订单:{},延迟:{}ms", orderId, delayMs);
}

3.4 消费延迟消息

@RabbitListener(queues = RabbitMQConfig.DELAY_QUEUE)
public void handleDelayMessage(String orderId, Channel channel,
                                @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    try {
        log.info("延迟消息触发,处理订单:{}", orderId);
        closeOrderIfUnpaid(orderId);
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        log.error("延迟消息处理失败", e);
        try {
            channel.basicNack(deliveryTag, false, false);
        } catch (IOException ioException) {
            log.error("NACK失败", ioException);
        }
    }
}

四、两种方案对比

维度TTL + 死信队列延迟消息插件
延迟精度队列统一TTL,不够灵活消息级别,每条可独立设置
依赖原生支持,无需插件需要安装 rabbitmq-delayed-message-exchange
内存占用延迟消息存在内存中,消息量大时有压力
适用场景所有消息超时时间一致每条消息超时时间不同
可靠性高(持久化到磁盘)插件重启后延迟消息不丢失(需持久化)
推荐指数⭐⭐⭐⭐⭐⭐⭐⭐

结论

  • 场景简单、超时时间统一 → 用 TTL + 死信队列,省事
  • 超时时间需要灵活配置 → 用延迟消息插件,更优雅

五、生产环境踩坑汇总

坑1:消息在正常队列里消费了,还是进了死信队列?

原因:消费者抛出异常后,框架自动 NACK 且 requeue=false,导致消息进死信。
解决:检查消费者代码是否有未捕获异常,或配置重试策略。

坑2:死信队列堆积大量消息,消费不过来

原因:死信消费者处理逻辑太重(如频繁查库)。
解决:死信消费者只做状态标记,异步处理后续逻辑;或增加消费者实例。

坑3:延迟消息的实际延迟比设置的要长

原因:RabbitMQ 延迟插件的定时精度约为 ±1秒,高负载时误差更大。
解决:不要在延迟消息里做精确到秒的业务(如整点秒杀),改用 Redis ZADD + 定时任务方案。

坑4:Spring Boot 3.x 报 ClassNotFoundException: org.springframework.amqp...

原因:Spring Boot 3.x 对 AMQP 依赖版本有要求。
解决:确保 spring-boot-starter-amqp 版本与 Spring Boot 3.x 对应,不要手动指定旧版本。

坑5:消息重复消费

原因:消费者处理成功但网络抖动导致 ACK 未送达,RabbitMQ 重新投递。
解决:消费者增加幂等性处理,用 Redis 记录已处理的消息 ID。

@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)
public void handleDeadLetter(String orderId, Channel channel,
                              @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                              @Header(AmqpHeaders.MESSAGE_ID) String messageId) {
    // 幂等性检查
    if (redisTemplate.hasKey("processed:" + messageId)) {
        log.warn("消息已处理,跳过:{}", messageId);
        channel.basicAck(deliveryTag, false);
        return;
    }
    // 处理业务...
    redisTemplate.opsForValue().set("processed:" + messageId, "1",
        Duration.ofDays(3)); // 保存3天
    channel.basicAck(deliveryTag, false);
}

六、完整项目结构

src/main/java/com/example/
├── config/
│   └── RabbitMQConfig.java        # 交换机、队列、绑定配置
├── service/
│   └── OrderService.java          # 生产者:发送普通/延迟消息
├── consumer/
│   ├── DeadLetterConsumer.java    # 死信队列消费者
│   └── DelayMessageConsumer.java  # 延迟消息消费者
└── Application.java

七、总结

功能实现方式关键配置
消息失败兜底死信队列(DLX)x-dead-letter-exchange
定时统一过期队列级别 TTLx-message-ttl
灵活延迟投递延迟消息插件x-delayed-message 类型交换机
消息不丢失手动 ACKacknowledge-mode: manual
防止重复消费幂等性Redis 记录 MessageId

死信队列和延迟消息是 RabbitMQ 里最实用的两个"进阶功能",在订单超时、重试补偿、定时通知等场景里几乎必用。理解了本文的原理和坑点,生产环境基本上不会再踩雷。

如果你的项目里 RabbitMQ 消息量特别大,可以结合我之前写的 Docker 多阶段构建 来部署,镜像体积直接减一半,启动也更快。


如有问题,欢迎在评论区留言!转载请注明出处 92yangyi.top

0

评论区