PySpark缺失值处理:从语义解析到生产级容错实践

📅 2026/6/19 13:24:14
PySpark缺失值处理:从语义解析到生产级容错实践
1. 为什么在 PySpark 里“填空”比“删空”更值得花时间琢磨做数据工程或机器学习项目你肯定遇到过这样的场景上游系统传来的订单表里customer_id列有 12% 是 null用户行为日志中session_duration字段大量缺失或者一个跨部门合并的宽表十几个字段里总有一两个像幽灵一样飘着 null。这时候很多人第一反应是——“直接dropna()干掉算了”。我带过的三届实习生前两届都这么干结果上线后模型 AUC 掉了 0.08业务方打电话来问“你们是不是把高价值沉默用户全删了”——后来查清楚那些region为 null 的记录恰恰是新拓展市场的测试用户不是脏数据是金矿。这就是 PySpark 处理缺失值最常被低估的真相null 不是错误而是信息的一种特殊编码方式。它可能代表“未采集”传感器故障、“未发生”新用户还没产生购买行为、“不适用”男性用户的maternity_leave_days、甚至“敏感拒填”金融风控中的职业字段。PySpark 的na模块不是工具箱而是一套语义解析器——你用drop(howany)还是fill(0)本质上是在向下游系统声明“我认定这些 null 属于哪一类语义”。这个判断一旦出错后续所有特征工程、模型训练、AB 测试都会在错误的基线上滑行。所以这篇文章不讲“怎么用”而是带你拆解每一个.na.drop()和.na.fill()调用背后的数据契约。我会用真实生产环境里的五个典型场景电商漏斗、IoT 设备上报、金融反欺诈、医疗随访、广告归因作为锚点说明为什么thresh3比howall更安全为什么对sales填均值可能毁掉整个 LTV 预测以及如何用when().otherwise()构建可审计的填充逻辑。所有代码都基于 Spark 3.4适配 Delta Lake 表和 Iceberg 表的写入规范避免你在生产环境踩到null与NaN混淆、分区裁剪失效、或谓词下推被破坏的坑。关键词 Apache Spark 在这里不是技术标签而是责任边界——当你在.na.fill()里敲下那个数字时你签下的是一份数据质量承诺书。2. 缺失值处理的整体设计思路从“删填二分法”到“语义驱动决策树”2.1 为什么不能照搬 Pandas 思维Spark 的分布式本质决定了策略必须重构很多从 Pandas 转过来的工程师习惯性地把df.dropna()当作万能解药。但在 Spark 里这招在生产环境大概率会触发三重灾难计算爆炸dropna()默认触发全表扫描 shuffle因为 Spark 必须确认每一行是否含 null。当你的表是 50TB 的分区表且where条件无法提前过滤时这个操作会让集群 CPU 持续 95% 一整夜语义丢失Pandas 的dropna(subset[A,B])只删 A 或 B 为 null 的行但 Spark 的等价操作na.drop(subset[A,B])实际上是“只要 A 或 B 任一为 null 就删”这和业务方理解的“仅当 A 和 B 同时缺失才视为无效记录”完全相反血缘断裂dropna()生成的新 DataFrame 会丢失原始列的nullableTrue元数据下游任务如果依赖该列非空做 join key编译期不报错运行时直接 OOM。我见过最痛的案例某银行用na.drop(howany)清洗客户主数据结果把id_card_number为 null正在补录流程中但mobile_phone有效的 27 万条准客户全删了导致当月营销活动触达率下降 40%。事后复盘发现他们真正需要的是“保留至少有一个有效联系方式的记录”而不是“删除任何含 null 的记录”。所以我们的设计起点必须是先定义 null 的业务语义再匹配 Spark 的执行语义。我把这个过程拆成四步决策流分类这个 null 是missing at randomMAR、missing not at randomMNAR还是missing completely at randomMCAR比如电商的discount_code缺失大概率是 MCAR用户没领券但医疗的blood_pressure缺失很可能是 MNAR患者拒绝测量本身就有健康风险影响域评估这个字段在后续 pipeline 中扮演什么角色是 join key必须非空、特征列可填充、标签列不可填充还是审计字段需保留 null 标记策略映射根据前两步选择 Spark 提供的原生能力drop/fill/replace还是自定义 UDF如用 KNN 填充时空序列可观测性嵌入在填充/删除操作后必须追加质量监控列例如sales_null_flag、sales_fill_method让数据血缘图能追踪每一条记录的 null 处理路径。这个决策树不是理论模型而是我们团队在 12 个核心数据产品中强制落地的 SOP。每次上线新表数据治理平台会自动校验这四步是否在 DDL 注释中明确声明否则 CI/CD 流水线直接阻断。2.2 Spark 3.4 的新能力为什么na.replace()正在取代部分fill()场景很多人忽略了一个关键演进Spark 3.4 开始na.replace()不再只是字符串替换工具它支持对任意数据类型做条件映射且能规避fill()的两大硬伤类型安全漏洞na.fill(0)对 string 列会静默失败返回原值而na.replace(, N/A)明确限定作用域语义污染风险fill(-999)把业务上“未知”编码成数值后续统计时mean()会把 -999 当真实值计算而replace(None, UNKNOWN)保持语义隔离。我们用一个真实案例说明某 IoT 设备上报的battery_level字段业务定义如下null设备离线未上报-1设备故障电池传感器损坏0电量耗尽需紧急更换。如果用na.fill(0)就把“离线”和“耗尽”混为一谈而用na.replace({None: OFFLINE, -1: SENSOR_FAULT})既保持类型一致全转 string又为后续告警规则提供明确分类依据。更关键的是性能replace()是 map-side 操作不触发 shuffle而fill()对数值列虽快但对复杂嵌套结构如structtemp:double,humidity:double必须序列化反序列化实测慢 3.2 倍。我们在处理 200 亿条设备日志时把fill()改成replace()后ETL 任务从 47 分钟降到 18 分钟。所以现在我们的原则是只要 null 的语义能用离散值表达优先用replace()只有连续型特征且业务接受插值时才考虑fill()。2.3 为什么“阈值删除”thresh比“全删”howall更符合数据治理实践na.drop(thresh3)这个参数常被误读为“至少保留 3 个非空字段”。但它的真正威力在于构建容错型数据契约。想象一个用户画像宽表包含 23 个字段基础属性 8 个、行为指标 10 个、偏好标签 5 个。业务方要求“只要用户身份能确认user_idmobile_hash非空其他字段缺失可接受”。如果用howall意味着整行全 null 才删——这根本没意义因为user_id为 null 的记录不可能存在如果用subset[user_id,mobile_hash]又太激进会把user_id有效但mobile_hash加密失败的记录误杀。而thresh2完美匹配这个需求只要user_id和mobile_hash两个关键字段中有一个非空就满足阈值整行保留。我们把它封装成一个治理函数def drop_rows_by_key_threshold(df: DataFrame, key_columns: List[str], min_valid_keys: int 1) - DataFrame: 基于关键字段集的最小有效数阈值删除行 适用于用户ID体系user_id/mob_hash/email_hash 三选一即可 设备标识imei/mac/adid 二选一即可 # 构建布尔表达式sum(isNotNull(col) for col in key_columns) min_valid_keys from pyspark.sql.functions import when, col, sum as spark_sum key_null_checks [col(c).isNotNull() for c in key_columns] valid_count_expr spark_sum([when(check, 1).otherwise(0) for check in key_null_checks]) return df.filter(valid_count_expr min_valid_keys)这个函数在我们所有客户主数据清洗任务中复用把原来需要 5 行na.drop()组合的逻辑压缩成一行可读、可测、可审计的调用。更重要的是它把业务规则“三选一”直接编码进 SQL 执行计划避免人工配置错误。3. 核心细节解析与实操要点从语法糖到执行计划的穿透式理解3.1na.drop()的四个参数组合何时用how何时用subset何时必须两者共存官方文档说how和subset可以独立使用但生产环境里90% 的误用都源于没搞清它们的逻辑关系。我们画一张真值表来揭示本质参数组合删除条件典型场景生产风险howany默认行中任一列为 null 即删清洗强约束表如交易流水order_id和amount必须同时存在误删“部分字段缺失但业务有效”的记录如discount_code为空但order_amount0的正常订单howall行中所有列均为 null 才删处理上游系统偶发的空行注入如 Kafka 消费者重启时的占位消息几乎无用因为真实数据极少出现全 null 行反而掩盖真正的脏数据问题subset[A,B]仅检查指定列其他列 null 不影响宽表中只保证核心键完整如user_id和event_time非空其他行为字段可缺失安全但需确保subset包含所有 join key 和 partition key否则下游 join 会产出 nullhowany, subset[A,B]仅当 A 或 B 为 null 时才删精确控制删除范围如“只要user_id或device_id缺失就丢弃该设备会话”最常用但必须验证subset列是否覆盖业务 SLA 要求关键洞察howany是全局扫描subset是局部过滤二者叠加才是精准手术刀。我们曾用na.drop(howany)处理一个 150 列的广告曝光日志结果删掉了 63% 的记录——因为creative_id、campaign_id、ad_group_id这三个字段中任意一个为 null 就触发删除。而业务真实要求是“只要impression_id和timestamp有效其他都可缺”。改成na.drop(howany, subset[impression_id,timestamp])后保留率升至 99.2%。提示永远用df.select([col(c).isNull().alias(f{c}_null) for c in df.columns]).agg(*[sum(col(c)).alias(c) for c in df.columns]).show()先探查 null 分布再决定subset列表。别凭感觉猜。3.2na.fill()的隐式类型转换陷阱为什么fill(0)对 string 列“静默失败”这是 Spark 最反直觉的设计之一。看这段代码from pyspark.sql import SparkSession spark SparkSession.builder.getOrCreate() df spark.createDataFrame([(1, a), (2, None)], [id, name]) df.printSchema() # root # |-- id: long (nullable true) # |-- name: string (nullable true) df.na.fill(0).show() # 输出1 a | 2 null —— name 列的 null 没变 df.na.fill(0).show() # 输出1 a | 2 0 —— 这才生效原因在于 Spark 的fill()实现机制它会遍历 DataFrame 的 schema对每个列检查value参数是否与列类型兼容。0是整数只匹配 numeric 类型列0是字符串只匹配 string 类型列。这种“类型守门员”设计本意是防止类型污染但导致两个严重问题调试地狱当你想用fill(0)统一处理所有数值列却忘了 string 列也含 null结果线上跑出一堆null漏洞日志里还找不到报错血缘污染fill(0)对 string 列无效但 Spark 不报错也不警告下游任务以为数据已清洗实际带着 null 进入 join产出笛卡尔积。我们的解决方案是强制类型感知填充def safe_fill(df: DataFrame, fill_value_map: Dict[str, Any], default_numeric: float 0.0, default_string: str MISSING) - DataFrame: 类型安全的填充函数自动匹配列类型 from pyspark.sql.types import StringType, NumericType fill_exprs [] for col_name in df.columns: col_type [f.dataType for f in df.schema.fields if f.name col_name][0] if col_name in fill_value_map: # 显式指定了填充值直接使用 fill_exprs.append( when(col(col_name).isNull(), lit(fill_value_map[col_name])) .otherwise(col(col_name)) .alias(col_name) ) elif isinstance(col_type, StringType): fill_exprs.append( when(col(col_name).isNull(), lit(default_string)) .otherwise(col(col_name)) .alias(col_name) ) elif isinstance(col_type, NumericType): fill_exprs.append( when(col(col_name).isNull(), lit(default_numeric)) .otherwise(col(col_name)) .alias(col_name) ) else: # 其他类型timestamp, boolean, struct保持原样 fill_exprs.append(col(col_name)) return df.select(fill_exprs)这个函数在我们所有 ETL 任务中作为标准组件把原来需要 3 种不同fill()调用的逻辑统一成一次可审计的调用。更重要的是它把类型决策从“运行时隐式”变成“编译时显式”CI 流程能静态检查fill_value_map是否覆盖所有关键列。3.3na.replace()的高级用法用字典映射实现业务规则驱动的 null 转换replace()的强大远超字符串替换。它的字典参数支持多级映射且能处理None即 null作为 key。我们用一个金融反欺诈场景演示某信贷审批表中employment_status字段业务定义null用户未填写职业信息高风险信号unemployed明确失业需加强审核retired退休低风险但需验证年龄student学生需验证在校证明。但上游系统把null和unemployed都存为字符串NULL导致风控模型无法区分。这时replace()就是救星# 第一步把字符串 NULL 统一转为真正的 null df_clean df.replace({NULL: None}, subset[employment_status]) # 第二步用业务规则映射 null 到风险等级标签 from pyspark.sql.functions import when, col, lit df_risk_labeled df_clean.select( *, when(col(employment_status).isNull(), lit(RISK_HIGH)) .when(col(employment_status) unemployed, lit(RISK_MEDIUM)) .when(col(employment_status) retired, lit(RISK_LOW)) .when(col(employment_status) student, lit(RISK_MEDIUM)) .otherwise(lit(RISK_UNKNOWN)) .alias(employment_risk_level) )这个方案的优势在于可解释性employment_risk_level列直接暴露业务逻辑审计时无需翻代码可扩展性新增风险等级只需改字典不改 SQL性能replace()是 Catalyst 优化器可识别的操作会被编译成高效的字节码比 UDF 快 8 倍。我们把这个模式抽象为business_rule_replace()工具函数在 7 个风控模型中复用把平均规则更新周期从 3 天缩短到 15 分钟。3.4 填充另一列值when().otherwise()的正确打开方式与性能陷阱原文示例用when(Name.isNull(), Id).otherwise(Name)填充Name这看似简单但藏着三个致命坑类型不匹配Id是 longName是 stringSpark 会尝试隐式转换失败时返回 null导致填充失效空字符串干扰Name可能是空字符串而非nullisNull()判断为 false结果保留空字符串业务上仍是脏数据广播开销如果Id是高基数列如用户 IDwhen().otherwise()会触发 shuffle因为 Spark 需要确保同一行的Id和Name在同一个 partition。我们生产环境的标准解法是分三步走from pyspark.sql.functions import when, col, lit, trim, length def fill_column_from_another(df: DataFrame, target_col: str, source_col: str, fill_if_null: bool True, fill_if_empty: bool True) - DataFrame: 安全填充目标列支持 null 和空字符串双重判断 condition lit(False) if fill_if_null: condition condition | col(target_col).isNull() if fill_if_empty: condition condition | ((trim(col(target_col)) ) (length(col(target_col)) 0)) return df.withColumn( target_col, when(condition, col(source_col).cast(string)) # 强制转 string 避免类型错误 .otherwise(col(target_col)) ) # 使用示例用 user_id 填充空的 username df_filled fill_column_from_another( df, target_colusername, source_coluser_id, fill_if_nullTrue, fill_if_emptyTrue )这个函数的关键设计双条件判断同时处理null和因为业务上二者常等价显式类型转换.cast(string)确保源列值能安全赋给目标列无 shuffle所有操作都是 map-side不改变数据分布。我们在处理 12 亿条社交关系数据时用此函数替代原始when().otherwise()任务耗时从 22 分钟降至 6 分钟GC 时间减少 73%。4. 实操过程与核心环节实现从本地测试到生产部署的全链路4.1 构建可复现的缺失值分析流水线用describe()和自定义统计函数定位根因在动手处理前必须建立缺失值的“数字指纹”。我们不用df.describe()因为它只统计 numeric 列且不显示 null 计数。取而代之的是一个增强版统计函数from pyspark.sql.functions import col, count, when, isnan, isnull, lit, concat_ws from pyspark.sql.types import StructType, StructField, StringType, LongType def analyze_nulls(df: DataFrame, output_path: str None) - DataFrame: 全面分析 DataFrame 中各列的 null 情况 返回包含列名、null 数量、null 比例、数据类型、示例非空值 total_count df.count() # 构建统计表达式 stats_exprs [] for c in df.columns: col_type [f.dataType for f in df.schema.fields if f.name c][0] null_count count(when(isnull(col(c)) | isnan(col(c)), 1)) # 取一个非空示例值避免全 null 列报错 sample_value when( ~isnull(col(c)) ~isnan(col(c)), col(c) ).alias(f{c}_sample) stats_exprs.extend([ lit(c).alias(column_name), lit(str(col_type)).alias(data_type), null_count.alias(null_count), (null_count / lit(total_count)).alias(null_ratio), # 用 first() 取非空样本加 limit 1 避免全 null 时崩溃 df.select( when(~isnull(col(c)) ~isnan(col(c)), col(c)) .alias(non_null_sample) ).filter(col(non_null_sample).isNotNull()).limit(1) .select(non_null_sample).rdd.flatMap(lambda x: x).first() if df.filter(~isnull(col(c)) ~isnan(col(c))).count() 0 else ALL_NULL ]) # 执行统计注意这里用聚合避免全表扫描 null_stats df.agg( *[ count(when(isnull(col(c)) | isnan(col(c)), 1)).alias(f{c}_null_count) for c in df.columns ] ).select( concat_ws(,, *[col(f{c}_null_count) for c in df.columns]).alias(all_null_counts) ) # 实际生产中我们用更高效的方式采样 精确计数 # 因篇幅限制此处展示核心逻辑 result_schema StructType([ StructField(column_name, StringType(), True), StructField(data_type, StringType(), True), StructField(null_count, LongType(), True), StructField(null_ratio, StringType(), True), StructField(sample_non_null_value, StringType(), True) ]) # 返回 DataFrame实际代码中会用 RDD mapPartitions 实现高性能统计 return spark.createDataFrame([ (c, str(df.schema[c].dataType), df.filter(isnull(col(c)) | isnan(col(c))).count(), f{df.filter(isnull(col(c)) | isnan(col(c))).count()/total_count:.2%}, df.filter(~isnull(col(c)) ~isnan(col(c))).limit(1).select(c).rdd.flatMap(lambda x: x).first() or ALL_NULL) for c in df.columns ], result_schema) # 使用示例 null_report analyze_nulls(null_df) null_report.orderBy(null_ratio, ascendingFalse).show(20, truncateFalse)这个函数输出的报告是我们所有数据治理会议的必读材料。它能一眼看出Sales列 null 比例 15.3%但sample_non_null_value显示全是正数 → 可能是 MCAR适合均值填充Name列 null 比例 8.7%但sample_non_null_value有John Doe和NULL→ 存在字符串NULL伪装需先replace()ID列 null 比例 0.02%但sample_non_null_value是None→ 可能是主键生成失败需告警而非填充。有了这份报告后续处理策略就不再是拍脑袋决定。4.2 均值填充的工业级实现为什么collect()是生产环境的毒药原文用collect()获取mean(Sales)这在生产环境是绝对禁止的。原因有三内存爆炸collect()把全量聚合结果拉到 driver如果Sales是 double 类型10 亿行聚合结果可能超 8GBdriver 直接 OOM单点瓶颈driver 成为性能瓶颈集群 1000 个 executor 闲置等待 driver 分发均值不可扩展当表从 10TB 扩到 100TB代码完全不能复用。正确的做法是用broadcast joinaggregate pushdownfrom pyspark.sql.functions import broadcast, mean, col def fill_numeric_with_mean(df: DataFrame, column: str, subset_condition: str None) - DataFrame: 工业级均值填充避免 collect()用 broadcast join 实现分布式填充 # Step 1: 计算均值在 executor 端完成不回传 driver if subset_condition: mean_df df.filter(subset_condition).agg(mean(col(column)).alias(mean_val)) else: mean_df df.agg(mean(col(column)).alias(mean_val)) # Step 2: 广播均值小数据适合 broadcast mean_val mean_df.collect()[0][mean_val] mean_broadcast spark.sparkContext.broadcast(mean_val) # Step 3: 在 map 端填充不 shuffle from pyspark.sql.functions import when, lit return df.withColumn( column, when(col(column).isNull(), lit(mean_broadcast.value)) .otherwise(col(column)) ) # 使用示例 df_filled fill_numeric_with_mean(null_df, Sales)但更优解是彻底避免collect()用window function实现from pyspark.sql.window import Window from pyspark.sql.functions import mean as func_mean, col, when def fill_with_window_mean(df: DataFrame, column: str, partition_cols: List[str] None) - DataFrame: 用窗口函数填充均值支持分组均值如按 region 填充 sales if partition_cols: window_spec Window.partitionBy(*partition_cols) mean_col func_mean(col(column)).over(window_spec).alias(mean_val) else: window_spec Window.partitionBy() # 全局窗口 mean_col func_mean(col(column)).over(window_spec).alias(mean_val) return df.withColumn(mean_val, mean_col).withColumn( column, when(col(column).isNull(), col(mean_val)) .otherwise(col(column)) ).drop(mean_val) # 按 region 填充 sales更符合业务逻辑 df_region_filled fill_with_window_mean(null_df, Sales, [region])这个方案的优势零 driver 内存压力所有计算在 executor 完成天然支持分组partitionBy(region)让北京用户的Sales用北京均值填充避免全国均值扭曲局部特征可扩展表从 10TB 到 1PB代码完全不变。我们在处理 32TB 的电商销售数据时用此方案将均值填充任务从 41 分钟collect 方案降至 9 分钟且 driver 内存占用稳定在 2GB 以内。4.3 生产环境部署 checklist从本地验证到灰度发布的七道关卡把缺失值处理代码从本地 Jupyter 推到生产不是spark-submit就完事。我们有七道强制关卡关卡检查项工具/方法不通过后果1. 语义校验subset列是否包含所有 join key 和 partition key静态代码扫描 数据血缘平台 API 查询CI/CD 阻断需架构师审批2. 性能基线新增填充逻辑后shuffle 数据量是否增加 20%Spark UI History Server 对比报告自动降级为旧版本逻辑3. null 比例监控处理后null_ratio是否在预设阈值内如Sales填充后 null_ratio 0.1%Prometheus Grafana 实时监控触发企业微信告警暂停下游任务4. 数据漂移检测填充后的Sales分布均值、方差、分位数是否与历史周同比偏差 5%Evidently AI 库集成生成 drift report需数据科学家确认5. 血缘完整性新增的sales_fill_method列是否在 DataHub 中注册且 lineage 连接到原始表DataHub API 自动注册未注册则禁止写入生产表6. 回滚能力是否生成反向 SQL如UPDATE SET SalesNULL WHERE fill_methodMEAN自动生成 rollback script无脚本则不允许上线7. 灰度发布是否先对 1% 的分区如dt2023-10-01运行验证 2 小时无异常Airflow 动态分区参数灰度失败则自动回滚这套流程让我们在过去 18 个月里0 次因缺失值处理导致的 P0 故障。最典型的案例是某次Sales均值填充上线后drift 检测发现 95 分位数突增 12%排查发现是上游系统把测试数据Sales999999混入生产填充逻辑放大了异常值影响。靠第 4 关我们在 37 分钟内定位并修复避免了财务报表错误。5. 常见问题与排查技巧实录来自 127 次生产事故的避坑指南5.1 “为什么na.drop()后数据量没变”——分区裁剪失效的隐形杀手现象对一个按dt分区的表执行df.na.drop()预期删掉 5% 的记录但count()结果和原表一样。根因Spark 的na.drop()是 transformation不触发 action所以count()会触发全表扫描而你的where dt2023-10-01条件可能没下推到 scan 阶段。排查步骤查看 Spark UI 的 SQL tab确认Filter算子是否出现在FileScan下方执行df.explain(True)搜索PushedFilters看是否有IsNotNull类过滤器如果没有说明 Catalyst 优化器放弃了谓词下推。解决方案强制 hintdf.filter(dt2023-10-01).na.drop()把 filter 放在 drop 前重写为 SQLspark.sql(SELECT * FROM table WHERE dt2023-10-01 AND NOT (col1 IS NULL OR col2 IS NULL))SQL 解析器更激进下推升级 Spark3.3 版本修复了大部分na.drop()下推 bug。我们曾因此问题在凌晨 2 点被叫醒最终发现是 Spark 3.1.2 的已知 bug升级后解决。5.2 “fill()填进去的值为什么groupby().count()里算不出来”——null 与 NaN 的混淆陷阱现象对temperature列fill(0)后df.groupBy(city).count()显示某城市记录数为 0但df.filter(cityBeijing).count()是 12000。根因temperature列实际含NaNNot a Number不是null。fill()只处理null对NaN无感。而groupBy的 hash 计算中NaN ! NaN导致所有NaN被分到同一组或丢弃。验证方法# 检查是否存在 NaN df.select(count(when(isnan(col(temperature)), 1))).show() # 若 0 则存在 NaN # 正确处理 NaN from pyspark.sql.functions import isnan, isnull, when, col, lit df_clean df.withColumn( temperature, when(isnull(col(temperature)), lit(0)) .when(isnan(col(temperature)), lit(0)) .otherwise(col(temperature)) )这是硬件传感器数据的高频坑。我们的 IoT 平台每天处理 80 亿条温度数据其中 0.3% 是NaN必须在fill()前加isnan()判断。5.3 “为什么replace()后null变多了”——字符串NULL的二次污染现象上游 CSV 中name列有NULL字符串执行df.replace({NULL: None})后name的 null 比例从