当前位置: 首页> 财经> 创投人物 > 上海房价_网页设计与制作教程课后题答案_哈尔滨优化推广公司_宁波seo外包哪个品牌好

上海房价_网页设计与制作教程课后题答案_哈尔滨优化推广公司_宁波seo外包哪个品牌好

时间:2025/8/4 16:22:06来源:https://blog.csdn.net/weixin_47467016/article/details/147139115 浏览次数:2次
上海房价_网页设计与制作教程课后题答案_哈尔滨优化推广公司_宁波seo外包哪个品牌好

从数据格式转换的角度 flink cdc 如何同步数据,写入paimon?

从一个测试用例着手

org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java

public void testSinkWithDataChange(String metastore, boolean enableDeleteVector)throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,Catalog.DatabaseNotExistException, SchemaEvolveException {// 初始化paimon的元数据initialize(metastore);// 创建paimonSinkPaimonSink<Event> paimonSink =new PaimonSink<>(catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext());Committer<MultiTableCommittable> committer = paimonSink.createCommitter();// insertwriteAndCommit(writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0]));Assertions.assertThat(fetchResults(table1)).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2"));// deletewriteAndCommit(writer,committer,generateDelete(table1, Arrays.asList(Tuple2.of(STRING(), "1"), Tuple2.of(STRING(), "1"))));Assertions.assertThat(fetchResults(table1)).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "2", "2"));// updatewriteAndCommit(writer,committer,generateUpdate(table1,Arrays.asList(Tuple2.of(STRING(), "2"), Tuple2.of(STRING(), "2")),Arrays.asList(Tuple2.of(STRING(), "2"), Tuple2.of(STRING(), "x"))));Assertions.assertThat(fetchResults(table1)).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "2", "x"));if (enableDeleteVector) {Assertions.assertThat(fetchMaxSequenceNumber(table1.getTableName())).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 3L));} else {Assertions.assertThat(fetchMaxSequenceNumber(table1.getTableName())).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1L),Row.ofKind(RowKind.INSERT, 2L),Row.ofKind(RowKind.INSERT, 3L));}}

整体流程:

  1. 创建变更事件。

  2. 创建PaimonWriter

  3. 对于每一个事件event,都调用paimonWriter中的write。具体又包括以下几个步骤

    • 序列化event为paimon 中的数据格式genericRow
    • 创建paimon的write算子(StoreSinkWrite),将genericRow写入paimon表. 进入paimon内部的写入逻辑。
      • 从每一条row中提取rowkind(INSERT,UPDATE_BEFORE,UPDATE_AFTER,DELETE)
      • 提取data,MergeTreeWriter中执行两个操作,内存够,就把数据put到内存,内存不够就flush到磁盘

源码

Flink cdc 代码

  • 创建测试data change event

在这里插入图片描述

  • 遍历事件

在这里插入图片描述

  • 序列化

在这里插入图片描述

  • 核心转换逻辑

在这里插入图片描述

  • paimon的写入

在这里插入图片描述

以下是paimon的代码

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

总结

回到本文的标题:从数据格式转换的角度 flink cdc 如何把数据处理成paimon?

在flink cdc端 核心是 convertEventToGenericRow,处理dataChangeEvent 和 genericRow 对应。
为什么要这么对应?

关键字:上海房价_网页设计与制作教程课后题答案_哈尔滨优化推广公司_宁波seo外包哪个品牌好

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: