《Debezium + Kafka Connect 实战:从零搭建 MySQL CDC 数据管道,踩坑全记录》

📅 2026/7/1 1:28:48
《Debezium + Kafka Connect 实战:从零搭建 MySQL CDC 数据管道,踩坑全记录》
部分命令没有详细解释不清楚的可以问AI一、背景与目标搭建一条从 MySQL 到 Kafka 的实时数据同步管道CDC为后续实时数仓Flink Doris Paimon提供数据源。技术选型MySQL 5.7.28开启 binlogKafka 3.2.0消息队列Debezium 1.9.7CDC 工具Kafka ConnectDebezium 运行框架二、环境准备组件版本安装路径CentOS7—JavaOpenJDK 1.8.0_412/opt/module/jdk1.8.0_212Kafka3.2.0/opt/module/kafkaZookeeper3.5.7/opt/module/zookeeper-3.5.7MySQL5.7.28hadoop102:3306Debezium1.9.7.Final/opt/module/kafka/plugins/debezium-connector-mysql三、MySQL 开启 binlog数据源准备修改配置文件/etc/my.cnfini[mysqld] server-id1 log-binmysql-bin binlog_formatrow binlog-row-imageFULL # 注意如果配置了 binlog-do-db只记录指定数据库的变更 # 如需同步所有数据库请注释掉该配置 # binlog-do-dbretail_db重启 MySQLbashsystemctl restart mysqld验证 binlog 是否开启bashmysql -u root -p -e SHOW VARIABLES LIKE log_bin; # 预期输出: log_bin ON四、安装 Debezium MySQL Connectorbashcd /opt/module/kafka mkdir -p plugins cd plugins # 下载 Debezium 插件 wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.2.0/connect-plugins/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz # 解压 tar -xzvf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz # 确认解压成功 ls -l debezium-connector-mysql/五、配置 Kafka Connect配置文件路径/opt/module/kafka/config/connect-distributed.propertiesproperties# Kafka Connect 集群标识 group.idconnect-cluster # Kafka 集群地址三台 broker bootstrap.servershadoop102:9092,hadoop103:9092,hadoop104:9092 # Debezium 插件路径 plugin.path/opt/module/kafka/plugins # 偏移量存储 Topic offset.storage.topicconnect-offsets offset.storage.replication.factor1 offset.storage.partitions25 # 配置存储 Topic config.storage.topicconnect-configs config.storage.replication.factor1 # 状态存储 Topic status.storage.topicconnect-status status.storage.replication.factor1 # 序列化格式 key.converterorg.apache.kafka.connect.json.JsonConverter value.converterorg.apache.kafka.connect.json.JsonConverter key.converter.schemas.enablefalse value.converter.schemas.enablefalse # REST API 地址 rest.advertised.host.namehadoop102 rest.advertised.port8083六、踩坑记录核心章节坑 1connect-offsetsTopic 的cleanup.policy必须是compact问题现象logorg.apache.kafka.common.config.ConfigException: Topic connect-offsets supplied via the offset.storage.topic property is required to have cleanup.policycompact to guarantee consistency and durability of source connector offsets, but found the topic currently has cleanup.policydelete.根本原因Kafka Connect 要求connect-offsetsTopic 的cleanup.policy必须是compact否则无法持久化偏移量。如果该 Topic 不存在Kafka 自动创建时默认使用cleanup.policydelete。解决方案bash# 1. 删除原有 Topic kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic connect-offsets # 2. 手动重建指定 cleanup.policycompact kafka-topics.sh --bootstrap-server hadoop102:9092 --create \ --topic connect-offsets \ --partitions 25 \ --replication-factor 1 \ --config cleanup.policycompact \ --config min.cleanable.dirty.ratio0.01 \ --config segment.ms10000 # 3. 验证配置是否生效 kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic connect-offsets | grep cleanup # 预期输出: cleanup.policycompact坑 28083 端口被占用Address already in use问题现象logERROR Stopping due to error org.apache.kafka.connect.errors.ConnectException: Unable to initialize REST server Caused by: java.net.BindException: Address already in usecurl http://hadoop102:8083/connectors返回Connection refused。根本原因上一个 Kafka Connect 进程未正常退出仍占用 8083 端口。新的 Connect 进程启动时无法绑定端口导致启动失败。解决方案bash# 1. 查看谁占用了 8083 端口 lsof -i:8083 # 2. 强制杀掉占用进程 kill -9 PID # 3. 重新启动 Kafka Connect cd /opt/module/kafka nohup bin/connect-distributed.sh config/connect-distributed.properties /dev/null 21 坑 3Schema History Topic 缺失The db history topic is missing问题现象logWARN Database history was not found but was expected ERROR io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY根本原因Debezium 需要schema-changes.retail_dbTopic 来存储表结构变更历史。该 Topic 不存在时Debezium 不会自动创建而是直接报错退出。解决方案bash# 1. 手动创建 Schema History Topic kafka-topics.sh --bootstrap-server hadoop102:9092 --create \ --topic schema-changes.retail_db \ --partitions 1 \ --replication-factor 1 # 2. 删除连接器后重新注册使用 schema_only_recovery 模式 curl -X DELETE http://hadoop102:8083/connectors/mysql-connector-retail curl -X POST -H Content-Type: application/json --data { name: mysql-connector-retail, config: { connector.class: io.debezium.connector.mysql.MySqlConnector, database.hostname: hadoop102, database.port: 3306, database.user: root, database.password: 123456, database.server.id: 1, database.server.name: retail_db, database.include.list: retail_db, database.history.kafka.bootstrap.servers: hadoop102:9092,hadoop103:9092,hadoop104:9092, database.history.kafka.topic: schema-changes.retail_db, include.schema.changes: true, snapshot.mode: schema_only_recovery, snapshot.locking.mode: none, decimal.handling.mode: double } } http://hadoop102:8083/connectors坑 4Decimal 类型序列化为乱码问题现象Flink 或kafka-console-consumer消费数据时price字段显示为AV8s、Cq38等不可读字符而不是6999.00。根本原因Debezium 默认将 MySQL 的DECIMAL类型序列化为 Base64 编码的字节数组。需要配置decimal.handling.modedouble让 Debezium 以数字形式输出。解决方案在连接器配置中添加jsondecimal.handling.mode: double完整连接器注册命令bashcurl -X POST -H Content-Type: application/json --data { name: mysql-connector-retail, config: { connector.class: io.debezium.connector.mysql.MySqlConnector, database.hostname: hadoop102, database.port: 3306, database.user: root, database.password: 123456, database.server.id: 1, database.server.name: retail_db, database.include.list: retail_db, database.history.kafka.bootstrap.servers: hadoop102:9092,hadoop103:9092,hadoop104:9092, database.history.kafka.topic: schema-changes.retail_db, include.schema.changes: true, snapshot.mode: initial, snapshot.locking.mode: none, decimal.handling.mode: double } } http://hadoop102:8083/connectors坑 5binlog-do-db 限制了可同步的数据库问题现象Debezium 连接器状态显示RUNNING但 Kafka 中始终没有生成对应的 Topic消费不到任何数据。根本原因MySQL 配置了binlog-do-db只记录指定数据库的 binlog。如果目标数据库不在白名单中Debezium 读不到任何变更。解决方案修改/etc/my.cnfini# 如需同步所有数据库注释掉该配置 # binlog-do-dbgmall # binlog-do-dbgmall2023_config # 或添加目标数据库 binlog-do-dbretail_db重启 MySQLbashsystemctl restart mysqld七、最终验证1. 查看连接器状态bashcurl http://hadoop102:8083/connectors/mysql-connector-retail/status # 预期: state:RUNNING2. 查看 Kafka 中的 Topicbashkafka-topics.sh --bootstrap-server hadoop102:9092 --list | grep retail # 预期输出: # retail_db # retail_db.retail_db.orders # retail_db.retail_db.products # schema-changes.retail_db3. 消费数据验证bashkafka-console-consumer.sh --bootstrap-server hadoop102:9092 \ --topic retail_db.retail_db.products \ --from-beginning \ --max-messages 3预期输出JSON 格式price为数字json{before:null,after:{product_id:1,product_name:iPhone 15,category:Electronics,price:6999.0,stock:50},source:{...},op:r,ts_ms:...}八、踩坑总结坑关键词核心解决方案1cleanup.policycompact手动重建connect-offsets指定compact2Address already in uselsof -i:8083→kill -93The db history topic is missing手动创建schema-changes.retail_db使用SCHEMA_ONLY_RECOVERY4Decimal 乱码连接器配置添加decimal.handling.modedouble5数据同步不到检查binlog-do-db是否包含目标数据库