MLOps实战:用MLflow+Airflow+DVC搭建可复现模型流水线

📅 2026/7/4 15:53:20
MLOps实战:用MLflow+Airflow+DVC搭建可复现模型流水线
1. 这不是“AI运维”是数据科学家必须亲手搭起来的生产流水线你有没有过这样的经历花了三周时间调出一个AUC 0.92的模型兴奋地发给业务方对方说“能不能明天就上线跑真实订单”——你愣住因为模型还在Jupyter里跑着训练数据是本地CSV特征工程靠手动写pandas超参是手调的连版本号都没打过标签。第二天你吭哧吭哧打包成Flask API部署到测试服务器结果发现线上数据格式和训练时差了两列特征缺失值填充逻辑不一致模型预测全飘了。业务方再问“上次那个效果好的版本还能复现吗”你翻遍Git历史发现那版代码混在十几个notebook分支里依赖包版本早被pip install -U冲没了。这就是MLOps的真实起点它不是IT部门新增的一个运维岗位也不是DevOps套个AI壳子的营销概念。它是数据科学家从“模型能跑通”走向“模型可交付、可追踪、可迭代”的必经门槛。我带过17个跨行业MLOps落地项目从电商实时推荐到制药临床试验预测所有踩过坑的团队最后都明白一件事——MLOps的本质是把数据科学工作流中那些靠人脑记忆、靠口头约定、靠临时脚本完成的隐性动作变成可版本化、可自动化、可审计的显性资产。关键词里的“Towards AI”不是平台名而是一种实践导向所有工具链的选择都必须服务于“让模型更快、更稳、更可信地进入业务闭环”这个单一目标。它适合三类人刚转行的数据科学家别再只交notebook了、带团队的技术负责人别让模型卡在实验阶段、还有正在被“模型上线难”反复折磨的算法工程师。接下来我要讲的不是PPT里的抽象分层图而是我在银行风控模型上线前48小时用什么命令回滚到上周三的特征版本、怎么用三行代码验证线上推理结果和离线批量预测完全一致、为什么我们坚持不用Kubeflow而选MLflowAirflow组合——全是实操中抠出来的细节。2. MLOps整体设计为什么放弃“大而全”选择“小而准”的渐进式架构2.1 拒绝“一步到位”的幻觉从单机实验到生产环境的鸿沟有多深很多团队一上来就想建Kubernetes集群、上Seldon、配Prometheus监控结果三个月过去连第一个模型都没跑通生产流水线。问题出在对“生产环境”理解有偏差——它不是指服务器配置多高而是指任何环节的微小扰动都会被指数级放大并直接冲击业务指标。举个真实案例某物流公司的ETA预测模型在离线评估时MAE稳定在8.3分钟上线后首日实际误差飙升到22分钟。排查发现训练时用的是MySQL导出的CSV而线上服务读取的是Kafka实时流两者对空字符串的处理逻辑不同CSV里是Kafka里是NULL导致特征编码器输入异常。这种问题再完美的K8s编排也救不了。所以我的设计原则很朴素先确保“每次训练都能100%复现每次部署都能100%等价”再谈自动化与扩展性。这意味着架构必须满足三个硬约束可追溯性能精确回答“这个预测结果是由哪次训练、哪个代码提交、哪份数据快照、哪个参数组合生成的”可隔离性开发、测试、预发、生产环境的代码、数据、模型、依赖必须物理隔离避免“在我机器上好好的”这类经典陷阱可降级性当新模型上线出问题必须能在30秒内切回旧版本且切换过程不中断服务。基于这三点我放弃了“All-in-One”平台方案采用分层解耦架构底层用Docker保证环境一致性中间用GitDVC管理代码与数据版本上层用轻量级调度器Airflow串联任务模型注册与监控用MLflow统一入口。这套组合不是技术炫技而是每个组件都解决一个具体痛点Docker解决“Python 3.8.10和3.8.12下scikit-learn的RandomForest结果差异”问题DVC解决“训练数据集从10GB涨到100GB后Git不堪重负”问题Airflow解决“特征工程脚本必须在模型训练前5分钟完成且失败要自动重试”问题。2.2 工具链选型背后的血泪账为什么MLflow胜过Kubeflow为什么Airflow比Prefect更适配数据科学工具选型不是看GitHub Stars而是看它能否把“人肉操作”变成“一行命令”。我对比过主流方案结论很明确模型注册与追踪MLflow vs Kubeflow ML PipelinesKubeflow强在K8s原生集成但它的Pipeline DSL写起来像写汇编——定义一个简单的“数据清洗→训练→评估”流程需要写200行YAML还要手动配置每个步骤的镜像、资源限制、卷挂载。而MLflow的mlflow.start_run()只需加三行装饰器就能自动记录代码版本、参数、指标、模型文件。更重要的是MLflow的Model Registry支持UI直接拖拽版本、设置Staging/Production状态业务方点几下就能看到“当前线上用的是v2.3.1上周效果最好的是v2.1.7”。我们曾用Kubeflow做了两周POC最后发现80%时间花在调试YAML语法错误上而MLflow三天就跑通全流程。工作流调度Airflow vs Prefect vs DagsterPrefect的动态DAG生成很酷但数据科学场景里DAG结构其实高度稳定“每天凌晨2点拉新数据→清洗→生成特征→训练→评估→达标则部署”。Airflow的静态DAGPython定义反而更直观。关键优势在于它的Operator生态PythonOperator直接运行本地函数BashOperator调用shell脚本DockerOperator启动容器化任务——我们甚至用SlackOperator在模型评估低于阈值时自动负责人。而Prefect的Task Runner在Windows开发机上兼容性极差团队里三个同事的环境配置了两天都没跑通。数据版本控制DVC vs Pachyderm vs lakeFSPachyderm需要独立集群lakeFS依赖AWS S3而DVC只需一个Git仓库任意对象存储我们用MinIO自建成本为零。最实用的功能是dvc repro当修改了featurize.py执行该命令会自动检测哪些数据文件被影响只重新运行必要步骤。某次我们更新用户行为特征计算逻辑DVC自动跳过未改动的地理特征模块节省了6小时计算时间。提示不要迷信“云厂商MLOps套件”。AWS SageMaker Pipelines的UI看似强大但一旦需要自定义数据校验逻辑比如检查新数据中“用户年龄”字段是否出现负值就得写Lambda函数再对接复杂度陡增。而用AirflowPython脚本加一行assert df[age].min() 0就完事。2.3 架构分层详解从代码提交到模型上线的七步闭环整个MLOps流水线不是黑箱而是七个清晰可干预的环节每个环节都有明确输入、输出和质量门禁代码提交Git所有代码notebook、脚本、配置必须提交到Git分支策略采用Git Flowdevelop分支用于集成测试main分支对应生产发布。数据版本化DVC原始数据存入MinIODVC生成.dvc文件记录哈希值dvc push同步到远程存储。环境固化DockerDockerfile明确指定Python版本、核心库版本如scikit-learn1.2.2构建镜像并推送到私有Registry。训练触发Airflow定时DAG检测DVC数据哈希变更触发训练任务使用DockerOperator在隔离环境中运行。模型记录MLflow训练脚本中调用mlflow.log_artifact(model.pkl)自动关联代码提交ID、参数、指标。评估门禁Custom Script独立评估脚本加载新模型与历史数据计算关键指标如AUC、KS值若低于阈值则阻断发布。模型部署Flask Nginx通过MLflow Model Registry API获取生产模型URI用mlflow.pyfunc.load_model()加载Nginx做负载均衡与健康检查。这个闭环里第6步“评估门禁”最容易被忽视。我们曾因跳过此步将一个在测试集上AUC 0.89但在验证集上骤降至0.71的模型误发上线。现在规则是任何模型上线前必须通过三重验证——离线指标达标、在线AB测试胜出、业务方抽样确认预测结果合理。少一个都不行。3. 核心细节解析从零搭建可复现的MLOps流水线3.1 环境固化为什么Dockerfile里要锁死scikit-learn的补丁版本很多人写Dockerfile只写pip install scikit-learn这是灾难源头。scikit-learn 1.2.x系列在1.2.0到1.2.3之间对RandomForestClassifier的oob_score计算逻辑有过三次修正导致同一份代码在不同补丁版本下OOB分数可能相差0.05。而我们的风控模型审批流程要求OOB分数必须≥0.65这种微小差异足以让模型卡在合规审核环节。正确做法是精确锁定版本并验证其行为一致性# Dockerfile FROM python:3.8.12-slim # 锁定核心依赖版本注意必须包含补丁号 RUN pip install --no-cache-dir \ numpy1.23.5 \ pandas1.5.3 \ scikit-learn1.2.2 \ mlflow2.9.0 \ dvc[s3]3.40.0 # 复制代码与依赖文件 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . /app WORKDIR /app更关键的是在CI阶段加入版本行为验证。我们在test_version_consistency.py中写了一个黄金测试# 验证scikit-learn 1.2.2的RandomForest行为 from sklearn.ensemble import RandomForestClassifier import numpy as np def test_rf_oob_consistency(): # 固定随机种子和数据 X np.random.randn(100, 5) y (X.sum(axis1) 0).astype(int) # 训练模型 clf RandomForestClassifier( n_estimators10, oob_scoreTrue, random_state42 ) clf.fit(X, y) # 断言OOB分数必须等于预存的黄金值0.682 assert abs(clf.oob_score_ - 0.682) 0.001, \ fOOB score changed: expected 0.682, got {clf.oob_score_}这个测试在每次构建镜像后自动运行一旦失败立即终止CI流程。两年来它帮我们拦截了3次因基础镜像升级导致的隐性bug。3.2 数据版本控制DVC如何解决“100GB数据集无法Git管理”的难题Git对大文件支持极差直接git add data/raw/transactions.csv会导致仓库膨胀、克隆缓慢、协作困难。DVC的解法很巧妙它不存储数据本身而是存储数据的元信息。操作流程如下初始化DVCdvc init会在Git仓库中创建.dvc配置将数据目录设为DVC跟踪dvc add data/raw/transactions.csv此时生成data/raw/transactions.csv.dvc文件内容类似deps: - path: data/raw/transactions.csv outs: - md5: a1b2c3d4e5f6... # 文件内容的MD5哈希 path: data/raw/transactions.csvgit add data/raw/transactions.csv.dvc只提交小文件dvc push将实际数据上传到MinIO或S3后续协作时同事只需git clone然后dvc pull即可下载对应哈希的数据文件。最关键的是DVC能感知数据变更当上游ETL任务更新了transactions.csvDVC检测到文件哈希变化dvc status会显示modified: data/raw/transactions.csv此时dvc repro会自动重新运行所有依赖该数据的步骤如特征工程、模型训练。我们曾用此机制实现“数据漂移预警”在特征工程脚本中加入统计校验# featurize.py import pandas as pd import dvc.api def validate_data_drift(df_new, df_baseline): 比较新数据与基线数据的分布差异 drift_alerts [] for col in [user_age, order_amount]: # KS检验 from scipy.stats import ks_2samp ks_stat, p_value ks_2samp(df_baseline[col], df_new[col]) if p_value 0.01: # 显著性水平 drift_alerts.append(f{col} distribution drifted (KS{ks_stat:.3f})) return drift_alerts # 在DVC pipeline中调用 df_raw pd.read_csv(data/raw/transactions.csv) df_baseline pd.read_csv(dvc.api.read(data/baseline/transactions.csv, repo.)) alerts validate_data_drift(df_raw, df_baseline) if alerts: raise ValueError(fData drift detected: {alerts})这样只要新数据分布异常整个流水线就会中断强制人工介入。3.3 模型追踪MLflow中“Run”与“Experiment”的正确组织方式MLflow的Experiment不是按项目划分而是按问题域划分。我们曾犯过错误为每个客户建一个Experiment结果20个客户就有20个Experiment根本无法横向对比模型效果。正确做法是Experiment命名 业务问题 时间粒度例如churn_prediction_weekly、fraud_detection_daily这样所有针对流失预测的模型都在同一个Experiment下方便用MLflow UI的对比视图分析不同特征工程方案的效果。Run的Tagging必须结构化不能只靠run_name要用set_tag标记关键维度import mlflow mlflow.set_tag(feature_version, v3.2) # 特征版本 mlflow.set_tag(data_version, 20240520) # 数据日期 mlflow.set_tag(trainer, alicecompany.com) # 责任人 mlflow.set_tag(pipeline_stage, staging) # 流水线阶段Artifact存储路径要可预测MLflow默认把模型存成mlruns/1/abc123/artifacts/model/但我们需要在CI中自动提取模型。因此在训练脚本末尾加# 保存模型到固定路径便于CI脚本读取 model_path fmodels/{mlflow.active_run().info.run_id}/model mlflow.sklearn.save_model(model, model_path) # 同时记录到MLflow mlflow.log_artifact(model_path, model)这样CI脚本可以用find models/ -name model | head -1快速定位最新模型。3.4 自动化门禁用Python脚本实现“不达标不发布”的硬约束门禁不是形式主义而是防止低质量模型污染生产的最后一道闸。我们的门禁脚本gatekeeper.py包含三层检查离线指标门禁加载新模型用预留的验证集计算核心指标在线一致性门禁用相同输入对比新旧模型预测结果确保无意外偏差业务逻辑门禁执行领域规则校验如“预测违约概率0.9的用户其历史逾期次数必须≥2”# gatekeeper.py import mlflow import pandas as pd from sklearn.metrics import roc_auc_score def run_offline_gate(model_uri, val_data_path, min_auc0.85): 离线AUC门禁 model mlflow.pyfunc.load_model(model_uri) val_df pd.read_csv(val_data_path) y_pred model.predict_proba(val_df.drop(label, axis1))[:, 1] auc roc_auc_score(val_df[label], y_pred) if auc min_auc: raise ValueError(fAUC {auc:.3f} threshold {min_auc}) print(f✓ AUC check passed: {auc:.3f}) def run_consistency_gate(new_model_uri, old_model_uri, test_input): 在线一致性门禁 new_model mlflow.pyfunc.load_model(new_model_uri) old_model mlflow.pyfunc.load_model(old_model_uri) new_pred new_model.predict(test_input) old_pred old_model.predict(test_input) # 允许1%以内的预测值波动浮点精度 if not np.allclose(new_pred, old_pred, rtol0.01): raise ValueError(Prediction inconsistency detected) def run_business_gate(model_uri, sample_data): 业务规则门禁高风险用户必须有历史逾期 model mlflow.pyfunc.load_model(model_uri) pred_prob model.predict_proba(sample_data)[:, 1] high_risk_mask pred_prob 0.9 if high_risk_mask.any(): # 检查这些用户的历史逾期次数 overdue_counts sample_data.loc[high_risk_mask, overdue_times] if (overdue_counts 2).any(): raise ValueError(High-risk prediction violates business rule) # 在CI中调用 if __name__ __main__: run_offline_gate( model_urimodels:/churn_prediction_weekly/Production, val_data_pathdata/val/churn_val.csv ) # ... 其他门禁这个脚本被集成到Airflow DAG中作为部署前的最后一个任务。失败时Airflow会邮件通知负责人并在Slack频道发送详细错误日志。4. 实操过程从本地开发到生产部署的完整 walkthrough4.1 本地开发环境初始化三分钟搭建可复现的沙盒所有开发必须在Docker容器中进行杜绝“在我机器上好使”的问题。我们提供标准化的dev-container# 1. 克隆仓库只含代码和.dvc文件不含大数据 git clone https://gitlab.company.com/ml/churn-prediction.git cd churn-prediction # 2. 启动开发容器自动挂载代码、配置DVC远程 docker run -it \ --rm \ --gpus all \ -v $(pwd):/workspace \ -v ~/.dvc:/root/.dvc \ -p 8888:8888 \ -p 5000:5000 \ company/ml-dev:3.8.12 \ bash # 容器内执行 cd /workspace dvc remote add -d myremote s3://ml-data-bucket/ dvc pull # 下载当前分支所需的数据 jupyter notebook --ip0.0.0.0:8888 --allow-root这个容器镜像company/ml-dev:3.8.12已预装所有依赖且dvc pull会根据.dvc文件自动下载对应哈希的数据。开发者无需关心数据在哪只需专注代码。4.2 特征工程流水线用Airflow DAG实现“数据就绪即触发”特征工程不是一次性脚本而是可调度、可重试、可监控的生产任务。我们的featurize_dag.py定义如下# airflow/dags/featurize_dag.py from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.docker.operators.docker import DockerOperator from datetime import datetime, timedelta default_args { owner: data-science, depends_on_past: False, start_date: datetime(2024, 1, 1), retries: 3, retry_delay: timedelta(minutes5), } dag DAG( churn_featurize, default_argsdefault_args, descriptionDaily feature engineering for churn prediction, schedule_interval0 2 * * *, # 每天凌晨2点 catchupFalse ) def check_data_availability(**context): 检查上游数据是否已就绪 import subprocess result subprocess.run( [dvc, status, data/raw/transactions.csv], capture_outputTrue, textTrue ) if modified in result.stdout: return True raise ValueError(Raw data not updated) check_task PythonOperator( task_idcheck_data_status, python_callablecheck_data_availability, dagdag ) featurize_task DockerOperator( task_idrun_featurize, imagecompany/ml-train:3.8.12, commandpython /app/featurize.py --input data/raw/transactions.csv --output data/features/churn_v3.2.parquet, volumes[/path/to/data:/app/data], docker_urlunix://var/run/docker.sock, network_modebridge, dagdag ) check_task featurize_task关键点在于DockerOperator它确保特征工程在与训练环境完全一致的容器中运行且volumes挂载保证输入输出路径与DVC管理的路径一致。当任务失败时Airflow会自动重试并在UI中显示完整的stdout/stderr日志。4.3 模型训练与评估MLflow Tracking的完整埋点实践训练脚本train.py不是简单调用fit()而是深度集成MLflow# train.py import mlflow import mlflow.sklearn from sklearn.ensemble import RandomForestClassifier import pandas as pd import argparse def main(): parser argparse.ArgumentParser() parser.add_argument(--data-path, typestr, requiredTrue) parser.add_argument(--n-estimators, typeint, default100) args parser.parse_args() # 1. 开启MLflow Run with mlflow.start_run(run_namefrf_{args.n_estimators}) as run: # 2. 记录参数 mlflow.log_param(n_estimators, args.n_estimators) mlflow.log_param(max_depth, 10) mlflow.log_param(random_state, 42) # 3. 记录代码版本Git commit ID mlflow.log_param(git_commit, get_git_commit()) # 4. 加载数据DVC管理的路径 df pd.read_parquet(args.data_path) X, y df.drop(churn, axis1), df[churn] # 5. 训练模型 model RandomForestClassifier( n_estimatorsargs.n_estimators, max_depth10, random_state42 ) model.fit(X, y) # 6. 记录指标 from sklearn.metrics import accuracy_score, roc_auc_score y_pred model.predict(X) y_pred_proba model.predict_proba(X)[:, 1] mlflow.log_metric(accuracy, accuracy_score(y, y_pred)) mlflow.log_metric(auc, roc_auc_score(y, y_pred_proba)) # 7. 记录模型自动序列化 mlflow.sklearn.log_model(model, model) # 8. 记录数据版本DVC哈希 data_hash get_dvc_hash(args.data_path) mlflow.log_param(data_hash, data_hash) if __name__ __main__: main()执行命令mlflow run . -P># model_service.py import mlflow from mlflow.pyfunc import load_model from flask import Flask, request, jsonify import logging app Flask(__name__) model None app.before_first_request def load_mlflow_model(): global model # 从MLflow Model Registry加载生产模型 model_uri models:/churn_prediction_weekly/Production model load_model(model_uri) logging.info(Model loaded successfully) app.route(/health) def health_check(): return jsonify({status: healthy, model_version: model.metadata.run_id}) app.route(/predict, methods[POST]) def predict(): data request.get_json() # 输入校验 if features not in data: return jsonify({error: Missing features in request}), 400 try: prediction model.predict([data[features]])[0] probability model.predict_proba([data[features]])[0, 1] return jsonify({ prediction: int(prediction), probability: float(probability) }) except Exception as e: logging.error(fPrediction error: {e}) return jsonify({error: Internal server error}), 500Step 2Docker化服务Dockerfile.serviceFROM python:3.8.12-slim RUN pip install flask gunicorn mlflow2.9.0 COPY model_service.py /app/ WORKDIR /app CMD exec gunicorn --bind :5000 --workers 2 --threads 4 --timeout 30 model_service:appStep 3Nginx反向代理与负载均衡nginx.confupstream ml_backend { server app-v1:5000; server app-v2:5000; # 支持灰度发布 } server { listen 80; location / { proxy_pass http://ml_backend; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; } location /health { proxy_pass http://app-v1:5000/health; } }Step 4部署脚本自动化deploy.sh一键完成镜像构建、推送、K8s部署#!/bin/bash # 构建服务镜像 docker build -t registry.company.com/ml/churn-service:v2.3.1 . # 推送 docker push registry.company.com/ml/churn-service:v2.3.1 # 更新K8s Deployment替换镜像版本 kubectl set image deployment/churn-service churn-serviceregistry.company.com/ml/churn-service:v2.3.1 # 滚动更新后等待健康检查通过 kubectl rollout status deployment/churn-serviceStep 5监控告警在Prometheus中配置指标采集# prometheus.yml - job_name: churn-service static_configs: - targets: [churn-service:8000] metrics_path: /metrics并在Grafana中创建看板监控http_request_total{code~5..} 105xx错误率突增model_prediction_latency_seconds_bucket{le0.5} 0.95P95延迟超标model_version{currenttrue}确认当前运行版本5. 常见问题与排查技巧实录那些文档里不会写的实战经验5.1 “模型在本地预测正常线上返回NaN”——浮点数溢出的隐形杀手现象模型在Jupyter中预测一切正常部署到Flask后部分请求返回{prediction: null, probability: null}。排查过程查看Flask日志发现RuntimeWarning: invalid value encountered in double_scalars在服务中添加调试日志logging.info(fInput features: {data[features]})发现线上请求的某个特征值为1e308接近float64上限根因训练数据中该特征最大值为1e5但线上数据因ETL bug产生1e308模型内部计算如exp(x)溢出为inf最终nan传播到输出。解决方案在API入口增加输入校验def validate_input(features): for i, val in enumerate(features): if not (-1e10 val 1e10): raise ValueError(fFeature {i} out of range: {val})在特征工程中加入鲁棒缩放from sklearn.preprocessing import RobustScaler # RobustScaler对异常值不敏感比StandardScaler更安全 scaler RobustScaler().fit(X_train) X_scaled scaler.transform(X)注意永远不要相信上游数据。我们在所有API入口都加了try/except捕获ValueError并返回400 Bad Request而不是让nan污染下游。5.2 “DVC pull总是失败ConnectionResetError”——MinIO配置的致命细节现象dvc pull在CI环境中频繁失败报错ConnectionResetError: [Errno 104] Connection reset by peer。排查过程手动在CI节点执行curl -v http://minio:9000发现连接超时检查MinIO Pod日志ERROR: Unable to initialize object layer: The specified bucket does not exist.发现CI使用的MinIO地址是http://minio:9000但DVC配置中写的是https://minio.company.comSSL端口443根因DVC默认使用HTTPS而CI环境中的MinIO是HTTP服务且未配置SSL证书导致连接被重置。解决方案在DVC远程配置中强制使用HTTPdvc remote modify myremote endpointurl http://minio:9000 dvc remote modify myremote ssl_verify false在CI脚本中增加健康检查# 等待MinIO就绪 until curl -f http://minio:9000/minio/health/live; do echo Waiting for MinIO... sleep 2 done5.3 “MLflow UI看不到新Run”——后端存储配置的权限陷阱现象本地mlflow run成功但MLflow UI中没有新实验记录。排查过程检查MLflow服务日志ERROR: Failed to store run: Permission denied: /mlflow/mlruns/0/abc123登录MLflow服务器ls -l /mlflow/mlruns发现目录属主是root而MLflow进程以mlflow用户运行根因MLflow后端存储--backend-store-uri sqlite:///mlflow.db和artifact存储--default-artifact-root file:///mlflow/mlruns的目录权限不匹配。解决方案启动MLflow服务时指定用户docker run -d \ --user 1001:1001 \ # 使用非root用户 -v $(pwd)/mlflow:/mlflow \ -p 5000:5000 \ mlflow:2.9.0 \ mlflow server \ --backend-store-uri sqlite:///mlflow/mlflow.db \ --default-artifact-root file:///mlflow/mlruns \ --host 0.0.0.0初始化数据库目录mkdir -p mlflow/mlruns mlflow/mlflow.db chown -R 1001:1001 mlflow/5.4 “Airflow DAG不触发”——时区配置的静默故障现象DAG设置schedule_interval0 2 * * *但每天凌晨2点从未运行。排查过程查看Airflow WebUI的DAG详情页Next Run显示为2024-05-20 02:00:00 UTC但服务器时区是Asia/ShanghaiUTC8实际期望是北京时间2点根因Airflow默认使用UTC时区schedule_interval按UTC解析。0 2 * * *在UTC是凌晨2点对应北京时间上午10点。解决方案在DAG定义中显式指定时区from airflow import DAG from airflow.utils.timezone import datetime from pendulum import timezone beijing_tz timezone(Asia/Shanghai) dag DAG( churn_featurize, default_argsdefault_args, schedule_interval0 2 * * *, # 这仍是UTC时间 timezonebeijing_tz, # 关键DAG时区 ... )更佳