关键词:Apache Flink 1.19、Spring Boot 3.x、DataStream API、Flink SQL、Kafka Source、实时统计、窗口函数、Exactly-Once、Checkpoint、JobManager、TaskManager、FlinkCDC、MySQL CDC、实时大屏
一、为什么选择 Flink?
在微服务架构日趋成熟的今天,实时数据处理需求无处不在:
- 电商大促的实时 GMV 统计
- 风控系统的异常行为秒级感知
- 物联网的设备状态实时告警
- 用户行为的实时个性化推荐
面对这些场景,传统的定时批处理已力不从心。Apache Flink 凭借其 真正的流处理(真流)、毫秒级延迟、Exactly-Once 语义 和 强大的 SQL 支持,成为当今业界流计算的首选框架。
本文将系统介绍如何在 Spring Boot 3.x 项目中集成并运用 Apache Flink 1.19,构建一套可落地的企业级实时计算平台。
二、核心概念速览
2.1 Flink 架构
Client ──提交Job──▶ JobManager (Master)
│
┌──────────┼──────────┐
TaskManager TaskManager TaskManager
│
Task Slot(并行度单元)
| 组件 | 职责 |
|---|---|
| JobManager | 协调作业调度、Checkpoint 触发、故障恢复 |
| TaskManager | 执行具体算子,管理内存/网络 Buffer |
| Task Slot | 资源隔离单元,每个 Slot 运行一个并行子任务 |
2.2 时间语义
| 类型 | 说明 | 适用场景 |
|---|---|---|
| Event Time | 事件发生时间(数据中的时间戳) | 乱序数据、精确聚合 |
| Processing Time | 算子处理时间 | 对延迟不敏感的低延迟场景 |
| Ingestion Time | 数据进入 Flink 的时间 | 较少使用 |
企业级场景强烈推荐 Event Time,配合 Watermark 处理乱序数据。
2.3 窗口类型
Tumbling Window(滚动): [0,10) [10,20) [20,30)
Sliding Window(滑动): [0,10) [5,15) [10,20)
Session Window(会话): 根据间隔自动切分
三、项目搭建
3.1 工程结构
flink-realtime/
├── flink-common/ # 公共模型、工具类
├── flink-job/ # Flink 作业(独立 JAR)
│ ├── OrderStatJob.java # 订单实时统计
│ ├── AlarmJob.java # 异常告警
│ └── CDCJob.java # MySQL CDC 同步
├── flink-api/ # Spring Boot 3.x API(提交/监控 Job)
└── docker-compose.yml # Flink + Kafka + MySQL 环境
3.2 Maven 依赖
flink-job/pom.xml
<properties>
<flink.version>1.19.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.2.0-1.19</version>
</dependency>
<!-- Flink CDC (MySQL) -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.1.0</version>
</dependency>
<!-- JSON 序列化 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Redis Sink(自定义) -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.1.0</version>
</dependency>
</dependencies>
flink-api/pom.xml(Spring Boot 3.x 管理 API)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 调用 Flink REST API -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
3.3 本地环境(Docker Compose)
# docker-compose.yml
version: '3.8'
services:
jobmanager:
image: flink:1.19.1-java17
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
state.backend: rocksdb
state.checkpoints.dir: file:///opt/flink/checkpoints
execution.checkpointing.interval: 30000
taskmanager:
image: flink:1.19.1-java17
depends_on:
- jobmanager
command: taskmanager
deploy:
replicas: 2
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 2048m
kafka:
image: bitnami/kafka:3.7
ports:
- "9092:9092"
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
mysql:
image: mysql:8.0
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: root123
MYSQL_DATABASE: shop
command: --log-bin=mysql-bin --binlog-format=ROW --server-id=1
redis:
image: redis:7-alpine
ports:
- "6379:6379"
四、核心实战:DataStream API 订单实时统计
4.1 数据模型
// flink-common/OrderEvent.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
private String orderId;
private String userId;
private String category; // 商品分类
private BigDecimal amount; // 订单金额
private String status; // CREATED / PAID / CANCELLED
private long eventTime; // 事件时间戳(毫秒)
}
4.2 Kafka Source 配置
public class KafkaSourceFactory {
public static KafkaSource<OrderEvent> createOrderSource(String brokers) {
return KafkaSource.<OrderEvent>builder()
.setBootstrapServers(brokers)
.setTopics("order-events")
.setGroupId("flink-order-consumer")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(
new JsonDeserializationSchema<>(OrderEvent.class)
))
.build();
}
}
4.3 订单实时统计 Job(每分钟 GMV、分类 Top5)
@Slf4j
public class OrderStatJob {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// 2. 开启 Checkpoint(Exactly-Once + RocksDB 状态后端)
env.enableCheckpointing(30_000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("hdfs://nn:8020/flink/checkpoints");
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10_000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
// 3. 数据源(Event Time + Watermark)
KafkaSource<OrderEvent> source = KafkaSourceFactory.createOrderSource("kafka:9092");
DataStream<OrderEvent> orderStream = env
.fromSource(source, WatermarkStrategy
.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getEventTime()),
"Kafka-Order-Source"
)
.filter(e -> "PAID".equals(e.getStatus())) // 只统计已支付订单
.name("filter-paid-orders");
// 4. 每分钟滚动窗口:各分类 GMV 统计
DataStream<CategoryStat> categoryStat = orderStream
.keyBy(OrderEvent::getCategory)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new GMVAggregateFunction(), new CategoryWindowResult())
.name("category-gmv-1min");
// 5. 侧输出流:金额 > 10000 的大额订单告警
OutputTag<OrderEvent> largeOrderTag = new OutputTag<>("large-order") {};
SingleOutputStreamOperator<OrderEvent> mainStream = orderStream
.process(new ProcessFunction<OrderEvent, OrderEvent>() {
@Override
public void processElement(OrderEvent e,
Context ctx, Collector<OrderEvent> out) {
if (e.getAmount().compareTo(new BigDecimal("10000")) > 0) {
ctx.output(largeOrderTag, e); // 侧输出
} else {
out.collect(e);
}
}
});
DataStream<OrderEvent> largeOrders = mainStream.getSideOutput(largeOrderTag);
// 6. Sink:写入 Redis(实时大屏)
categoryStat.addSink(new RedisCategorySink()).name("redis-category-sink");
// 7. 大额订单告警 → Kafka 告警 Topic
largeOrders.sinkTo(
KafkaSink.<OrderEvent>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("order-alarm")
.setValueSerializationSchema(new JsonSerializationSchema<>())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build()
).name("kafka-alarm-sink");
env.execute("OrderRealTimeStat");
}
}
4.4 自定义聚合函数
// GMV 聚合:累计订单数 + 金额
public class GMVAggregateFunction
implements AggregateFunction<OrderEvent, GMVAccumulator, GMVAccumulator> {
@Override
public GMVAccumulator createAccumulator() {
return new GMVAccumulator();
}
@Override
public GMVAccumulator add(OrderEvent event, GMVAccumulator acc) {
acc.setCount(acc.getCount() + 1);
acc.setTotalAmount(acc.getTotalAmount().add(event.getAmount()));
return acc;
}
@Override
public GMVAccumulator getResult(GMVAccumulator acc) {
return acc;
}
@Override
public GMVAccumulator merge(GMVAccumulator a, GMVAccumulator b) {
return new GMVAccumulator(
a.getCount() + b.getCount(),
a.getTotalAmount().add(b.getTotalAmount())
);
}
}
// 窗口结果封装
public class CategoryWindowResult
extends ProcessWindowFunction<GMVAccumulator, CategoryStat, String, TimeWindow> {
@Override
public void process(String category, Context ctx,
Iterable<GMVAccumulator> elements, Collector<CategoryStat> out) {
GMVAccumulator acc = elements.iterator().next();
out.collect(CategoryStat.builder()
.category(category)
.orderCount(acc.getCount())
.totalAmount(acc.getTotalAmount())
.windowStart(ctx.window().getStart())
.windowEnd(ctx.window().getEnd())
.build());
}
}
4.5 Redis Sink(实时大屏数据)
public class RedisCategorySink extends RichSinkFunction<CategoryStat> {
private transient Jedis jedis;
@Override
public void open(Configuration parameters) {
jedis = new Jedis("redis", 6379);
}
@Override
public void invoke(CategoryStat stat, Context context) {
// Hash 结构:realtime:gmv:category:{windowEnd} -> {category: amount}
String key = "realtime:gmv:category:" + stat.getWindowEnd();
jedis.hset(key, stat.getCategory(), stat.getTotalAmount().toPlainString());
jedis.expire(key, 3600); // 1小时过期
// ZSet 结构:分类排行榜
jedis.zadd("realtime:category:rank",
stat.getTotalAmount().doubleValue(), stat.getCategory());
log.info("Sink category stat: {} → ¥{} ({} orders)",
stat.getCategory(), stat.getTotalAmount(), stat.getOrderCount());
}
@Override
public void close() {
if (jedis != null) jedis.close();
}
}
五、Flink SQL 实战(更简洁的声明式编程)
5.1 Flink SQL 读取 Kafka → 实时去重 UV 统计
public class UVStatSQLJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 注册 Kafka Source 表
tEnv.executeSql("""
CREATE TABLE user_behavior (
user_id STRING,
page STRING,
action STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-behavior',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-uv-consumer',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
""");
// 注册 Redis Sink 表(通过 JDBC 或自定义 Connector)
tEnv.executeSql("""
CREATE TABLE uv_result (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
page STRING,
uv BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/shop',
'table-name'= 'realtime_uv',
'username' = 'root',
'password' = 'root123'
)
""");
// SQL 查询:每分钟滚动窗口 PV/UV 统计(APPROX_COUNT_DISTINCT 高效去重)
tEnv.executeSql("""
INSERT INTO uv_result
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
page,
APPROX_COUNT_DISTINCT(user_id) AS uv
FROM user_behavior
WHERE action = 'view'
GROUP BY
TUMBLE(event_time, INTERVAL '1' MINUTE),
page
""");
}
}
5.2 Flink SQL TopN(分类实时 Top5 商品)
-- 一阶段:统计每个商品每分钟销量
CREATE VIEW product_cnt AS
SELECT
product_id,
category,
COUNT(*) AS sale_cnt,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end
FROM order_events
GROUP BY
TUMBLE(event_time, INTERVAL '1' MINUTE),
product_id,
category;
-- 二阶段:窗口内 TopN
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY category, window_end
ORDER BY sale_cnt DESC
) AS rn
FROM product_cnt
)
WHERE rn <= 5;
六、Flink CDC:MySQL 数据实时同步
6.1 什么是 Flink CDC?
Flink CDC(Change Data Capture)通过监听 MySQL Binlog,将数据库变更(Insert/Update/Delete)实时同步到下游,无需轮询,延迟可低至毫秒级。
6.2 MySQL CDC → Elasticsearch 实时同步
public class MysqlCDCJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// MySQL CDC Source
tEnv.executeSql("""
CREATE TABLE product_source (
id BIGINT,
name STRING,
price DECIMAL(10, 2),
stock INT,
category STRING,
update_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'root',
'password' = 'root123',
'database-name' = 'shop',
'table-name' = 'product',
'server-id' = '5401-5404',
'scan.startup.mode' = 'initial',
'jdbc.properties.useSSL' = 'false'
)
""");
// Elasticsearch Sink
tEnv.executeSql("""
CREATE TABLE product_es (
id BIGINT,
name STRING,
price DECIMAL(10, 2),
stock INT,
category STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 'product',
'format' = 'json'
)
""");
// 同步 SQL(自动处理 CDC 的 +I/-U/+U/-D 操作)
tEnv.executeSql("INSERT INTO product_es SELECT id, name, price, stock, category FROM product_source");
}
}
核心优势:一条 SQL,自动处理 INSERT / UPDATE / DELETE,幂等写入 ES,无需额外代码。
七、Spring Boot 3.x 集成 Flink REST API
Flink 提供完整的 REST API,Spring Boot 作为管控层,实现 Job 提交、监控、停止。
7.1 Flink REST Client 封装
@Service
@Slf4j
public class FlinkRestClient {
private final WebClient webClient;
public FlinkRestClient(@Value("${flink.jobmanager.url}") String jobManagerUrl) {
this.webClient = WebClient.builder()
.baseUrl(jobManagerUrl)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
/**
* 获取所有 Job 状态
*/
public Mono<FlinkJobsResponse> listJobs() {
return webClient.get()
.uri("/jobs/overview")
.retrieve()
.bodyToMono(FlinkJobsResponse.class);
}
/**
* 提交 Flink JAR
*/
public Mono<String> submitJar(String jarId, String entryClass, int parallelism) {
return webClient.post()
.uri("/jars/{jarId}/run", jarId)
.bodyValue(Map.of(
"entryClass", entryClass,
"parallelism", parallelism,
"programArgsList", List.of()
))
.retrieve()
.bodyToMono(Map.class)
.map(resp -> (String) resp.get("jobid"));
}
/**
* 触发 Savepoint 并停止 Job
*/
public Mono<String> stopWithSavepoint(String jobId, String savepointDir) {
return webClient.post()
.uri("/jobs/{jobId}/stop", jobId)
.bodyValue(Map.of(
"drain", false,
"targetDirectory", savepointDir
))
.retrieve()
.bodyToMono(Map.class)
.map(resp -> (String) resp.get("location"));
}
/**
* 获取 Job 指标(TPS、延迟等)
*/
public Mono<List<FlinkMetric>> getJobMetrics(String jobId, List<String> metricKeys) {
String keys = String.join(",", metricKeys);
return webClient.get()
.uri("/jobs/{jobId}/metrics?get={keys}", jobId, keys)
.retrieve()
.bodyToFlux(FlinkMetric.class)
.collectList();
}
}
7.2 管控 API Controller
@RestController
@RequestMapping("/api/flink")
@RequiredArgsConstructor
@Slf4j
public class FlinkJobController {
private final FlinkRestClient flinkClient;
@GetMapping("/jobs")
public Mono<FlinkJobsResponse> listJobs() {
return flinkClient.listJobs();
}
@PostMapping("/jobs/{jarId}/submit")
public Mono<ApiResult<String>> submit(
@PathVariable String jarId,
@RequestParam String entryClass,
@RequestParam(defaultValue = "4") int parallelism) {
return flinkClient.submitJar(jarId, entryClass, parallelism)
.map(jobId -> ApiResult.success("Job 已提交: " + jobId))
.onErrorResume(e -> {
log.error("Submit job failed", e);
return Mono.just(ApiResult.fail("提交失败: " + e.getMessage()));
});
}
@PostMapping("/jobs/{jobId}/stop")
public Mono<ApiResult<String>> stop(
@PathVariable String jobId,
@RequestParam(required = false) String savepointDir) {
String dir = savepointDir != null ? savepointDir : "hdfs://nn:8020/flink/savepoints";
return flinkClient.stopWithSavepoint(jobId, dir)
.map(location -> ApiResult.success("Savepoint 已生成: " + location));
}
@GetMapping("/jobs/{jobId}/metrics")
public Mono<List<FlinkMetric>> metrics(@PathVariable String jobId) {
return flinkClient.getJobMetrics(jobId,
List.of("numRecordsInPerSecond", "numRecordsOutPerSecond", "latency.median"));
}
}
八、Checkpoint 与 Savepoint:生产级容错
8.1 Checkpoint vs Savepoint
| 维度 | Checkpoint | Savepoint |
|---|---|---|
| 触发方式 | 自动(周期触发) | 手动触发 |
| 存储位置 | 可配置(HDFS/S3/本地) | 手动指定 |
| 用途 | 故障自动恢复 | 作业升级、迁移 |
| 生命周期 | 作业结束后自动清除 | 手动管理 |
| 保留数量 | 可配置(默认1个) | 不限 |
8.2 生产 Checkpoint 配置最佳实践
CheckpointConfig ckpConfig = env.getCheckpointConfig();
// Checkpoint 间隔 30s,超时 60s
env.enableCheckpointing(30_000);
ckpConfig.setCheckpointTimeout(60_000);
// 最少间隔 10s(防止 Checkpoint 堆积)
ckpConfig.setMinPauseBetweenCheckpoints(10_000);
// 最多同时进行 1 个 Checkpoint
ckpConfig.setMaxConcurrentCheckpoints(1);
// 允许失败 3 次
ckpConfig.setTolerableCheckpointFailureNumber(3);
// 作业取消后保留 Checkpoint
ckpConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 增量 Checkpoint(RocksDB 状态后端必选)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = 增量
8.3 从 Savepoint 恢复上线
# 提交 Job 时指定 Savepoint 路径
./bin/flink run \
-s hdfs://nn:8020/flink/savepoints/savepoint-abc123 \
-c com.example.OrderStatJob \
./flink-job.jar
九、背压(Backpressure)排查与调优
背压(Backpressure)是 Flink 生产最常见的性能问题:下游处理速度跟不上上游,导致整个管道降速。
9.1 排查步骤
Flink Web UI → Job → 节点颜色
🟢 绿色 = 正常
🟡 黄色 = 轻度背压
🔴 红色 = 严重背压(需立即处理)
9.2 常见原因与解法
| 原因 | 解法 |
|---|---|
| Sink 写入慢(DB/ES) | 增加 Sink 并行度,批量写入 |
| 自定义算子 CPU 密集 | 优化算法,增加并行度 |
| RocksDB 磁盘 I/O 慢 | 使用 SSD,开启增量 Checkpoint |
| 网络传输瓶颈 | 调大 taskmanager.network.memory.fraction |
| 数据倾斜 | 添加随机 Key 预聚合,或使用 LocalKeyBy |
9.3 并行度调优公式
推荐并行度 = Max(源数据 TPS / 单核处理能力, Kafka Partition 数)
示例:
Kafka TPS = 100,000 msg/s
单核处理能力 = 25,000 msg/s
建议并行度 = max(100000/25000, 8分区) = max(4, 8) = 8
十、监控与告警
10.1 Flink Metrics → Prometheus → Grafana
在 Flink 配置中开启 Prometheus 报告器:
# flink-conf.yaml
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249
关键监控指标:
| 指标 | 含义 | 告警阈值 |
|---|---|---|
flink_jobmanager_job_uptime | Job 运行时长 | = 0 → 告警 |
flink_taskmanager_job_task_backPressuredTimeMsPerSecond | 背压时间比 | > 500ms/s |
flink_taskmanager_job_task_numRecordsInPerSecond | 输入 TPS | 突降 50% |
flink_jobmanager_job_numRestarts | 重启次数 | > 3 次/小时 |
flink_taskmanager_Status_JVM_Memory_Heap_Used | 堆内存使用 | > 80% |
十一、完整架构图
数据源层
┌─────────────┐ ┌─────────────┐
│ MySQL CDC │ │ 用户行为日志 │
└──────┬──────┘ └──────┬──────┘
│ Binlog │ HTTP/SDK
▼ ▼
消息总线层
┌────────────────────────────────┐
│ Apache Kafka │
│ order-events / user-behavior │
└─────────────────┬──────────────┘
│
实时计算层
┌─────────────────▼──────────────┐
│ Apache Flink │
│ ┌──────────┐ ┌────────────┐ │
│ │DataStream│ │ Flink SQL │ │
│ │ API │ │ (Catalog) │ │
│ └──────────┘ └────────────┘ │
└──────────────┬─────────────────┘
│
结果存储层
┌──────────────┼─────────────────┐
▼ ▼ ▼
Redis MySQL/TiDB Elasticsearch
(实时大屏) (业务数据库) (搜索/分析)
展示/消费层
Spring Boot API → 实时大屏 / Grafana / 业务系统
十二、生产踩坑与最佳实践
✅ 最佳实践清单
- 状态后端选 RocksDB —— 大状态必选,防止 OOM
- 开启增量 Checkpoint —— 大状态下节省 90% Checkpoint 时间
- Kafka Source 并行度 = Partition 数 —— 避免资源浪费或不足
- UDF 算子标注
@FunctionHint—— 提升 Flink SQL 类型推断效率 - 算子链条(Operator Chaining)合理拆分 —— 避免不必要的网络传输
- 生产禁用
env.setParallelism(1)—— 确保至少 2 个并行度用于高可用
⚠️ 常见陷阱
// ❌ 错误:在算子中使用非序列化对象
public class BadMapper extends RichMapFunction<String, String> {
private Connection conn; // 普通 Connection 不可序列化!
}
// ✅ 正确:在 open() 中初始化,在 close() 中释放
public class GoodMapper extends RichMapFunction<String, String> {
private transient Connection conn;
@Override
public void open(Configuration params) throws Exception {
conn = DriverManager.getConnection(...);
}
@Override
public void close() throws Exception {
if (conn != null) conn.close();
}
}
// ❌ 错误:在迭代中修改状态集合(ConcurrentModificationException)
listState.get().forEach(item -> listState.add(newItem)); // 危险!
// ✅ 正确:收集后批量更新
List<Item> items = new ArrayList<>();
listState.get().forEach(items::add);
items.add(newItem);
listState.update(items);
十三、总结
| 维度 | Flink 核心优势 |
|---|---|
| 延迟 | 毫秒级端到端延迟 |
| 语义 | Exactly-Once(精确一次) |
| 扩展性 | 水平扩展,支持百 TB 状态 |
| 易用性 | DataStream API + Flink SQL 双模式 |
| 生态 | Kafka/MySQL CDC/Elasticsearch/Redis 全面支持 |
本文从环境搭建 → DataStream API → Flink SQL → CDC 同步 → Spring Boot 管控 → Checkpoint 容错 → 监控告警,完整覆盖了 Flink 在企业级实时数据处理中的核心实践。结合博客中已有的 Kafka、Redis、ELK、Prometheus 等文章,读者可以构建出一套完整的实时数据平台。
下一步可以探索:
- Flink + Hudi:构建实时湖仓一体
- Flink on K8s(Native Kubernetes Mode):弹性资源调度
- Flink CDC 3.x:统一批流数据集成框架
💡 关联阅读
- [RocketMQ 5.x 实战全攻略:Spring Boot 3.x 集成消息发送、顺序消息与事务消息]
- [ELK Stack 日志全链路实战:Spring Boot 3.x 微服务日志采集、分析与告警]
- [Prometheus + Grafana 全链路监控 Spring Boot 3.x 微服务实战]
- [Kubernetes 实战:将 Spring Boot 3.x 微服务部署到 K8s 集群全攻略]
评论区