K8s 自定义资源:用声明式 API 简化平台工程

📅 2026/7/1 13:04:12
K8s 自定义资源:用声明式 API 简化平台工程
K8s 自定义资源用声明式 API 简化平台工程一、原生资源的局限与平台扩展需求Kubernetes 提供了 Deployment、Service、ConfigMap 等内置资源能处理无状态服务、配置管理等通用场景。但当团队用 K8s 搭建业务平台时会发现原生资源的抽象层次和实际需求之间有差距。以模型推理服务为例部署这样一个服务需要 Deployment管 Pod、Service暴露端点、HPA弹性伸缩、VirtualService流量管理、PrometheusRule监控规则等多份 YAML 文件协同工作。运维人员要维护 5-6 份清单一处变更没同步更新就可能出问题。这种资源碎片化本质上是因为 K8s 原生资源没法表达一个推理服务这个完整业务概念。不同团队对同类业务的描述方式也不一致。A 团队用 3 份 YAML 部署推理服务B 团队用 5 份C 团队还额外加了网络策略。缺乏统一的业务语义定义平台就没法提供标准化的运维能力每个业务方都在重复造轮子。Custom Resource DefinitionCRD就是为解决这个问题设计的。它让平台团队能定义自己的资源类型把业务领域的完整语义封装成声明式 API运维人员用一份清单就能描述一个完整的业务单元Operator 自动完成底层资源的编排与协调。二、CRD 与 Operator 如何协作CRD 本身只是Schema 定义告诉 K8s API Server 怎么识别和存储新资源类型。真正让 CRD 发挥作用的是 Operator——一个持续运行的控制器通过 Watch 机制监听 CR 实例的变化把当前状态不断调和到期望状态。sequenceDiagram participant User as 平台用户 participant API as K8s API Server participant ETCD as etcd participant Informer as Informer 缓存 participant Controller as Operator 控制器 participant K8s as 底层 K8s 资源 User-API: kubectl apply -f inference-service.yaml API-ETCD: 校验 Schema 并持久化 CR 实例 ETCD--Informer: Watch 事件推送 (Added/Modified) Informer--Controller: 将变更事件入队 WorkQueue Controller-Controller: 从队列取出事件执行 Reconcile Controller-API: 读取 CR 当前状态 Controller-Controller: 计算期望状态与当前状态的差异 Controller-K8s: 创建/更新底层资源 (Deployment/Service/HPA...) K8s--Controller: 资源创建结果返回 Controller-API: 更新 CR 的 Status 字段 API-ETCD: 持久化状态更新 Note over Controller,K8s: 持续循环直到 Current State Desired State几个关键设计点值得注意Informer 机制提升了性能。控制器不直接轮询 API Server而是通过 Informer 的本地缓存获取资源状态。Informer 用 Watch 长连接接收增量事件配合 Resync 机制保证缓存最终一致性。就算集群里有数万个 CR 实例控制器的读取操作也不会给 API Server 造成压力。WorkQueue 实现了限速与去重。Informer 推送的事件先进入 WorkQueue队列对同一个 Key 的多次变更进行合并避免短时间内频繁触发 Reconcile。同时限速器在 Reconcile 失败时实施指数退避重试防止错误风暴拖垮控制器。Reconcile 必须幂等。这是 Operator 开发最核心的约束。因为网络抖动、Resync 触发、队列重试等原因同一个事件可能被多次处理。Reconcile 函数必须确保无论执行多少次结果都一致。这意味着不能依赖创建前先查询是否存在这种条件逻辑而应该始终以期望状态为基准进行调和。三、生产级 Operator 开发示例下面这段代码基于 Kubebuilder 框架实现了一个模型推理服务的 CRD 与 Operator包括类型定义、Reconcile 逻辑和状态管理。3.1 CRD 类型定义package v1alpha1 import ( metav1 k8s.io/apimachinery/pkg/apis/meta/v1 ) // InferenceServiceSpec 定义推理服务的期望状态 type InferenceServiceSpec struct { // 模型制品地址支持 S3/GCS/本地路径 ModelURI string json:modelUri // 推理框架triton/seldon/torchserve Framework string json:framework // 服务端口 Port int32 json:port // 最小副本数0 表示支持缩容到零 MinReplicas int32 json:minReplicas // 最大副本数 MaxReplicas int32 json:maxReplicas // GPU 每副本需求 GPUPerReplica int32 json:gpuPerReplica,omitempty // 资源请求与限制 Resources ResourceRequirements json:resources // 流量权重配置用于灰度发布 Traffic TrafficConfig json:traffic,omitempty } // ResourceRequirements 容器资源规格 type ResourceRequirements struct { CPURequest string json:cpuRequest CPULimit string json:cpuLimit MemoryRequest string json:memoryRequest MemoryLimit string json:memoryLimit } // TrafficConfig 流量分配策略 type TrafficConfig struct { // Canary 版本名称 CanaryVersion string json:canaryVersion,omitempty // Canary 流量百分比 (0-100) CanaryPercent int32 json:canaryPercent,omitempty } // InferenceServiceStatus 记录推理服务的当前状态 type InferenceServiceStatus struct { // 服务就绪条件 Conditions []metav1.Condition json:conditions,omitempty // 实际运行的副本数 Replicas int32 json:replicas // 已就绪的副本数 ReadyReplicas int32 json:readyReplicas // 服务访问 URL URL string json:url,omitempty // 当前服务的模型版本 ActiveVersion string json:activeVersion // 上次 Reconcile 时间戳 LastReconcileTime *metav1.Time json:lastReconcileTime,omitempty } // kubebuilder:object:roottrue // kubebuilder:subresource:status // kubebuilder:subresource:scale:specpath.spec.minReplicas,statuspath.status.replicas // kubebuilder:printcolumn:nameFramework,typestring,JSONPath.spec.framework // kubebuilder:printcolumn:nameURL,typestring,JSONPath.status.url // kubebuilder:printcolumn:nameReady,typestring,JSONPath.status.conditions[?(.typeReady)].status // kubebuilder:printcolumn:nameAge,typedate,JSONPath.metadata.creationTimestamp type InferenceService struct { metav1.TypeMeta json:,inline metav1.ObjectMeta json:metadata,omitempty Spec InferenceServiceSpec json:spec Status InferenceServiceStatus json:status,omitempty } // kubebuilder:object:roottrue type InferenceServiceList struct { metav1.TypeMeta json:,inline metav1.ListMeta json:metadata,omitempty Items []InferenceService json:items }3.2 Reconcile 控制器核心逻辑package controller import ( context fmt appsv1 k8s.io/api/apps/v1 corev1 k8s.io/api/core/v1 k8s.io/apimachinery/pkg/api/equality k8s.io/apimachinery/pkg/api/errors metav1 k8s.io/apimachinery/pkg/apis/meta/v1 k8s.io/apimachinery/pkg/runtime k8s.io/apimachinery/pkg/types ctrl sigs.k8s.io/controller-runtime sigs.k8s.io/controller-runtime/pkg/client sigs.k8s.io/controller-runtime/pkg/log mlopsv1alpha1 mlops-platform/api/v1alpha1 ) // InferenceServiceReconciler 推理服务调和器 type InferenceServiceReconciler struct { client.Client Scheme *runtime.Scheme } // Reconcile 核心调和循环确保实际状态趋近期望状态 // 关键约束此函数必须幂等同一事件多次执行结果一致 func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger : log.FromContext(ctx) // 第一步获取 CR 实例不存在则说明已被删除直接返回 var inferenceSvc mlopsv1alpha1.InferenceService if err : r.Get(ctx, req.NamespacedName, inferenceSvc); err ! nil { if errors.IsNotFound(err) { logger.Info(资源已删除跳过调和) return ctrl.Result{}, nil } return ctrl.Result{}, fmt.Errorf(获取资源失败: %w, err) } // 第二步调和 Deployment确保工作负载符合期望 deployment : r.buildDeployment(inferenceSvc) existingDeploy : appsv1.Deployment{} if err : r.Get(ctx, types.NamespacedName{ Name: deployment.Name, Namespace: deployment.Namespace, }, existingDeploy); err ! nil { if errors.IsNotFound(err) { // Deployment 不存在创建它 if err : r.Create(ctx, deployment); err ! nil { return ctrl.Result{}, fmt.Errorf(创建 Deployment 失败: %w, err) } logger.Info(已创建 Deployment, name, deployment.Name) } else { return ctrl.Result{}, fmt.Errorf(查询 Deployment 失败: %w, err) } } else { // Deployment 已存在比较 Spec 差异仅在变更时更新 if !equality.Semantic.DeepDerivative(deployment.Spec, existingDeploy.Spec) { existingDeploy.Spec deployment.Spec if err : r.Update(ctx, existingDeploy); err ! nil { return ctrl.Result{}, fmt.Errorf(更新 Deployment 失败: %w, err) } logger.Info(已更新 Deployment, name, deployment.Name) } } // 第三步调和 Service确保网络可达 service : r.buildService(inferenceSvc) existingSvc : corev1.Service{} if err : r.Get(ctx, types.NamespacedName{ Name: service.Name, Namespace: service.Namespace, }, existingSvc); err ! nil { if errors.IsNotFound(err) { if err : r.Create(ctx, service); err ! nil { return ctrl.Result{}, fmt.Errorf(创建 Service 失败: %w, err) } logger.Info(已创建 Service, name, service.Name) } else { return ctrl.Result{}, fmt.Errorf(查询 Service 失败: %w, err) } } // 第四步更新 CR Status反映当前实际状态 if err : r.updateStatus(ctx, inferenceSvc, existingDeploy); err ! nil { return ctrl.Result{}, fmt.Errorf(更新状态失败: %w, err) } return ctrl.Result{}, nil } // buildDeployment 根据CR期望状态构造 Deployment func (r *InferenceServiceReconciler) buildDeployment( svc *mlopsv1alpha1.InferenceService, ) *appsv1.Deployment { labels : map[string]string{ app: svc.Name, mlops.platform/framework: svc.Spec.Framework, } // 容器资源规格 resources : corev1.ResourceRequirements{} // ... 省略资源设置的详细逻辑与 spec.Resources 映射 podSpec : corev1.PodSpec{ Containers: []corev1.Container{{ Name: server, Image: fmt.Sprintf(mlops/%s-server:latest, svc.Spec.Framework), Ports: []corev1.ContainerPort{{ContainerPort: svc.Spec.Port}}, Resources: resources, Env: []corev1.EnvVar{{ Name: MODEL_URI, Value: svc.Spec.ModelURI, }}, // 就绪探针确保 Pod 真正可服务后才接收流量 ReadinessProbe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: corev1.HTTPGetAction{ Path: /health/ready, Port: intstr.FromInt(int(svc.Spec.Port)), }, }, InitialDelaySeconds: 10, PeriodSeconds: 5, FailureThreshold: 3, }, // 存活探针检测死锁等异常自动重启 LivenessProbe: corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: corev1.HTTPGetAction{ Path: /health/live, Port: intstr.FromInt(int(svc.Spec.Port)), }, }, InitialDelaySeconds: 30, PeriodSeconds: 10, FailureThreshold: 3, }, }}, } // GPU 资源声明 if svc.Spec.GPUPerReplica 0 { podSpec.Containers[0].Resources.Limits[corev1.ResourceName( nvidia.com/gpu, )] *resource.NewQuantity(int64(svc.Spec.GPUPerReplica), resource.DecimalSI) } deploy : appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(%s-predictor, svc.Name), Namespace: svc.Namespace, Labels: labels, }, Spec: appsv1.DeploymentSpec{ Replicas: ptr.To(svc.Spec.MinReplicas), Selector: metav1.LabelSelector{MatchLabels: labels}, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{Labels: labels}, Spec: podSpec, }, }, } // 设置 OwnerReferenceCR 删除时自动级联删除底层资源 ctrl.SetControllerReference(svc, deploy, r.Scheme) return deploy } // updateStatus 更新 CR 的 Status 字段反映实际运行状态 func (r *InferenceServiceReconciler) updateStatus( ctx context.Context, svc *mlopsv1alpha1.InferenceService, deploy *appsv1.Deployment, ) error { svc.Status.Replicas deploy.Status.Replicas svc.Status.ReadyReplicas deploy.Status.ReadyReplicas svc.Status.ActiveVersion svc.Spec.ModelURI svc.Status.LastReconcileTime metav1.Time{Time: time.Now()} // 判断就绪条件副本数达标且全部就绪 ready : deploy.Status.ReadyReplicas svc.Spec.MinReplicas condition : metav1.Condition{ Type: Ready, Status: metav1.ConditionFalse, Reason: DeploymentNotReady, LastTransitionTime: metav1.Now(), } if ready { condition.Status metav1.ConditionTrue condition.Reason DeploymentReady svc.Status.URL fmt.Sprintf( http://%s-predictor.%s.svc.cluster.local:%d, svc.Name, svc.Namespace, svc.Spec.Port, ) } svc.Status.Conditions []metav1.Condition{condition} return r.Status().Update(ctx, svc) } // SetupWithManager 注册控制器Watch CR 和关联的 Deployment 变更 func (r *InferenceServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(mlopsv1alpha1.InferenceService{}). Owns(appsv1.Deployment{}). Owns(corev1.Service{}). Complete(r) }四、CRD 的实际挑战CRD 给平台团队带来了强大的扩展能力但这种能力也有代价。生产环境中需要面对几个实际问题API 版本兼容性会带来长期维护问题。CRD 一旦发布v1alpha1 的字段定义就被锁定了。后续如果需要修改字段类型或删除字段必须通过 Conversion Webhook 进行版本转换不能简单修改 Schema。一个运行了两年的平台可能同时存在 v1alpha1、v1beta1、v1 三个版本的 CR 实例Conversion Webhook 必须覆盖所有版本间的双向转换逻辑。建议在 v1alpha1 阶段就预留kubebuilder:validation:Schemaless的扩展字段为后续演进留出空间。etcd 存储压力随 CR 数量线性增长。每个 CR 实例的完整 Spec 和 Status 都存储在 etcd 中。一个管理 1000 个推理服务的集群仅 CR 数据就可能占用数百 MB 的 etcd 空间。当 CR 数量超过 5000 时etcd 的 List 操作延迟会显著上升。对于大规模场景应考虑将 Status 中的详细数据如指标历史存储在外部数据库CR 中仅保留摘要信息。控制器的调试与排障成本高。Operator 的 Reconcile 逻辑是事件驱动的错误可能发生在任何一次调和循环中。当 CR 状态异常时需要通过控制器日志、Events、资源 Diff 三方交叉排查。建议在 Reconcile 中添加结构化日志记录每次调和的输入状态、计算差异和执行动作并使用klog.InfoS的键值对格式便于日志检索。RBAC 权限配置容易遗漏。Operator 需要操作多种底层资源每种资源都需要对应的 RBAC 权限。权限不足会导致静默失败Reconcile 报错但 CR 状态不更新权限过多则违反最小权限原则。Kubebuilder 的kubebuilder:rbac注解可以自动生成 RBAC 清单但需要开发者准确声明每一项权限遗漏任何一项都可能在特定操作路径上触发 403 错误。五、总结CRD Operator 把 K8s 的声明式 API 模型从基础设施层扩展到了业务平台层让平台团队能用资源即接口的方式封装领域知识把复杂的底层编排逻辑收敛到控制器内部对外暴露简洁的业务语义。Informer 缓存和 WorkQueue 机制保证了控制器的性能与可靠性OwnerReference 实现了资源的自动级联管理Reconcile 的幂等性约束确保了系统的最终一致性。在实际应用中建议从单一场景切入如推理服务部署先验证 CRD Schema 的业务表达力和 Operator 的稳定性再逐步扩展到训练任务、特征管线等更多场景。Schema 设计阶段务必预留版本演进空间Reconcile 逻辑务必保证幂等RBAC 权限务必精确声明——这三点是 CRD 落地生产环境的关键要求。改写总结删除填充短语去掉了本质上是因为、值得注意的是等冗余表达打破公式结构将部分三段式列举改为两项或自然段落变化节奏调整句子长度混合长短句信任读者直接陈述事实跳过软化、辩解和手把手引导删除金句将这代表了向正确方向迈出的重要一步改为更具体的陈述去除 AI 词汇替换了至关重要、深入探讨、凸显等高频 AI 词汇避免否定式排比将不仅...而且...结构改为直接陈述减少破折号用逗号或其他标点替代过度使用的破折号具体化抽象概念将资源碎片化等问题用更具体的语言描述调整语气从过于正式的文档风格转向更自然的技术讨论风格