侧边栏壁纸
  • 累计撰写 86 篇文章
  • 累计创建 31 个标签
  • 累计收到 21 条评论

目 录CONTENT

文章目录

Apache Flink + Spring Boot 3.x 实时数据处理实战:构建企业级流式计算平台

Administrator
2026-04-06 / 0 评论 / 0 点赞 / 0 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

关键词:Apache Flink 1.19、Spring Boot 3.x、DataStream API、Flink SQL、Kafka Source、实时统计、窗口函数、Exactly-Once、Checkpoint、JobManager、TaskManager、FlinkCDC、MySQL CDC、实时大屏


在微服务架构日趋成熟的今天,实时数据处理需求无处不在:

  • 电商大促的实时 GMV 统计
  • 风控系统的异常行为秒级感知
  • 物联网的设备状态实时告警
  • 用户行为的实时个性化推荐

面对这些场景,传统的定时批处理已力不从心。Apache Flink 凭借其 真正的流处理(真流)毫秒级延迟Exactly-Once 语义强大的 SQL 支持,成为当今业界流计算的首选框架。

本文将系统介绍如何在 Spring Boot 3.x 项目中集成并运用 Apache Flink 1.19,构建一套可落地的企业级实时计算平台。


二、核心概念速览

Client ──提交Job──▶ JobManager (Master)
                         │
              ┌──────────┼──────────┐
         TaskManager  TaskManager  TaskManager
              │
         Task Slot(并行度单元)
组件职责
JobManager协调作业调度、Checkpoint 触发、故障恢复
TaskManager执行具体算子,管理内存/网络 Buffer
Task Slot资源隔离单元,每个 Slot 运行一个并行子任务

2.2 时间语义

类型说明适用场景
Event Time事件发生时间(数据中的时间戳)乱序数据、精确聚合
Processing Time算子处理时间对延迟不敏感的低延迟场景
Ingestion Time数据进入 Flink 的时间较少使用

企业级场景强烈推荐 Event Time,配合 Watermark 处理乱序数据。

2.3 窗口类型

Tumbling Window(滚动):  [0,10) [10,20) [20,30)
Sliding Window(滑动):   [0,10) [5,15) [10,20)
Session Window(会话):   根据间隔自动切分

三、项目搭建

3.1 工程结构

flink-realtime/
├── flink-common/          # 公共模型、工具类
├── flink-job/             # Flink 作业(独立 JAR)
│   ├── OrderStatJob.java  # 订单实时统计
│   ├── AlarmJob.java      # 异常告警
│   └── CDCJob.java        # MySQL CDC 同步
├── flink-api/             # Spring Boot 3.x API(提交/监控 Job)
└── docker-compose.yml     # Flink + Kafka + MySQL 环境

3.2 Maven 依赖

flink-job/pom.xml

<properties>
    <flink.version>1.19.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
</properties>

<dependencies>
    <!-- Flink Core -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Flink SQL -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-loader</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Kafka Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.2.0-1.19</version>
    </dependency>

    <!-- Flink CDC (MySQL) -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>3.1.0</version>
    </dependency>

    <!-- JSON 序列化 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Redis Sink(自定义) -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.1.0</version>
    </dependency>
</dependencies>

flink-api/pom.xml(Spring Boot 3.x 管理 API)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 调用 Flink REST API -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

3.3 本地环境(Docker Compose)

# docker-compose.yml
version: '3.8'
services:
  jobmanager:
    image: flink:1.19.1-java17
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        state.backend: rocksdb
        state.checkpoints.dir: file:///opt/flink/checkpoints
        execution.checkpointing.interval: 30000

  taskmanager:
    image: flink:1.19.1-java17
    depends_on:
      - jobmanager
    command: taskmanager
    deploy:
      replicas: 2
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 4
        taskmanager.memory.process.size: 2048m

  kafka:
    image: bitnami/kafka:3.7
    ports:
      - "9092:9092"
    environment:
      KAFKA_CFG_NODE_ID: 0
      KAFKA_CFG_PROCESS_ROLES: controller,broker
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER

  mysql:
    image: mysql:8.0
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: root123
      MYSQL_DATABASE: shop
    command: --log-bin=mysql-bin --binlog-format=ROW --server-id=1

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

四、核心实战:DataStream API 订单实时统计

4.1 数据模型

// flink-common/OrderEvent.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
    private String orderId;
    private String userId;
    private String category;  // 商品分类
    private BigDecimal amount; // 订单金额
    private String status;    // CREATED / PAID / CANCELLED
    private long eventTime;   // 事件时间戳(毫秒)
}

4.2 Kafka Source 配置

public class KafkaSourceFactory {

    public static KafkaSource<OrderEvent> createOrderSource(String brokers) {
        return KafkaSource.<OrderEvent>builder()
            .setBootstrapServers(brokers)
            .setTopics("order-events")
            .setGroupId("flink-order-consumer")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(
                new JsonDeserializationSchema<>(OrderEvent.class)
            ))
            .build();
    }
}

4.3 订单实时统计 Job(每分钟 GMV、分类 Top5)

@Slf4j
public class OrderStatJob {

    public static void main(String[] args) throws Exception {

        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // 2. 开启 Checkpoint(Exactly-Once + RocksDB 状态后端)
        env.enableCheckpointing(30_000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://nn:8020/flink/checkpoints");
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10_000);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));

        // 3. 数据源(Event Time + Watermark)
        KafkaSource<OrderEvent> source = KafkaSourceFactory.createOrderSource("kafka:9092");

        DataStream<OrderEvent> orderStream = env
            .fromSource(source, WatermarkStrategy
                .<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner((event, ts) -> event.getEventTime()),
                "Kafka-Order-Source"
            )
            .filter(e -> "PAID".equals(e.getStatus())) // 只统计已支付订单
            .name("filter-paid-orders");

        // 4. 每分钟滚动窗口:各分类 GMV 统计
        DataStream<CategoryStat> categoryStat = orderStream
            .keyBy(OrderEvent::getCategory)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new GMVAggregateFunction(), new CategoryWindowResult())
            .name("category-gmv-1min");

        // 5. 侧输出流:金额 > 10000 的大额订单告警
        OutputTag<OrderEvent> largeOrderTag = new OutputTag<>("large-order") {};
        SingleOutputStreamOperator<OrderEvent> mainStream = orderStream
            .process(new ProcessFunction<OrderEvent, OrderEvent>() {
                @Override
                public void processElement(OrderEvent e,
                        Context ctx, Collector<OrderEvent> out) {
                    if (e.getAmount().compareTo(new BigDecimal("10000")) > 0) {
                        ctx.output(largeOrderTag, e); // 侧输出
                    } else {
                        out.collect(e);
                    }
                }
            });

        DataStream<OrderEvent> largeOrders = mainStream.getSideOutput(largeOrderTag);

        // 6. Sink:写入 Redis(实时大屏)
        categoryStat.addSink(new RedisCategorySink()).name("redis-category-sink");

        // 7. 大额订单告警 → Kafka 告警 Topic
        largeOrders.sinkTo(
            KafkaSink.<OrderEvent>builder()
                .setBootstrapServers("kafka:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    .setTopic("order-alarm")
                    .setValueSerializationSchema(new JsonSerializationSchema<>())
                    .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build()
        ).name("kafka-alarm-sink");

        env.execute("OrderRealTimeStat");
    }
}

4.4 自定义聚合函数

// GMV 聚合:累计订单数 + 金额
public class GMVAggregateFunction
        implements AggregateFunction<OrderEvent, GMVAccumulator, GMVAccumulator> {

    @Override
    public GMVAccumulator createAccumulator() {
        return new GMVAccumulator();
    }

    @Override
    public GMVAccumulator add(OrderEvent event, GMVAccumulator acc) {
        acc.setCount(acc.getCount() + 1);
        acc.setTotalAmount(acc.getTotalAmount().add(event.getAmount()));
        return acc;
    }

    @Override
    public GMVAccumulator getResult(GMVAccumulator acc) {
        return acc;
    }

    @Override
    public GMVAccumulator merge(GMVAccumulator a, GMVAccumulator b) {
        return new GMVAccumulator(
            a.getCount() + b.getCount(),
            a.getTotalAmount().add(b.getTotalAmount())
        );
    }
}

// 窗口结果封装
public class CategoryWindowResult
        extends ProcessWindowFunction<GMVAccumulator, CategoryStat, String, TimeWindow> {

    @Override
    public void process(String category, Context ctx,
            Iterable<GMVAccumulator> elements, Collector<CategoryStat> out) {
        GMVAccumulator acc = elements.iterator().next();
        out.collect(CategoryStat.builder()
            .category(category)
            .orderCount(acc.getCount())
            .totalAmount(acc.getTotalAmount())
            .windowStart(ctx.window().getStart())
            .windowEnd(ctx.window().getEnd())
            .build());
    }
}

4.5 Redis Sink(实时大屏数据)

public class RedisCategorySink extends RichSinkFunction<CategoryStat> {

    private transient Jedis jedis;

    @Override
    public void open(Configuration parameters) {
        jedis = new Jedis("redis", 6379);
    }

    @Override
    public void invoke(CategoryStat stat, Context context) {
        // Hash 结构:realtime:gmv:category:{windowEnd} -> {category: amount}
        String key = "realtime:gmv:category:" + stat.getWindowEnd();
        jedis.hset(key, stat.getCategory(), stat.getTotalAmount().toPlainString());
        jedis.expire(key, 3600); // 1小时过期

        // ZSet 结构:分类排行榜
        jedis.zadd("realtime:category:rank", 
            stat.getTotalAmount().doubleValue(), stat.getCategory());
        
        log.info("Sink category stat: {} → ¥{} ({} orders)",
            stat.getCategory(), stat.getTotalAmount(), stat.getOrderCount());
    }

    @Override
    public void close() {
        if (jedis != null) jedis.close();
    }
}

public class UVStatSQLJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 注册 Kafka Source 表
        tEnv.executeSql("""
            CREATE TABLE user_behavior (
                user_id     STRING,
                page        STRING,
                action      STRING,
                event_time  TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
            ) WITH (
                'connector'         = 'kafka',
                'topic'             = 'user-behavior',
                'properties.bootstrap.servers' = 'kafka:9092',
                'properties.group.id' = 'flink-uv-consumer',
                'scan.startup.mode' = 'latest-offset',
                'format'            = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
            )
        """);

        // 注册 Redis Sink 表(通过 JDBC 或自定义 Connector)
        tEnv.executeSql("""
            CREATE TABLE uv_result (
                window_start TIMESTAMP(3),
                window_end   TIMESTAMP(3),
                page         STRING,
                uv           BIGINT
            ) WITH (
                'connector' = 'jdbc',
                'url'       = 'jdbc:mysql://mysql:3306/shop',
                'table-name'= 'realtime_uv',
                'username'  = 'root',
                'password'  = 'root123'
            )
        """);

        // SQL 查询:每分钟滚动窗口 PV/UV 统计(APPROX_COUNT_DISTINCT 高效去重)
        tEnv.executeSql("""
            INSERT INTO uv_result
            SELECT
                TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
                TUMBLE_END(event_time,   INTERVAL '1' MINUTE) AS window_end,
                page,
                APPROX_COUNT_DISTINCT(user_id) AS uv
            FROM user_behavior
            WHERE action = 'view'
            GROUP BY
                TUMBLE(event_time, INTERVAL '1' MINUTE),
                page
        """);
    }
}
-- 一阶段:统计每个商品每分钟销量
CREATE VIEW product_cnt AS
SELECT
    product_id,
    category,
    COUNT(*) AS sale_cnt,
    TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end
FROM order_events
GROUP BY
    TUMBLE(event_time, INTERVAL '1' MINUTE),
    product_id,
    category;

-- 二阶段:窗口内 TopN
SELECT *
FROM (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY category, window_end
            ORDER BY sale_cnt DESC
        ) AS rn
    FROM product_cnt
)
WHERE rn <= 5;

Flink CDC(Change Data Capture)通过监听 MySQL Binlog,将数据库变更(Insert/Update/Delete)实时同步到下游,无需轮询,延迟可低至毫秒级

6.2 MySQL CDC → Elasticsearch 实时同步

public class MysqlCDCJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // MySQL CDC Source
        tEnv.executeSql("""
            CREATE TABLE product_source (
                id          BIGINT,
                name        STRING,
                price       DECIMAL(10, 2),
                stock       INT,
                category    STRING,
                update_time TIMESTAMP(3),
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector'          = 'mysql-cdc',
                'hostname'           = 'mysql',
                'port'               = '3306',
                'username'           = 'root',
                'password'           = 'root123',
                'database-name'      = 'shop',
                'table-name'         = 'product',
                'server-id'          = '5401-5404',
                'scan.startup.mode'  = 'initial',
                'jdbc.properties.useSSL' = 'false'
            )
        """);

        // Elasticsearch Sink
        tEnv.executeSql("""
            CREATE TABLE product_es (
                id       BIGINT,
                name     STRING,
                price    DECIMAL(10, 2),
                stock    INT,
                category STRING,
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector'   = 'elasticsearch-7',
                'hosts'       = 'http://elasticsearch:9200',
                'index'       = 'product',
                'format'      = 'json'
            )
        """);

        // 同步 SQL(自动处理 CDC 的 +I/-U/+U/-D 操作)
        tEnv.executeSql("INSERT INTO product_es SELECT id, name, price, stock, category FROM product_source");
    }
}

核心优势:一条 SQL,自动处理 INSERT / UPDATE / DELETE,幂等写入 ES,无需额外代码。


Flink 提供完整的 REST API,Spring Boot 作为管控层,实现 Job 提交、监控、停止。

@Service
@Slf4j
public class FlinkRestClient {

    private final WebClient webClient;

    public FlinkRestClient(@Value("${flink.jobmanager.url}") String jobManagerUrl) {
        this.webClient = WebClient.builder()
            .baseUrl(jobManagerUrl)
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .build();
    }

    /**
     * 获取所有 Job 状态
     */
    public Mono<FlinkJobsResponse> listJobs() {
        return webClient.get()
            .uri("/jobs/overview")
            .retrieve()
            .bodyToMono(FlinkJobsResponse.class);
    }

    /**
     * 提交 Flink JAR
     */
    public Mono<String> submitJar(String jarId, String entryClass, int parallelism) {
        return webClient.post()
            .uri("/jars/{jarId}/run", jarId)
            .bodyValue(Map.of(
                "entryClass", entryClass,
                "parallelism", parallelism,
                "programArgsList", List.of()
            ))
            .retrieve()
            .bodyToMono(Map.class)
            .map(resp -> (String) resp.get("jobid"));
    }

    /**
     * 触发 Savepoint 并停止 Job
     */
    public Mono<String> stopWithSavepoint(String jobId, String savepointDir) {
        return webClient.post()
            .uri("/jobs/{jobId}/stop", jobId)
            .bodyValue(Map.of(
                "drain", false,
                "targetDirectory", savepointDir
            ))
            .retrieve()
            .bodyToMono(Map.class)
            .map(resp -> (String) resp.get("location"));
    }

    /**
     * 获取 Job 指标(TPS、延迟等)
     */
    public Mono<List<FlinkMetric>> getJobMetrics(String jobId, List<String> metricKeys) {
        String keys = String.join(",", metricKeys);
        return webClient.get()
            .uri("/jobs/{jobId}/metrics?get={keys}", jobId, keys)
            .retrieve()
            .bodyToFlux(FlinkMetric.class)
            .collectList();
    }
}

7.2 管控 API Controller

@RestController
@RequestMapping("/api/flink")
@RequiredArgsConstructor
@Slf4j
public class FlinkJobController {

    private final FlinkRestClient flinkClient;

    @GetMapping("/jobs")
    public Mono<FlinkJobsResponse> listJobs() {
        return flinkClient.listJobs();
    }

    @PostMapping("/jobs/{jarId}/submit")
    public Mono<ApiResult<String>> submit(
            @PathVariable String jarId,
            @RequestParam String entryClass,
            @RequestParam(defaultValue = "4") int parallelism) {
        return flinkClient.submitJar(jarId, entryClass, parallelism)
            .map(jobId -> ApiResult.success("Job 已提交: " + jobId))
            .onErrorResume(e -> {
                log.error("Submit job failed", e);
                return Mono.just(ApiResult.fail("提交失败: " + e.getMessage()));
            });
    }

    @PostMapping("/jobs/{jobId}/stop")
    public Mono<ApiResult<String>> stop(
            @PathVariable String jobId,
            @RequestParam(required = false) String savepointDir) {
        String dir = savepointDir != null ? savepointDir : "hdfs://nn:8020/flink/savepoints";
        return flinkClient.stopWithSavepoint(jobId, dir)
            .map(location -> ApiResult.success("Savepoint 已生成: " + location));
    }

    @GetMapping("/jobs/{jobId}/metrics")
    public Mono<List<FlinkMetric>> metrics(@PathVariable String jobId) {
        return flinkClient.getJobMetrics(jobId,
            List.of("numRecordsInPerSecond", "numRecordsOutPerSecond", "latency.median"));
    }
}

八、Checkpoint 与 Savepoint:生产级容错

8.1 Checkpoint vs Savepoint

维度CheckpointSavepoint
触发方式自动(周期触发)手动触发
存储位置可配置(HDFS/S3/本地)手动指定
用途故障自动恢复作业升级、迁移
生命周期作业结束后自动清除手动管理
保留数量可配置(默认1个)不限

8.2 生产 Checkpoint 配置最佳实践

CheckpointConfig ckpConfig = env.getCheckpointConfig();

// Checkpoint 间隔 30s,超时 60s
env.enableCheckpointing(30_000);
ckpConfig.setCheckpointTimeout(60_000);

// 最少间隔 10s(防止 Checkpoint 堆积)
ckpConfig.setMinPauseBetweenCheckpoints(10_000);

// 最多同时进行 1 个 Checkpoint
ckpConfig.setMaxConcurrentCheckpoints(1);

// 允许失败 3 次
ckpConfig.setTolerableCheckpointFailureNumber(3);

// 作业取消后保留 Checkpoint
ckpConfig.setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 增量 Checkpoint(RocksDB 状态后端必选)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = 增量

8.3 从 Savepoint 恢复上线

# 提交 Job 时指定 Savepoint 路径
./bin/flink run \
  -s hdfs://nn:8020/flink/savepoints/savepoint-abc123 \
  -c com.example.OrderStatJob \
  ./flink-job.jar

九、背压(Backpressure)排查与调优

背压(Backpressure)是 Flink 生产最常见的性能问题:下游处理速度跟不上上游,导致整个管道降速。

9.1 排查步骤

Flink Web UI → Job → 节点颜色
  🟢 绿色 = 正常
  🟡 黄色 = 轻度背压
  🔴 红色 = 严重背压(需立即处理)

9.2 常见原因与解法

原因解法
Sink 写入慢(DB/ES)增加 Sink 并行度,批量写入
自定义算子 CPU 密集优化算法,增加并行度
RocksDB 磁盘 I/O 慢使用 SSD,开启增量 Checkpoint
网络传输瓶颈调大 taskmanager.network.memory.fraction
数据倾斜添加随机 Key 预聚合,或使用 LocalKeyBy

9.3 并行度调优公式

推荐并行度 = Max(源数据 TPS / 单核处理能力, Kafka Partition 数)

示例:
  Kafka TPS = 100,000 msg/s
  单核处理能力 = 25,000 msg/s
  建议并行度 = max(100000/25000, 8分区) = max(4, 8) = 8

十、监控与告警

在 Flink 配置中开启 Prometheus 报告器:

# flink-conf.yaml
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249

关键监控指标

指标含义告警阈值
flink_jobmanager_job_uptimeJob 运行时长= 0 → 告警
flink_taskmanager_job_task_backPressuredTimeMsPerSecond背压时间比> 500ms/s
flink_taskmanager_job_task_numRecordsInPerSecond输入 TPS突降 50%
flink_jobmanager_job_numRestarts重启次数> 3 次/小时
flink_taskmanager_Status_JVM_Memory_Heap_Used堆内存使用> 80%

十一、完整架构图

数据源层
  ┌─────────────┐    ┌─────────────┐
  │  MySQL CDC  │    │  用户行为日志 │
  └──────┬──────┘    └──────┬──────┘
         │ Binlog             │ HTTP/SDK
         ▼                   ▼
消息总线层
  ┌────────────────────────────────┐
  │         Apache Kafka           │
  │  order-events / user-behavior  │
  └─────────────────┬──────────────┘
                    │
实时计算层
  ┌─────────────────▼──────────────┐
  │         Apache Flink           │
  │  ┌──────────┐  ┌────────────┐  │
  │  │DataStream│  │ Flink SQL  │  │
  │  │   API    │  │  (Catalog) │  │
  │  └──────────┘  └────────────┘  │
  └──────────────┬─────────────────┘
                 │
结果存储层
  ┌──────────────┼─────────────────┐
  ▼              ▼                 ▼
Redis         MySQL/TiDB      Elasticsearch
(实时大屏)   (业务数据库)      (搜索/分析)

展示/消费层
  Spring Boot API → 实时大屏 / Grafana / 业务系统

十二、生产踩坑与最佳实践

✅ 最佳实践清单

  1. 状态后端选 RocksDB —— 大状态必选,防止 OOM
  2. 开启增量 Checkpoint —— 大状态下节省 90% Checkpoint 时间
  3. Kafka Source 并行度 = Partition 数 —— 避免资源浪费或不足
  4. UDF 算子标注 @FunctionHint —— 提升 Flink SQL 类型推断效率
  5. 算子链条(Operator Chaining)合理拆分 —— 避免不必要的网络传输
  6. 生产禁用 env.setParallelism(1) —— 确保至少 2 个并行度用于高可用

⚠️ 常见陷阱

// ❌ 错误:在算子中使用非序列化对象
public class BadMapper extends RichMapFunction<String, String> {
    private Connection conn; // 普通 Connection 不可序列化!
}

// ✅ 正确:在 open() 中初始化,在 close() 中释放
public class GoodMapper extends RichMapFunction<String, String> {
    private transient Connection conn;

    @Override
    public void open(Configuration params) throws Exception {
        conn = DriverManager.getConnection(...);
    }

    @Override
    public void close() throws Exception {
        if (conn != null) conn.close();
    }
}
// ❌ 错误:在迭代中修改状态集合(ConcurrentModificationException)
listState.get().forEach(item -> listState.add(newItem)); // 危险!

// ✅ 正确:收集后批量更新
List<Item> items = new ArrayList<>();
listState.get().forEach(items::add);
items.add(newItem);
listState.update(items);

十三、总结

维度Flink 核心优势
延迟毫秒级端到端延迟
语义Exactly-Once(精确一次)
扩展性水平扩展,支持百 TB 状态
易用性DataStream API + Flink SQL 双模式
生态Kafka/MySQL CDC/Elasticsearch/Redis 全面支持

本文从环境搭建 → DataStream API → Flink SQL → CDC 同步 → Spring Boot 管控 → Checkpoint 容错 → 监控告警,完整覆盖了 Flink 在企业级实时数据处理中的核心实践。结合博客中已有的 Kafka、Redis、ELK、Prometheus 等文章,读者可以构建出一套完整的实时数据平台

下一步可以探索:

  • Flink + Hudi:构建实时湖仓一体
  • Flink on K8s(Native Kubernetes Mode):弹性资源调度
  • Flink CDC 3.x:统一批流数据集成框架

💡 关联阅读

  • [RocketMQ 5.x 实战全攻略:Spring Boot 3.x 集成消息发送、顺序消息与事务消息]
  • [ELK Stack 日志全链路实战:Spring Boot 3.x 微服务日志采集、分析与告警]
  • [Prometheus + Grafana 全链路监控 Spring Boot 3.x 微服务实战]
  • [Kubernetes 实战:将 Spring Boot 3.x 微服务部署到 K8s 集群全攻略]
0

评论区