前言
在博客之前的文章里,我们装好了 RabbitMQ(CentOS7 安装篇),也在 Spring Boot 3.x 系列里搞定了 Docker 多阶段构建。这次来解决一个生产中绕不开的问题:消息失败了怎么办?订单30分钟未支付怎么自动关闭?
答案就是两个核心机制:
- 死信队列(Dead Letter Exchange,DLX):消息"死了"之后去哪?
- 延迟消息(Delayed Message):消息怎么"定时发"?
这两个功能在电商、支付、物流系统里几乎是标配。本文基于 Spring Boot 3.2 + RabbitMQ 3.12 全程实战,踩过的坑都给你标出来。
一、死信队列是什么?
先搞清楚概念,再上代码。
1.1 什么是"死信"?
消息在以下三种情况下会变成死信(Dead Letter):
| 情况 | 说明 |
|---|---|
| 消息被拒绝 | basicNack 或 basicReject,且 requeue=false |
| 消息TTL过期 | 消息在队列中存活时间超过了设定的 TTL |
| 队列达到最大长度 | 队列满了,最早的消息被挤出去 |
1.2 死信队列的工作原理
生产者 → 正常交换机 → 正常队列 → 消费者处理
↓ (失败/过期/满了)
死信交换机 → 死信队列 → 死信消费者
简单说就是:给正常队列配一个"后备队列",消息死了之后自动路由过去,而不是直接丢掉。
二、Spring Boot 3.x 实战:死信队列
2.1 引入依赖
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 配置文件
# application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 手动确认,非常重要!
prefetch: 1 # 每次只拉1条,防止消息堆积
⚠️ 坑1:
acknowledge-mode必须设为manual,否则消息消费失败也会被自动 ACK,死信机制完全失效!
2.3 声明交换机和队列
@Configuration
public class RabbitMQConfig {
// ============ 交换机名称 ============
public static final String NORMAL_EXCHANGE = "normal.exchange";
public static final String DLX_EXCHANGE = "dlx.exchange";
// ============ 队列名称 ============
public static final String NORMAL_QUEUE = "normal.queue";
public static final String DLX_QUEUE = "dlx.queue";
// ============ RoutingKey ============
public static final String NORMAL_ROUTING_KEY = "normal.key";
public static final String DLX_ROUTING_KEY = "dlx.key";
// ---------- 声明交换机 ----------
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE, true, false);
}
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE, true, false);
}
// ---------- 声明队列 ----------
/**
* 正常队列:绑定死信交换机,设置 TTL(30秒过期)
*/
@Bean
public Queue normalQueue() {
Map<String, Object> args = new HashMap<>();
// 绑定死信交换机
args.put("x-dead-letter-exchange", DLX_EXCHANGE);
// 死信路由 Key
args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
// 消息 TTL:30秒(毫秒单位)
args.put("x-message-ttl", 30_000);
// 队列最大长度(可选)
// args.put("x-max-length", 100);
return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();
}
/**
* 死信队列:普通队列,用来接收死信消息
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(DLX_QUEUE).build();
}
// ---------- 绑定关系 ----------
@Bean
public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);
}
@Bean
public Binding dlxBinding(Queue dlxQueue, DirectExchange dlxExchange) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_ROUTING_KEY);
}
}
2.4 生产者:发送消息
@Service
@RequiredArgsConstructor
public class OrderService {
private final RabbitTemplate rabbitTemplate;
/**
* 下单时发送延迟关闭消息(30秒内未支付则进入死信队列处理)
*/
public void createOrder(String orderId) {
// 业务逻辑:创建订单...
System.out.println("订单创建成功:" + orderId);
// 发送到正常队列,30秒后若未被消费则成为死信
rabbitTemplate.convertAndSend(
RabbitMQConfig.NORMAL_EXCHANGE,
RabbitMQConfig.NORMAL_ROUTING_KEY,
orderId
);
}
}
2.5 消费者:处理死信
@Component
@Slf4j
public class DeadLetterConsumer {
/**
* 监听死信队列:订单超时未支付,自动关闭
*/
@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)
public void handleDeadLetter(String orderId, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
log.info("收到死信消息,订单超时关闭:{}", orderId);
// 查询订单状态,如果还是"待支付"则关闭
closeOrderIfUnpaid(orderId);
// 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("处理死信消息失败:{}", e.getMessage(), e);
try {
// 处理失败,不重新入队(避免无限循环)
channel.basicNack(deliveryTag, false, false);
} catch (IOException ioException) {
log.error("NACK失败", ioException);
}
}
}
private void closeOrderIfUnpaid(String orderId) {
// 查库确认状态,执行关闭逻辑...
log.info("订单 {} 已超时关闭", orderId);
}
}
⚠️ 坑2:死信消费者里消费失败后,千万不能
requeue=true,否则消息会在死信队列里无限循环!
三、延迟消息:更灵活的方案
上面用 TTL + 死信队列实现了"定时关闭",但有个硬伤:所有消息的超时时间是一样的(队列级别的 TTL)。
如果我想让 A 订单 15 分钟超时、B 订单 60 分钟超时,就需要用延迟消息插件。
3.1 安装 rabbitmq-delayed-message-exchange 插件
# 进入 RabbitMQ 容器
docker exec -it rabbitmq bash
# 下载插件(版本要与 RabbitMQ 匹配)
cd /plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 退出并重启容器
exit
docker restart rabbitmq
或者在 docker-compose.yml 里一步到位:
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.12-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
volumes:
- ./plugins:/plugins_extra
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
# 启动时自动加载插件
command: >
bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"
⚠️ 坑3:插件版本必须和 RabbitMQ 版本严格匹配,否则启用失败但不报错,消息根本不会延迟!
3.2 声明延迟交换机
// 在 RabbitMQConfig 中追加
public static final String DELAY_EXCHANGE = "delay.exchange";
public static final String DELAY_QUEUE = "delay.queue";
public static final String DELAY_KEY = "delay.key";
/**
* 延迟交换机:类型为 x-delayed-message
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 内部路由类型
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE).build();
}
@Bean
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_KEY).noargs();
}
3.3 发送延迟消息
/**
* 发送延迟消息,延迟时间可以每条消息单独设置
* @param orderId 订单ID
* @param delayMs 延迟毫秒数(如 30分钟 = 30 * 60 * 1000)
*/
public void sendDelayMessage(String orderId, long delayMs) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.DELAY_EXCHANGE,
RabbitMQConfig.DELAY_KEY,
orderId,
message -> {
// 设置消息级别的延迟时间(单位:毫秒)
message.getMessageProperties().setDelay((int) delayMs);
return message;
}
);
log.info("延迟消息已发送,订单:{},延迟:{}ms", orderId, delayMs);
}
3.4 消费延迟消息
@RabbitListener(queues = RabbitMQConfig.DELAY_QUEUE)
public void handleDelayMessage(String orderId, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
log.info("延迟消息触发,处理订单:{}", orderId);
closeOrderIfUnpaid(orderId);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("延迟消息处理失败", e);
try {
channel.basicNack(deliveryTag, false, false);
} catch (IOException ioException) {
log.error("NACK失败", ioException);
}
}
}
四、两种方案对比
| 维度 | TTL + 死信队列 | 延迟消息插件 |
|---|---|---|
| 延迟精度 | 队列统一TTL,不够灵活 | 消息级别,每条可独立设置 |
| 依赖 | 原生支持,无需插件 | 需要安装 rabbitmq-delayed-message-exchange |
| 内存占用 | 低 | 延迟消息存在内存中,消息量大时有压力 |
| 适用场景 | 所有消息超时时间一致 | 每条消息超时时间不同 |
| 可靠性 | 高(持久化到磁盘) | 插件重启后延迟消息不丢失(需持久化) |
| 推荐指数 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
结论:
- 场景简单、超时时间统一 → 用 TTL + 死信队列,省事
- 超时时间需要灵活配置 → 用延迟消息插件,更优雅
五、生产环境踩坑汇总
坑1:消息在正常队列里消费了,还是进了死信队列?
原因:消费者抛出异常后,框架自动 NACK 且 requeue=false,导致消息进死信。
解决:检查消费者代码是否有未捕获异常,或配置重试策略。
坑2:死信队列堆积大量消息,消费不过来
原因:死信消费者处理逻辑太重(如频繁查库)。
解决:死信消费者只做状态标记,异步处理后续逻辑;或增加消费者实例。
坑3:延迟消息的实际延迟比设置的要长
原因:RabbitMQ 延迟插件的定时精度约为 ±1秒,高负载时误差更大。
解决:不要在延迟消息里做精确到秒的业务(如整点秒杀),改用 Redis ZADD + 定时任务方案。
坑4:Spring Boot 3.x 报 ClassNotFoundException: org.springframework.amqp...
原因:Spring Boot 3.x 对 AMQP 依赖版本有要求。
解决:确保 spring-boot-starter-amqp 版本与 Spring Boot 3.x 对应,不要手动指定旧版本。
坑5:消息重复消费
原因:消费者处理成功但网络抖动导致 ACK 未送达,RabbitMQ 重新投递。
解决:消费者增加幂等性处理,用 Redis 记录已处理的消息 ID。
@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)
public void handleDeadLetter(String orderId, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
@Header(AmqpHeaders.MESSAGE_ID) String messageId) {
// 幂等性检查
if (redisTemplate.hasKey("processed:" + messageId)) {
log.warn("消息已处理,跳过:{}", messageId);
channel.basicAck(deliveryTag, false);
return;
}
// 处理业务...
redisTemplate.opsForValue().set("processed:" + messageId, "1",
Duration.ofDays(3)); // 保存3天
channel.basicAck(deliveryTag, false);
}
六、完整项目结构
src/main/java/com/example/
├── config/
│ └── RabbitMQConfig.java # 交换机、队列、绑定配置
├── service/
│ └── OrderService.java # 生产者:发送普通/延迟消息
├── consumer/
│ ├── DeadLetterConsumer.java # 死信队列消费者
│ └── DelayMessageConsumer.java # 延迟消息消费者
└── Application.java
七、总结
| 功能 | 实现方式 | 关键配置 |
|---|---|---|
| 消息失败兜底 | 死信队列(DLX) | x-dead-letter-exchange |
| 定时统一过期 | 队列级别 TTL | x-message-ttl |
| 灵活延迟投递 | 延迟消息插件 | x-delayed-message 类型交换机 |
| 消息不丢失 | 手动 ACK | acknowledge-mode: manual |
| 防止重复消费 | 幂等性 | Redis 记录 MessageId |
死信队列和延迟消息是 RabbitMQ 里最实用的两个"进阶功能",在订单超时、重试补偿、定时通知等场景里几乎必用。理解了本文的原理和坑点,生产环境基本上不会再踩雷。
如果你的项目里 RabbitMQ 消息量特别大,可以结合我之前写的 Docker 多阶段构建 来部署,镜像体积直接减一半,启动也更快。
如有问题,欢迎在评论区留言!转载请注明出处 92yangyi.top
评论区