当前位置: 首页> 健康> 美食 > 运维网页平台维护_广州网站建设费_seo职位要求_软文推广平台有哪些

运维网页平台维护_广州网站建设费_seo职位要求_软文推广平台有哪些

时间:2025/7/11 20:09:06来源:https://blog.csdn.net/qq_21451945/article/details/145772635 浏览次数:0次
运维网页平台维护_广州网站建设费_seo职位要求_软文推广平台有哪些

1. Flink-CDC的介绍

Flink-cdc主要是用来同步数据库中的数据,它的主要优势在于基于Flink框架直接用Flink Stream Api 或Flink SQL 直接编程,不需要引入第三方组件

2.Flink-CDC的使用

Flink-cdc在使用上需要注意的点

  • 注意Flink-cdc在2.1版本之前需要导入MySQL的连接包,之后版本不需要,如果环境中有MySQL的连接包需要去除掉
  • 在2.4版本之监控MySQL表需要它有主键,2.4版本开始只需要配置“scan.incremental.snapshot.chunk.key-column”参数即可
  • MySQL CDC Connector在监控多个表的时候,每个表需要指定库名,并用逗号隔开
  • Flink中必须要设置checkpoint,不设置无法正常监控binlog变更日志
    Flink-CDC基于DataStream的使用
MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node2")      //设置MySQL hostname.port(3306)             //设置MySQL port.databaseList("db1")    //设置捕获的数据库.tableList("db1.tbl1,db1.tbl2") //设置捕获的数据表.username("root")       //设置登录MySQL用户名.password("123456")     //设置登录MySQL密码.deserializer(new JsonDebeziumDeserializationSchema()) //设置序列化将SourceRecord 转换成 Json 字符串.startupOptions(StartupOptions.initial()).build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpoint
env.enableCheckpointing(5000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"MySQL Source").setParallelism(4).print();
env.execute();

基于Flink Sql的使用

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
//设置checkpoint
tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 5000L);
tableEnv.executeSql("" +"CREATE TABLE mysql_binlog (" +" id INT," +" name STRING," +" age INT," +" PRIMARY KEY(id) NOT ENFORCED" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'node2'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '123456'," +" 'database-name' = 'db1'," +" 'table-name' = 'tbl1'" +")");
tableEnv.executeSql("select * from mysql_binlog").print();

2.1 Flink-CDC对全量和增量数据的工作原理

并行读取表的全量快照,然后以单并行度方式读取表的binlog进行增量数据的同步

  • 全量同步过程中,它会根据主键把数据分为多个chunk分片,然后分配给多并行度去分别读取这些chunk上的数据,读取快照期间,Flink支持chunk级别的checkpoint,即使在同步的过程中发生故障,也可以做到exactly-once级别的恢复

2.2 Flink-CDC启动模式

启动模式是指程序启动的时候,以怎么的方式监控数据库中的数据,共有如下几种模式

  • initial(默认): 对受监控的库表进行初始快照,并继续读取最新的binlog
  • earliest-offset: 它会跳过快照直接读取最早的binlog日志,它与initial方式区别在于,initial只读取已经操作后(表中现有数据)的数据
  • latest-offset: 不执行快照,从binlog的最新处开始读取增量数据
  • specific-offset: 从指定的binlog位点开始读取,位点可以通过binlog文件名和位置指定
  • timestamp: 从指定的时间戳读取binlog事件
关键字:运维网页平台维护_广州网站建设费_seo职位要求_软文推广平台有哪些

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: