import pandas as pd
from sqlalchemy import create_engine, text
import cx_Oracle
from sqlalchemy.exc import DatabaseError
import traceback# SQL Server 配置
sql_server_conn_str = 'mssql+pyodbc://用户名:密码@数据库地址:端口/库名?driver=ODBC+Driver+11+for+SQL+Server'
sql_server_engine = create_engine(sql_server_conn_str)# Oracle 配置
oracle_conn_str = 'oracle+cx_oracle://用户名:密码@数据库地址:端口/库名'
oracle_engine = create_engine(oracle_conn_str,connect_args={"encoding": "UTF-8", "nencoding": "UTF-8"})def get_sqlserver_columns(table_name):"""获取SQL Server表的列定义"""with sql_server_engine.connect() as conn:columns = conn.execute(text(f"""SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTHFROM INFORMATION_SCHEMA.COLUMNSWHERE TABLE_NAME = '{table_name}'""")).fetchall()return columnsdef get_numeric_columns(table_name):"""获取需要转换为整数的列(NUMBER类型)"""with sql_server_engine.connect() as conn:result = conn.execute(text(f"""SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'AND DATA_TYPE IN ('int', 'smallint', 'tinyint', 'bigint')"""))return [row.COLUMN_NAME for row in result]def map_oracle_type(sqlserver_type, max_length):"""SQL Server到Oracle类型映射"""sqlserver_type = sqlserver_type.lower()if max_length and max_length < 1:max_length = Nonetype_map = {'varchar': lambda: 'CLOB' if max_length is None else f'VARCHAR2({min(max_length, 4000)})','nvarchar': lambda: 'NCLOB' if max_length is None else f'NVARCHAR2({min(max_length, 2000)})','text': 'CLOB','char': lambda: f'CHAR({max_length})' if max_length else 'CHAR(1)','nchar': lambda: f'NCHAR({max_length})' if max_length else 'NCHAR(1)','int': 'NUMBER(10)','bigint': 'NUMBER(19)','smallint': 'NUMBER(5)','tinyint': 'NUMBER(3)','decimal': 'NUMBER','numeric': 'NUMBER','float': 'BINARY_DOUBLE','real': 'BINARY_FLOAT','datetime': 'TIMESTAMP(6)','datetime2': 'TIMESTAMP(6)','date': 'DATE','time': 'TIMESTAMP(6)','bit': 'NUMBER(1)'}if sqlserver_type in type_map:return type_map[sqlserver_type]() if callable(type_map[sqlserver_type]) else type_map[sqlserver_type]return 'CLOB'def migrate_table(table_name):try:oracle_table_name = table_name.upper()columns_info = get_sqlserver_columns(table_name)numeric_cols = get_numeric_columns(table_name) # 获取需要转换的列# 构建Oracle表结构columns_with_types = [f'"{col.COLUMN_NAME}" {map_oracle_type(col.DATA_TYPE, col.CHARACTER_MAXIMUM_LENGTH)}'for col in columns_info]with oracle_engine.connect() as conn:if conn.execute(text("SELECT 1 FROM user_tables WHERE table_name = :name"),{'name': oracle_table_name}).scalar():print(f"删除旧表 {oracle_table_name}...")conn.execute(text(f'DROP TABLE "{oracle_table_name}" PURGE'))conn.commit()create_sql = f'CREATE TABLE "{oracle_table_name}" ({", ".join(columns_with_types)})'print(f"\n[DEBUG] 建表SQL:\n{create_sql}")conn.execute(text(create_sql))conn.commit()chunks = pd.read_sql_table(table_name,sql_server_engine,chunksize=10000)for chunk_idx, chunk_df in enumerate(chunks):# 空数据直接跳过if len(chunk_df) == 0:print(f"跳过空数据批次:{chunk_idx + 1}")continue# 空值处理chunk_df = chunk_df.where(pd.notnull(chunk_df), None)# 动态转换数值列for col in numeric_cols:if col in chunk_df.columns:chunk_df[col] = chunk_df[col].fillna(0).astype('int64')# 日期类型转换datetime_cols = [col for col in chunk_df.columnsif pd.api.types.is_datetime64_any_dtype(chunk_df[col])]for col in datetime_cols:chunk_df[col] = chunk_df[col].dt.tz_localize(None)# 构建插入SQLcolumns = ', '.join([f'"{col}"' for col in chunk_df.columns])placeholders = ', '.join([f':{col}' for col in chunk_df.columns])insert_sql = text(f"""INSERT INTO "{oracle_table_name}" ({columns}) VALUES ({placeholders})""")try:conn.execute(insert_sql, chunk_df.to_dict(orient='records'))conn.commit()print(f"批次 {chunk_idx + 1}: 成功插入 {len(chunk_df)} 行")except DatabaseError as e:conn.rollback()print(f"插入失败: {str(e)}")print("问题数据样例:", chunk_df.iloc[0].to_dict())returnexcept Exception as e:print(f"严重错误: {str(e)}")traceback.print_exc()def migrate_database():with sql_server_engine.connect() as conn:tables = conn.execute(text("""SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE'""")).fetchall()exclude_tables = ['表名'] # 要过滤的表for table in tables:table_name = table[0]# 跳过特定表(不区分大小写)if table_name.upper() in [t.upper() for t in exclude_tables]:print(f"\n{'=' * 30} 跳过表 {table_name} {'=' * 30}")continueprint(f"\n{'=' * 30} 迁移表 {table_name} {'=' * 30}")migrate_table(table_name)if __name__ == '__main__':# migrate_table('表名') # 单表测试migrate_database() # 全库迁移print("\n迁移完成")
上面代码主要是解决整库迁移过程中相关表的创建(备注:不同数据库之间数据类型的映射转换),还有读取原始数据的类型转换 和 分批插入优化。
注意事项:
Oracle
1.在pycharm搜索不到cx_Oracle的库,通过cmd的方式进入python安装环境的目录Scripts下,然后pip install cx_Oracle 进行导入。
2.配置 Oracle Instant Client
1).根据Oracle的版本和操作系统进行相应下载
官方地址:Oracle Instant Client Downloads
2).配置环境变量
- Windows:将解压路径添加到系统
Path
变量。
SqlServer
1.根据SqlServer的版本进行驱动下载安装
官方地址:Download ODBC Driver for SQL Server - ODBC Driver for SQL Server | Microsoft Learn