WebSocket 重连后 K 线还缺?Python 检测缺口 + REST 回补 + gap_report 留痕**

📅 2026/7/3 2:47:49
WebSocket 重连后 K 线还缺?Python 检测缺口 + REST 回补 + gap_report 留痕**
摘要WebSocket 行情断流后重连成功系统显示一切正常——但中间缺的那几根 K 线不会自己回来。连接恢复不等于数据连续断流窗口必须通过 REST K 线独立回补并用 gap_report 把每一次回补留痕。本文给出一套完整的 Python 方案从缺口检测、分段回补、结果校验到失败分支处理附带可直接集成的骨架代码和一份最小检查清单。正文一、问题现象重连成功K 线图却出现一根平直的线凌晨三点你的行情监控脚本告警WebSocket 断开。五秒后重连成功日志显示连接恢复心跳正常。你看了一眼 K 线图一切正常。没有跳空没有断崖价格曲线平滑。于是你关掉告警继续睡觉。两周后跑策略回测发现那天的分钟线有几根不对。排查了很久才找到根因凌晨断流那段时间缺了几分钟数据而重连后 WebSocket 只推送了当前快照没有把断流窗口的 K 线补回来。你看到的平滑曲线其实是前端把缺失段直接连起来了。重连成功 ≠ 数据连续。连接 watchdog 只能告诉你“通道恢复了”不能告诉你“中间窗口的数据是完整的”。断流回补不是一个“要不要做”的问题而是一个“怎样可靠地做”的问题。下面把这件事拆成三步检测缺口、回补数据、留痕记录。二、为什么 watchdog 发现了断流却不能证明数据完整一个典型的 WebSocket watchdog 做三件事心跳超时检测、连接状态监控、重连触发。这三件事全部围绕“通道”展开。它不负责回答断流期间到底缺了几根 K 线重连后收到的第一条数据时间戳和断流前最后一条是否连续回补的数据是不是完整覆盖了缺失窗口还是只补了一部分连接是传输层的概念K 线是业务层的概念。通道的恢复不能自动推导出数据的完整。这个推导必须由你的业务代码主动完成。常见的三种缺数据情况缺段断流窗口内的 K 线全部缺失重连后直接跳到当前时间。缺条窗口内部分 K 线缺失比如 10 分钟内缺了 3 根。重复覆盖重连后推送了部分重叠数据但不是完整窗口导致你以为补全了其实还有缺口。这三种情况如果不主动做缺口检测都会被“连接正常”的状态掩盖。三、gap_report 字段设计把每一次回补都变成可审计记录进行回补前先约定好要记录什么。下面是一份完整的 gap_report 字段表建议直接作为数据库表结构或日志 schema。字段名类型说明symbolstr标的代码如600519.SHintervalstrK 线周期如1d、1mgap_startdatetime缺口起始时间基于 expected 序列gap_enddatetime缺口结束时间missing_timeslist[datetime]缺失的时间点列表从 expected 对比 actual 得出expected_countint应该有的 K 线条数recovered_countint实际回补成功的条数recovered_timeslist[datetime]实际回补得到的时间点列表raw_snapshot_idstr回补请求原始响应体的哈希摘要SHA-256 前 16 位statusstrfull/partial/unrecoverable/empty/failednotestr人工或自动备注如“缺 3 条2 条补回1 条仍缺失”reported_atdatetime报告生成时间UTCstatus 的判定规则fullmissing_times中的所有时间点都在recovered_times中出现且无多余。partialrecovered_times只覆盖了missing_times的一部分。unrecoverable经确认如查询历史覆盖范围、权限等该时段数据确实不可获取。emptyREST 请求成功但返回空数据待人工确认是否属于不可恢复。failedREST 请求本身失败超时、HTTP 错误等。四、Python 检测缺口拿“应该有”和“实际有”做比对缺口检测的核心逻辑很简单根据 K 线周期和你自己维护的交易日历生成断流窗口内“应该有”的时间序列然后跟本地已入库的行情时间序列actual_times做集合差。注意actual_times不能直接拿 WebSocket 推送的每根 K 线的时间而应该取自本地聚合后的 bar_time、入库记录或业务侧有效行情时间轴。这是因为 WebSocket 可能在断流前后重复推送部分数据导致本地已过滤的时间序列才反映最终入库的真实情况。fromtypingimportListfromdatetimeimportdatetimedefdetect_gaps(expected_times:List[datetime],actual_times:List[datetime])-dict: 检测缺口expected_times 应该有actual_times 实际有。 返回 missing缺失、overlap重叠、ok。 expected_setset(expected_times)actual_setset(actual_times)missingsorted(expected_set-actual_set)overlapsorted(expected_setactual_set)unexpectedsorted(actual_set-expected_set)return{missing_times:missing,missing_count:len(missing),overlap_count:len(overlap),unexpected_count:len(unexpected),is_complete:len(missing)0andlen(unexpected)0}expected_times 怎么来根据你请求的 interval 和断流起止时间推算。比如 1 分钟 K 线断流 10 分钟就应该有 10 个 expected_times。这里必须参照交易日历排除非交易时段、午休和节假日——否则会把不应该有 K 线的时段也计为“缺失”导致 expected_count 虚高回补窗口永远有一批“无法补上”的缺口。actual_times 怎么来从本地数据库中该 symbol 在此时间区间内的所有 bar 的time字段提取并归一化到与 expected_times 相同的精度如分钟级截断。五、为什么必须用 REST 回补而不是让 WebSocket 重新推WebSocket 是面向实时推送的轻量通道服务端推送的是“当前及未来”的增量数据。如果要求它在重连后补推所有历史快照服务端就需要为每一个连接维护快照缓存和增量序列——这与实时推送的轻量定位相矛盾。因此历史回补必须走另一条通道REST K 线接口按需拉取指定时间窗口的完整历史数组。MCP 工具只适合 AI 按需查询不应放进自动化监控的断流回补链路。REST K 线接口是这个场景下最合适的选择一次请求拿一个完整时间窗口的 K 线返回结构确定不依赖连接状态。重要提醒REST K 线接口只能用于“查询缺口窗口”不能承诺一定完整回补。历史数据是否可用取决于数据源的存储策略和历史覆盖范围。六、REST K 线回补分段拉取缺失窗口检测到缺口后用 REST K 线接口把缺失窗口的数据拉回来。一条 REST 请求不要覆盖太大时间窗口。如果缺口跨越数天按天或按小时分段请求。deffetch_kline_gap(symbol:str,interval:str,start:datetime,end:datetime)-dict: 用 REST K 线接口回补缺失窗口。 实际调用时替换为具体数据源的端点、鉴权方式和参数。 返回 gap_report 所需的核心字段。 # 教学骨架具体实现以数据源官方文档为准# 示例resp requests.get(kline_url, headers{X-API-Key: key},# params{symbol: symbol, interval: interval,# start: start, end: end})# 模拟返回结构return{symbol:symbol,interval:interval,start:start.isoformat(),end:end.isoformat(),klines:[],# 实际返回的 K 线数组每条含 time/open/high/low/close/volumestatus:success,# success / partial / emptyraw_snapshot:{}# 完整响应体用于留痕}回补前先查重用 gap_start gap_end symbol interval 四个字段去重。如果已有同窗口的成功回补记录跳过本次请求避免浪费 API 配额。七、回补结果校验与 gap_report 生成拿到 REST 返回的 K 线后提取time列表作为recovered_times与missing_times做对比判定 status 并生成 gap_report。importhashlibimportjsonfromdatetimeimportdatetime,timezonedefwrite_gap_report(symbol:str,interval:str,missing_times:list,recovered_klines:list,raw_response:dict,request_error:strNone)-dict:生成并返回 gap_report 记录。# 原始响应快照哈希raw_idhashlib.sha256(json.dumps(raw_response,sort_keysTrue,ensure_asciiFalse,defaultstr).encode()).hexdigest()[:16]expected_countlen(missing_times)# 提取 recovered_times需归一化到与 missing_times 相同精度recovered_times[]forbarinrecovered_klines:tbar.get(time)ift:# 示例毫秒时间戳转分钟级 datetime这里假设 expected 是分钟级# 实际实现需根据 interval 调整归一化策略dtdatetime.fromtimestamp(t/1000,tztimezone.utc).replace(second0,microsecond0)recovered_times.append(dt)recovered_countlen(recovered_times)ifrequest_error:statusfailedelifrecovered_count0:statusemptyelse:missing_setset(missing_times)recovered_setset(recovered_times)ifmissing_setrecovered_set:statusfullelifmissing_set.issubset(recovered_set):# 补回的数据包含了所有缺失且还有多余可能有时段外的statusfullelse:statuspartialnotefexpected{expected_count}, recovered{recovered_count}ifstatuspartial:still_missingsorted(missing_set-recovered_set)notef, still missing:{still_missing}elifstatusfailed:noterequest_errorelifstatusempty:note, check if gap window is outside available history rangereturn{symbol:symbol,interval:interval,gap_start:min(missing_times).isoformat()ifmissing_timeselseNone,gap_end:max(missing_times).isoformat()ifmissing_timeselseNone,missing_times:[t.isoformat()fortinmissing_times],expected_count:expected_count,recovered_count:recovered_count,recovered_times:[t.isoformat()fortinrecovered_times],raw_snapshot_id:raw_id,status:status,note:note,reported_at:datetime.now(timezone.utc).isoformat()}状态partial和unrecoverable的区分REST 返回了部分数据但没拉全是partial。REST 返回空数据并且确认了查询参数、权限、历史覆盖范围后判定该时段数据确实不存在应由人工或规则升级为unrecoverable。八、6 个失败分支#失败场景处理方式①REST 回补请求返回空data先标empty确认参数和权限无误后若该时段超出历史覆盖范围则升级为unrecoverable②回补条数不足recovered expectedstatuspartial明确标注缺失条数和对应时间点③交易日历误判导致 expected 不准确note中标注“交易日历可能不准确”不强制补数④interval 不一致回补用了和订阅不同的周期阻断修正 interval 参数后重新拉取⑤同一段缺口被重复回补写入回补前先查 gap_report 表按 gap_start gap_end symbol interval 去重⑥raw_snapshot 未保存statusfailednote中标注“缺失原始快照”该条 gap_report 标记为不可复查九、TickDB 的工程边界上面这套断流检测、REST 回补和 gap_report 留痕流程是一套通用的工程方法不绑定任何特定数据源。如果你用 TickDB 做行情接入它在断流回补场景中承担两个明确的角色WebSocket 负责持续推送。实时行情通过 WebSocket 通道到达鉴权使用api_key查询参数。REST K 线负责历史回补。断流窗口的缺失 K 线通过 REST 接口拉取鉴权使用X-API-KeyHeader。两个通道各司其职不能混用。WebSocket 不是历史数据源REST K 线不是实时推送通道。MCP 使用X-TickDB-Key是给 AI 工具按需查询用的不适合放进自动化监控的断流回补链路。所有端点、字段路径、timestamp 语义和时间戳口径以 TickDB 官方文档和你自己的实测为准。十、断流回补的最小检查清单重连后有没有根据订阅的 interval 和交易日历生成 expected_times有没有用 actual_times来自本地入库记录和 expected_times 做集合差确认缺失窗口缺口回补是不是用 REST K 线接口按段拉取而不是靠 WebSocket 重新推送每一次回补有没有生成 gap_report包含 missing_times 和 recovered_times 的对比statuspartial的记录有没有明确标注仍缺失的时间点statusempty的记录有没有人工确认是否属于unrecoverable回补前有没有检查 gap_report 表防止重复写入同一缺口原始响应有没有保存 raw_snapshot_id供事后复查全部回补完成后有没有检查 status 分布确认是否存在 unrecoverable 窗口 本文以 TickDB WebSocket 和 REST K 线接口作为行情接入示例。文中代码为 Python 教学骨架不依赖任何特定数据源的端点或字段。本文仅讨论断流回补的工程方法不构成投资建议。