前言
在微服务架构中,消息队列是解耦服务、实现异步通信的核心组件。相比 RabbitMQ 基于 AMQP 协议的灵活路由,Apache RocketMQ 在大规模消息吞吐、顺序消息和事务消息方面有着显著优势,是阿里巴巴双十一亿级流量背后的核心中间件。
本文将带你从 RocketMQ 5.x 部署 到 Spring Boot 3.x 集成,覆盖:
- ✅ Docker 快速部署 RocketMQ 5.x
- ✅ Spring Boot 3.x 集成 rocketmq-spring-boot-starter
- ✅ 普通消息、延迟消息、顺序消息、事务消息实战
- ✅ 消费者幂等性与消息重试机制
- ✅ 生产环境踩坑记录
一、RocketMQ 核心概念速览
在动手之前,先理清几个关键概念:
| 概念 | 说明 |
|---|---|
| NameServer | 路由注册中心,无状态,可集群部署 |
| Broker | 消息存储与转发节点,分 Master/Slave |
| Topic | 消息主题,生产者和消费者的逻辑分类 |
| Tag | Topic 下的二级分类,用于消息过滤 |
| MessageQueue | Topic 的物理分区,顺序消息的最小单元 |
| ConsumerGroup | 消费者分组,同一组内负载均衡消费 |
| ProducerGroup | 生产者分组,事务消息必须指定 |
💡 RocketMQ 5.x 变化:引入了 Proxy 模式,支持 gRPC 协议,推荐使用新版 SDK
rocketmq-client-java,但 Spring Boot Starter 目前仍基于旧 SDK,生产中两者均可用。
二、Docker 快速部署 RocketMQ 5.x
2.1 创建 docker-compose.yml
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.1.4
container_name: rmq-namesrv
ports:
- "9876:9876"
command: sh mqnamesrv
environment:
- JAVA_OPT_EXT=-Xms512m -Xmx512m
volumes:
- ./data/namesrv/logs:/home/rocketmq/logs
networks:
- rocketmq
broker:
image: apache/rocketmq:5.1.4
container_name: rmq-broker
ports:
- "10909:10909"
- "10911:10911"
- "10912:10912"
command: sh mqbroker -n namesrv:9876 --enable-proxy
depends_on:
- namesrv
environment:
- JAVA_OPT_EXT=-Xms1g -Xmx1g
- NAMESRV_ADDR=namesrv:9876
volumes:
- ./data/broker/logs:/home/rocketmq/logs
- ./data/broker/store:/home/rocketmq/store
- ./conf/broker.conf:/home/rocketmq/conf/broker.conf
networks:
- rocketmq
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rmq-dashboard
ports:
- "8080:8080"
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
networks:
rocketmq:
driver: bridge
2.2 broker.conf 配置文件
# conf/broker.conf
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
# 允许自动创建 Topic(生产环境建议关闭)
autoCreateTopicEnable=true
# Broker 对外暴露的 IP(必须配置为宿主机 IP)
brokerIP1=192.168.1.100
namesrvAddr=namesrv:9876
⚠️ 踩坑提醒:
brokerIP1必须填写宿主机的实际 IP 地址,不能用localhost或127.0.0.1,否则 Spring Boot 客户端无法连接 Broker。
2.3 启动
mkdir -p ./data/namesrv/logs ./data/broker/logs ./data/broker/store ./conf
# 写入 broker.conf 后
docker-compose up -d
# 验证
docker logs rmq-broker | grep "The broker boot success"
访问 http://localhost:8080 打开 RocketMQ Dashboard,能看到 Broker 在线即成功。
三、Spring Boot 3.x 集成
3.1 引入依赖
<!-- pom.xml -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
3.2 application.yml 配置
rocketmq:
name-server: 192.168.1.100:9876
producer:
group: my-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 3
max-message-size: 4194304 # 4MB
consumer:
group: my-consumer-group
topic: my-topic
spring:
application:
name: rocketmq-demo
四、普通消息实战
4.1 生产者
@RestController
@RequestMapping("/mq")
@RequiredArgsConstructor
public class MessageController {
private final RocketMQTemplate rocketMQTemplate;
/**
* 同步发送(默认,有返回值,可靠)
*/
@PostMapping("/send/sync")
public String sendSync(@RequestParam String msg) {
SendResult result = rocketMQTemplate.syncSend("my-topic:my-tag", msg);
return "msgId: " + result.getMsgId() + ", status: " + result.getSendStatus();
}
/**
* 异步发送(高吞吐,通过回调感知结果)
*/
@PostMapping("/send/async")
public String sendAsync(@RequestParam String msg) {
rocketMQTemplate.asyncSend("my-topic:my-tag", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步发送成功: {}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("异步发送失败", e);
}
});
return "异步发送中...";
}
/**
* 单向发送(最高吞吐,不关心结果,适用于日志场景)
*/
@PostMapping("/send/oneway")
public String sendOneWay(@RequestParam String msg) {
rocketMQTemplate.sendOneWay("my-topic:my-tag", msg);
return "单向发送完成";
}
}
4.2 消费者
@Service
@RocketMQMessageListener(
topic = "my-topic",
selectorExpression = "my-tag", // Tag 过滤,* 表示所有
consumerGroup = "my-consumer-group",
consumeMode = ConsumeMode.CONCURRENTLY, // 并发消费
messageModel = MessageModel.CLUSTERING // 集群消费
)
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("收到消息: {}", message);
// 业务处理...
// 正常返回 = 消费成功 ACK
// 抛出异常 = 消费失败,触发重试
}
}
五、延迟消息
RocketMQ 的延迟消息基于固定延迟等级(开源版),不支持任意时间延迟:
| 等级 | 延迟时间 |
|---|---|
| 1 | 1s |
| 2 | 5s |
| 3 | 10s |
| 4 | 30s |
| 5 | 1m |
| 6 | 2m |
| ... | ... |
| 18 | 2h |
/**
* 发送延迟消息(延迟等级 4 = 30秒)
*/
@PostMapping("/send/delay")
public String sendDelay(@RequestParam String msg) {
// delayLevel: 1-18,对应固定延迟时间
SendResult result = rocketMQTemplate.syncSend(
"my-topic:delay-tag",
MessageBuilder.withPayload(msg).build(),
3000, // 发送超时
4 // delayLevel = 30s
);
return "延迟消息发送成功: " + result.getMsgId();
}
💡 RocketMQ 5.x 商业版 支持任意时间精度的定时消息;如果开源版不满足需求,可参考 RabbitMQ 延迟插件 方案(详见博客前文)。
六、顺序消息
顺序消息要求同一业务的消息投递到同一 MessageQueue,消费者串行消费该队列。
6.1 应用场景
- 订单状态流转(创建 → 支付 → 发货 → 完成)
- 数据库 Binlog 同步
- 账户流水按时序处理
6.2 顺序生产者
/**
* 顺序消息:同一 orderId 的消息发到同一队列
*/
@PostMapping("/send/order")
public String sendOrder(@RequestParam String orderId, @RequestParam String status) {
OrderMessage orderMsg = new OrderMessage(orderId, status, LocalDateTime.now());
// hashKey 决定选哪个 MessageQueue,相同 hashKey 必然路由到同一队列
SendResult result = rocketMQTemplate.syncSendOrderly(
"order-topic",
orderMsg,
orderId // hashKey = orderId,保证同一订单有序
);
log.info("顺序消息发送成功 orderId={}, queue={}", orderId, result.getMessageQueue().getQueueId());
return result.getMsgId();
}
6.3 顺序消费者
@Service
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.ORDERLY // 关键:串行消费模式
)
public class OrderConsumer implements RocketMQListener<OrderMessage> {
@Override
public void onMessage(OrderMessage message) {
log.info("顺序消费订单 orderId={}, status={}, time={}",
message.getOrderId(), message.getStatus(), message.getTime());
// 处理订单状态变更...
}
}
⚠️ 注意:
ORDERLY模式下,如果消费失败会阻塞当前队列,直到超过最大重试次数才跳过。务必做好异常处理,避免死消息阻塞队列。
七、事务消息
事务消息是 RocketMQ 最强大的特性之一,用于解决本地事务与消息发送的原子性问题,实现分布式事务的最终一致性。
7.1 执行流程
生产者 Broker 消费者
| | |
|--发送半事务消息(Half Msg)----->| |
|<---------Half Msg 确认--------| |
| | |
|--执行本地事务(DB操作) | |
| | |
|--提交/回滚(COMMIT/ROLLBACK)--->| |
| (或超时回查) | |
| |--COMMIT后投递----------->|
| | ROLLBACK则丢弃 |
7.2 事务消息实现
定义事务执行器:
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
@RequiredArgsConstructor
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
private final OrderService orderService;
/**
* 执行本地事务
* 在 Half Msg 发送成功后,Broker 回调此方法
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// arg 是发送时传入的业务参数
CreateOrderRequest request = (CreateOrderRequest) arg;
orderService.createOrder(request); // 执行本地数据库事务
log.info("本地事务执行成功, orderId={}", request.getOrderId());
return RocketMQLocalTransactionState.COMMIT; // 通知 Broker 投递消息
} catch (Exception e) {
log.error("本地事务执行失败", e);
return RocketMQLocalTransactionState.ROLLBACK; // 通知 Broker 丢弃消息
}
}
/**
* 事务回查
* 当 Broker 超时未收到 COMMIT/ROLLBACK,主动回查本地事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = msg.getHeaders().get("orderId", String.class);
boolean exists = orderService.existsOrder(orderId);
log.info("事务回查: orderId={}, exists={}", orderId, exists);
return exists ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
}
发送事务消息:
@PostMapping("/send/transaction")
public String sendTransaction(@RequestBody CreateOrderRequest request) {
// 构造消息头,回查时可获取业务 Key
Message<CreateOrderRequest> message = MessageBuilder
.withPayload(request)
.setHeader("orderId", request.getOrderId())
.build();
// sendMessageInTransaction 发送半事务消息,并触发本地事务
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-topic",
message,
request // 传给 executeLocalTransaction 的 arg 参数
);
log.info("事务消息状态: {}", result.getLocalTransactionState());
return result.getMsgId();
}
八、消费者幂等性设计
RocketMQ 至少一次投递(At Least Once) 语义意味着消息可能被重复消费,必须在业务层保证幂等:
@Service
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-pay-group")
@RequiredArgsConstructor
public class OrderPayConsumer implements RocketMQListener<MessageExt> {
private final RedisTemplate<String, String> redisTemplate;
private final OrderService orderService;
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
String idempotentKey = "mq:consumed:" + msgId;
// 利用 Redis SETNX 实现幂等
Boolean isFirstConsume = redisTemplate.opsForValue()
.setIfAbsent(idempotentKey, "1", Duration.ofDays(3));
if (Boolean.FALSE.equals(isFirstConsume)) {
log.warn("消息已消费,跳过 msgId={}", msgId);
return;
}
try {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
OrderPayMessage payload = JSON.parseObject(body, OrderPayMessage.class);
orderService.processPayment(payload);
} catch (Exception e) {
// 失败时删除幂等 Key,允许重试
redisTemplate.delete(idempotentKey);
throw e; // 重新抛出,触发 RocketMQ 重试机制
}
}
}
九、重试与死信队列
9.1 消费重试策略
| 重试次数 | 重试间隔 |
|---|---|
| 1 | 10s |
| 2 | 30s |
| 3 | 1min |
| ... | 逐渐增大 |
| 16 | 2h(最后一次重试) |
@RocketMQMessageListener(
topic = "my-topic",
consumerGroup = "my-group",
maxReconsumeTimes = 3 // 最大重试次数,-1 表示使用 Broker 默认值(16次)
)
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 抛出异常 -> 触发重试
// 正常返回 -> ACK 成功
}
}
9.2 死信队列处理
超过最大重试次数的消息进入死信队列 %DLQ%{ConsumerGroup},需要单独消费处理:
@Service
@RocketMQMessageListener(
topic = "%DLQ%my-group", // 死信队列 Topic
consumerGroup = "dlq-handler-group"
)
public class DlqConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
log.error("死信消息: msgId={}, body={}, reconsumeTimes={}",
message.getMsgId(),
new String(message.getBody()),
message.getReconsumeTimes()
);
// 告警、人工干预、写入数据库等处理
alertService.sendDlqAlert(message);
}
}
十、生产环境踩坑记录
坑 1:消费者不消费消息
现象:生产者发送成功,Dashboard 显示消息堆积,消费者无任何日志。
原因:consumerGroup 与 Broker 上已有的 Group 订阅关系冲突,新实例订阅了不同的 Tag。
解决:确保同一 consumerGroup 下所有实例的 Topic/Tag 订阅关系完全一致。
坑 2:顺序消息并发消费
现象:设置了 ORDERLY 模式,但消息还是乱序。
原因:同一 ConsumerGroup 部署了多个实例,且每个队列分配到了不同实例。顺序消息依赖同一队列单线程消费,多实例时需确保队列数 >= 实例数,或使用 MessageModel.BROADCASTING。
解决:调整 Topic 的 MessageQueue 数量 >= 消费者实例数。
坑 3:事务回查超时
现象:本地事务已提交,但 Broker 日志显示持续回查。
原因:executeLocalTransaction 执行耗时过长(超过 Broker 的 transactionTimeoutCheck 时间),触发了回查。
解决:本地事务保持轻量,耗时操作异步化;确保 checkLocalTransaction 能正确查询本地事务状态。
坑 4:消息体超过 4MB
现象:SEND_REQUEST_TIMEOUT 或 MESSAGE_ILLEGAL。
原因:RocketMQ 默认最大消息体 4MB,超过则拒绝。
解决:大数据场景改用 OSS/MinIO 存储实体,消息体只传递引用 ID。
十一、RocketMQ vs RabbitMQ 选型参考
| 维度 | RocketMQ | RabbitMQ |
|---|---|---|
| 吞吐量 | 极高(百万级 TPS) | 较高(万级 TPS) |
| 延迟 | 毫秒级 | 微秒级 |
| 顺序消息 | ✅ 原生支持 | ❌ 需额外设计 |
| 事务消息 | ✅ 原生支持 | ❌ 需要补偿机制 |
| 延迟消息 | ✅(固定等级) | ✅(插件支持任意时间) |
| 消息回溯 | ✅ 支持按时间回溯 | ❌ 不支持 |
| 协议 | 私有协议 / gRPC | AMQP 标准协议 |
| 适用场景 | 电商、金融、大数据 | 业务解耦、RPC、复杂路由 |
建议:新建微服务系统优先考虑 RocketMQ;对协议标准化要求高或已有 RabbitMQ 基础设施的场景继续使用 RabbitMQ。
总结
本文系统梳理了 RocketMQ 5.x 在 Spring Boot 3.x 中的完整实践:
- Docker 部署:通过 docker-compose 快速搭建单机环境,注意
brokerIP1配置 - 普通消息:同步/异步/单向三种发送模式,按业务场景选择
- 延迟消息:基于固定等级实现定时任务,超细粒度需考虑商业版
- 顺序消息:
syncSendOrderly+ORDERLY消费模式,保证同一业务消息有序 - 事务消息:Half Msg + 本地事务 + 回查机制,实现分布式最终一致性
- 幂等性:Redis SETNX 防重,配合死信队列兜底
- 踩坑记录:订阅关系冲突、顺序消费多实例、事务回查、消息体超限
结合上篇 RabbitMQ 死信队列与延迟消息实战,两篇文章覆盖了微服务消息中间件的主流选型,欢迎收藏对比参考。
如有疑问,欢迎留言交流 🙌
评论区