当前位置: 首页> 健康> 科研 > 微信怎么推广最有效_建筑模板生产厂家有哪些_如何自己弄个免费网站_抖音热门搜索关键词

微信怎么推广最有效_建筑模板生产厂家有哪些_如何自己弄个免费网站_抖音热门搜索关键词

时间:2025/7/12 9:51:53来源:https://blog.csdn.net/dkl12/article/details/147614284 浏览次数:0次
微信怎么推广最有效_建筑模板生产厂家有哪些_如何自己弄个免费网站_抖音热门搜索关键词

前言

记录 Flink jdbc、mysql-cdc 连接 mysql8 碰到的小问题

版本

  • Flink 1.15.3
  • mysql-cdc 2.3.0
  • MySQL 8.0.27

cdc_mysql2mysql

MySQL5

之前主要用 MySQL5 ,下面是 MySQL5 的 sql ,具体见 Flink MySQL CDC 使用总结

set yarn.application.name=cdc_mysql2mysql;
set execution.target=yarn-per-job;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;set execution.checkpointing.interval=10000; 
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATE TABLE mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致name string,price double,ts bigint,dt string
) WITH ('connector' = 'mysql-cdc','hostname' = '19.168.44.128','port' = '3306','username' = 'root','password' = 'root-123','database-name' = 'cdc','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

MySQL8

同样的 SQL 会报错:

Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Longat com.mysql.jdbc.ConnectionImpl.buildCollationMapping(ConnectionImpl.java:1024)

最初怀疑是该版本的 cdc 不支持 MySQL8,后来发现只需要在 jdbc 添加 driver 参数解决:

'driver' = 'com.mysql.cj.jdbc.Driver'

完整的sql:

set yarn.application.name=cdc_mysql2mysql;
set execution.target=yarn-per-job;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;set execution.checkpointing.interval=10000; 
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATE TABLE mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致name string,price double,ts bigint,dt string
) WITH ('connector' = 'mysql-cdc','hostname' = '19.168.44.128','port' = '3306','username' = 'root','password' = 'root-123','database-name' = 'cdc','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

小结:

  • mysql-cdc: MySQL5和8的写法一样
  • jdbc: MySQL8 要添加 dirver 参数:‘driver’ = ‘com.mysql.cj.jdbc.Driver’

jdbc_mysql2mysql

MySQL8

根据上面 cdc_mysql2mysql 的经验,jdbc_mysql2mysql source 和 sink 应该都添加driver:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

本来以为这样就没问题了,但是会报错:

Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long

经排查发现是字段类型不一致导致的问题,因为 mysql 建表时 ts的类型为 int ,那么在flink sql 中 ts也应该为 int 而不应该为 bigint,完整的正确 sql 为:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

MySQL5

验证一下 MySQL5 是不是存在和 MySQL8 一样的问题,经验证问题一样,在 MySQL5 中 ts 的类型为bigint 也会报同样的错误,完整的正确 sql 为:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

sink

在 cdc_mysql2mysql 中 ts bigint 就没问题, 尝试把 sink 表中 ts 的字段类型改为 bigint,最终发现 MySQL5 和 MySQL8 都没问题,也就是只有 jdbc 的 source 表对字段类型限制比较严格。

driver 参数

  • jdbc:MySQL5 添加 driver 参数也可以正常运行,但不是必须的,MySQL8 必须添加 driver 参数,所以无论是 5 还是 8 都加上 driver 参数,这样就不用区分 mysql的版本了。
  • cdc : 不支持 driver 参数

字段类型映射

官方文档:

  • jdbc: https://nightlies.apache.org/flink/flink-docs-release-2.0/zh/docs/connectors/table/jdbc/#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84
  • cdc : https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/mysql-cdc/#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84

MySQL8 适配

由此可见,Flink jdbc、mysql-cdc 均适配 MySQL5 和 MySQL8,对应 jar 如下:

  • jdbc: flink-connector-jdbc-1.15.3.jar
  • cdc : flink-sql-connector-mysql-cdc-2.3.0.jar

仅需要这两个包,不需要额外的 mysql-connector-java jar包

但在 cdc 3.1 版本以上,需要额外的 mysql-connector-java jar包,具体见官网:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/mysql-cdc/

cdc 版本支持

官方文档:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/overview/

关键字:微信怎么推广最有效_建筑模板生产厂家有哪些_如何自己弄个免费网站_抖音热门搜索关键词

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: