从零构建机器学习管线:数据到部署的工程化实践

📅 2026/7/5 12:05:46
从零构建机器学习管线:数据到部署的工程化实践
在实际机器学习项目中从数据到模型再到服务从来不是一蹴而就的。很多开发者最初只关注模型算法本身但很快就会发现数据清洗、特征工程、模型训练、评估、部署和监控这些环节如果缺乏系统化的串联项目就会陷入混乱、难以复现和迭代。这正是“机器学习管线”要解决的核心问题。它不是一个具体的工具而是一套将机器学习工作流标准化、自动化和模块化的工程实践。无论你是数据科学家、机器学习工程师还是希望将AI能力集成到业务系统中的开发者理解并构建一条健壮的机器学习管线都是将模型从实验推向生产、从一次性成功变为可持续服务的必经之路。本文将带你从零开始理解机器学习管线的核心概念、标准阶段并动手搭建一个从数据处理到模型部署的完整、可复现的管线示例让你掌握构建和维护生产级机器学习应用的关键骨架。1. 理解机器学习管线的核心价值与标准阶段在深入代码之前我们必须先厘清概念。机器学习管线常被称为ML Pipeline或ML Workflow它指的是一系列自动化、可重复的步骤用于将原始数据转化为可部署的机器学习模型并持续管理其生命周期。它的核心价值在于将零散、手动的任务串联成一个有序、可控的流程。1.1 为什么需要管线化想象一个没有管线的场景数据科学家A用Jupyter Notebook清洗了数据保存为data_v1.csv。工程师B基于这个文件训练了模型调整了超参数得到了一个.pkl文件。几周后数据更新了A修改了清洗逻辑生成了data_v2.csv但B可能不知道或者用旧参数重新训练结果无法复现。更糟糕的是当需要将模型部署为API时发现训练环境和生产环境的依赖库版本不一致。这些问题都源于流程的割裂。管线化通过以下方式解决这些问题可复现性管线的每一步数据版本、预处理参数、模型参数都被明确记录和版本控制。任何时候都可以用相同的输入和配置得到完全一致的输出。自动化将手动执行的脚本串联起来实现一键触发从数据到部署的全流程减少人为错误提升效率。模块化将数据处理、特征工程、训练、评估等步骤封装为独立的模块。可以单独测试、优化或替换某个模块而不影响其他部分。协作与监控清晰的管线定义了各团队数据、算法、工程的交接边界。同时管线的每个步骤都可以被监控便于追踪性能瓶颈和故障点。1.2 端到端机器学习管线的标准阶段一个完整的端到端机器学习管线通常包含以下三个阶段每个阶段内部又由多个子步骤构成。阶段核心目标关键子步骤产出物示例数据处理将原始数据转化为可供模型学习的干净、有效的特征。数据收集、清洗、探索性分析(EDA)、特征工程、数据拆分。训练集、验证集、测试集通常是.csv、.parquet文件或数据库表。模型开发基于处理好的数据选择并训练出性能最优的模型。模型选择、超参数调优、模型训练、模型评估与验证。训练好的模型文件.pkl、.joblib、.onnx、评估报告准确率、AUC等指标。模型部署与运维将训练好的模型集成到生产环境中并确保其持续稳定运行。模型序列化/打包、服务化API、嵌入式、性能监控、模型再训练/更新。可调用的API服务、容器镜像、监控仪表盘。这三个阶段并非严格线性。在实践中它们往往形成一个循环监控阶段发现模型性能下降模型漂移会触发新一轮的数据处理或模型再训练。这个循环正是MLOps机器学习运维的核心。2. 环境准备与工具选型在开始构建管线之前我们需要搭建一个标准化的开发环境。这里我们选择Python生态因为它拥有最丰富的ML库和成熟的管线工具。2.1 基础环境配置首先使用conda或venv创建一个独立的Python环境以避免包冲突。# 使用 conda 创建环境推荐 conda create -n ml-pipeline-demo python3.9 conda activate ml-pipeline-demo # 或者使用 venv python -m venv ml-pipeline-demo # Windows ml-pipeline-demo\Scripts\activate # Linux/Mac source ml-pipeline-demo/bin/activate2.2 核心依赖库安装我们将使用scikit-learn作为机器学习库pandas和numpy处理数据mlflow管理实验和模型fastapi构建API服务。使用pip安装它们。pip install scikit-learn1.3.0 pandas2.0.3 numpy1.24.3 pip install mlflow2.9.2 pip install fastapi0.104.1 uvicorn0.24.0 pydantic2.5.0 pip install joblib1.3.2 # 用于模型序列化注意版本号是为了确保环境一致性。在实际项目中应使用requirements.txt或pyproject.toml来严格管理依赖。2.3 项目结构设计一个清晰的项目结构是管线可维护性的基础。建议按如下方式组织你的项目目录ml_pipeline_project/ ├── data/ # 数据目录 │ ├── raw/ # 原始数据不修改 │ ├── processed/ # 处理后的数据 │ └── external/ # 外部数据源 ├── notebooks/ # 用于探索性分析的Jupyter Notebook ├── src/ # 源代码 │ ├── data/ # 数据处理模块 │ │ ├── __init__.py │ │ ├── make_dataset.py # 数据加载、清洗、拆分 │ │ └── build_features.py # 特征工程 │ ├── models/ # 模型相关模块 │ │ ├── __init__.py │ │ ├── train.py # 模型训练脚本 │ │ └── predict.py # 模型预测函数 │ └── visualization/ # 可视化模块可选 ├── tests/ # 单元测试 ├── models/ # 保存训练好的模型文件 ├── app/ # 模型服务化应用如FastAPI ├── configs/ # 配置文件YAML/JSON ├── pipelines/ # 管线定义文件如Airflow DAGs ├── requirements.txt # 项目依赖 ├── pyproject.toml # 项目元数据和构建配置 └── README.md这个结构将数据处理、模型训练、服务化等逻辑分离符合管线的模块化思想。3. 构建一个可复现的机器学习管线我们将以一个经典的分类问题——鸢尾花Iris数据集预测为例构建一个包含数据处理、模型训练和评估的完整管线。虽然问题简单但管道的设计模式可以扩展到复杂场景。3.1 阶段一数据处理管线数据处理是管线的起点。目标是创建可复用的数据转换步骤。首先在src/data/make_dataset.py中编写数据加载和拆分逻辑import pandas as pd import numpy as np from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split import joblib import os def load_raw_data(): 加载原始鸢尾花数据集。 iris load_iris() # 将数据转换为DataFrame便于处理 data pd.DataFrame(datairis.data, columnsiris.feature_names) data[target] iris.target # 通常我们会将原始数据保存下来确保可追溯 raw_data_path data/raw/iris_raw.csv os.makedirs(os.path.dirname(raw_data_path), exist_okTrue) data.to_csv(raw_data_path, indexFalse) print(f原始数据已保存至: {raw_data_path}) return data, iris.target_names def split_data(data, test_size0.2, val_size0.1, random_state42): 将数据拆分为训练集、验证集和测试集。 参数: data: 包含特征和标签的完整DataFrame。 test_size: 测试集占比。 val_size: 验证集占训练验证集的占比。 random_state: 随机种子确保可复现。 返回: X_train, X_val, X_test, y_train, y_val, y_test from sklearn.model_selection import train_test_split # 先分离特征和标签 X data.drop(target, axis1) y data[target] # 先分出测试集 X_temp, X_test, y_temp, y_test train_test_split( X, y, test_sizetest_size, random_staterandom_state, stratifyy ) # 从剩余数据中分出验证集 val_ratio_adjusted val_size / (1 - test_size) X_train, X_val, y_train, y_val train_test_split( X_temp, y_temp, test_sizeval_ratio_adjusted, random_staterandom_state, stratifyy_temp ) print(f数据拆分完成: 训练集 {X_train.shape}, 验证集 {X_val.shape}, 测试集 {X_test.shape}) return X_train, X_val, X_test, y_train, y_val, y_test if __name__ __main__: # 示例运行此脚本以生成拆分后的数据 data, target_names load_raw_data() X_train, X_val, X_test, y_train, y_val, y_test split_data(data) # 保存拆分后的数据供后续步骤使用 joblib.dump((X_train, X_val, X_test, y_train, y_val, y_test), data/processed/split_data.joblib) print(拆分后的数据已保存至 data/processed/split_data.joblib)接下来在src/data/build_features.py中定义特征工程步骤。这里我们使用scikit-learn的Pipeline和ColumnTransformer来创建可序列化的转换器。from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from sklearn.compose import ColumnTransformer import joblib def create_feature_pipeline(feature_columns): 创建特征处理管线。 这里以标准化为例可以扩展为更复杂的操作如多项式特征、编码等。 # 定义数值型特征的预处理步骤 numeric_transformer Pipeline(steps[ (scaler, StandardScaler()) # 标准化减去均值除以标准差 ]) # 将所有特征这里全是数值型应用同一个转换器 preprocessor ColumnTransformer( transformers[ (num, numeric_transformer, feature_columns) ]) return preprocessor def fit_and_save_pipeline(preprocessor, X_train, save_pathmodels/preprocessor.joblib): 在训练集上拟合预处理管线并保存。 preprocessor.fit(X_train) joblib.dump(preprocessor, save_path) print(f预处理管线已保存至: {save_path}) return preprocessor def transform_features(preprocessor, X_data): 使用拟合好的管线转换特征。 return preprocessor.transform(X_data) if __name__ __main__: # 示例加载数据创建并保存预处理管线 X_train, X_val, X_test, y_train, y_val, y_test joblib.load(data/processed/split_data.joblib) feature_columns X_train.columns.tolist() preprocessor create_feature_pipeline(feature_columns) fit_and_save_pipeline(preprocessor, X_train)关键点解释数据拆分使用train_test_split并设置stratifyy可以保持训练集和测试集中各类别的比例与原数据集一致这对于类别不平衡的数据集尤为重要。随机种子random_state参数确保了每次运行拆分结果一致这是可复现性的基石。预处理管线使用Pipeline和ColumnTransformer将多个预处理步骤封装成一个对象。这个对象可以像模型一样被fit在训练集上和transform应用于任何数据。保存这个对象joblib.dump意味着我们保存了从训练数据中学到的“标准化参数”均值和标准差从而可以一致地处理新数据。3.2 阶段二模型训练与评估管线模型训练管线需要集成数据处理步骤并管理训练、超参数调优和评估。在src/models/train.py中import joblib import mlflow import mlflow.sklearn from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, classification_report, confusion_matrix import pandas as pd import numpy as np import os from datetime import datetime def train_model(X_train, y_train, X_val, y_val, config): 训练模型并使用验证集评估。 参数: config: 包含模型超参数字典。 返回: 训练好的模型以及在验证集上的评估指标。 # 初始化模型传入配置参数 model RandomForestClassifier( n_estimatorsconfig.get(n_estimators, 100), max_depthconfig.get(max_depth, None), random_stateconfig.get(random_state, 42), n_jobs-1 # 使用所有CPU核心 ) # 训练模型 model.fit(X_train, y_train) # 在验证集上预测和评估 y_val_pred model.predict(X_val) val_accuracy accuracy_score(y_val, y_val_pred) print(f验证集准确率: {val_accuracy:.4f}) print(\n分类报告:) print(classification_report(y_val, y_val_pred)) return model, val_accuracy, y_val_pred def evaluate_on_test(model, X_test, y_test): 在测试集上进行最终评估。 y_test_pred model.predict(X_test) test_accuracy accuracy_score(y_test, y_test_pred) print(*50) print(f测试集准确率: {test_accuracy:.4f}) print(\n测试集分类报告:) print(classification_report(y_test, y_test_pred)) print(\n混淆矩阵:) print(confusion_matrix(y_test, y_test_pred)) return test_accuracy, y_test_pred def run_training_pipeline(config_pathconfigs/train_config.yaml): 运行完整的训练管线。 1. 加载配置和数据。 2. 加载预处理管线并转换特征。 3. 训练模型并记录实验。 4. 评估并保存模型。 # 1. 加载配置这里简化直接使用字典 config { model_name: random_forest, n_estimators: 150, max_depth: 5, random_state: 42 } # 实际项目中应从YAML文件加载 # import yaml # with open(config_path, r) as f: # config yaml.safe_load(f) # 2. 加载数据 X_train, X_val, X_test, y_train, y_val, y_test joblib.load(data/processed/split_data.joblib) preprocessor joblib.load(models/preprocessor.joblib) # 3. 转换特征 X_train_processed preprocessor.transform(X_train) X_val_processed preprocessor.transform(X_val) X_test_processed preprocessor.transform(X_test) # 4. 使用MLflow记录实验 # 设置MLflow跟踪服务器URI本地文件存储 mlflow.set_tracking_uri(file:///./mlruns) # 存储在当前目录的mlruns文件夹 mlflow.set_experiment(Iris_Classification) with mlflow.start_run(run_namefrf_{datetime.now().strftime(%Y%m%d_%H%M%S)}): # 记录超参数 mlflow.log_params(config) # 训练模型 model, val_accuracy, _ train_model(X_train_processed, y_train, X_val_processed, y_val, config) # 记录验证集指标 mlflow.log_metric(val_accuracy, val_accuracy) # 在测试集上评估注意在真实项目中测试集应只在最终评估时使用一次 test_accuracy, _ evaluate_on_test(model, X_test_processed, y_test) mlflow.log_metric(test_accuracy, test_accuracy) # 记录预处理管线和模型 mlflow.sklearn.log_model(model, model) # 也可以记录预处理管线 mlflow.sklearn.log_model(preprocessor, preprocessor) # 5. 保存模型到本地可选MLflow已记录 model_save_path fmodels/iris_rf_model.joblib joblib.dump(model, model_save_path) mlflow.log_artifact(model_save_path) print(f模型已保存至: {model_save_path}) print(训练管线执行完毕。) if __name__ __main__: run_training_pipeline()关键点解释配置管理将超参数如n_estimators、max_depth从代码中分离到配置文件如YAML是管线可配置、可复现的关键。这里用字典示意实际项目应使用文件。实验跟踪使用MLflow自动记录每次运行的参数、指标、模型和代码版本。这解决了“上次最好的模型参数是什么”的问题。mlflow.set_tracking_uri可以指向本地目录或远程服务器。数据泄露预防预处理管线StandardScaler的fit操作只能在训练集上进行然后用transform方法应用于验证集和测试集。如果在完整数据集上fit会导致信息泄露使评估结果过于乐观。测试集隔离测试集只在最终评估模型性能时使用一次以模拟模型面对全新数据时的表现。在调参过程中应只使用验证集。3.3 阶段三模型服务化与简单监控训练好的模型需要被其他系统调用。我们使用FastAPI创建一个简单的REST API服务。在app/main.py中from fastapi import FastAPI, HTTPException from pydantic import BaseModel import joblib import numpy as np import pandas as pd import os # 定义输入数据的模型 class IrisFeatures(BaseModel): sepal_length: float sepal_width: float petal_length: float petal_width: float # 可选添加数据验证 class Config: schema_extra { example: { sepal_length: 5.1, sepal_width: 3.5, petal_length: 1.4, petal_width: 0.2 } } # 初始化FastAPI应用 app FastAPI(title鸢尾花分类模型API, version1.0.0) # 在启动时加载模型和预处理管线 MODEL_PATH models/iris_rf_model.joblib PREPROCESSOR_PATH models/preprocessor.joblib try: model joblib.load(MODEL_PATH) preprocessor joblib.load(PREPROCESSOR_PATH) print(模型和预处理管线加载成功。) except FileNotFoundError as e: print(f加载模型失败: {e}) # 在实际生产环境中这里应该触发告警或使用降级策略 model None preprocessor None app.get(/) def read_root(): return {message: 鸢尾花分类模型API已就绪, status: healthy} app.get(/health) def health_check(): 健康检查端点用于监控。 if model is not None and preprocessor is not None: return {status: healthy, model_loaded: True} else: return {status: unhealthy, model_loaded: False}, 503 app.post(/predict/) def predict(features: IrisFeatures): 预测端点。 if model is None or preprocessor is None: raise HTTPException(status_code503, detail模型服务暂不可用) # 将输入数据转换为DataFrame列顺序需与训练时一致 input_df pd.DataFrame([[ features.sepal_length, features.sepal_width, features.petal_length, features.petal_width ]], columns[sepal length (cm), sepal width (cm), petal length (cm), petal width (cm)]) try: # 1. 预处理 processed_features preprocessor.transform(input_df) # 2. 预测 prediction model.predict(processed_features) # 3. 获取预测概率如果模型支持 probabilities model.predict_proba(processed_features) # 鸢尾花类别映射 target_names [setosa, versicolor, virginica] predicted_class target_names[prediction[0]] # 格式化概率输出 prob_list probabilities[0].tolist() prob_dict {name: prob for name, prob in zip(target_names, prob_list)} return { predicted_class: predicted_class, probabilities: prob_dict, status: success } except Exception as e: raise HTTPException(status_code500, detailf预测过程中发生错误: {str(e)}) if __name__ __main__: import uvicorn uvicorn.run(app, host0.0.0.0, port8000)关键点解释输入验证使用Pydantic的BaseModel定义API的输入格式并自动进行类型验证防止非法输入导致服务崩溃。服务健康检查/health端点用于监控系统如Kubernetes的Liveness Probe检查服务状态是生产环境必备。错误处理在predict函数中使用try-except捕获处理和预测过程中的异常并返回明确的错误信息而不是让服务内部错误暴露给客户端。模型与预处理管线绑定预测时必须使用与训练时完全相同的预处理管线对输入数据进行转换否则模型接收到的特征分布会不一致导致预测失效。运行API服务cd ml_pipeline_project uvicorn app.main:app --reload --host 0.0.0.0 --port 8000使用curl或Postman测试curl -X POST http://127.0.0.1:8000/predict/ \ -H Content-Type: application/json \ -d {sepal_length:5.1, sepal_width:3.5, petal_length:1.4, petal_width:0.2}预期返回{ predicted_class: setosa, probabilities: { setosa: 0.99, versicolor: 0.01, virginica: 0.0 }, status: success }4. 管线自动化与编排进阶上述步骤仍需手动按顺序执行脚本。在生产环境中我们需要自动化调度整个管线。这里介绍两种常见方式4.1 使用Python脚本串联创建一个主脚本run_pipeline.py按顺序调用各个模块的函数# run_pipeline.py import sys import os sys.path.append(os.path.join(os.path.dirname(__file__), src)) from data.make_dataset import load_raw_data, split_data from data.build_features import create_feature_pipeline, fit_and_save_pipeline from models.train import run_training_pipeline import joblib def main(): print(步骤1: 加载和拆分数据...) data, target_names load_raw_data() X_train, X_val, X_test, y_train, y_val, y_test split_data(data) joblib.dump((X_train, X_val, X_test, y_train, y_val, y_test), data/processed/split_data.joblib) print(步骤2: 拟合并保存预处理管线...) feature_columns X_train.columns.tolist() preprocessor create_feature_pipeline(feature_columns) fit_and_save_pipeline(preprocessor, X_train) print(步骤3: 训练和评估模型...) run_training_pipeline() print(机器学习管线执行完成) if __name__ __main__: main()4.2 使用工作流编排工具如Apache Airflow对于更复杂、周期性的管线推荐使用Airflow、Kubeflow Pipelines或Prefect。以下是一个简化的Airflow DAG示例# pipelines/iris_ml_pipeline.py from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator default_args { owner: data_science, depends_on_past: False, start_date: datetime(2023, 10, 27), email_on_failure: True, email_on_retry: False, retries: 1, retry_delay: timedelta(minutes5), } dag DAG( iris_ml_pipeline, default_argsdefault_args, description一个端到端的鸢尾花分类模型训练管线, schedule_intervalweekly, # 每周运行一次 catchupFalse ) def data_processing_task(**kwargs): # 调用我们之前写的数据处理函数 from src.data.make_dataset import load_raw_data, split_data from src.data.build_features import create_feature_pipeline, fit_and_save_pipeline import joblib data, _ load_raw_data() X_train, X_val, X_test, y_train, y_val, y_test split_data(data) joblib.dump((X_train, X_val, X_test, y_train, y_val, y_test), data/processed/split_data.joblib) feature_columns X_train.columns.tolist() preprocessor create_feature_pipeline(feature_columns) fit_and_save_pipeline(preprocessor, X_train) def model_training_task(**kwargs): from src.models.train import run_training_pipeline run_training_pipeline() start DummyOperator(task_idstart, dagdag) process_data PythonOperator( task_idprocess_data, python_callabledata_processing_task, dagdag ) train_model PythonOperator( task_idtrain_model, python_callablemodel_training_task, dagdag ) end DummyOperator(task_idend, dagdag) start process_data train_model endAirflow提供了任务依赖管理、调度、重试、监控和告警等生产级功能是管理复杂ML管线的理想选择。5. 常见问题排查与最佳实践构建和运行ML管线时你会遇到各种问题。下面是一些典型场景的排查思路和预防措施。5.1 管线执行常见问题排查问题现象可能原因检查方式处理建议数据拆分结果不一致未设置random_state或种子值在不同运行中变化。检查数据拆分代码中的random_state参数。在需要可复现的步骤如数据拆分、模型初始化中固定random_state。预处理管线转换时报错特征数量不匹配训练和预测时输入数据的列顺序、列名或数量不一致。打印训练时X_train.columns和预测时输入数据的列信息进行对比。使用ColumnTransformer并明确指定列名。在预测API中确保将输入数据转换为与训练时列名、顺序完全一致的DataFrame。模型在测试集上表现远差于验证集数据泄露预处理时在全集上fit或测试集分布与训练/验证集差异过大。检查预处理管线是否只在训练集上fit。对数据集进行统计描述查看分布。严格隔离训练、验证、测试集。预处理管线的fit仅用于训练集。进行更细致的数据探索性分析(EDA)。MLflow无法记录实验跟踪服务器URI设置错误或目录权限问题。检查mlflow.set_tracking_uri指向的路径是否存在且可写。查看MLflow运行日志。使用绝对路径。确保运行脚本的用户对mlruns目录有写权限。对于生产环境考虑部署MLflow服务器。API服务预测结果异常模型或预处理管线未成功加载输入数据格式错误。调用/health端点检查状态。在API日志中查看predict函数的输入和转换后的数据。在服务启动时加入更健壮的加载检查和错误日志。在API中添加输入数据的详细日志注意隐私。5.2 生产环境最佳实践清单当管线从实验走向生产时需要额外关注以下几点配置外置化将所有配置数据库连接、文件路径、超参数、API密钥从代码中移出使用环境变量或配置中心如Consul、AWS Parameter Store管理。版本控制一切不仅代码要Git管理数据使用DVC、模型使用MLflow Model Registry、预处理管线、甚至运行环境使用Docker都需要版本化。完善的日志与监控应用日志记录管线的每个关键步骤开始、结束、错误、数据行数、模型指标等。模型监控监控预测延迟、吞吐量、输入数据分布漂移如PSI、预测结果分布变化。系统监控监控API服务的CPU、内存、网络和磁盘使用情况。自动化测试单元测试测试数据处理函数、特征工程逻辑、模型预测函数。集成测试测试从数据输入到模型输出的完整管线。一致性测试确保新训练的模型性能不低于基线模型。回滚与灾备模型部署应有回滚机制如通过MLflow Model Registry切换生产模型版本。定期备份关键数据和模型。资源与成本管理对于大规模训练使用云上Spot实例或自动伸缩组。设置预算告警避免意外成本。5.3 从项目到产品的管线演进最初的管线可能只是一个脚本。随着复杂度增加你需要考虑数据版本化使用DVCData Version Control或LakeFS来管理数据集的版本。特征存储使用Feast或Tecton等特征存储平台实现特征的定义、计算、存储和在线/离线服务解决训练/服务特征不一致问题。模型注册表使用MLflow Model Registry或类似的工具来管理模型的生命周期Staging, Production, Archived实现协作和审批流程。持续集成/持续部署将模型训练和评估管线集成到CI/CD流程中当代码或数据更新时自动触发训练并通过测试后自动部署到预发环境。机器学习管线是将机器学习项目工程化、产品化的核心框架。它始于对数据流和模型生命周期的系统性思考并通过自动化工具得以实现。本文通过一个从数据到服务的完整案例展示了构建一条基础但健壮的管线所需的关键组件模块化的代码、可复现的数据处理、实验跟踪、模型服务化以及简单的监控。掌握这些你就能为更复杂的机器学习项目打下坚实的工程基础。下一步你可以尝试将本例中的RandomForest替换为更复杂的模型引入特征选择或超参数自动优化如Optuna或者将整个管线部署到云上如使用AWS SageMaker Pipelines或Google Vertex AI Pipelines向真正的MLOps迈进。