数据自愈模块综合问题分析报告
数据自愈模块综合问题分析报告
Context
本分析旨在全面梳理 src/utils/data_healing/ 数据自愈模块存在的各类问题,结合三份参考文档(数据自愈BUG Cursor 1.md、数据自愈BUG Cursor 2.md、启动性能优化设计文档.md)及代码实地核查,形成统一的问题清单与修复优先级建议。
一、模块架构速览
DataHealingOrchestrator.heal_and_prepare()
Phase 1 : _load_zscore_history() ← SQL 时间窗口加载
Phase 2–3: 诊断-修复循环(最多3轮)
├─ _diagnose() ← 三项检查(连续性/新鲜度/数量)
│ ├─ ContinuityChecker.check_continuity()
│ └─ _check_freshness()
├─ RepairExecutor.repair() ← 补K线 + 重算zscore
└─ _load_zscore_history() ← 重载验证
Phase 4 : _final_assessment() → QualityAssessor.assess()
关键配置(config.py / src/config.py):
HEALING_MAX_ITERATIONS = 3FRESHNESS_MULTIPLIER = 2.5→ 新鲜度阈值 = 240min × 2.5 = 600minHEALING_FALLBACK_THRESHOLD = 0.80/HEALING_REJECT_THRESHOLD = 0.60TOLERANCE_RATIO = 0.2(连续性容差 ±20%)
二、问题清单(按严重程度分级)
🔴 严重 BUG(直接导致修复失效)
BUG-01:加载语义与修复语义不一致 → 修复永远无法收敛
位置:
orchestrator.pyL407–486(_load_zscore_history)orchestrator.pyL383–401(_generate_shortfall_targets)
根因:
加载层:WHERE kline_time >= NOW() - make_interval(hours => N)
↑ 固定时间窗,只看"过去N小时"
修复层:向 earliest_time 之前 补写更早的时间点
↑ 越补越往过去扩展
结果:补写 T0(更早)→ 重载仍用 NOW()-12h 窗 → T0 在窗外被过滤
→ 仍看到原来的条数 → 3轮循环全部无效 → 陷入 degraded/failed
完整因果链日志佐证:
[轮1] 数量不足 (2/3) → 生成 shortfall_targets=[T0] → repair成功=1
[轮2] 重载: 仍2条 → 数量不足 (2/3) → 重新生成 T0 → repair成功=1(重复写)
[轮3] 重载: 仍2条 → 数量不足 (2/3) → 3轮耗尽 → degraded
影响:所有数量不足的配对均受影响(SEI、XAI、IOTA 等数据较少的配对)
BUG-02:单条数据时无法生成任何修复目标
位置:
continuity_checker.pyL55–57orchestrator.pyL233–239
根因:
# continuity_checker.py L55-57(实际代码确认)
if len(records) < 2:
return False, [], completeness_pct # is_continuous 强制为 False
# orchestrator.py _diagnose() L233-238
if not is_fresh and is_continuous and len(records) >= required_count:
stale_targets = ... # 需 is_continuous=True AND count>=required
if is_continuous and not gap_times and len(records) < required_count:
shortfall_targets = ... # 需 is_continuous=True
# 结论:len(records)==1 时 is_continuous=False
# → stale 条件不满足(is_continuous=False)
# → shortfall 条件不满足(is_continuous=False)
# → 修复目标为空 → 终止修复
日志表现:
数量不足(1/3), 已过时1284min
无法确定修复目标,终止修复
影响:所有仅剩1条历史记录的配对(系统长期离线后常见)
BUG-03:过时 + 数量不足的组合场景无法修复
位置:orchestrator.py L233–239
根因:两类修复目标的生成条件相互排斥:
# stale 生成:需要 is_continuous=True AND len>=required(数量充足才允许)
# shortfall 生成:需要 is_continuous=True AND len<required(数量不足才允许)
# 当 len(records)==1:
# - is_continuous=False → 两者都不满足
# - 既无 stale 也无 shortfall → 修复目标为空
本质:BUG-02 的扩展场景,1条数据时"过时"条件加重了问题(stale条件额外要求count>=required,更难满足)
🟠 中等 BUG(功能受损、日志误导)
BUG-04:「尝试更大范围」日志具有误导性
位置:orchestrator.py L418–425(时间窗列表定义)
根因:
time_ranges_hours = [
math.ceil(needed_hours), # 例:12h
math.ceil(needed_hours * 1.3), # 例:16h
math.ceil(needed_hours * 2), # 例:24h ← 最大仅到此
]
# 日志:"数据不足: 1条(24h窗口), 尝试更大范围"
# 但实际上24h已是最后一档,后面直接进入诊断,并无48h/72h
影响:日志误导排查方向;实际未尝试更大窗口,问题根因(加载语义)被掩盖
BUG-05:修复目标在多轮中重复写入数据库
根因:BUG-01 的副作用。因重载看不到补写的条,每轮都重新生成相同的 shortfall_targets,RepairExecutor.repair() 对同一时间点重复执行 K线拉取 + zscore写入操作。
影响:
- 无效的 API 调用(消耗速率限制)
- 数据库重复写入(依赖DB层的幂等性保护)
- 3轮修复时间全部浪费
🟡 性能问题(启动缓慢)
PERF-01:完全串行处理,无预过滤
位置:src/services/realtime_kline_service_base.py L1766–1851(_run_data_healing)
现象:100个配对全部串行走完整 4 阶段自愈流程(含 SQL 查询、API 调用),健康配对与问题配对同等耗时。
量化:
| 场景 | 当前耗时 | 期望 |
|---|---|---|
| 100对全健康(正常重启) | ~100s | <5s |
| 80健康+20需修复 | ~260s | ~50s |
| 20需修复(首次/离线恢复) | ~160s | ~45s |
根因:
- 无批量预检(无法快速跳过已健康配对)
- 串行
for循环,无并行 HEALING_TIMEOUT_SECONDS=300(5分钟),串行仅能处理约30对
PERF-02:SIGALRM 超时机制在多线程下不可用
位置:realtime_kline_service_base.py(超时控制)
根因:Python signal.SIGALRM 只能在主线程使用,并行化时子线程无法使用此超时机制,需改为 threading.Timer。
🟢 潜在风险(低优先级)
RISK-01:WebSocket 重连竞态窗口
位置:enhanced_ws_manager.py L744–756
风险:stop() 与 _on_close 同时触发时,虽有二次检查,但 stop_event.set() 与状态更新之间仍存在微小竞态窗口,极端情况可能导致停止期间触发一次额外重连。
缓解:已有 _reconnect_lock 和 stop_event 二次检查,实际触发概率低。
RISK-02:continuity_checker 对0条数据和1条数据语义混淆
位置:continuity_checker.py L49–57
问题:0条(not records)和 1条(len < 2)都返回 is_continuous=False,但语义不同:
- 0条:「无数据,无法判断」
- 1条:「有基准点,只是无法测量间隔」
调用层(orchestrator)无法区分这两种情况,导致修复策略不够精准。
三、问题根因总结图
所有BUG的共同根源:
┌─────────────────────────────────────────────────────┐
│ 加载语义(时间窗)与修复语义(向前扩展)不一致 │ ← BUG-01,04,05
│ 连续性检查对 len<2 过于严格 │ ← BUG-02,03
│ 修复目标生成条件相互依赖,组合场景未充分考虑 │ ← BUG-03
│ 串行处理缺少预过滤 │ ← PERF-01
└─────────────────────────────────────────────────────┘
四、修复方案建议
修复 BUG-01,04,05(核心):改加载语义为「最近N条」
文件:src/utils/data_healing/orchestrator.py _load_zscore_history()
# 旧(时间窗):
WHERE kline_time >= NOW() - make_interval(hours => %s)
# 新(最近N条,可加7天安全边界):
WHERE kline_time >= NOW() - INTERVAL '7 days'
ORDER BY kline_time DESC
LIMIT %s # = required_count + buffer
收益:补写的条无论多早,重载时都能看到 → 修复在第1轮即可收敛
修复 BUG-02,03(核心):解耦连续性与shortfall生成
文件:orchestrator.py _diagnose() + continuity_checker.py
# 方向1:continuity_checker 区分「0条」和「1条」
if len(records) < 2:
return True, [], completeness_pct # 1条视为"连续"(无矛盾)
# 方向2:orchestrator _diagnose() 解耦条件
# shortfall 不依赖 is_continuous(数量不足就补):
if len(records) < required_count:
shortfall_targets = self._generate_shortfall_targets(records, required_count)
# stale 条件放宽(即使 count<required 也允许更新过时的已有条):
if not is_fresh:
stale_targets = self._generate_stale_targets(records)
修复 PERF-01(性能):批量预过滤 + 并行化
文件:realtime_kline_service_base.py _run_data_healing()
# Step 1:一次SQL查询全量配对的最新时间和数量
healthy_pairs, sick_pairs = _quick_health_check_batch(all_pairs)
# Step 2:直接跳过健康配对
# Step 3:并行处理需修复的配对
with ThreadPoolExecutor(max_workers=HEALING_WORKERS) as pool:
futures = [pool.submit(heal_one, pair) for pair in sick_pairs]
注意:并行化需要对 KlineDataFiller.fill_cooldown 字典加 threading.Lock,超时改用 threading.Timer。
五、修复优先级与实施顺序
| 优先级 | 问题 | 修改文件 | 难度 | 收益 |
|---|---|---|---|---|
| P0 | BUG-01:加载语义不一致 | orchestrator.py |
低 | 高(修复收敛) |
| P0 | BUG-02:1条不修复 | continuity_checker.py + orchestrator.py |
中 | 高(覆盖更多场景) |
| P1 | BUG-03:过时+不足组合 | orchestrator.py |
低 | 中 |
| P1 | PERF-01:预过滤 | realtime_kline_service_base.py |
中 | 极高(正常重启<5s) |
| P2 | PERF-01:并行化 | 同上 + repair_executor.py |
高 | 高(病态情况50%提速) |
| P3 | BUG-04:日志误导 | orchestrator.py |
低 | 低(可观测性) |
| P3 | RISK-01:WS竞态 | enhanced_ws_manager.py |
高 | 低(概率极低) |
六、涉及的关键文件
| 文件 | 关键改动点 |
|---|---|
src/utils/data_healing/orchestrator.py |
_load_zscore_history()(加载语义)、_diagnose()(条件解耦) |
src/utils/data_healing/continuity_checker.py |
L55-57(1条数据处理) |
src/utils/data_healing/repair_executor.py |
加 threading.Lock 保护 fill_cooldown |
src/services/realtime_kline_service_base.py |
_run_data_healing()(预过滤+并行) |
src/config.py |
新增 HEALING_WORKERS、调整 HEALING_TIMEOUT_SECONDS |
七、验证方案
- 单元测试:针对
continuity_checker的0条/1条/多条场景;orchestrator._diagnose()的组合条件 - 集成测试:模拟 DB 仅1条数据的配对,验证1轮修复后重载能看到3条
- 性能测试:正常重启场景,验证启动时间 <5s(100对全健康)
- 日志验证:观察「数量不足」配对的修复过程,确认「重载后条数增加」的日志出现
- 回归测试:已健康配对不受影响,仍正常启动