1. 背景与要解决的问题在使用 Apache SeaTunnel 进行批处理或同步任务时当source是非结构化或者半结构化的类型时Source 侧通常需要显式定义 schema字段名、类型、顺序。在真实生产环境中这会带来几个典型问题表结构字段多、类型复杂手工维护 schema 成本高且易出错上游表结构发生变更加字段、改类型时需要同步修改 SeaTunnel 作业对于已有存量表仅为了同步数据却需要重复描述元数据存在明显冗余因此核心诉求是能否让 SeaTunnel 直接复用已有元数据系统中的表结构定义而不是在作业中重复声明 schema本功能正是为了解决这一问题而引入。2. Gravitino 能力简介与本功能相关部分Gravitino 是一个统一的元数据管理与访问服务提供了标准化的 REST API用于管理和暴露以下对象Metalake逻辑隔离单元Catalog如 MySQL、Hive、Iceberg 等Schema / DatabaseTable 及其字段定义通过 Gravitino表结构可以被集中管理下游系统可以通过HTTP API动态获取表的 schema 定义不再需要在每个计算/同步任务中重复维护字段信息本次在 SeaTunnel 中引入的能力正是支持在 Source 的 schema 定义中通过 Gravitino 提供的 schema_url 自动拉取表结构3. 本地测试环境准备3.1 准备mysql环境3.1.1 创建目标表MySQL 中提前创建好目标表test.demo_user建表语句如下CREATE TABLE demo_user ( id bigint unsigned NOT NULL AUTO_INCREMENT, user_code varchar(32) NOT NULL, user_name varchar(64) DEFAULT NULL, password varchar(128) DEFAULT NULL, email varchar(128) DEFAULT NULL, phone varchar(20) DEFAULT NULL, gender tinyint DEFAULT NULL, age int DEFAULT NULL, status tinyint DEFAULT NULL, level int DEFAULT NULL, score decimal(10,2) DEFAULT NULL, balance decimal(12,2) DEFAULT NULL, is_deleted tinyint DEFAULT NULL, register_ip varchar(45) DEFAULT NULL, last_login_ip varchar(45) DEFAULT NULL, login_count int DEFAULT NULL, remark varchar(255) DEFAULT NULL, ext1 varchar(100) DEFAULT NULL, ext2 varchar(100) DEFAULT NULL, ext3 varchar(100) DEFAULT NULL, ext4 varchar(100) DEFAULT NULL, ext5 varchar(100) DEFAULT NULL, created_by varchar(64) DEFAULT NULL, updated_by varchar(64) DEFAULT NULL, created_time datetime DEFAULT NULL, updated_time datetime DEFAULT NULL, birthday date DEFAULT NULL, last_login_time datetime DEFAULT NULL, version int DEFAULT NULL, PRIMARY KEY (id), UNIQUE KEY uk_user_code (user_code) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;3.1.2 创建要同步的表结构在实际应用中我们把表结构统一管理起来可能管理在paimonhivehudi等元数据组件中但在这里为了方便测试用的表结构信息指向测试的目标表也就是上一个步骤创建的test.demo_user表3.2 注册该表结构到Gravitino中Gravitino支持直连数据库并会扫描库下所有表该表已经作为local-mysqlcatalog 下的一个 table 被 Gravitino 管理。Metalaketest_Metalake3.3 表结构访问关系说明Gravitino 中表结构可以通过如下 REST API 访问http://localhost:8090/api/metalakes/test_Metalake/catalogs/${catalog}/schemas/${schema}/tables/${table}在本次测试中实际使用的 schema_url 为http://localhost:8090/api/metalakes/test_Metalake/catalogs/local-mysql/schemas/test/tables/demo_user该接口返回的 JSON 中包含了demo_user表的完整字段定义。3.4 本地部署seatunnel由于该功能还并未发版需要手动编译最新的seatunnel的dev分支代码并部署到本地。3.5 准备数据文件本次测试用例是csv作为数据文件总共是2000条数据。4. SeaTunnel 作业配置说明4.1 核心配置示例env { parallelism 1 job.mode BATCH } source { LocalFile { path /Users/wangxuepeng/Desktop/seatunnel/apache-seatunnel-2.3.13-SNAPSHOT/test_data file_format_type csv schema { schema_url http://localhost:8090/api/metalakes/test_Metalake/catalogs/local-mysql/schemas/test/tables/demo_user } } } sink { jdbc { url jdbc:mysql://localhost:3306/test driver com.mysql.cj.jdbc.Driver username root password 123456 database test table demo_user generate_sink_sql true } }4.2 配置要点说明schema.schema_url指向 Gravitino 中的表元数据 REST 接口SeaTunnel 在任务启动时会自动拉取表结构无需在作业中手工声明字段列表generate_sink_sql trueSink 侧根据解析后的 schema 自动生成 INSERT SQL5. 数据与任务执行结果日志截图 数据库截图任务运行过程中Source 根据 schema_url 自动解析字段结构CSV 文件字段与表结构自动对齐数据成功写入 MySQLdemo_user表6. 问题解答6.1 功能支持的范围该功能在dev分支目前是已经支持文件类型的连接器包括localhdfss3等。6.2 使用schema_url是否支持多表改功能的引入并不影响多表的功能甚至可以混合使用比如source { LocalFile { tables_configs [ { path /seatunnel/read/metalake/table1 file_format_type csv field_delimiter , row_delimiter \n skip_header_row_number 1 schema { table db.table1 fields { c_string string c_int int c_boolean boolean c_double double } } }, { path /seatunnel/read/metalake/table2 file_format_type csv field_delimiter , row_delimiter \n skip_header_row_number 1 schema { table db.table2 schema_url http://gravitino:8090/api/metalakes/test_metalake/catalogs/test_catalog/schemas/test_schema/tables/table2 } } ] } }7. 功能总结通过引入基于 Gravitino schema_url 的 schema 自动解析能力SeaTunnel 在数据同步场景中具备了以下优势消除重复 schema 定义降低作业配置复杂度复用统一的元数据管理系统提升一致性表结构变更对作业更加友好维护成本显著降低该能力非常适合已有完善元数据平台的企业场景大表、多字段、频繁变更 schema 的同步任务希望提升 SeaTunnel 作业可维护性的用户