侧边栏壁纸
  • 累计撰写 87 篇文章
  • 累计创建 31 个标签
  • 累计收到 21 条评论

目 录CONTENT

文章目录

Kafka 3.x + Spring Boot 3.x 生产级消息系统实战:高吞吐、精确一次语义与事务消息全攻略

Administrator
2026-04-07 / 0 评论 / 0 点赞 / 0 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

摘要:本文深入讲解如何在 Spring Boot 3.x 项目中集成 Apache Kafka 3.x,从基础配置到生产调优,覆盖高吞吐发送、消费者组管理、精确一次语义(EOS)、事务消息、KStream 流处理、KRaft 模式部署,结合 Docker Compose 搭建完整演示环境,助你打造企业级 Kafka 消息平台。


目录

  1. 背景与选型
  2. 环境搭建:KRaft 模式 Docker Compose
  3. Spring Boot 3.x 集成 Kafka
  4. 生产者深度实战
  5. 消费者深度实战
  6. 精确一次语义(EOS)实战
  7. Kafka 事务消息
  8. Kafka Streams 实时流处理
  9. 死信队列(DLT)与重试机制
  10. 生产调优:高吞吐与低延迟
  11. 监控:Prometheus + Grafana
  12. 总结与最佳实践

1. 背景与选型

为什么选 Kafka?

在微服务架构中,消息队列是解耦、削峰填谷的核心组件。与 RabbitMQ、RocketMQ 相比,Kafka 有以下优势:

维度Kafka 3.xRocketMQ 5.xRabbitMQ 3.x
吞吐量百万级/秒十万级/秒万级/秒
持久化方式顺序写磁盘混合存储内存+磁盘
流处理Kafka Streams 内置需 Flink 集成
精确一次原生支持需 Seata不支持
适用场景大数据/日志/事件流金融/订单通用消息

Kafka 3.x 核心新特性

  • KRaft 模式:彻底移除 ZooKeeper 依赖,自管理 metadata,Kafka 3.3+ 默认支持
  • 幂等生产者增强:默认 enable.idempotence=true
  • Tiered Storage(3.6+):冷热数据分离,超长时间保留
  • Kafka Streams DSL 优化:窗口聚合、表连接更稳定

2. 环境搭建:KRaft 模式 Docker Compose

Kafka 3.x 推荐使用 KRaft 模式,无需 ZooKeeper。

docker-compose.yml

version: '3.8'

services:
  kafka1:
    image: confluentinc/cp-kafka:7.6.0
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:9092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/var/lib/kafka/data'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    volumes:
      - kafka1-data:/var/lib/kafka/data

  kafka2:
    image: confluentinc/cp-kafka:7.6.0
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9094:9092"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka2:9092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/var/lib/kafka/data'
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    volumes:
      - kafka2-data:/var/lib/kafka/data

  kafka3:
    image: confluentinc/cp-kafka:7.6.0
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9095:9092"
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka3:9092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/var/lib/kafka/data'
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    volumes:
      - kafka3-data:/var/lib/kafka/data

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local-kraft
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9092,kafka3:9092
    depends_on:
      - kafka1
      - kafka2
      - kafka3

volumes:
  kafka1-data:
  kafka2-data:
  kafka3-data:

启动:

docker compose up -d

# 创建 Topic(3副本,12分区,适合高并发)
docker exec -it kafka1 kafka-topics --bootstrap-server kafka1:9092 \
  --create --topic order-events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000   # 7天

# 查看 Topic
docker exec -it kafka1 kafka-topics --bootstrap-server kafka1:9092 --describe --topic order-events

3. Spring Boot 3.x 集成 Kafka

3.1 依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.4</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!-- Avro Schema Registry(可选,用于强类型消息) -->
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>7.6.0</version>
    </dependency>
    <!-- Kafka Streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!-- Micrometer Kafka 监控 -->
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>
</dependencies>

3.2 核心配置 application.yml

spring:
  kafka:
    bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
    
    # 生产者配置
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # 确认机制:all = 等待所有 ISR 副本确认(最强保证)
      acks: all
      # 幂等生产者(Kafka 3.x 默认开启)
      properties:
        enable.idempotence: true
        # 批量发送
        batch.size: 65536        # 64KB
        linger.ms: 5             # 批次延迟,提高吞吐
        buffer.memory: 67108864  # 生产者缓冲区 64MB
        compression.type: snappy # 压缩,降低网络传输
        max.in.flight.requests.per.connection: 5
        retries: 3
        retry.backoff.ms: 100
      
    # 消费者配置
    consumer:
      group-id: order-service-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # 手动提交 offset,保证消费可靠性
      enable-auto-commit: false
      auto-offset-reset: earliest
      # 每次拉取最大条数
      max-poll-records: 500
      properties:
        spring.json.trusted.packages: "com.example.kafka.dto"
        # 消费者心跳
        heartbeat.interval.ms: 3000
        session.timeout.ms: 30000
        max.poll.interval.ms: 300000
        # 每次 fetch 最小字节数,减少空转
        fetch.min.bytes: 1024
        fetch.max.wait.ms: 500
        
    # 监听器容器
    listener:
      # MANUAL_IMMEDIATE:处理完立即 ack
      ack-mode: MANUAL_IMMEDIATE
      # 并发消费者数(建议 = 分区数)
      concurrency: 12
      # 批量消费
      type: batch
      
    # Kafka Streams
    streams:
      application-id: kafka-streams-app
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        commit.interval.ms: 1000
        num.stream.threads: 4

3.3 Kafka 配置类

@Configuration
@EnableKafka
public class KafkaConfig {

    /**
     * 生产者工厂(支持事务)
     */
    @Bean
    public ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
        Map<String, Object> props = kafkaProperties.buildProducerProperties();
        // 事务前缀
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-order-");
        DefaultKafkaProducerFactory<String, Object> factory = 
            new DefaultKafkaProducerFactory<>(props);
        factory.setTransactionIdPrefix("tx-order-");
        return factory;
    }

    /**
     * KafkaTemplate(支持事务)
     */
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    /**
     * 消费者工厂
     */
    @Bean
    public ConsumerFactory<String, Object> consumerFactory(KafkaProperties kafkaProperties) {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        return new DefaultKafkaConsumerFactory<>(props);
    }

    /**
     * 批量消费 ContainerFactory(支持手动 ack + 批量)
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> batchKafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);
        factory.setConcurrency(12);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 默认错误处理器(含重试)
        factory.setCommonErrorHandler(kafkaErrorHandler());
        return factory;
    }

    /**
     * 错误处理:3次重试 + 死信队列
     */
    @Bean
    public DefaultErrorHandler kafkaErrorHandler() {
        // 指数退避重试:初始1秒,最大10秒,重试3次
        ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
        backOff.setInitialInterval(1000L);
        backOff.setMultiplier(2.0);
        backOff.setMaxInterval(10000L);
        
        DefaultErrorHandler handler = new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate(null)), backOff);
        // 不可重试异常(直接进 DLT)
        handler.addNotRetryableExceptions(JsonParseException.class);
        return handler;
    }
}

4. 生产者深度实战

4.1 领域模型

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
    private String orderId;
    private String userId;
    private String productId;
    private BigDecimal amount;
    private String status;        // CREATED / PAID / SHIPPED / COMPLETED / CANCELLED
    private LocalDateTime eventTime;
    private String traceId;       // 链路追踪 ID
}

4.2 生产者服务

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderEventProducer {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    private static final String TOPIC_ORDER = "order-events";
    private static final String TOPIC_ORDER_PAID = "order-paid-events";

    /**
     * 同步发送(等待 Broker 确认)
     */
    public void sendSync(OrderEvent event) {
        try {
            SendResult<String, Object> result = kafkaTemplate
                .send(TOPIC_ORDER, event.getOrderId(), event)
                .get(5, TimeUnit.SECONDS);  // 超时5秒
            
            log.info("[Kafka] 同步发送成功 - topic={}, partition={}, offset={}, key={}",
                result.getRecordMetadata().topic(),
                result.getRecordMetadata().partition(),
                result.getRecordMetadata().offset(),
                event.getOrderId());
        } catch (TimeoutException e) {
            log.error("[Kafka] 发送超时 orderId={}", event.getOrderId(), e);
            throw new KafkaSendException("Kafka发送超时", e);
        } catch (Exception e) {
            log.error("[Kafka] 发送失败 orderId={}", event.getOrderId(), e);
            throw new KafkaSendException("Kafka发送失败", e);
        }
    }

    /**
     * 异步发送(高吞吐场景)
     */
    public void sendAsync(OrderEvent event) {
        ProducerRecord<String, Object> record = new ProducerRecord<>(
            TOPIC_ORDER, 
            null,                           // partition(null=自动分配)
            event.getOrderId(),             // key(用于保证同一订单有序)
            event,
            buildHeaders(event.getTraceId())// 携带 traceId
        );

        kafkaTemplate.send(record)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("[Kafka] 异步发送失败 orderId={}", event.getOrderId(), ex);
                    // 可写入本地补偿表
                } else {
                    log.debug("[Kafka] 异步发送成功 orderId={}, partition={}, offset={}",
                        event.getOrderId(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                }
            });
    }

    /**
     * 指定分区发送(同一用户消息有序)
     */
    public void sendToPartition(OrderEvent event, int partition) {
        ProducerRecord<String, Object> record = 
            new ProducerRecord<>(TOPIC_ORDER, partition, event.getOrderId(), event);
        kafkaTemplate.send(record);
    }

    /**
     * 批量发送
     */
    public void sendBatch(List<OrderEvent> events) {
        events.forEach(event -> kafkaTemplate.send(TOPIC_ORDER, event.getOrderId(), event));
        // flush 强制立即发送,不等 linger.ms
        kafkaTemplate.flush();
    }

    private Iterable<Header> buildHeaders(String traceId) {
        return List.of(new RecordHeader("X-Trace-Id", traceId.getBytes(StandardCharsets.UTF_8)));
    }
}

4.3 自定义分区策略

/**
 * 按用户 ID 分区,保证同一用户消息有序
 */
public class UserIdPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        if (key instanceof String userId) {
            // 一致性哈希,避免扩容后大规模迁移
            return Math.abs(MurmurHash2.hash(userId)) % numPartitions;
        }
        // 默认轮询
        return (int) (Math.random() * numPartitions);
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

5. 消费者深度实战

5.1 批量消费(高吞吐)

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderEventConsumer {

    private final OrderService orderService;

    /**
     * 批量消费 order-events
     * - 并发数 = 分区数 = 12
     * - 手动提交 offset
     */
    @KafkaListener(
        topics = "order-events",
        groupId = "order-service-group",
        containerFactory = "batchKafkaListenerContainerFactory",
        concurrency = "12"
    )
    public void consumeOrderEvents(
            List<ConsumerRecord<String, OrderEvent>> records,
            Acknowledgment ack) {
        
        log.info("[Kafka] 批量消费 {} 条消息", records.size());
        
        try {
            // 按 orderId 分组,批量处理
            Map<String, List<OrderEvent>> grouped = records.stream()
                .collect(Collectors.groupingBy(
                    ConsumerRecord::key,
                    Collectors.mapping(ConsumerRecord::value, Collectors.toList())
                ));
            
            orderService.processBatch(grouped);
            
            // 消费成功,提交 offset
            ack.acknowledge();
            
        } catch (Exception e) {
            log.error("[Kafka] 批量消费失败,回滚 offset", e);
            // 不 ack,触发重试(DefaultErrorHandler 接管)
            throw e;
        }
    }

    /**
     * 单条消费(消费完 ack,保证顺序处理)
     */
    @KafkaListener(
        topics = "order-paid-events",
        groupId = "payment-service-group",
        containerFactory = "batchKafkaListenerContainerFactory"
    )
    public void consumePaymentEvents(
            List<ConsumerRecord<String, OrderEvent>> records,
            Acknowledgment ack,
            @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
            @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

        for (int i = 0; i < records.size(); i++) {
            ConsumerRecord<String, OrderEvent> record = records.get(i);
            log.debug("[Kafka] 消费 partition={}, offset={}, key={}",
                partitions.get(i), offsets.get(i), record.key());

            try {
                orderService.handlePayment(record.value());
            } catch (Exception e) {
                log.error("[Kafka] 处理失败,orderId={}", record.key(), e);
                throw e;
            }
        }
        ack.acknowledge();
    }
}

5.2 消费者 Rebalance 监听

@Component
@Slf4j
public class KafkaRebalanceListener implements ConsumerRebalanceListener {

    private final Set<TopicPartition> currentAssignment = new HashSet<>();

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.warn("[Kafka Rebalance] 分区被撤销: {}", partitions);
        // Rebalance 前提交当前 offset,避免重复消费
        currentAssignment.removeAll(partitions);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.info("[Kafka Rebalance] 分区已分配: {}", partitions);
        currentAssignment.addAll(partitions);
    }

    @Override
    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        log.error("[Kafka Rebalance] 分区丢失(不正常): {}", partitions);
    }
}

6. 精确一次语义(EOS)实战

Kafka 精确一次语义 = 幂等生产者 + 事务 + 幂等消费

6.1 幂等生产者(默认开启)

Kafka 3.x 中 enable.idempotence=true 默认开启,底层原理:

每条消息携带 <PID, Partition, SequenceNumber>
Broker 维护每个 PID 的序列号
重复消息(相同序列号)自动去重

6.2 幂等消费(业务层去重)

@Service
@RequiredArgsConstructor
public class IdempotentOrderService {

    private final OrderRepository orderRepository;
    private final RedisTemplate<String, String> redisTemplate;

    /**
     * 幂等处理订单事件
     * 利用 Redis 记录已处理的消息 ID
     */
    public void processIdempotent(OrderEvent event) {
        String idempotentKey = "kafka:processed:" + event.getOrderId() + ":" + event.getStatus();
        
        // SET NX EX:原子操作,防止并发重复处理
        Boolean isNew = redisTemplate.opsForValue()
            .setIfAbsent(idempotentKey, "1", Duration.ofHours(24));
        
        if (Boolean.FALSE.equals(isNew)) {
            log.warn("[幂等] 重复消息,跳过处理: orderId={}, status={}", 
                event.getOrderId(), event.getStatus());
            return;
        }
        
        // 业务处理
        doProcess(event);
    }

    /**
     * 数据库唯一索引兜底(双重保护)
     */
    @Transactional
    public void doProcess(OrderEvent event) {
        // order_idempotent 表有 UNIQUE(order_id, status) 约束
        try {
            orderRepository.insertIdempotentRecord(event.getOrderId(), event.getStatus());
            // 执行业务逻辑
            orderRepository.updateStatus(event.getOrderId(), event.getStatus());
        } catch (DuplicateKeyException e) {
            log.warn("[幂等] 数据库唯一约束拦截重复消息: {}", event.getOrderId());
        }
    }
}

7. Kafka 事务消息

场景:下单后同时发送多个 Topic 消息,要么全部成功,要么全部失败

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderTransactionService {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;

    /**
     * 事务发送:下单 + 发送多个 Topic 消息原子化
     *
     * 注意:Kafka 事务保证消息原子性,不保证消息与数据库的原子性
     * 如需 DB + Kafka 原子性,需配合"事务性发件箱"模式
     */
    @Transactional
    public void createOrderWithKafkaTransaction(OrderEvent orderEvent) {
        // 1. 保存订单到数据库
        orderRepository.save(orderEvent);
        
        // 2. Kafka 事务:多个消息原子发送
        kafkaTemplate.executeInTransaction(ops -> {
            // 发送订单创建事件
            ops.send("order-events", orderEvent.getOrderId(), orderEvent);
            
            // 发送库存扣减事件
            InventoryEvent inventoryEvent = buildInventoryEvent(orderEvent);
            ops.send("inventory-events", orderEvent.getOrderId(), inventoryEvent);
            
            // 发送积分事件
            PointsEvent pointsEvent = buildPointsEvent(orderEvent);
            ops.send("points-events", orderEvent.getOrderId(), pointsEvent);
            
            return true;
        });
        
        log.info("[Kafka事务] 订单创建事务提交成功: {}", orderEvent.getOrderId());
    }

    /**
     * 事务性发件箱模式(Transactional Outbox)
     * 解决 DB + Kafka 原子性问题
     *
     * 步骤:
     * 1. DB 事务:写业务表 + 写 outbox 表(同一事务)
     * 2. 轮询/CDC:读 outbox 表,发送 Kafka 消息
     * 3. 发送成功:标记 outbox 记录为 sent
     */
    @Transactional
    public void createOrderWithOutbox(OrderEvent orderEvent) {
        // 业务写入
        orderRepository.save(orderEvent);
        
        // 同事务写入 outbox 表(消息尚未发送)
        OutboxEvent outbox = OutboxEvent.builder()
            .aggregateId(orderEvent.getOrderId())
            .aggregateType("ORDER")
            .eventType("ORDER_CREATED")
            .payload(JsonUtils.toJson(orderEvent))
            .status("PENDING")
            .createdAt(LocalDateTime.now())
            .build();
        outboxRepository.save(outbox);
        
        // Debezium CDC 或 @Scheduled 轮询 outbox 表发送 Kafka
    }
}

7.1 发件箱轮询器

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxPoller {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Scheduled(fixedDelay = 1000)  // 每秒轮询
    @Transactional
    public void pollAndPublish() {
        List<OutboxEvent> pending = outboxRepository.findPendingEvents(100);
        
        for (OutboxEvent event : pending) {
            try {
                kafkaTemplate.send(
                    resolveTopicName(event.getEventType()),
                    event.getAggregateId(),
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);
                
                // 标记已发送
                outboxRepository.markSent(event.getId());
                
            } catch (Exception e) {
                log.error("[Outbox] 发送失败: id={}", event.getId(), e);
                outboxRepository.incrementRetry(event.getId());
            }
        }
    }

    private String resolveTopicName(String eventType) {
        return switch (eventType) {
            case "ORDER_CREATED" -> "order-events";
            case "ORDER_PAID" -> "order-paid-events";
            default -> "default-events";
        };
    }
}

8. Kafka Streams 实时流处理

与上篇 Flink 文章形成互补:轻量流处理用 Kafka Streams,大规模流处理用 Flink

8.1 实时订单统计(滚动窗口)

@Configuration
@EnableKafkaStreams
public class OrderStreamConfig {

    /**
     * 实时统计每分钟各用户下单金额
     * 输入:order-events Topic
     * 输出:order-stats-per-minute Topic
     */
    @Bean
    public KStream<String, OrderEvent> orderStatsStream(StreamsBuilder builder) {
        // 1. 读取源 Topic(使用自定义 Serde)
        KStream<String, OrderEvent> orderStream = builder.stream(
            "order-events",
            Consumed.with(Serdes.String(), orderEventSerde())
        );

        // 2. 过滤:只统计已支付订单
        KStream<String, OrderEvent> paidOrders = orderStream
            .filter((key, value) -> "PAID".equals(value.getStatus()));

        // 3. 按 userId 分组
        KGroupedStream<String, OrderEvent> groupedByUser = paidOrders
            .groupBy((key, value) -> value.getUserId(),
                Grouped.with(Serdes.String(), orderEventSerde()));

        // 4. 1分钟滚动窗口聚合(累计金额)
        KTable<Windowed<String>, Double> minuteStats = groupedByUser
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .aggregate(
                () -> 0.0,
                (userId, event, total) -> total + event.getAmount().doubleValue(),
                Materialized.<String, Double, WindowStore<Bytes, byte[]>>as("order-minute-stats")
                    .withValueSerde(Serdes.Double())
            );

        // 5. 输出到统计 Topic
        minuteStats.toStream()
            .map((windowedKey, total) -> KeyValue.pair(
                windowedKey.key() + "@" + windowedKey.window().startTime(),
                total.toString()
            ))
            .to("order-stats-per-minute", Produced.with(Serdes.String(), Serdes.String()));

        return orderStream;
    }

    /**
     * 实时告警:单笔金额超过 10000 元
     */
    @Bean
    public KStream<String, OrderEvent> largeOrderAlertStream(StreamsBuilder builder) {
        KStream<String, OrderEvent> stream = builder.stream(
            "order-events",
            Consumed.with(Serdes.String(), orderEventSerde())
        );

        stream.filter((key, order) -> 
                order.getAmount().compareTo(new BigDecimal("10000")) > 0)
            .peek((key, order) -> 
                log.warn("[大额告警] orderId={}, amount={}", order.getOrderId(), order.getAmount()))
            .to("large-order-alerts", Produced.with(Serdes.String(), orderEventSerde()));

        return stream;
    }

    /**
     * 双流 Join:订单流 + 支付流,合并订单信息
     */
    @Bean
    public KStream<String, String> orderPaymentJoinStream(StreamsBuilder builder) {
        KStream<String, OrderEvent> orderStream = builder.stream("order-events",
            Consumed.with(Serdes.String(), orderEventSerde()));

        KStream<String, PaymentEvent> paymentStream = builder.stream("payment-events",
            Consumed.with(Serdes.String(), paymentEventSerde()));

        // 双流 Join(5分钟窗口内)
        KStream<String, String> joined = orderStream.join(
            paymentStream,
            (order, payment) -> String.format("orderId=%s,amount=%s,payMethod=%s",
                order.getOrderId(), order.getAmount(), payment.getPayMethod()),
            JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
            StreamJoined.with(Serdes.String(), orderEventSerde(), paymentEventSerde())
        );

        joined.to("order-payment-joined");
        return joined;
    }

    private Serde<OrderEvent> orderEventSerde() {
        return new JsonSerde<>(OrderEvent.class);
    }

    private Serde<PaymentEvent> paymentEventSerde() {
        return new JsonSerde<>(PaymentEvent.class);
    }
}

9. 死信队列(DLT)与重试机制

9.1 DLT 自动配置

Spring Kafka 的 DeadLetterPublishingRecoverer 会自动将重试耗尽的消息发到 {原topic}.DLT

@Bean
public DefaultErrorHandler kafkaErrorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
    // 发送到 DLT Topic
    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
        (record, ex) -> {
            // 自定义 DLT Topic 名
            if (ex.getCause() instanceof DeserializationException) {
                return new TopicPartition(record.topic() + ".DLT.PARSE_ERROR", -1);
            }
            return new TopicPartition(record.topic() + ".DLT", -1);
        });

    // 重试策略:指数退避,最大3次
    ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
    backOff.setInitialInterval(1_000L);
    backOff.setMultiplier(2.0);

    DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, backOff);
    errorHandler.addNotRetryableExceptions(
        IllegalArgumentException.class,    // 参数错误不重试
        DeserializationException.class     // 反序列化错误不重试
    );
    return errorHandler;
}

9.2 DLT 消费者(人工处理/告警)

@Component
@Slf4j
public class DeadLetterConsumer {

    @KafkaListener(topicPattern = ".*\\.DLT", groupId = "dlt-group")
    public void consumeDlt(
            ConsumerRecord<String, Object> record,
            @Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment ack) {

        log.error("[DLT] 死信消息 - originalTopic={}, originalOffset={}, key={}, error={}",
            originalTopic, originalOffset, record.key(), exceptionMessage);

        // 1. 持久化到 DB(人工排查)
        dltRepository.save(DeadLetterRecord.builder()
            .originalTopic(originalTopic)
            .originalOffset(originalOffset)
            .messageKey(record.key().toString())
            .payload(record.value().toString())
            .errorMessage(exceptionMessage)
            .occurredAt(LocalDateTime.now())
            .build());

        // 2. 发送告警通知(钉钉/企业微信)
        alertService.sendAlert(String.format("Kafka死信消息: topic=%s, key=%s",
            originalTopic, record.key()));

        ack.acknowledge();
    }
}

10. 生产调优:高吞吐与低延迟

10.1 Producer 调优

# 高吞吐模式(牺牲部分延迟)
batch.size=131072           # 128KB 批次
linger.ms=20                # 等 20ms 凑批
compression.type=lz4        # lz4 比 snappy 更快
buffer.memory=134217728     # 128MB 缓冲区
max.in.flight.requests.per.connection=5

# 低延迟模式(牺牲吞吐)
batch.size=1
linger.ms=0
compression.type=none

10.2 Consumer 调优

# 高吞吐消费
fetch.min.bytes=102400     # 100KB,减少空转 fetch
fetch.max.wait.ms=500
max.poll.records=1000      # 每批最多 1000 条

# 避免 Rebalance 风暴
max.poll.interval.ms=600000  # 处理时间上限 10 分钟
session.timeout.ms=45000
heartbeat.interval.ms=15000

10.3 Broker 调优

# 增大 socket 缓冲
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576

# 日志刷盘(异步,性能优先)
log.flush.interval.messages=100000
log.flush.interval.ms=1000

# 副本同步
replica.fetch.max.bytes=10485760
replica.lag.time.max.ms=30000

# 日志压缩(减少磁盘空间)
log.cleanup.policy=delete,compact
log.retention.hours=168     # 7天
log.segment.bytes=1073741824 # 1GB 一个 segment

10.4 性能压测参考

# 生产者压测(100万条,1KB 消息)
kafka-producer-perf-test.sh \
  --topic order-events \
  --num-records 1000000 \
  --record-size 1024 \
  --throughput -1 \
  --producer-props bootstrap.servers=kafka1:9092 \
    batch.size=65536 linger.ms=5 compression.type=snappy

# 消费者压测
kafka-consumer-perf-test.sh \
  --bootstrap-server kafka1:9092 \
  --topic order-events \
  --messages 1000000 \
  --group perf-group

典型压测结果(3节点集群,12分区,3副本):

场景吞吐量P99延迟
高吞吐(linger=20ms)~800K msg/s25ms
低延迟(linger=0)~200K msg/s3ms
带压缩(snappy)~600K msg/s18ms

11. 监控:Prometheus + Grafana

11.1 Actuator + Micrometer 暴露指标

management:
  endpoints:
    web:
      exposure:
        include: health,info,prometheus,metrics
  metrics:
    tags:
      application: ${spring.application.name}
    export:
      prometheus:
        enabled: true

11.2 关键 Kafka 监控指标

# Prometheus 采集规则(与上期监控文章衔接)
- job_name: 'kafka-app'
  metrics_path: '/actuator/prometheus'
  static_configs:
    - targets: ['order-service:8080']

关键 Grafana 面板指标:

指标PromQL告警阈值
消费者 Lagkafka_consumer_fetch_manager_records_lag> 10000
生产者发送速率rate(kafka_producer_record_send_total[1m])-
消费者吞吐rate(kafka_consumer_fetch_manager_records_consumed_total[1m])-
GC 暂停时间jvm_gc_pause_seconds> 0.5s
消费处理时间kafka_listener_seconds_max> 1s

11.3 Consumer Lag 告警

# AlertManager 规则
groups:
  - name: kafka-alerts
    rules:
      - alert: KafkaConsumerLagHigh
        expr: kafka_consumer_fetch_manager_records_lag > 50000
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "消费者积压严重"
          description: "{{ $labels.topic }} 消费者组 {{ $labels.group }} lag={{ $value }}"

      - alert: KafkaProducerError
        expr: rate(kafka_producer_record_error_total[1m]) > 0
        for: 30s
        labels:
          severity: warning
        annotations:
          summary: "Kafka 生产者发送错误"

12. 总结与最佳实践

核心原则

原则说明
幂等优先消费者必须幂等,Redis + DB 双重保护
手动提交 Offset禁用 enable.auto.commit,业务成功后再 ack
合理设置分区数分区数 ≥ 消费者并发数,通常 = CPU核数 × 2
避免大消息单条消息 < 1MB,超大消息使用 BLOB 存储 + Kafka 传 URL
监控 LagConsumer Lag 是最核心的健康指标
死信兜底所有消费者必须配置 DLT,避免消息永久阻塞

方案选型建议

高吞吐日志/埋点 ──────────────────► Kafka(唯一选择)
电商订单/支付(金融级)────────────► RocketMQ(事务消息更成熟)
通用业务解耦(中低频)────────────► RabbitMQ(运维简单)
实时流处理(复杂 CEP)────────────► Kafka + Flink(参考上期文章)
轻量流处理(统计/告警)───────────► Kafka Streams(无额外组件)

技术栈全景图

┌─────────────────────────────────────────────────────────┐
│                   生产者(Spring Boot 3.x)              │
│  同步发送 / 异步发送 / 事务发送 / Outbox 模式            │
└──────────────────────┬──────────────────────────────────┘
                       │
          ┌────────────▼────────────┐
          │   Kafka Cluster(KRaft)│
          │   3 Broker / 12 分区    │
          │   副本数=3, min.isr=2   │
          └────────────┬────────────┘
                       │
        ┌──────────────┼──────────────┐
        │              │              │
   ┌────▼────┐   ┌─────▼─────┐  ┌───▼────┐
   │消费者组1│   │Kafka Streams│ │消费者组N│
   │批量消费 │   │实时统计/告警│ │单条消费 │
   │手动 ack │   │双流 Join   │ │幂等处理 │
   └────┬────┘   └─────┬─────┘  └───┬────┘
        │              │             │
   ┌────▼──────────────▼─────────────▼────┐
   │         Prometheus + Grafana 监控      │
   │    Consumer Lag / 吞吐率 / 错误率      │
   └───────────────────────────────────────┘

本文完整代码 已按以上示例整理,可直接集成到 Spring Boot 3.x 项目中。如有问题欢迎在评论区讨论!


参考资料:

  • Apache Kafka 3.6 官方文档
  • Spring for Apache Kafka 3.1.x Reference
  • Confluent Platform 7.6 Documentation
  • 《Kafka 权威指南(第2版)》
0

评论区