侧边栏壁纸
  • 累计撰写 70 篇文章
  • 累计创建 28 个标签
  • 累计收到 21 条评论

目 录CONTENT

文章目录

RocketMQ 5.x 实战全攻略:Spring Boot 3.x 集成消息发送、顺序消息与事务消息

Administrator
2026-03-24 / 0 评论 / 0 点赞 / 0 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

前言

在微服务架构中,消息队列是解耦服务、实现异步通信的核心组件。相比 RabbitMQ 基于 AMQP 协议的灵活路由,Apache RocketMQ 在大规模消息吞吐、顺序消息和事务消息方面有着显著优势,是阿里巴巴双十一亿级流量背后的核心中间件。

本文将带你从 RocketMQ 5.x 部署Spring Boot 3.x 集成,覆盖:

  • ✅ Docker 快速部署 RocketMQ 5.x
  • ✅ Spring Boot 3.x 集成 rocketmq-spring-boot-starter
  • ✅ 普通消息、延迟消息、顺序消息、事务消息实战
  • ✅ 消费者幂等性与消息重试机制
  • ✅ 生产环境踩坑记录

一、RocketMQ 核心概念速览

在动手之前,先理清几个关键概念:

概念说明
NameServer路由注册中心,无状态,可集群部署
Broker消息存储与转发节点,分 Master/Slave
Topic消息主题,生产者和消费者的逻辑分类
TagTopic 下的二级分类,用于消息过滤
MessageQueueTopic 的物理分区,顺序消息的最小单元
ConsumerGroup消费者分组,同一组内负载均衡消费
ProducerGroup生产者分组,事务消息必须指定

💡 RocketMQ 5.x 变化:引入了 Proxy 模式,支持 gRPC 协议,推荐使用新版 SDK rocketmq-client-java,但 Spring Boot Starter 目前仍基于旧 SDK,生产中两者均可用。


二、Docker 快速部署 RocketMQ 5.x

2.1 创建 docker-compose.yml

version: '3.8'

services:
  namesrv:
    image: apache/rocketmq:5.1.4
    container_name: rmq-namesrv
    ports:
      - "9876:9876"
    command: sh mqnamesrv
    environment:
      - JAVA_OPT_EXT=-Xms512m -Xmx512m
    volumes:
      - ./data/namesrv/logs:/home/rocketmq/logs
    networks:
      - rocketmq

  broker:
    image: apache/rocketmq:5.1.4
    container_name: rmq-broker
    ports:
      - "10909:10909"
      - "10911:10911"
      - "10912:10912"
    command: sh mqbroker -n namesrv:9876 --enable-proxy
    depends_on:
      - namesrv
    environment:
      - JAVA_OPT_EXT=-Xms1g -Xmx1g
      - NAMESRV_ADDR=namesrv:9876
    volumes:
      - ./data/broker/logs:/home/rocketmq/logs
      - ./data/broker/store:/home/rocketmq/store
      - ./conf/broker.conf:/home/rocketmq/conf/broker.conf
    networks:
      - rocketmq

  dashboard:
    image: apacherocketmq/rocketmq-dashboard:latest
    container_name: rmq-dashboard
    ports:
      - "8080:8080"
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876
    depends_on:
      - namesrv
    networks:
      - rocketmq

networks:
  rocketmq:
    driver: bridge

2.2 broker.conf 配置文件

# conf/broker.conf
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
# 允许自动创建 Topic(生产环境建议关闭)
autoCreateTopicEnable=true
# Broker 对外暴露的 IP(必须配置为宿主机 IP)
brokerIP1=192.168.1.100
namesrvAddr=namesrv:9876

⚠️ 踩坑提醒brokerIP1 必须填写宿主机的实际 IP 地址,不能用 localhost127.0.0.1,否则 Spring Boot 客户端无法连接 Broker。

2.3 启动

mkdir -p ./data/namesrv/logs ./data/broker/logs ./data/broker/store ./conf
# 写入 broker.conf 后
docker-compose up -d

# 验证
docker logs rmq-broker | grep "The broker boot success"

访问 http://localhost:8080 打开 RocketMQ Dashboard,能看到 Broker 在线即成功。


三、Spring Boot 3.x 集成

3.1 引入依赖

<!-- pom.xml -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.0</version>
</dependency>

3.2 application.yml 配置

rocketmq:
  name-server: 192.168.1.100:9876
  producer:
    group: my-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 3
    max-message-size: 4194304  # 4MB
  consumer:
    group: my-consumer-group
    topic: my-topic

spring:
  application:
    name: rocketmq-demo

四、普通消息实战

4.1 生产者

@RestController
@RequestMapping("/mq")
@RequiredArgsConstructor
public class MessageController {

    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 同步发送(默认,有返回值,可靠)
     */
    @PostMapping("/send/sync")
    public String sendSync(@RequestParam String msg) {
        SendResult result = rocketMQTemplate.syncSend("my-topic:my-tag", msg);
        return "msgId: " + result.getMsgId() + ", status: " + result.getSendStatus();
    }

    /**
     * 异步发送(高吞吐,通过回调感知结果)
     */
    @PostMapping("/send/async")
    public String sendAsync(@RequestParam String msg) {
        rocketMQTemplate.asyncSend("my-topic:my-tag", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步发送成功: {}", sendResult.getMsgId());
            }
            @Override
            public void onException(Throwable e) {
                log.error("异步发送失败", e);
            }
        });
        return "异步发送中...";
    }

    /**
     * 单向发送(最高吞吐,不关心结果,适用于日志场景)
     */
    @PostMapping("/send/oneway")
    public String sendOneWay(@RequestParam String msg) {
        rocketMQTemplate.sendOneWay("my-topic:my-tag", msg);
        return "单向发送完成";
    }
}

4.2 消费者

@Service
@RocketMQMessageListener(
    topic = "my-topic",
    selectorExpression = "my-tag",   // Tag 过滤,* 表示所有
    consumerGroup = "my-consumer-group",
    consumeMode = ConsumeMode.CONCURRENTLY,  // 并发消费
    messageModel = MessageModel.CLUSTERING    // 集群消费
)
public class MyConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("收到消息: {}", message);
        // 业务处理...
        // 正常返回 = 消费成功 ACK
        // 抛出异常 = 消费失败,触发重试
    }
}

五、延迟消息

RocketMQ 的延迟消息基于固定延迟等级(开源版),不支持任意时间延迟:

等级延迟时间
11s
25s
310s
430s
51m
62m
......
182h
/**
 * 发送延迟消息(延迟等级 4 = 30秒)
 */
@PostMapping("/send/delay")
public String sendDelay(@RequestParam String msg) {
    // delayLevel: 1-18,对应固定延迟时间
    SendResult result = rocketMQTemplate.syncSend(
        "my-topic:delay-tag",
        MessageBuilder.withPayload(msg).build(),
        3000,   // 发送超时
        4       // delayLevel = 30s
    );
    return "延迟消息发送成功: " + result.getMsgId();
}

💡 RocketMQ 5.x 商业版 支持任意时间精度的定时消息;如果开源版不满足需求,可参考 RabbitMQ 延迟插件 方案(详见博客前文)。


六、顺序消息

顺序消息要求同一业务的消息投递到同一 MessageQueue,消费者串行消费该队列。

6.1 应用场景

  • 订单状态流转(创建 → 支付 → 发货 → 完成)
  • 数据库 Binlog 同步
  • 账户流水按时序处理

6.2 顺序生产者

/**
 * 顺序消息:同一 orderId 的消息发到同一队列
 */
@PostMapping("/send/order")
public String sendOrder(@RequestParam String orderId, @RequestParam String status) {
    OrderMessage orderMsg = new OrderMessage(orderId, status, LocalDateTime.now());

    // hashKey 决定选哪个 MessageQueue,相同 hashKey 必然路由到同一队列
    SendResult result = rocketMQTemplate.syncSendOrderly(
        "order-topic",
        orderMsg,
        orderId  // hashKey = orderId,保证同一订单有序
    );
    log.info("顺序消息发送成功 orderId={}, queue={}", orderId, result.getMessageQueue().getQueueId());
    return result.getMsgId();
}

6.3 顺序消费者

@Service
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group",
    consumeMode = ConsumeMode.ORDERLY  // 关键:串行消费模式
)
public class OrderConsumer implements RocketMQListener<OrderMessage> {

    @Override
    public void onMessage(OrderMessage message) {
        log.info("顺序消费订单 orderId={}, status={}, time={}",
            message.getOrderId(), message.getStatus(), message.getTime());
        // 处理订单状态变更...
    }
}

⚠️ 注意ORDERLY 模式下,如果消费失败会阻塞当前队列,直到超过最大重试次数才跳过。务必做好异常处理,避免死消息阻塞队列。


七、事务消息

事务消息是 RocketMQ 最强大的特性之一,用于解决本地事务与消息发送的原子性问题,实现分布式事务的最终一致性。

7.1 执行流程

生产者                          Broker                    消费者
   |                               |                          |
   |--发送半事务消息(Half Msg)----->|                          |
   |<---------Half Msg 确认--------|                          |
   |                               |                          |
   |--执行本地事务(DB操作)          |                          |
   |                               |                          |
   |--提交/回滚(COMMIT/ROLLBACK)--->|                          |
   |        (或超时回查)          |                          |
   |                               |--COMMIT后投递----------->|
   |                               |  ROLLBACK则丢弃          |

7.2 事务消息实现

定义事务执行器:

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
@RequiredArgsConstructor
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

    private final OrderService orderService;

    /**
     * 执行本地事务
     * 在 Half Msg 发送成功后,Broker 回调此方法
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // arg 是发送时传入的业务参数
            CreateOrderRequest request = (CreateOrderRequest) arg;
            orderService.createOrder(request);  // 执行本地数据库事务
            log.info("本地事务执行成功, orderId={}", request.getOrderId());
            return RocketMQLocalTransactionState.COMMIT;  // 通知 Broker 投递消息
        } catch (Exception e) {
            log.error("本地事务执行失败", e);
            return RocketMQLocalTransactionState.ROLLBACK;  // 通知 Broker 丢弃消息
        }
    }

    /**
     * 事务回查
     * 当 Broker 超时未收到 COMMIT/ROLLBACK,主动回查本地事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String orderId = msg.getHeaders().get("orderId", String.class);
        boolean exists = orderService.existsOrder(orderId);
        log.info("事务回查: orderId={}, exists={}", orderId, exists);
        return exists ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
    }
}

发送事务消息:

@PostMapping("/send/transaction")
public String sendTransaction(@RequestBody CreateOrderRequest request) {
    // 构造消息头,回查时可获取业务 Key
    Message<CreateOrderRequest> message = MessageBuilder
        .withPayload(request)
        .setHeader("orderId", request.getOrderId())
        .build();

    // sendMessageInTransaction 发送半事务消息,并触发本地事务
    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
        "order-topic",
        message,
        request  // 传给 executeLocalTransaction 的 arg 参数
    );

    log.info("事务消息状态: {}", result.getLocalTransactionState());
    return result.getMsgId();
}

八、消费者幂等性设计

RocketMQ 至少一次投递(At Least Once) 语义意味着消息可能被重复消费,必须在业务层保证幂等:

@Service
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-pay-group")
@RequiredArgsConstructor
public class OrderPayConsumer implements RocketMQListener<MessageExt> {

    private final RedisTemplate<String, String> redisTemplate;
    private final OrderService orderService;

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String idempotentKey = "mq:consumed:" + msgId;

        // 利用 Redis SETNX 实现幂等
        Boolean isFirstConsume = redisTemplate.opsForValue()
            .setIfAbsent(idempotentKey, "1", Duration.ofDays(3));

        if (Boolean.FALSE.equals(isFirstConsume)) {
            log.warn("消息已消费,跳过 msgId={}", msgId);
            return;
        }

        try {
            String body = new String(message.getBody(), StandardCharsets.UTF_8);
            OrderPayMessage payload = JSON.parseObject(body, OrderPayMessage.class);
            orderService.processPayment(payload);
        } catch (Exception e) {
            // 失败时删除幂等 Key,允许重试
            redisTemplate.delete(idempotentKey);
            throw e;  // 重新抛出,触发 RocketMQ 重试机制
        }
    }
}

九、重试与死信队列

9.1 消费重试策略

重试次数重试间隔
110s
230s
31min
...逐渐增大
162h(最后一次重试)
@RocketMQMessageListener(
    topic = "my-topic",
    consumerGroup = "my-group",
    maxReconsumeTimes = 3  // 最大重试次数,-1 表示使用 Broker 默认值(16次)
)
public class MyConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        // 抛出异常 -> 触发重试
        // 正常返回 -> ACK 成功
    }
}

9.2 死信队列处理

超过最大重试次数的消息进入死信队列 %DLQ%{ConsumerGroup},需要单独消费处理:

@Service
@RocketMQMessageListener(
    topic = "%DLQ%my-group",  // 死信队列 Topic
    consumerGroup = "dlq-handler-group"
)
public class DlqConsumer implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        log.error("死信消息: msgId={}, body={}, reconsumeTimes={}",
            message.getMsgId(),
            new String(message.getBody()),
            message.getReconsumeTimes()
        );
        // 告警、人工干预、写入数据库等处理
        alertService.sendDlqAlert(message);
    }
}

十、生产环境踩坑记录

坑 1:消费者不消费消息

现象:生产者发送成功,Dashboard 显示消息堆积,消费者无任何日志。

原因consumerGroup 与 Broker 上已有的 Group 订阅关系冲突,新实例订阅了不同的 Tag。

解决:确保同一 consumerGroup 下所有实例的 Topic/Tag 订阅关系完全一致。


坑 2:顺序消息并发消费

现象:设置了 ORDERLY 模式,但消息还是乱序。

原因:同一 ConsumerGroup 部署了多个实例,且每个队列分配到了不同实例。顺序消息依赖同一队列单线程消费,多实例时需确保队列数 >= 实例数,或使用 MessageModel.BROADCASTING

解决:调整 Topic 的 MessageQueue 数量 >= 消费者实例数。


坑 3:事务回查超时

现象:本地事务已提交,但 Broker 日志显示持续回查。

原因executeLocalTransaction 执行耗时过长(超过 Broker 的 transactionTimeoutCheck 时间),触发了回查。

解决:本地事务保持轻量,耗时操作异步化;确保 checkLocalTransaction 能正确查询本地事务状态。


坑 4:消息体超过 4MB

现象SEND_REQUEST_TIMEOUTMESSAGE_ILLEGAL

原因:RocketMQ 默认最大消息体 4MB,超过则拒绝。

解决:大数据场景改用 OSS/MinIO 存储实体,消息体只传递引用 ID。


十一、RocketMQ vs RabbitMQ 选型参考

维度RocketMQRabbitMQ
吞吐量极高(百万级 TPS)较高(万级 TPS)
延迟毫秒级微秒级
顺序消息✅ 原生支持❌ 需额外设计
事务消息✅ 原生支持❌ 需要补偿机制
延迟消息✅(固定等级)✅(插件支持任意时间)
消息回溯✅ 支持按时间回溯❌ 不支持
协议私有协议 / gRPCAMQP 标准协议
适用场景电商、金融、大数据业务解耦、RPC、复杂路由

建议:新建微服务系统优先考虑 RocketMQ;对协议标准化要求高或已有 RabbitMQ 基础设施的场景继续使用 RabbitMQ。


总结

本文系统梳理了 RocketMQ 5.x 在 Spring Boot 3.x 中的完整实践:

  1. Docker 部署:通过 docker-compose 快速搭建单机环境,注意 brokerIP1 配置
  2. 普通消息:同步/异步/单向三种发送模式,按业务场景选择
  3. 延迟消息:基于固定等级实现定时任务,超细粒度需考虑商业版
  4. 顺序消息syncSendOrderly + ORDERLY 消费模式,保证同一业务消息有序
  5. 事务消息:Half Msg + 本地事务 + 回查机制,实现分布式最终一致性
  6. 幂等性:Redis SETNX 防重,配合死信队列兜底
  7. 踩坑记录:订阅关系冲突、顺序消费多实例、事务回查、消息体超限

结合上篇 RabbitMQ 死信队列与延迟消息实战,两篇文章覆盖了微服务消息中间件的主流选型,欢迎收藏对比参考。


如有疑问,欢迎留言交流 🙌

0

评论区