基于 Flink CDC Pipeline 同步 MySQL 数据到 Doris 的完整部署指南

📅 2026/7/2 12:23:52
基于 Flink CDC Pipeline 同步 MySQL 数据到 Doris 的完整部署指南
1. 背景与目标在数据中台建设中将业务库MySQL的数据实时同步到 OLAP 引擎Doris是常见需求。本文档基于 Flink CDC 3.5.0 的 Pipeline 模式配合 Doris 4.1.0 和 Dolphinscheduler 3.4.1以下简称 DS详细记录从环境准备、配置文件编写、作业提交到监控验证的全过程并分享调优经验。2. 环境与版本数据源MySQL 8.0开启 binlog使用 CDC 账号目标库Apache Doris 4.1.0FE/BE 已部署Flink CDC3.5.0基于 Flink 1.20.3任务调度平台DataStudio 3.4.1用于托管 Shell 任务部署方式YARN Application 模式Flink on YARN3. 前置准备3.1 MySQL 侧配置开启 binloglog_binON, binlog_formatROW创建 CDC 专用账号如 cdc_doris授予 REPLICATION SLAVE, REPLICATION CLIENT, SELECT 权限确保网络连通性Flink 任务所在节点可访问 MySQL 和 Doris FE/BE3.2 Doris 侧准备创建目标数据库如 wms_ods确保 Doris 支持动态分区、轻量级 schema 变更light_schema_changetrue准备写入账号WMS-WriteOnly并授予 INSERT, CREATE, ALTER 等权限3.3 Flink CDC 安装下载 Flink CDC 3.5.0 二进制包解压至 /usr/bigtop/3.3.0/usr/lib/flink-cdc-3.5.0/将以下 JAR 包放入 flink/lib/ 目录flink-cdc-pipeline-connector-mysql-3.5.0.jarflink-cdc-pipeline-connector-doris-3.5.0.jarmysql-connector-j-9.7.0.jar或更高版本确保 Flink 安装版本 1.20.3可用并配置 HADOOP_CLASSPATH4. 编写 Pipeline 配置文件YAML文件路径tsp/cn_wms/mysql_cn_wms_to_tsp_ods_doris.yamlsource: type: mysql hostname: 192.168.222.242 port: 3306 username: cdc_doris password: xxxxxx scan.incremental.snapshot.chunk.size: 512 # 快照分块大小 scan.snapshot.fetch.size: 500 # 单次拉取行数 tables: cn_auto_wms.vmi_stock,cn_auto_wms.wms_attachment_info,... # 待同步表列表省略 server-id: 5420-5428 # 用于增量同步的 server-id 范围 sink: type: doris fenodes: 192.168.222.66:28030,192.168.222.67:28030,192.168.222.68:28030 username: WMS-WriteOnly password: xxxxxx jdbc-url: jdbc:mysql:loadbalance://192.168.222.66:29030,192.168.222.67:29030,192.168.222.68:29030/wms_ods sink.buffer-flush.max-rows: 10000 # 单次批量写入最大行数 sink.buffer-flush.max-bytes: 10485760 # 单次批量写入最大字节数10MB sink.buffer-flush.interval: 5s # 刷新间隔 table.create.properties.light_schema_change: true table.create.properties.replication_num: 3 route: - source-table: cn_auto_wms.vmi_stock,... # 与上表一致 sink-table: wms_ods.ods_cn_auto_wms_ # 目标表名模板被替换为源表名 replace-symbol: description: 为doris所有表添加ods_cn_auto_wms_前缀 pipeline: name: Sync cn_auto_wms to Doris with ods_cn_auto_wms_ prefix parallelism: 2 # Pipeline 全局并行度 schema.change.behavior: lenient # 容忍 DDL 变更如新增列关键参数解析scan.incremental.snapshot.chunk.size控制快照阶段的分块大小影响读取性能可根据表大小调整。server-id每个 MySQL 源需唯一范围区间应大于并行度避免冲突。sink.buffer-flush.*控制写入 Doris 的批次大小和频率需根据数据吞吐量优化。schema.change.behavior: lenient当源表发生 DDL如加列时作业不会失败自动适配需 Doris 支持 light_schema_change。route实现表名映射将 cn_auto_wms.xxx 自动重命名为 ods_cn_auto_wms_xxx。5. 编写提交脚本Shell文件路径mysql_cn_wms_to_tsp_ods_doris.sh#flinkc同步mysql cn_wms库到Doris-测试环境 sudo -u hdfs \ env JAVA_HOME/usr/java/jdk-11.0.30 \ env PATH$JAVA_HOME/bin:$PATH \ env HADOOP_CLASSPATH$(hadoop classpath) \ env FLINK_HOME/usr/bigtop/3.3.0/usr/lib/flink-1.20.3 \ /usr/bigtop/3.3.0/usr/lib/flink-cdc-3.5.0/bin/flink-cdc.sh \ --jar flink/lib/flink-cdc-pipeline-connector-doris-3.5.0.jar \ --jar flink/lib/flink-cdc-pipeline-connector-mysql-3.5.0.jar \ --jar flink/lib/mysql-connector-j-9.7.0.jar \ -t yarn-application \ -Dyarn.application.namemysql_cn_wms_to_tsp_ods_doris_sync_job \ -Dclassloader.resolve-orderparent-first \ -Dpipeline.namemysql_cn_wms_to_tsp_ods_doris_sync_job \ -Djobmanager.memory.process.size2048m \ -Djobmanager.cpu.cores1 \ -Dtaskmanager.memory.process.size10240m \ -Dtaskmanager.numberOfTaskSlots2 \ -Dtaskmanager.cpu.cores2 \ -Dtaskmanager.memory.managed.fraction0.25 \ -Dtaskmanager.memory.jvm-metaspace.size512m \ -Dtaskmanager.memory.jvm-overhead.min512m \ -Dtaskmanager.memory.jvm-overhead.max1g \ -Dpekko.ask.timeout60s \ -Dheartbeat.timeout60000 \ -Dpekko.tcp.timeout60s \ -Dtaskmanager.network.request-backoff.max60000 \ -Dstate.backendrocksdb \ -Dstate.backend.rocksdb.localdir/tmp/flink/rocksdb \ tsp/cn_wms/mysql_cn_wms_to_tsp_ods_doris.yaml \ -Denv.java.home/usr/java/jdk-11.0.30 \ -Dcontainerized.master.env.JAVA_HOME/usr/java/jdk-11.0.30 \ -Dcontainerized.taskmanager.env.JAVA_HOME/usr/java/jdk-11.0.30 \ -Dexecution.checkpointing.interval2min \ -Dexecution.checkpointing.unaligned.forcedtrue脚本详解用户切换使用 sudo -u hdfs 以 hdfs 用户执行需确保 hdfs 用户有权限访问 HDFS 和 Flink。环境变量显式指定 JAVA_HOME, PATH, HADOOP_CLASSPATH, FLINK_HOME避免依赖系统默认。flink-cdc.sh 启动通过 --jar 加载所需 connector 和 JDBC 驱动。部署目标-t yarn-application 表示以 YARN Application 模式运行JM 和 TM 均在 YARN 容器中。核心 JVM 参数JM 内存 2GB1 核TM 内存 10GB2 个 slot因此并行度建议 ≤22 核。taskmanager.memory.managed.fraction0.25用于 RocksDB 等状态后端的托管内存比例。taskmanager.memory.jvm-metaspace.size512m元数据空间避免频繁 Full GC。状态后端RocksDB生产推荐本地目录设为 /tmp/flink/rocksdb注意磁盘空间。Checkpoint间隔 2 分钟启用非对齐 checkpointunaligned.forcedtrue以降低反压下的 checkpoint 延迟。6. 在 Dolphinscheduler 中创建 Shell 任务DS 3.4.1 支持 Shell 节点步骤如下新建 Shell 节点进入项目或工作流拖入 Shell 节点。#进入ds项目管理如果项目已经存在则点击进入如果项目不存在则创建项目。#进入ds具体项目可以定义工作流#点击 工作流定义 菜单可以看到工作流列表以及创建工作流按钮#点击 创建工作流然后在通用组件的下方拖拽shell组件至画布空白处2.编写脚本内容将上述 Shell 脚本完整粘贴至节点编辑区。#填写节点名称、环境变量、任务优先级、重试次数、超时时间等参数信息#将上述 Shell 脚本完整粘贴至节点编辑区并在资源附件中勾选相应的jar与配置文件。3.保存并提交发布工作流后手动触发或配置定时调度。#依次点击确定、保存按钮在弹出对话框中填写工作流名称、描述、超时告警等信息#然后点击确定之后返回到工作流列表#工作流发表成功之后可以在手动触发或者配置定时调度7. 作业提交与运行1.手动执行 Shell 节点观察 DS 日志输出确认 flink-cdc.sh 成功提交 YARN 应用。注意执行任务时需要勾选租户选择环境变量等操作。2.登录 YARN ResourceManager Web UI查看 Application 状态RUNNING。查看 Flink Web UI通过 YARN 的 tracking URL进入 Job 页面确认 Source 和 Sink 各 Subtask 正常运行数据延迟在预期内。8. 数据验证在 Doris 中执行查询对比 MySQL 源表行数可抽样SELECT COUNT(*) FROM wms_ods.ods_cn_auto_wms_wms_material;9. 总结本文档完整记录了一个基于 Flink CDC Pipeline 的 MySQL → Doris 实时同步任务的搭建过程。通过 YAML 配置实现灵活的源和目标定义通过 Shell 脚本提交至 YARN并结合 Dolphinscheduler实现调度和运维。该方法具备以下优势低代码Pipeline 模式无需编写 Java/Scala 代码。全量增量自动完成历史数据同步和实时捕获。弹性扩展可通过 YARN 动态调整资源。易维护配置和脚本分离便于版本管理。在生产环境中建议配合监控告警如 Flink Metrics 接入 Prometheus保障同步稳定性。如有更多需求如多库合并、数据清洗可扩展 Pipeline 的 Transform 模块或结合 Flink SQL 实现更复杂的逻辑。