基础及入门(2)-流数据处理机制

📅 2026/6/27 22:22:56
基础及入门(2)-流数据处理机制
一、整体数据流转架构二、源端数据读取Kafka → Flink1. 数据格式与反序列化Flink 通过 format json 配置项告诉 Source 算子使用 JSON 反序列化器。每一条 Kafka 消息一个 JSON 字节数组进入 Flink 后会经历如下过程Kafka 消息字节流 ↓ JsonRowDeserializationSchema内置 JSON 反序列化器 ↓ Flink 内部统一数据结构RowData 对象 ↓ 字段映射EVENT_NO / PATIENT_ID / CLINIC_DEPT_CODE / CLINIC_DOCTOR_CODE / REGISTER_TIME这里有几个关键参数需要你深刻理解json.ignore-parse-errors true当某条 JSON 消息格式异常字段缺失、类型不匹配时不抛异常、直接跳过这条消息返回 null 行。这是生产环境防止脏数据崩溃任务的重要保护。scan.startup.mode group-offsets从消费者组记录的上次提交偏移量继续消费任务重启后不会重复消费也不会丢失。所有字段定义为 STRING因为 Kafka 的 JSON 消息里 REGISTER_TIME 是字符串格式Flink 在 Source 阶段不做类型转换保留原始字符串转换推迟到计算层处理。2. 消费偏移量管理Flink 内部有一个 Checkpoint 机制每次做 Checkpoint 时会将 Kafka 当前消费的 offset 也存入状态后端。这意味着任务失败恢复后能精确从上次 Checkpoint 的 offset 继续保证 exactly-once 语义配合 Doris 的幂等写入。三、Flink SQL 计算引擎内部处理机制1. SQL 编译过程你看不见但必须懂你写的 SQL 在提交时会经历SQL 文本 → Parser语法解析→ AST 抽象语法树 → Validator语义校验→ 检查字段是否存在、类型是否合法 → Planner优化器基于 Calcite→ 逻辑执行计划 → 物理执行计划 → 生成算子 DAG 图JobGraph → 提交到 TaskManager 执行你 SQL 最终被翻译成的算子链就是架构图里展示的Source → Filter → Calc → GroupAggregate → Sink这条 DAG。2. 每个算子的具体行为Source 算子从 Kafka 拉取 JSON 消息反序列化成 RowDataFlink 内部行格式每条消息对应一个 RowData。Filter 算子对应你的 WHERE 子句WHERE REGISTER_TIME IS NOT NULL AND CLINIC_DEPT_CODE IS NOT NULL AND CLINIC_DOCTOR_CODE IS NOT NULL对每条流入的 RowData 做谓词判断不满足条件的直接丢弃不进入后续算子。这是一个无状态的轻量算子在真正的聚合之前先过滤能大幅减少进入 GroupAggregate 的数据量。Calc 算子对应 SELECT 里的表达式计算CAST(SUBSTRING(REGISTER_TIME, 1, 10) AS DATE) AS dim_date这一步调用了两个内置标量函数SUBSTRING(str, start, length)字符串截取从 2024-01-15 08:30:00 截取 2024-01-15。CAST(... AS DATE)显式类型转换将字符串转为 Flink 内部的 DateData实际存储为距离 epoch 的天数整数。是的这些都是 Flink SQL 内置函数不需要你自己实现Flink 的内置函数体系非常完整涵盖字符串、时间、数学、聚合、条件等各类操作。GroupAggregate 算子核心有状态算子对应你的GROUP BY CAST(SUBSTRING(REGISTER_TIME,1,10) AS DATE), CLINIC_DEPT_CODE, CLINIC_DOCTOR_CODE这是一个有状态的流式聚合它的工作机制如下每来一条新数据dim_date2024-01-15, dept001, doctorD01 → 计算分组 Key 的哈希值 → 查询 State Backend 中该 Key 的当前累加值比如当前是 5 → 执行 SUM(1)5 1 6 → 将新值 6 写回 State Backend → 向下游 Sink 发出两条消息 撤回消息Retract(-) [2024-01-15, 001, D01, 5] ← 旧值无效 插入消息Accumulate() [2024-01-15, 001, D01, 6] ← 新值生效这就是 Flink 流式聚合的Retract撤回机制每次聚合结果更新都会先撤回旧值、再插入新值。这也是为什么 Doris 端需要配置 sink.properties.partial_update true 的核心原因。SUM(1)的本质这里 SUM(1) 等价于 COUNT(*)每来一条记录加 1是对人次进行累计计数的标准写法。在 Flink 内部SUM 是内置聚合函数它维护一个累加器状态不需要存储所有历史记录只存储当前累计值状态极小。四、类型转换为什么要显式 CAST发生在哪个阶段1. 为什么必须显式 CAST源端所有字段都定义为 STRING。Doris 目标表的 dim_date 字段类型是 DATE。Flink SQL 的类型系统是强类型的STRING 和 DATE 是两种完全不同的内部类型不会自动隐式转换。如果你不写 CASTPlanner 在编译阶段SQL 提交时就会报类型不匹配错误任务根本无法启动。2. 类型转换发生在哪个阶段提交 SQL 时编译期 Validator 校验发现 REGISTER_TIME 是 STRING目标是 DATE → 检查是否有 CAST 表达式 → 有 → 校验通过 运行期每条数据处理时 Calc 算子对每条 RowData 执行 1. SUBSTRING(2024-01-15 08:30:00, 1, 10) → 2024-01-15 2. CAST(2024-01-15 AS DATE) ↓ 内部DateTimeUtils.parseDate(2024-01-15) ↓ 转为 int距离 1970-01-01 的天数如 19737 ↓ 存储为 Flink 内部 DateData 类型类型转换的时序是编译期做类型合法性校验运行期每条数据实际执行转换逻辑。3. 常见类型转换错误场景当 REGISTER_TIME 值不是标准 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS 格式时CAST 会返回 NULL不是抛异常因为 Flink 默认对 CAST 失败返回 NULL。这条记录虽然不会让任务崩溃但 dim_date 会变成 NULL 被 WHERE 过滤掉或写入 NULL 值是你排查数据质量问题时需要重点关注的点。五、数据写入 Doris 的机制1. Sink 算子的工作方式Flink Doris Connector 的 Sink 算子接收来自上游的数据流采用批量缓冲 定时刷写的策略来自 GroupAggregate 的数据含 Retract 消息 → Sink Buffer内存缓冲区 → 满足以下任一条件触发 flush ① 缓冲行数 ≥ sink.buffer-flush.max-rows10000行 ② 距上次 flush ≥ sink.buffer-flush.interval5000ms → 序列化为 JSON 格式 → HTTP POST → Doris FE 的 Stream Load 接口2. Doris 收到数据后的处理Doris 接收到 Stream Load 请求后partial_update true开启部分列更新模式。Doris 会根据主键dim_date dim_dept name_employe_no 联合主键查找已有行如果存在则只更新 outp_cn 字段不影响其他列。这正好配合了 Flink 的 Retract 机制——旧值被新值覆盖实现了流式聚合结果的持续更新。Doris 内部会先将数据写入内存的 MemTable达到阈值后 flush 成不可变的 Segment 文件后台 Compaction 线程负责将多个小 Segment 合并成大 Segment这就是为什么你查询 Doris 时数据会有短暂延迟的原因。3. 为什么 outp_cn 要定义为 DECIMAL(20,2)这是 Doris 建表时的字段类型定义。虽然 SUM(1) 的结果是整数但 Flink 的 SUM 聚合函数在处理数值时内部类型推断结果是 DECIMAL防止溢出Doris 端用 DECIMAL(20,2) 兼容接收这个值精度上完全够用。六、排查问题的核心思路总结理解了上述机制后遇到问题可以按这个链路快速定位现象优先排查位置关键点任务提交即报错SQL 编译期类型不匹配、字段名拼写、连接器参数错误任务运行但无数据Source / Filter 算子Kafka offset 位置、JSON 解析失败、WHERE 过滤过严数据写入 Doris 延迟大Sink 缓冲配置调小buffer-flush.intervalDoris 数据不对 / 数值偏小GroupAggregate 状态State 丢失Checkpoint 配置、Retract 消息未被正确处理类型转换异常导致 NULLCalc 算子REGISTER_TIME 原始值格式不标准任务 OOMState Backend分组 Key 基数过大状态膨胀掌握了Source 反序列化 → Filter 过滤 → Calc 计算类型转换在此 → GroupAggregate 有状态聚合Retract 机制 → Sink 批量刷写 → Doris 部分列更新这条完整链路基本上 80% 的 Flink SQL 流处理问题都能快速定位到具体的算子层。