摘要:本文深入讲解如何在 Spring Boot 3.x 项目中集成 Apache Kafka 3.x,从基础配置到生产调优,覆盖高吞吐发送、消费者组管理、精确一次语义(EOS)、事务消息、KStream 流处理、KRaft 模式部署,结合 Docker Compose 搭建完整演示环境,助你打造企业级 Kafka 消息平台。
目录
- 背景与选型
- 环境搭建:KRaft 模式 Docker Compose
- Spring Boot 3.x 集成 Kafka
- 生产者深度实战
- 消费者深度实战
- 精确一次语义(EOS)实战
- Kafka 事务消息
- Kafka Streams 实时流处理
- 死信队列(DLT)与重试机制
- 生产调优:高吞吐与低延迟
- 监控:Prometheus + Grafana
- 总结与最佳实践
1. 背景与选型
为什么选 Kafka?
在微服务架构中,消息队列是解耦、削峰填谷的核心组件。与 RabbitMQ、RocketMQ 相比,Kafka 有以下优势:
| 维度 | Kafka 3.x | RocketMQ 5.x | RabbitMQ 3.x |
|---|---|---|---|
| 吞吐量 | 百万级/秒 | 十万级/秒 | 万级/秒 |
| 持久化方式 | 顺序写磁盘 | 混合存储 | 内存+磁盘 |
| 流处理 | Kafka Streams 内置 | 需 Flink 集成 | 无 |
| 精确一次 | 原生支持 | 需 Seata | 不支持 |
| 适用场景 | 大数据/日志/事件流 | 金融/订单 | 通用消息 |
Kafka 3.x 核心新特性
- KRaft 模式:彻底移除 ZooKeeper 依赖,自管理 metadata,Kafka 3.3+ 默认支持
- 幂等生产者增强:默认
enable.idempotence=true - Tiered Storage(3.6+):冷热数据分离,超长时间保留
- Kafka Streams DSL 优化:窗口聚合、表连接更稳定
2. 环境搭建:KRaft 模式 Docker Compose
Kafka 3.x 推荐使用 KRaft 模式,无需 ZooKeeper。
docker-compose.yml
version: '3.8'
services:
kafka1:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/var/lib/kafka/data'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
volumes:
- kafka1-data:/var/lib/kafka/data
kafka2:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka2
container_name: kafka2
ports:
- "9094:9092"
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka2:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/var/lib/kafka/data'
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
volumes:
- kafka2-data:/var/lib/kafka/data
kafka3:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka3
container_name: kafka3
ports:
- "9095:9092"
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka3:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/var/lib/kafka/data'
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
volumes:
- kafka3-data:/var/lib/kafka/data
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local-kraft
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092,kafka2:9092,kafka3:9092
depends_on:
- kafka1
- kafka2
- kafka3
volumes:
kafka1-data:
kafka2-data:
kafka3-data:
启动:
docker compose up -d
# 创建 Topic(3副本,12分区,适合高并发)
docker exec -it kafka1 kafka-topics --bootstrap-server kafka1:9092 \
--create --topic order-events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 # 7天
# 查看 Topic
docker exec -it kafka1 kafka-topics --bootstrap-server kafka1:9092 --describe --topic order-events
3. Spring Boot 3.x 集成 Kafka
3.1 依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Avro Schema Registry(可选,用于强类型消息) -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<!-- Kafka Streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Micrometer Kafka 监控 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies>
3.2 核心配置 application.yml
spring:
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 确认机制:all = 等待所有 ISR 副本确认(最强保证)
acks: all
# 幂等生产者(Kafka 3.x 默认开启)
properties:
enable.idempotence: true
# 批量发送
batch.size: 65536 # 64KB
linger.ms: 5 # 批次延迟,提高吞吐
buffer.memory: 67108864 # 生产者缓冲区 64MB
compression.type: snappy # 压缩,降低网络传输
max.in.flight.requests.per.connection: 5
retries: 3
retry.backoff.ms: 100
# 消费者配置
consumer:
group-id: order-service-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 手动提交 offset,保证消费可靠性
enable-auto-commit: false
auto-offset-reset: earliest
# 每次拉取最大条数
max-poll-records: 500
properties:
spring.json.trusted.packages: "com.example.kafka.dto"
# 消费者心跳
heartbeat.interval.ms: 3000
session.timeout.ms: 30000
max.poll.interval.ms: 300000
# 每次 fetch 最小字节数,减少空转
fetch.min.bytes: 1024
fetch.max.wait.ms: 500
# 监听器容器
listener:
# MANUAL_IMMEDIATE:处理完立即 ack
ack-mode: MANUAL_IMMEDIATE
# 并发消费者数(建议 = 分区数)
concurrency: 12
# 批量消费
type: batch
# Kafka Streams
streams:
application-id: kafka-streams-app
properties:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 1000
num.stream.threads: 4
3.3 Kafka 配置类
@Configuration
@EnableKafka
public class KafkaConfig {
/**
* 生产者工厂(支持事务)
*/
@Bean
public ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
Map<String, Object> props = kafkaProperties.buildProducerProperties();
// 事务前缀
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-order-");
DefaultKafkaProducerFactory<String, Object> factory =
new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("tx-order-");
return factory;
}
/**
* KafkaTemplate(支持事务)
*/
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
/**
* 消费者工厂
*/
@Bean
public ConsumerFactory<String, Object> consumerFactory(KafkaProperties kafkaProperties) {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* 批量消费 ContainerFactory(支持手动 ack + 批量)
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> batchKafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
factory.setConcurrency(12);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 默认错误处理器(含重试)
factory.setCommonErrorHandler(kafkaErrorHandler());
return factory;
}
/**
* 错误处理:3次重试 + 死信队列
*/
@Bean
public DefaultErrorHandler kafkaErrorHandler() {
// 指数退避重试:初始1秒,最大10秒,重试3次
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
backOff.setInitialInterval(1000L);
backOff.setMultiplier(2.0);
backOff.setMaxInterval(10000L);
DefaultErrorHandler handler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate(null)), backOff);
// 不可重试异常(直接进 DLT)
handler.addNotRetryableExceptions(JsonParseException.class);
return handler;
}
}
4. 生产者深度实战
4.1 领域模型
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
private String orderId;
private String userId;
private String productId;
private BigDecimal amount;
private String status; // CREATED / PAID / SHIPPED / COMPLETED / CANCELLED
private LocalDateTime eventTime;
private String traceId; // 链路追踪 ID
}
4.2 生产者服务
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderEventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC_ORDER = "order-events";
private static final String TOPIC_ORDER_PAID = "order-paid-events";
/**
* 同步发送(等待 Broker 确认)
*/
public void sendSync(OrderEvent event) {
try {
SendResult<String, Object> result = kafkaTemplate
.send(TOPIC_ORDER, event.getOrderId(), event)
.get(5, TimeUnit.SECONDS); // 超时5秒
log.info("[Kafka] 同步发送成功 - topic={}, partition={}, offset={}, key={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset(),
event.getOrderId());
} catch (TimeoutException e) {
log.error("[Kafka] 发送超时 orderId={}", event.getOrderId(), e);
throw new KafkaSendException("Kafka发送超时", e);
} catch (Exception e) {
log.error("[Kafka] 发送失败 orderId={}", event.getOrderId(), e);
throw new KafkaSendException("Kafka发送失败", e);
}
}
/**
* 异步发送(高吞吐场景)
*/
public void sendAsync(OrderEvent event) {
ProducerRecord<String, Object> record = new ProducerRecord<>(
TOPIC_ORDER,
null, // partition(null=自动分配)
event.getOrderId(), // key(用于保证同一订单有序)
event,
buildHeaders(event.getTraceId())// 携带 traceId
);
kafkaTemplate.send(record)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("[Kafka] 异步发送失败 orderId={}", event.getOrderId(), ex);
// 可写入本地补偿表
} else {
log.debug("[Kafka] 异步发送成功 orderId={}, partition={}, offset={}",
event.getOrderId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
/**
* 指定分区发送(同一用户消息有序)
*/
public void sendToPartition(OrderEvent event, int partition) {
ProducerRecord<String, Object> record =
new ProducerRecord<>(TOPIC_ORDER, partition, event.getOrderId(), event);
kafkaTemplate.send(record);
}
/**
* 批量发送
*/
public void sendBatch(List<OrderEvent> events) {
events.forEach(event -> kafkaTemplate.send(TOPIC_ORDER, event.getOrderId(), event));
// flush 强制立即发送,不等 linger.ms
kafkaTemplate.flush();
}
private Iterable<Header> buildHeaders(String traceId) {
return List.of(new RecordHeader("X-Trace-Id", traceId.getBytes(StandardCharsets.UTF_8)));
}
}
4.3 自定义分区策略
/**
* 按用户 ID 分区,保证同一用户消息有序
*/
public class UserIdPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key instanceof String userId) {
// 一致性哈希,避免扩容后大规模迁移
return Math.abs(MurmurHash2.hash(userId)) % numPartitions;
}
// 默认轮询
return (int) (Math.random() * numPartitions);
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
5. 消费者深度实战
5.1 批量消费(高吞吐)
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderEventConsumer {
private final OrderService orderService;
/**
* 批量消费 order-events
* - 并发数 = 分区数 = 12
* - 手动提交 offset
*/
@KafkaListener(
topics = "order-events",
groupId = "order-service-group",
containerFactory = "batchKafkaListenerContainerFactory",
concurrency = "12"
)
public void consumeOrderEvents(
List<ConsumerRecord<String, OrderEvent>> records,
Acknowledgment ack) {
log.info("[Kafka] 批量消费 {} 条消息", records.size());
try {
// 按 orderId 分组,批量处理
Map<String, List<OrderEvent>> grouped = records.stream()
.collect(Collectors.groupingBy(
ConsumerRecord::key,
Collectors.mapping(ConsumerRecord::value, Collectors.toList())
));
orderService.processBatch(grouped);
// 消费成功,提交 offset
ack.acknowledge();
} catch (Exception e) {
log.error("[Kafka] 批量消费失败,回滚 offset", e);
// 不 ack,触发重试(DefaultErrorHandler 接管)
throw e;
}
}
/**
* 单条消费(消费完 ack,保证顺序处理)
*/
@KafkaListener(
topics = "order-paid-events",
groupId = "payment-service-group",
containerFactory = "batchKafkaListenerContainerFactory"
)
public void consumePaymentEvents(
List<ConsumerRecord<String, OrderEvent>> records,
Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
for (int i = 0; i < records.size(); i++) {
ConsumerRecord<String, OrderEvent> record = records.get(i);
log.debug("[Kafka] 消费 partition={}, offset={}, key={}",
partitions.get(i), offsets.get(i), record.key());
try {
orderService.handlePayment(record.value());
} catch (Exception e) {
log.error("[Kafka] 处理失败,orderId={}", record.key(), e);
throw e;
}
}
ack.acknowledge();
}
}
5.2 消费者 Rebalance 监听
@Component
@Slf4j
public class KafkaRebalanceListener implements ConsumerRebalanceListener {
private final Set<TopicPartition> currentAssignment = new HashSet<>();
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.warn("[Kafka Rebalance] 分区被撤销: {}", partitions);
// Rebalance 前提交当前 offset,避免重复消费
currentAssignment.removeAll(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("[Kafka Rebalance] 分区已分配: {}", partitions);
currentAssignment.addAll(partitions);
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
log.error("[Kafka Rebalance] 分区丢失(不正常): {}", partitions);
}
}
6. 精确一次语义(EOS)实战
Kafka 精确一次语义 = 幂等生产者 + 事务 + 幂等消费。
6.1 幂等生产者(默认开启)
Kafka 3.x 中 enable.idempotence=true 默认开启,底层原理:
每条消息携带 <PID, Partition, SequenceNumber>
Broker 维护每个 PID 的序列号
重复消息(相同序列号)自动去重
6.2 幂等消费(业务层去重)
@Service
@RequiredArgsConstructor
public class IdempotentOrderService {
private final OrderRepository orderRepository;
private final RedisTemplate<String, String> redisTemplate;
/**
* 幂等处理订单事件
* 利用 Redis 记录已处理的消息 ID
*/
public void processIdempotent(OrderEvent event) {
String idempotentKey = "kafka:processed:" + event.getOrderId() + ":" + event.getStatus();
// SET NX EX:原子操作,防止并发重复处理
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(idempotentKey, "1", Duration.ofHours(24));
if (Boolean.FALSE.equals(isNew)) {
log.warn("[幂等] 重复消息,跳过处理: orderId={}, status={}",
event.getOrderId(), event.getStatus());
return;
}
// 业务处理
doProcess(event);
}
/**
* 数据库唯一索引兜底(双重保护)
*/
@Transactional
public void doProcess(OrderEvent event) {
// order_idempotent 表有 UNIQUE(order_id, status) 约束
try {
orderRepository.insertIdempotentRecord(event.getOrderId(), event.getStatus());
// 执行业务逻辑
orderRepository.updateStatus(event.getOrderId(), event.getStatus());
} catch (DuplicateKeyException e) {
log.warn("[幂等] 数据库唯一约束拦截重复消息: {}", event.getOrderId());
}
}
}
7. Kafka 事务消息
场景:下单后同时发送多个 Topic 消息,要么全部成功,要么全部失败。
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderTransactionService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
/**
* 事务发送:下单 + 发送多个 Topic 消息原子化
*
* 注意:Kafka 事务保证消息原子性,不保证消息与数据库的原子性
* 如需 DB + Kafka 原子性,需配合"事务性发件箱"模式
*/
@Transactional
public void createOrderWithKafkaTransaction(OrderEvent orderEvent) {
// 1. 保存订单到数据库
orderRepository.save(orderEvent);
// 2. Kafka 事务:多个消息原子发送
kafkaTemplate.executeInTransaction(ops -> {
// 发送订单创建事件
ops.send("order-events", orderEvent.getOrderId(), orderEvent);
// 发送库存扣减事件
InventoryEvent inventoryEvent = buildInventoryEvent(orderEvent);
ops.send("inventory-events", orderEvent.getOrderId(), inventoryEvent);
// 发送积分事件
PointsEvent pointsEvent = buildPointsEvent(orderEvent);
ops.send("points-events", orderEvent.getOrderId(), pointsEvent);
return true;
});
log.info("[Kafka事务] 订单创建事务提交成功: {}", orderEvent.getOrderId());
}
/**
* 事务性发件箱模式(Transactional Outbox)
* 解决 DB + Kafka 原子性问题
*
* 步骤:
* 1. DB 事务:写业务表 + 写 outbox 表(同一事务)
* 2. 轮询/CDC:读 outbox 表,发送 Kafka 消息
* 3. 发送成功:标记 outbox 记录为 sent
*/
@Transactional
public void createOrderWithOutbox(OrderEvent orderEvent) {
// 业务写入
orderRepository.save(orderEvent);
// 同事务写入 outbox 表(消息尚未发送)
OutboxEvent outbox = OutboxEvent.builder()
.aggregateId(orderEvent.getOrderId())
.aggregateType("ORDER")
.eventType("ORDER_CREATED")
.payload(JsonUtils.toJson(orderEvent))
.status("PENDING")
.createdAt(LocalDateTime.now())
.build();
outboxRepository.save(outbox);
// Debezium CDC 或 @Scheduled 轮询 outbox 表发送 Kafka
}
}
7.1 发件箱轮询器
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxPoller {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Scheduled(fixedDelay = 1000) // 每秒轮询
@Transactional
public void pollAndPublish() {
List<OutboxEvent> pending = outboxRepository.findPendingEvents(100);
for (OutboxEvent event : pending) {
try {
kafkaTemplate.send(
resolveTopicName(event.getEventType()),
event.getAggregateId(),
event.getPayload()
).get(5, TimeUnit.SECONDS);
// 标记已发送
outboxRepository.markSent(event.getId());
} catch (Exception e) {
log.error("[Outbox] 发送失败: id={}", event.getId(), e);
outboxRepository.incrementRetry(event.getId());
}
}
}
private String resolveTopicName(String eventType) {
return switch (eventType) {
case "ORDER_CREATED" -> "order-events";
case "ORDER_PAID" -> "order-paid-events";
default -> "default-events";
};
}
}
8. Kafka Streams 实时流处理
与上篇 Flink 文章形成互补:轻量流处理用 Kafka Streams,大规模流处理用 Flink。
8.1 实时订单统计(滚动窗口)
@Configuration
@EnableKafkaStreams
public class OrderStreamConfig {
/**
* 实时统计每分钟各用户下单金额
* 输入:order-events Topic
* 输出:order-stats-per-minute Topic
*/
@Bean
public KStream<String, OrderEvent> orderStatsStream(StreamsBuilder builder) {
// 1. 读取源 Topic(使用自定义 Serde)
KStream<String, OrderEvent> orderStream = builder.stream(
"order-events",
Consumed.with(Serdes.String(), orderEventSerde())
);
// 2. 过滤:只统计已支付订单
KStream<String, OrderEvent> paidOrders = orderStream
.filter((key, value) -> "PAID".equals(value.getStatus()));
// 3. 按 userId 分组
KGroupedStream<String, OrderEvent> groupedByUser = paidOrders
.groupBy((key, value) -> value.getUserId(),
Grouped.with(Serdes.String(), orderEventSerde()));
// 4. 1分钟滚动窗口聚合(累计金额)
KTable<Windowed<String>, Double> minuteStats = groupedByUser
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.aggregate(
() -> 0.0,
(userId, event, total) -> total + event.getAmount().doubleValue(),
Materialized.<String, Double, WindowStore<Bytes, byte[]>>as("order-minute-stats")
.withValueSerde(Serdes.Double())
);
// 5. 输出到统计 Topic
minuteStats.toStream()
.map((windowedKey, total) -> KeyValue.pair(
windowedKey.key() + "@" + windowedKey.window().startTime(),
total.toString()
))
.to("order-stats-per-minute", Produced.with(Serdes.String(), Serdes.String()));
return orderStream;
}
/**
* 实时告警:单笔金额超过 10000 元
*/
@Bean
public KStream<String, OrderEvent> largeOrderAlertStream(StreamsBuilder builder) {
KStream<String, OrderEvent> stream = builder.stream(
"order-events",
Consumed.with(Serdes.String(), orderEventSerde())
);
stream.filter((key, order) ->
order.getAmount().compareTo(new BigDecimal("10000")) > 0)
.peek((key, order) ->
log.warn("[大额告警] orderId={}, amount={}", order.getOrderId(), order.getAmount()))
.to("large-order-alerts", Produced.with(Serdes.String(), orderEventSerde()));
return stream;
}
/**
* 双流 Join:订单流 + 支付流,合并订单信息
*/
@Bean
public KStream<String, String> orderPaymentJoinStream(StreamsBuilder builder) {
KStream<String, OrderEvent> orderStream = builder.stream("order-events",
Consumed.with(Serdes.String(), orderEventSerde()));
KStream<String, PaymentEvent> paymentStream = builder.stream("payment-events",
Consumed.with(Serdes.String(), paymentEventSerde()));
// 双流 Join(5分钟窗口内)
KStream<String, String> joined = orderStream.join(
paymentStream,
(order, payment) -> String.format("orderId=%s,amount=%s,payMethod=%s",
order.getOrderId(), order.getAmount(), payment.getPayMethod()),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(), orderEventSerde(), paymentEventSerde())
);
joined.to("order-payment-joined");
return joined;
}
private Serde<OrderEvent> orderEventSerde() {
return new JsonSerde<>(OrderEvent.class);
}
private Serde<PaymentEvent> paymentEventSerde() {
return new JsonSerde<>(PaymentEvent.class);
}
}
9. 死信队列(DLT)与重试机制
9.1 DLT 自动配置
Spring Kafka 的 DeadLetterPublishingRecoverer 会自动将重试耗尽的消息发到 {原topic}.DLT:
@Bean
public DefaultErrorHandler kafkaErrorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
// 发送到 DLT Topic
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> {
// 自定义 DLT Topic 名
if (ex.getCause() instanceof DeserializationException) {
return new TopicPartition(record.topic() + ".DLT.PARSE_ERROR", -1);
}
return new TopicPartition(record.topic() + ".DLT", -1);
});
// 重试策略:指数退避,最大3次
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
backOff.setInitialInterval(1_000L);
backOff.setMultiplier(2.0);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, backOff);
errorHandler.addNotRetryableExceptions(
IllegalArgumentException.class, // 参数错误不重试
DeserializationException.class // 反序列化错误不重试
);
return errorHandler;
}
9.2 DLT 消费者(人工处理/告警)
@Component
@Slf4j
public class DeadLetterConsumer {
@KafkaListener(topicPattern = ".*\\.DLT", groupId = "dlt-group")
public void consumeDlt(
ConsumerRecord<String, Object> record,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.ORIGINAL_OFFSET) long originalOffset,
Acknowledgment ack) {
log.error("[DLT] 死信消息 - originalTopic={}, originalOffset={}, key={}, error={}",
originalTopic, originalOffset, record.key(), exceptionMessage);
// 1. 持久化到 DB(人工排查)
dltRepository.save(DeadLetterRecord.builder()
.originalTopic(originalTopic)
.originalOffset(originalOffset)
.messageKey(record.key().toString())
.payload(record.value().toString())
.errorMessage(exceptionMessage)
.occurredAt(LocalDateTime.now())
.build());
// 2. 发送告警通知(钉钉/企业微信)
alertService.sendAlert(String.format("Kafka死信消息: topic=%s, key=%s",
originalTopic, record.key()));
ack.acknowledge();
}
}
10. 生产调优:高吞吐与低延迟
10.1 Producer 调优
# 高吞吐模式(牺牲部分延迟)
batch.size=131072 # 128KB 批次
linger.ms=20 # 等 20ms 凑批
compression.type=lz4 # lz4 比 snappy 更快
buffer.memory=134217728 # 128MB 缓冲区
max.in.flight.requests.per.connection=5
# 低延迟模式(牺牲吞吐)
batch.size=1
linger.ms=0
compression.type=none
10.2 Consumer 调优
# 高吞吐消费
fetch.min.bytes=102400 # 100KB,减少空转 fetch
fetch.max.wait.ms=500
max.poll.records=1000 # 每批最多 1000 条
# 避免 Rebalance 风暴
max.poll.interval.ms=600000 # 处理时间上限 10 分钟
session.timeout.ms=45000
heartbeat.interval.ms=15000
10.3 Broker 调优
# 增大 socket 缓冲
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
# 日志刷盘(异步,性能优先)
log.flush.interval.messages=100000
log.flush.interval.ms=1000
# 副本同步
replica.fetch.max.bytes=10485760
replica.lag.time.max.ms=30000
# 日志压缩(减少磁盘空间)
log.cleanup.policy=delete,compact
log.retention.hours=168 # 7天
log.segment.bytes=1073741824 # 1GB 一个 segment
10.4 性能压测参考
# 生产者压测(100万条,1KB 消息)
kafka-producer-perf-test.sh \
--topic order-events \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=kafka1:9092 \
batch.size=65536 linger.ms=5 compression.type=snappy
# 消费者压测
kafka-consumer-perf-test.sh \
--bootstrap-server kafka1:9092 \
--topic order-events \
--messages 1000000 \
--group perf-group
典型压测结果(3节点集群,12分区,3副本):
| 场景 | 吞吐量 | P99延迟 |
|---|---|---|
| 高吞吐(linger=20ms) | ~800K msg/s | 25ms |
| 低延迟(linger=0) | ~200K msg/s | 3ms |
| 带压缩(snappy) | ~600K msg/s | 18ms |
11. 监控:Prometheus + Grafana
11.1 Actuator + Micrometer 暴露指标
management:
endpoints:
web:
exposure:
include: health,info,prometheus,metrics
metrics:
tags:
application: ${spring.application.name}
export:
prometheus:
enabled: true
11.2 关键 Kafka 监控指标
# Prometheus 采集规则(与上期监控文章衔接)
- job_name: 'kafka-app'
metrics_path: '/actuator/prometheus'
static_configs:
- targets: ['order-service:8080']
关键 Grafana 面板指标:
| 指标 | PromQL | 告警阈值 |
|---|---|---|
| 消费者 Lag | kafka_consumer_fetch_manager_records_lag | > 10000 |
| 生产者发送速率 | rate(kafka_producer_record_send_total[1m]) | - |
| 消费者吞吐 | rate(kafka_consumer_fetch_manager_records_consumed_total[1m]) | - |
| GC 暂停时间 | jvm_gc_pause_seconds | > 0.5s |
| 消费处理时间 | kafka_listener_seconds_max | > 1s |
11.3 Consumer Lag 告警
# AlertManager 规则
groups:
- name: kafka-alerts
rules:
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_fetch_manager_records_lag > 50000
for: 2m
labels:
severity: critical
annotations:
summary: "消费者积压严重"
description: "{{ $labels.topic }} 消费者组 {{ $labels.group }} lag={{ $value }}"
- alert: KafkaProducerError
expr: rate(kafka_producer_record_error_total[1m]) > 0
for: 30s
labels:
severity: warning
annotations:
summary: "Kafka 生产者发送错误"
12. 总结与最佳实践
核心原则
| 原则 | 说明 |
|---|---|
| 幂等优先 | 消费者必须幂等,Redis + DB 双重保护 |
| 手动提交 Offset | 禁用 enable.auto.commit,业务成功后再 ack |
| 合理设置分区数 | 分区数 ≥ 消费者并发数,通常 = CPU核数 × 2 |
| 避免大消息 | 单条消息 < 1MB,超大消息使用 BLOB 存储 + Kafka 传 URL |
| 监控 Lag | Consumer Lag 是最核心的健康指标 |
| 死信兜底 | 所有消费者必须配置 DLT,避免消息永久阻塞 |
方案选型建议
高吞吐日志/埋点 ──────────────────► Kafka(唯一选择)
电商订单/支付(金融级)────────────► RocketMQ(事务消息更成熟)
通用业务解耦(中低频)────────────► RabbitMQ(运维简单)
实时流处理(复杂 CEP)────────────► Kafka + Flink(参考上期文章)
轻量流处理(统计/告警)───────────► Kafka Streams(无额外组件)
技术栈全景图
┌─────────────────────────────────────────────────────────┐
│ 生产者(Spring Boot 3.x) │
│ 同步发送 / 异步发送 / 事务发送 / Outbox 模式 │
└──────────────────────┬──────────────────────────────────┘
│
┌────────────▼────────────┐
│ Kafka Cluster(KRaft)│
│ 3 Broker / 12 分区 │
│ 副本数=3, min.isr=2 │
└────────────┬────────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌────▼────┐ ┌─────▼─────┐ ┌───▼────┐
│消费者组1│ │Kafka Streams│ │消费者组N│
│批量消费 │ │实时统计/告警│ │单条消费 │
│手动 ack │ │双流 Join │ │幂等处理 │
└────┬────┘ └─────┬─────┘ └───┬────┘
│ │ │
┌────▼──────────────▼─────────────▼────┐
│ Prometheus + Grafana 监控 │
│ Consumer Lag / 吞吐率 / 错误率 │
└───────────────────────────────────────┘
本文完整代码 已按以上示例整理,可直接集成到 Spring Boot 3.x 项目中。如有问题欢迎在评论区讨论!
参考资料:
- Apache Kafka 3.6 官方文档
- Spring for Apache Kafka 3.1.x Reference
- Confluent Platform 7.6 Documentation
- 《Kafka 权威指南(第2版)》
评论区