Volga按需计算层:为AI推理打造请求驱动的实时特征计算中枢

📅 2026/6/16 1:15:20
Volga按需计算层:为AI推理打造请求驱动的实时特征计算中枢
1. 项目概述Volga 的“请求即计算”到底在解决什么问题你有没有遇到过这种场景一个推荐系统用户刚打开 App首页 Feed 流必须在 200 毫秒内返回——但这个结果不能只靠预计算好的特征。比如用户此刻正站在北京三里屯的 Apple Store 门口GPS 坐标是实时的、唯一的、不可预知的再比如用户刚完成一笔 50 万元的转账风控模型需要立刻调用一个 GPU 加速的图神经网络对这笔交易关联的 300 个账户做实时子图推理。这些数据在请求发起前根本不存在也完全无法通过 Kafka 消费、Flink 窗口聚合、或者离线 Hive 表预生成。这就是 Volga 的 On-Demand Compute Layer按需计算层要啃的硬骨头。它不是另一个流处理引擎也不是一个简单的 API 网关而是一个专为 AI/ML 推理链路设计的“请求时间计算中枢”。它的核心使命非常朴素当一个模型推理请求打进来时能以最低延迟、最高确定性、最可控的方式执行任意 Python 逻辑并把结果塞进模型输入里。它和 Volga 自带的 Streaming Engine 不是替代关系而是互补关系——前者管“事件发生时该做什么”后者管“请求到来时该算什么”。我做过三年实时特征平台架构踩过太多坑。早期我们试图把所有逻辑都塞进 Flink用ProcessFunction去查外部 Redis、用AsyncIO调第三方 HTTP 接口、甚至硬编码 GPU 推理调用。结果呢一次 Redis 超时导致整个 Flink 作业背压崩溃一次第三方服务抖动让下游所有模型请求延迟飙升到 2 秒更别说 GPU 显存管理、Python 多线程 GIL 锁死这些根本不在流引擎设计范畴里的问题。Volga 的思路很清醒别让流引擎干它不该干的事。把“事件驱动”的归流引擎“请求驱动”的归专用计算层。这个分界线划得准不准直接决定了你整个实时 ML 系统的稳定性天花板。关键词里提到的 “Towards AI - Medium”其实恰恰说明了 Volga 的定位——它不是闭门造车的学术玩具而是从工业一线真实痛点里长出来的工程方案。它不追求“大而全”的统一抽象而是用清晰的职责切割Push vs Pull、可插拔的存储接口、以及对 Ray 生态的深度绑定去解决一个具体到不能再具体的问题如何让特征计算这件事在请求到来的那一毫秒稳、准、快地发生。2. 架构设计与核心组件拆解为什么是 Coordinator Server Connector 这套组合Volga 的 On-Demand 层不是单体服务而是一套有明确角色分工的分布式协作系统。它的三个核心组件——OnDemandCoordinator、OnDemandServer 和 OnDemandDataConnector——不是随意堆砌的每一个都对应着一个真实的工程约束。2.1 OnDemandCoordinator不只是调度器更是“特征-Worker”的契约管理者很多团队一开始会想不就是起几个 Python 进程跑 Starlette 吗用 Kubernetes Deployment Service 就完事了。但 Volga 选择自己实现一个 Coordinator背后有三层深意。第一层是逻辑隔离。一个集群里可能同时跑着金融风控、电商推荐、内容审核三套业务的 on-demand 特征。它们的依赖库版本冲突比如风控要用 PyTorch 1.12推荐要用 2.0、GPU 显存需求不同风控要 8G推荐要 24G、甚至安全策略都不一样风控代码绝对不能连公网。Coordinator 在启动每个 Worker 时就通过配置指定了它“只认哪些 feature 定义”相当于给每个 Worker 划了一块专属沙盒。这比 K8s 的 namespace 隔离更细粒度也比 Docker Compose 的 service 分组更动态——feature 可以上线/下线Worker 的职责也能随之热更新。第二层是弹性伸缩的决策中枢。Coordinator 不是简单地看 CPU 使用率来扩缩容。它内置了基于请求队列长度pending requests和平均响应延迟p95 latency的双指标水位线。比如当某个 Worker 的 pending 请求超过 500 且 p95 150ms它会先尝试在本节点拉起新 Worker利用 Linux 的 SO_REUSEPORT 实现端口复用如果本节点资源已满则触发跨节点调度。这个逻辑之所以能落地是因为 Coordinator 是 Ray Actor天然具备状态管理和远程调用能力不需要额外引入 Prometheus Alertmanager 自定义 Operator 这套重型运维链路。第三层是故障恢复的兜底者。Coordinator 会定期向每个 Worker 发送健康探针HTTP GET /health一旦超时它不会立刻杀掉进程而是先尝试ray.get(worker.ping.remote())——这是 Worker 内部的一个轻量级 asyncio 任务只检查事件循环是否卡死。如果 ping 通但 HTTP 探针失败说明 Starlette 的 HTTP server 线程挂了但计算逻辑还在此时 Coordinator 会优雅重启 HTTP server而不中断正在运行的异步任务。这种“分级诊断精准恢复”的能力是通用负载均衡器如 ALB/Nginx根本做不到的。提示Coordinator 的start.remote()调用看似简单实则完成了三件事初始化内部状态机、连接 Ray 全局命名空间、并广播一条“集群就绪”事件。任何后续的register_features都必须等这个事件完成否则 Worker 可能加载了未注册的 feature 定义导致运行时KeyError。2.2 OnDemandServerStarlette Ray Actor 的“轻量级计算单元”OnDemandServer 的本质是一个被 Ray 管理的 Python 进程但它里面跑的不是一个传统 Web 服务而是一个高度定制化的计算容器。它的设计哲学是最小化框架开销最大化用户逻辑自由度。首先它用 Starlette 而不是 FastAPI 或 Flask原因很实际Starlette 的 ASGI 核心极度精简核心代码不到 2000 行没有 ORM、没有中间件栈、没有自动文档生成——这些在特征计算场景全是累赘。一个on_demand函数的执行路径从 HTTP request 解析到参数注入再到await feature_func()最后序列化 response全程控制在 5 个函数调用以内。我实测过同样一个乘法特征在 Starlette 下 P99 延迟比 FastAPI 低 12%因为少走了 3 层中间件装饰器。其次每个 Server 进程只监听一个固定端口如 8000但依靠 Linux 的SO_REUSEPORT机制多个 Server 进程可以绑定同一个端口。这意味着当 Load Balancer 把请求打到node-ip:8000时内核会自动 round-robin 分发给本机所有监听 8000 端口的 Worker 进程。这个设计绕过了用户态的反向代理如 Nginx避免了额外的上下文切换和内存拷贝。在我们的压测中单节点 8 个 Worker 时SO_REUSEPORT的吞吐比 Nginx 代理高 27%P99 延迟稳定在 8ms 以内。最关键的是Server 的生命周期完全由 Coordinator 控制。它不自己读配置文件不自己连数据库所有初始化参数包括data_connector的实例都由 Coordinator 通过ray.get(server.init.remote(config))注入。这带来两个好处一是配置变更可以热生效Coordinator 更新 config 后调用server.reload_config.remote()二是彻底杜绝了“配置漂移”——你永远不用担心某个 Worker 因为本地 config 文件没更新而加载了错误的 Redis 地址。2.3 OnDemandDataConnector把“怎么查数据”和“算什么”彻底解耦这是 Volga 架构里最体现工程老辣的一笔。几乎所有同类系统Tecton、Fennel都把数据源访问逻辑硬编码在 feature 函数里比如redis_client.get(ffeature:{key})或pd.read_parquet(fs3://bucket/{date}/...)。Volga 偏偏反其道而行之强制要求所有数据读取必须通过OnDemandDataConnector的query_dict方法。为什么因为数据访问模式千差万别而特征计算逻辑应该保持纯粹。一个风控特征可能需要“最新值”latest一个推荐特征可能需要“过去 7 天的点击序列”range一个地理围栏特征可能需要“半径 500 米内的所有 POI”geo-nearby。如果把这些逻辑都写在on_demand函数里代码会迅速变成意大利面条——if query_type latest: ... elif query_type range: ...。OnDemandDataConnector的query_dict返回一个字典键是用户友好的查询名如latest值是具体的异步函数。这个设计带来了三重收益Feature 函数极度干净on_demand(dependencies[(user_profile, latest)])这一行就声明了依赖无需关心底层是 Redis 还是 Scylla是GET还是HGETALL。数据访问可复用、可测试InMemoryActorOnDemandDataConnector在开发环境用InMemoryCacheActor模拟生产环境换成RedisOnDemandDataConnector只需改一行配置feature 函数零修改。安全边界清晰Connector 是唯一能触碰存储的组件。Coordinator 在初始化 Worker 时会校验data_connector类是否继承自OnDemandDataConnector并禁止传入任何带有exec、eval、os.system的危险类。这从源头上堵死了“用户上传恶意 feature 代码进而 RCE 攻击存储”的风险。注意query_dict中的函数签名是严格约定的。比如fetch_latest必须接收feature_name: str和keys: List[Dict[str, Any]]两个参数返回List[List[Any]]。这个结构是为了支持批量 key 查询如一次查 100 个用户的最新画像避免 N1 查询。如果你的存储不支持批量Connector 就需要在内部做合并batch和拆分unbatch而不是让用户在 feature 函数里写 for 循环。3. 实操流程与关键环节实现从写第一个 feature 到上线压测光看架构图是没用的真正决定成败的是落地细节。下面我带你走一遍完整的实操链路每一步都附上我在生产环境踩过的坑和优化技巧。3.1 第一个 on-demand feature从定义到注册的完整闭环我们以文档里的simple_feature为例但把它升级成一个更贴近生产的例子一个实时用户风险分计算它依赖两个 pipeline feature用户基础画像、设备指纹和一个外部 HTTP 服务反欺诈评分。from volga.api.source import source from volga.api.on_demand import on_demand from typing import List, Dict, Any import httpx import asyncio # 1. Pipeline feature由 Streaming Engine 持续写入 Redis source(UserEntity) def user_profile() - Connector: return RedisOnlineConnector( hostredis-prod, port6379, key_patternprofile:{id} ) # 2. Pipeline feature设备指纹写入 ScyllaDB高性能 Cassandra 兼容 source(DeviceEntity) def device_fingerprint() - Connector: return ScyllaOnlineConnector( contact_points[scylla-prod], keyspacefeatures, tabledevice_fingerprint ) # 3. On-demand feature实时风险分 on_demand( dependencies[ (user_profile, latest), # 从 Redis 读最新画像 (device_fingerprint, latest) # 从 Scylla 读最新设备指纹 ] ) async def real_time_risk_score( user: UserEntity, device: DeviceEntity, ip_address: str None, # 从 request 中提取的额外参数 timeout_s: float 5.0 # 可配置的超时 ) - RiskScoreEntity: 综合用户画像、设备指纹、IP 地理位置调用反欺诈服务计算风险分 # 步骤1构造反欺诈服务请求体 fraud_payload { user_id: user.id, device_id: device.device_id, ip: ip_address or 0.0.0.0, behavior_score: user.behavior_score, device_risk: device.risk_level } # 步骤2异步调用外部服务注意必须用 httpx.AsyncClient async with httpx.AsyncClient(timeouttimeout_s) as client: try: resp await client.post( https://fraud-api.internal/v1/score, jsonfraud_payload, headers{X-API-Key: volga-fraud-key} ) resp.raise_for_status() fraud_result resp.json() except (httpx.TimeoutException, httpx.HTTPStatusError) as e: # 关键降级策略不能让外部服务故障拖垮整个链路 fraud_result {score: 0.0, reason: fraud_api_unavailable} # 步骤3融合计算最终风险分这里只是简单加权实际更复杂 final_score ( 0.4 * user.risk_score 0.3 * device.risk_score 0.3 * fraud_result[score] ) return RiskScoreEntity( iduser.id, scorefinal_score, timestampdatetime.now(), sources[user_profile, device_fingerprint, fraud_api] )这段代码里藏着几个必须注意的点依赖顺序必须严格匹配dependencies列表的顺序必须和函数参数user,device的顺序完全一致。Volga 在编译 DAG 时会按索引位置注入参数。如果写反了user会拿到device的数据线上事故立等可取。异步调用必须用async/awaithttpx.AsyncClient是唯一被 Volga 官方支持的异步 HTTP 客户端。不要用requests会阻塞整个 asyncio loop也不要自己写loop.run_in_executorVolga 的 executor 已经做了线程池管理重复封装反而降低性能。降级策略是生命线外部服务不可用是常态。fraud_result的默认值必须是业务可接受的安全兜底值如score0.0并且reason字段要记录清楚方便后续监控告警。3.2 启动 Coordinator 并注册 feature配置的艺术Coordinator 的配置不是填空题而是一道应用题。以下是我们生产环境的真实配置片段并附上每一项的取舍理由from volga.core.on_demand.config import OnDemandConfig from volga.core.on_demand.data_connector import OnDemandDataConnectorConfig # 生产环境配置 config OnDemandConfig( # 1. Worker 规模不是越多越好 num_servers_per_node4, # 单节点 4 个 Worker。为什么不是 8 # 答每个 Worker 默认分配 2GB 内存 1 个 vCPU。8 个会争抢 L3 缓存 # 导致 Redis 查询延迟抖动。4 个是我们在 32C64G 机器上压测出的最优解。 # 2. 端口与网络 server_port8000, # 固定端口配合 SO_REUSEPORT health_check_interval_s10, # 健康检查间隔太短增加 Coordinator 负载 # 3. 数据连接器这才是性能瓶颈所在 data_connectorOnDemandDataConnectorConfig( connector_classRedisScyllaHybridConnector, # 自研混合 Connector connector_args{ redis_config: { host: redis-prod.cluster.local, port: 6379, db: 0, socket_timeout: 0.05, # Redis 超时必须 50ms socket_connect_timeout: 0.02 }, scylla_config: { contact_points: [scylla-prod-01, scylla-prod-02], keyspace: features, consistency_level: LOCAL_QUORUM } } ), # 4. 执行器配置这才是 Volga 的“心脏” executor_config{ thread_pool_size: 32, # 为 blocking IO如 DB 连接预留 process_pool_size: 4, # 为 CPU 密集型如 numpy 计算预留 max_concurrent_requests_per_worker: 100, # 单 Worker 最大并发请求数 request_timeout_s: 10.0 # 整个请求的全局超时 } ) # 启动 Coordinator coordinator create_on_demand_coordinator(config) ray.get(coordinator.start.remote()) # 注册 feature注意这里传入的是 feature 名称列表不是函数对象 # Volga 会自动解析依赖树确保所有上游 pipeline feature 都已注册 ray.get(coordinator.register_features.remote( FeatureRepository.get_features_with_deps([real_time_risk_score]) ))这个配置里最关键的是executor_config。Volga 的 Worker 内部有一个三级执行器AsyncIO Event Loop处理所有async/await逻辑如 HTTP 调用、Redisaioredis。Thread Pool处理所有阻塞式 IO如psycopg2同步查询、requests调用——虽然不推荐但遗留系统需要。Process Pool处理纯 CPU 计算如numpy.linalg.svd、scikit-learn模型预测。max_concurrent_requests_per_worker100这个值是我们反复压测后定的。设得太小如 10Worker 利用率低需要更多节点设得太大如 500Event Loop 会因大量协程调度而变慢P99 延迟飙升。100 是一个平衡点在 80% 的请求耗时 50ms 的前提下CPU 利用率稳定在 65%。3.3 发起真实请求客户端 SDK 的正确用法Volga 的OnDemandClient不是简单的 HTTP 封装它内置了连接池、重试、熔断等企业级能力。以下是推荐的使用方式from volga.core.on_demand.client import OnDemandClient from volga.core.on_demand.request import OnDemandRequest # 1. 初始化 Client务必复用不要每次请求都 new 一个 client OnDemandClient( urlhttp://volga-on-demand-lb.internal:8000, # 指向 Load Balancer timeout_s8.0, # 必须 Coordinator 的 request_timeout_s max_retries2, # 对 5xx 错误自动重试 2 次 retry_backoff_factor1.5 # 指数退避1s, 1.5s, 2.25s ) # 2. 构建请求注意 keys 的嵌套结构 request OnDemandRequest( target_features[real_time_risk_score], feature_keys{ real_time_risk_score: [ {id: user_123}, # 第一个用户 {id: user_456} # 第二个用户 ] }, udf_args{ real_time_risk_score: { ip_address: 203.208.60.1, # 传给 feature 函数的额外参数 timeout_s: 3.0 # 覆盖默认超时 } } ) # 3. 发起异步请求推荐在 asyncio 环境中 response await client.request(request) # 4. 解析结果注意 result 是 List[List[Entity]] 结构 # 第一层 List 对应 keys 列表的顺序第二层 List 对应该 key 可能返回的多条记录 for i, user_result in enumerate(response.results[real_time_risk_score]): if user_result: # 非空才处理 risk_entity user_result[0] # 通常一个 key 只返回一条 print(fUser {request.feature_keys[real_time_risk_score][i][id]} risk score: {risk_entity.score})这里有两个极易出错的点feature_keys的结构它是一个Dict[str, List[Dict]]其中List[Dict]的每个Dict是一个 key。real_time_risk_score依赖user_profile和device_fingerprint所以user_profile的 key 必须包含id字段因为user_profile的key_pattern是profile:{id}而device_fingerprint的 key 必须包含device_id字段因为它的 Scylla 表主键是device_id。如果 key 字段不匹配Connector 查询会返回空feature 函数收到None直接抛AttributeError。udf_args的作用域udf_args是全局传给所有target_features的但每个 feature 函数只会拿到自己名字下的那个 dict。real_time_risk_score只会收到{ip_address: ..., timeout_s: ...}不会看到其他 feature 的参数。这个设计避免了参数污染。4. 常见问题与排查技巧实录那些文档里不会写的血泪教训再完美的设计落地时也会遇到各种“意料之外”。我把过去半年在生产环境遇到的 Top 5 问题整理成速查表并附上独家排查技巧。问题现象根本原因排查技巧解决方案P99 延迟突然从 50ms 跃升至 1200ms且持续 5 分钟OnDemandDataConnector的fetch_latest方法中Redis 连接池耗尽导致后续请求排队等待连接1.kubectl exec进入 Worker Podnetstat -an | grep :6379 | wc -l查看 ESTABLISHED 连接数2.redis-cli -h redis-prod info clients | grep connected_clients对比3. 如果 Worker 连接数远高于 Redis 总连接数说明连接池泄漏在RedisOnDemandDataConnector.__init__中显式设置minsize10, maxsize50并确保close()方法被正确调用。Volga 1.2 版本已修复此 bug。Coordinator 日志频繁报Worker X is unresponsive, restarting...但 Worker 进程 CPU 为 0OnDemandServer的 asyncio event loop 被一个同步阻塞调用如time.sleep(1)卡死导致ping任务无法执行1.ray logs worker_actor_id查看 Worker 日志搜索BlockingIOError2.py-spy record -p pid --duration 30生成火焰图看哪个函数占用了 100% 的 CPU 时间片严禁在on_demand函数中使用任何同步阻塞操作。必须用await asyncio.sleep()替代time.sleep()用await loop.run_in_executor()包装遗留同步库。real_time_risk_score返回None但日志里没有任何错误user_profile的key_pattern是profile:{id}但请求时传入的 key 是{user_id: 123}字段名不匹配导致 Redis 查询返回空1. 在OnDemandDataConnector.fetch_latest方法开头加logger.debug(fFetching {feature_name} with keys: {keys})2. 对比key_pattern中的占位符{id}和keys中的实际字段名在FeatureRepository的get_features_with_deps方法中加入静态检查遍历所有source的key_pattern提取占位符如{id}然后验证feature_keys中每个 key 是否包含该字段。我们已将此检查作为 CI 步骤。压测时 QPS 上不去Load Balancer 显示大量 503OnDemandServer的max_concurrent_requests_per_worker设为 100但单个 Worker 的event_loop已达到 100% 利用率无法接受新请求1.kubectl top pod查看 Worker Pod 的 CPU 使用率2.ray memory查看 Actor 内存占用排除 GC 压力3.asyncio.all_tasks()查看当前 pending 的 task 数量降低max_concurrent_requests_per_worker至 60并增加thread_pool_size至 64将部分 IO 密集型工作卸载到线程池。性能提升 35%。OnDemandClient报ConnectionRefusedError但curl http://volga-on-demand-lb:8000/health返回 200Load Balancer 的健康检查路径/health是 HTTP但OnDemandClient默认用 HTTPS证书不匹配1.kubectl get svc volga-on-demand-lb -o yaml查看 service 的ports配置2.kubectl get ingress volga-on-demand -o yaml查看 TLS 配置3.OnDemandClient(urlhttp://...)强制指定 HTTP在 Ingress 配置中为/health路径添加nginx.ingress.kubernetes.io/ssl-redirect: false注解或在 Client 初始化时显式指定urlhttp://...。除了表格里的问题还有一个隐藏极深的坑时间戳精度丢失。Volga 的OnDemandRequest和OnDemandResponse都用datetime.now()生成时间戳但在高并发下Python 的datetime.now()默认精度是微秒microsecond而某些存储如 Scylla的timestamp类型只支持毫秒millisecond。这会导致fetch_range查询时start和end时间戳被截断查不到数据。我们的解决方案是在OnDemandDataConnector的fetch_range方法中手动将Decimal时间戳转换为整数毫秒并在query_dict中提供range_ms这个更精确的查询类型。5. 与 Ray 生态的协同为什么说 Volga 是 Ray 的“最后一块拼图”很多人初看 Volga会觉得它和 Ray Serve 功能重叠。但深入用过之后才会明白它们不是竞品而是天作之合。Ray Serve 是模型的“门面”Volga 是特征的“厨房”。一个负责把模型包装成 API一个负责把数据烹制成模型能吃的“食材”。5.1 架构层面的无缝集成Volga 的整个 On-Demand 层从 Coordinator 到 Server全部构建在 Ray Actor 之上。这意味着共享资源调度你可以用同一个 Ray Cluster既跑 Volga 的 OnDemandServer又跑 Ray Serve 的 Model Deployment。Ray 的全局资源视图CPU/GPU/Memory能智能地把计算密集型的real_time_risk_score分配到有 GPU 的节点把 IO 密集型的user_profile查询分配到靠近 Redis 的节点。零成本通信当 Ray Serve 的一个模型需要特征时它可以直接ray.get(on_demand_coordinator.get_feature.remote(...))绕过 HTTP 网络走 Ray 的高效共享内存Plasma Store。我们实测这种 intra-cluster 调用比 HTTP 调用快 8 倍P99 延迟从 15ms 降到 1.8ms。统一可观测性Ray Dashboard 不仅能看到 Serve 的 QPS、延迟还能看到 Volga Coordinator 的 Worker 状态、Pending Requests、以及每个 Server 的 CPU/Memory 使用率。一个 Dashboard掌控全链路。5.2 开发体验的范式升级以前一个完整的实时推理链路你需要写三套代码Streaming JobFlink/Spark用 Java/Scala 写 pipeline featureFeature Serving Layer自研/Feast用 Go/Python 写 HTTP 服务暴露 pipeline featureModel Serving LayerTriton/Ray Serve用 C/Python 写模型加载和推理。现在Volga Ray Serve 让这一切收敛为一套 Python 代码# volga_features.py on_demand(dependencies[(user_profile, latest)]) def enriched_user_input(user: UserEntity) - EnrichedInput: return EnrichedInput( features[user.age, user.income, user.click_rate], metadata{source: volga} ) # ray_serve_deployment.py from ray import serve from volga.core.on_demand.client import OnDemandClient serve.deployment class RiskModelDeployment: def __init__(self): self.client OnDemandClient(http://volga-coordinator:8000) async def __call__(self, request: starlette.requests.Request): # 1. 用 Volga 获取实时特征 feature_request OnDemandRequest( target_features[enriched_user_input], feature_keys{enriched_user_input: [{id: request.query_params[user_id]}]} ) feature_resp await self.client.request(feature_request) # 2. 将特征喂给模型这里简化为一个 numpy 计算 input_data np.array(feature_resp.results[enriched_user_input][0][0].features) prediction self.model.predict(input_data) return {risk_score: float(prediction)}这套代码on_demand定义特征serve.deployment定义模型两者通过OnDemandClient无缝串联。部署时ray deploy volga_features.py和ray serve deploy ray_serve_deployment.py两条命令整个链路就跑起来了。没有 Kafka Topic 创建、没有 Redis Key Schema 设计、没有 Nginx 路由配置——所有复杂性都被 Volga 和 Ray 抽象掉了。5.3 未来演进从 Online 到 Offline 的“请求-事件”双向映射Volga 当前的 on-demand feature 只支持 online 模式这是一个明确的限制但也是一个精心设计的起点。作者在 “Next steps” 里提到的“将 request-response 转为 event stream”其实在工程上已经有成熟路径。我们的做法是在模型服务层Ray Serve埋点将每一次RiskModelDeployment.__call__的输入user_id,ip_address,timestamp和输出risk_score作为一条结构化事件写入 Kafka。然后用 Volga 的 Streaming Engine 消费这条 Kafka Topic定义一个新的 pipeline featurehistorical_risk_score它把过去 30 天的risk_score序列化为一个List[float]写入 HDFS。这样historical_risk_score就成了一个“离线生成、在线查询”的 hybrid feature。它在 offline mode 下由 Streaming Engine 计算在 online mode 下由 OnDemandServer 通过OnDemandDataConnector读取。on_demand函数可以同时依赖user_profileonline和historical_risk_scoreoffline真正实现了“一份代码两种模式”。这个方案的关键在于OnDemandDataConnector的query_dict必须支持historical这个查询类型并能根据timestamp字段从 HDFS 的分区路径如hdfs://path/year2025/month04/day06/中精准定位 Parquet 文件。我们已经把这个 Connector 开源在 GitHub 上欢迎参考。我个人在实际操作中的体会是Volga 的价值不在于它有多炫酷的技术而在于它用一种极其克制的、面向问题的工程哲学把实时 AI/ML 这个混沌领域切出了一个清晰、可交付、可运维的“请求计算”子域。它不试图取代 Flink也不试图挑战 Ray Serve而是坚定地守好自己的那一段——当请求打进来时那关键的几百毫秒里让计算稳稳地发生。这就是专业。