当前位置: 首页> 健康> 科研 > python工具--mysql2doris的datax json生成工具

python工具--mysql2doris的datax json生成工具

时间:2025/7/12 5:43:30来源:https://blog.csdn.net/weixin_45399602/article/details/141258689 浏览次数:0次

一、说明

要做大量的datax来同步mysql-doris,会需要写很多datax的json文件,为了省事,写了工具,只要提供doris的建表语句即可生产json。

二、文件说明

一共用到了五个文档

2.1 conf.json

这里是Mysql和doris的链接信息
其中 table_prefix 字段是因为mysql到doris时表会加前缀,如果你是同名表 就用不到。

{"mysql": {"host": "","port": 3306,"user": "root","password": "","database": "","table_prefix": ""},"doris": {"host": "","port": 9030,"user": "","password": "","loadUrl": ["xxx:8030"],"preSql": []}
}

2.2 datax_example.json

这里是一个标准的json文件,生产是会用这个做模版修改成你想要的json
如果你想提高速度,可以在这里修改channel
其他关于模版的修改 也在这里改。


{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "","password": "","splitPk": "","connection": [{"querySql": [],"jdbcUrl": []}]}},"writer": {"name": "doriswriter","parameter": {"username": "","password": "","loadUrl": [],"TwoPhaseCommit": "true","column": [],"preSql": [],"flushInterval": 30000,"connection": [{"table": [""],"jdbcUrl": "","selectedDatabase": ""}],"loadProps": {"format": "json","strip_outer_array": "true","line_delimiter": "\\x02"}}}}]}
}

2.3 datax_output.json

这是最终生成的json文件,不需要提前创建

2.4 dorisDDL.sql

这里是doris的建表语句。需要注意的是,请带上库名,datax json需要写库名,是从这里解析的。

CREATE TABLE `test`.`merchant` (`merchant_code` VARCHAR(60) NOT NULL COMMENT '商户编码',`merchant_name` VARCHAR(100) DEFAULT NULL COMMENT '商户名称',`merchant_type` VARCHAR(60) DEFAULT NULL COMMENT '商户类型'
) ENGINE = OLAP
UNIQUE KEY(`merchant_code`)
DISTRIBUTED BY HASH(`merchant_code`)   BUCKETS 20
PROPERTIES ("replication_num" = "1","storage_type" = "COLUMN"
);

2.5 Mysql2dorisDataxTools.py

这就是核心代码了

import json
import re# 读取JSON文件
def read_and_format_json(file_path):# 读取文件内容with open(file_path, 'r', encoding='utf-8') as file:content = file.read()# 将内容转换为Python字典return json.loads(content)# 解析DDL
def parse_create_table_sql(file):sql = ''with open(file, 'r', encoding='utf-8') as file:sql = file.read()# 移除SQL中的注释sql = re.sub(r'--.*$', '', sql, flags=re.MULTILINE)sql = re.sub(r'/\*.*?\*/', '', sql, flags=re.DOTALL)# 提取库名和表名table_pattern = r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(?:(`?)(\w+)\1\.)?(`?)(\w+)\3'table_match = re.search(table_pattern, sql, re.IGNORECASE)if not table_match:return Nonedatabase_name = table_match.group(2)table_name = table_match.group(4)# 提取字段定义部分column_section = re.search(r'\((.*?)\)[^)]*$', sql, re.DOTALL)if not column_section:return None# 提取字段名column_pattern = r'`?(\w+)`?\s+(?:\w+)(?:\(.*?\))?(?:\s+.*?)?(?:,|$)'columns = re.findall(column_pattern, column_section.group(1))return {'database': database_name,'table': table_name,'columns': columns}# def get_select()
def get_select(columns,table_name,tablename_prefix):columns = [s.strip() for s in columns]table_name = table_name.replace(tablename_prefix,'')return 'SELECT  ' +'`'+ '`,`'.join(columns) +'`'+ ' FROM ' + table_namedef get_column(columns):return ['`' + c + '`' for c in columns]conf = read_and_format_json('conf.json')
js_demo = read_and_format_json('datax_example.json')
ddl_info = parse_create_table_sql('dorisDDL.sql')
select_sql = get_select(ddl_info['columns'],ddl_info['table'],conf['mysql']['table_prefix'])
column = get_column(ddl_info['columns'])# reader部分
js_demo['job']['content'][0]['reader']['parameter']['username'] = conf['mysql']['user']
js_demo['job']['content'][0]['reader']['parameter']['password'] = conf['mysql']['password']
js_demo['job']['content'][0]['reader']['parameter']['connection'][0]['querySql'] = [select_sql]
jdbc_url_mysql = 'jdbc:mysql://'+conf['mysql']['host']+':'+str(conf['mysql']['port'])+'/'+conf['mysql']['database']+'?useSSL=false&serverTimezone=Asia/Shanghai'
js_demo['job']['content'][0]['reader']['parameter']['connection'][0]['jdbcUrl'] = [jdbc_url_mysql]# writer部分
js_demo['job']['content'][0]['writer']['parameter']['username'] = conf['doris']['user']
js_demo['job']['content'][0]['writer']['parameter']['password'] = conf['doris']['password']
js_demo['job']['content'][0]['writer']['parameter']['loadUrl'] = conf['doris']['loadUrl']
js_demo['job']['content'][0]['writer']['parameter']['column'] = column
js_demo['job']['content'][0]['writer']['parameter']['preSql'] = conf['doris']['preSql']
js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['table'] = [ddl_info['table']]
jdbc_url_doris = 'jdbc:mysql://'+conf['doris']['host']+':'+str(conf['doris']['port'])+'/'+ddl_info['database']
js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['jdbcUrl'] = jdbc_url_doris
js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['selectedDatabase'] = ddl_info['database']print(json.dumps(js_demo))# 写入JSON文件
with open('datax_output.json', 'w', encoding='utf-8') as file:json.dump(js_demo, file, ensure_ascii=False, indent=4)

三、使用

只要把以上文件放在一个目录里,直接执行Mysql2dorisDataxTools.py 就可以了。

如果你懒得写,直接用下面的压缩包吧。

https://download.csdn.net/download/weixin_45399602/89644991?spm=1001.2014.3001.5503

关键字:python工具--mysql2doris的datax json生成工具

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: