Flink SQL 开发与优化指南 📅 2026/6/26 6:34:00 一、Flink SQL 概述Flink SQL 是 Apache Flink 提供的流批统一的计算引擎支持通过标准 SQL 语句进行数据处理。SQL 经过解析、优化后生成高效的物理执行计划运行在 Flink 集群上。二、Flink SQL 工作流程与内部优化2.1 SQL 执行流程textSQL Query → Abstract Syntax Tree (AST) → Optimized Physical Plan → Execution2.2 核心内部优化优化策略说明Expression Reduce常量表达式预计算如12t1.value优化为3t1.valueProjection Push Down只读取查询所需的字段减少 I/O 和网络传输Sub-Plan Reuse复用相同子查询的执行结果需开启配置三、Flink SQL 最佳实践3.1 子图复用 (Sub-Plan Reuse)通过复用相同的数据源子查询避免重复计算sql-- 优化前 INSERT INTO sink1 SELECT * FROM t1 WHERE a 10; INSERT INTO sink2 SELECT * FROM t1 WHERE a 10 AND b 100; -- 优化后复用 v1 CREATE TEMPORARY VIEW v1 AS SELECT * FROM t1 WHERE a 10; INSERT INTO sink1 SELECT * FROM v1; INSERT INTO sink2 SELECT * FROM v1 WHERE b 100;开启配置yamltable.optimizer.reuse-sub-plan-enabled: true table.optimizer.reuse-source-enabled: true3.2 MiniBatch 优化MiniBatch 将微批数据聚合后再处理显著提升吞吐量适用于聚合类操作。yamltable.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5s table.exec.mini-batch.size: 103.3 两阶段聚合 (Two-Phase Aggregate)在 MiniBatch 基础上增加本地预聚合减少 Shuffle 数据量。yamltable.optimizer.agg-phase-strategy: TWO_PHASE # 或 AUTO3.4 Distinct Aggregation 优化拆分 distinct 聚合为 partial final 两个阶段yamltable.optimizer.distinct-agg.split.enabled: true table.optimizer.distinct-agg.split.bucket-num: 1024 table.optimizer.incremental-agg-enabled: true # 默认 true完整配置示例yamltable.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5s table.optimizer.agg-phase-strategy: TWO_PHASE table.optimizer.distinct-agg.split.enabled: true table.optimizer.distinct-agg.split.bucket-num: 10243.5 CASE WHEN → FILTER 语法改写推荐使用FILTER替代CASE WHEN进行条件聚合优化器可生成更高效的计划。sql-- 推荐 SELECT SUM(amount) FILTER (WHERE type A) AS sum_a FROM orders; -- 不推荐 SELECT SUM(CASE WHEN type A THEN amount END) AS sum_a FROM orders;3.6 Regular Join 状态优化三种状态存储策略对比场景状态结构存储效率Join Key 包含主键MapPK, Tuple2Row, Int最高Join Input 有主键Tuple2Row, Int中等无主键MapRow, Tuple2Int, Int最低优化建议Join Key 尽量包含主键Join 前只保留必要字段3.7 Lookup Join 优化1.16异步模式sql-- 同步默认 SELECT /* LOOKUP(tablemy_table2, asyncfalse) */ * FROM t1 JOIN my_table2 FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.a t2.c; -- 异步提升吞吐 SELECT /* LOOKUP(tablemy_table2, asynctrue) */ * FROM t1 JOIN my_table2 FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.a t2.c;异步输出模式sql-- Ordered保序 /* LOOKUP(tablemy_table2, asynctrue, output-modeordered) */ -- Unordered更高吞吐 /* LOOKUP(tablemy_table2, asynctrue, output-modeallow_unordered) */缓存策略1.17缓存类型适用场景配置FULL小数据集lookup.cache FULLPARTIAL大数据集lookup.cache PARTIALNONE实时性要求高lookup.cache NONE3.8 TopN 优化策略策略适用条件状态存储AppendRank输入仅 insert 变更按 key 存储 TopN 记录UpdateFastRankUpsert key 含 partition keyorder-by 字段单调且方向相反MaporderKey, RecordRetractRank通用场景回撤流存储全部输入数据3.9 自定义 Connector 接口实现实现以下接口可提升 Connector 性能接口收益SupportsFilterPushDown减少扫描 I/OSupportsProjectionPushDown减少无效字段读取SupportsPartitionPushDown静态分区裁剪SupportsDynamicFiltering动态分区裁剪SupportsLimitPushDown减少扫描 I/OSupportsAggregatePushDown减少 I/O输出数据量更小SupportsStatisticReport生成更优执行计划四、Flink SQL 开发建议4.1 维表 Join 个数不超过 5 个过多维表会导致 TM Heap 压力大频繁 GC作业性能下降。4.2 减少嵌套层级嵌套层级越多回撤流数据量越大建议控制在合理范围内。4.3 提前过滤数据在 Aggregate、Join 前进行数据过滤可减少 Shuffle 数据量和网络 I/O。sql-- 优化前shuffle 后过滤 SELECT * FROM A JOIN B ON A.id B.id WHERE A.userid 10; -- 优化后shuffle 前过滤 SELECT * FROM (SELECT * FROM A WHERE userid 10) A JOIN B ON A.id B.id;4.4 优先使用 LIKE 替代正则表达式正则表达式REGEXP、REGEXP_EXTRACT、REGEXP_REPLACE性能开销约为加减乘除的百倍极端情况可能无限循环。4.5 UDF 嵌套不超过 6 个多层 UDF 嵌套可能导致生成的代码超过 64KB引发编译错误。4.6 合理设置并行度Source 并行度推荐由上游组件推断Kafka 分区数 / HDFS Block 数过大并行度资源浪费过小并行度处理缓慢4.7 非状态计算资源优化对于 Filter、Union All、Lookup Join 等无状态操作重点调优Heap Size和Network。4.8 状态计算资源优化对于 Join、Window、Group By 等有状态操作重点调优vCore适当增加 CPU 资源Managed Memory用于状态后端管理off-Heap / Overhead适当提高如 5G五、Flink 1.15 特性增强5.1 表级 TTLJoin 场景不同表可设置不同 TTL减少状态冗余sqlINSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info AS t JOIN user_score /* OPTIONS(state.ttl.left60S, state.ttl.right120S) */ AS d ON t.user_id d.user_id;5.2 JTL (Join-To-Live)根据关联次数决定数据是否过期减少状态压力仅支持 Join/Inner Join不与 TTL/广播同时使用sql/* OPTIONS(eliminate-state.left.threshold10, eliminate-state.right.threshold10) */5.3 窗口优化Pane 存储结构传统方式数据分配到多个窗口如 102s 的数据分配到 4 个窗口存在冗余存储Pane 优化数据仅存储在一个 Pane 中触发计算时从 Pane 中提取对应窗口数据内存只存一份5.4 DISTRIBUTEBY Hint按指定字段分区解决数据仅需分区的场景sqlSELECT /* DISTRIBUTEBY(id) */ id, name FROM t1; SELECT /* DISTRIBUTEBY(id, name) */ id, name FROM t1;5.5 BROADCAST Hint大小表 Join 时将小表广播到所有 Join Task大表通过 Rebalance 打散sqlSELECT /* BROADCAST(A) */ a2, b2 FROM A JOIN B ON a1 b1;5.6 Kafka Source 限流sqlCREATE TABLE KafkaSource (...) WITH ( subtask.scan.records-per-second.limit 1000 );5.7 Source 独立并发设置sqlCREATE TABLE KafkaSource (...) WITH ( source.parallelism 2 );5.8 其他性能增强特性说明Lookup 算子复用多 Sink 场景复用同一 Lookup Join 算子UDF 重用多次执行仅复制第一次结果JSON_VALUE 优化解析同一 JSON item 的多个字段时复用解析结果六、总结速查表场景推荐配置/方案聚合类作业MiniBatch 两阶段聚合 Distinct 拆分多表 Join提前过滤 保留必要字段 表级 TTL维表关联≤5 张 Lookup Cache 异步模式TopN根据输入类型选择合适的 Rank 策略资源调优无状态→Heap/Network有状态→vCore/Managed Memory数据倾斜BROADCAST Hint DISTRIBUTEBY限流Kafkasubtask.scan.records-per-second.limit