数据自愈模块综合问题分析报告

数据自愈模块综合问题分析报告

Context

本分析旨在全面梳理 src/utils/data_healing/ 数据自愈模块存在的各类问题,结合三份参考文档(数据自愈BUG Cursor 1.md数据自愈BUG Cursor 2.md启动性能优化设计文档.md)及代码实地核查,形成统一的问题清单与修复优先级建议。


一、模块架构速览

DataHealingOrchestrator.heal_and_prepare()
  Phase 1 : _load_zscore_history()        ← SQL 时间窗口加载
  Phase 2–3: 诊断-修复循环(最多3轮)
    ├─ _diagnose()                         ← 三项检查(连续性/新鲜度/数量)
    │   ├─ ContinuityChecker.check_continuity()
    │   └─ _check_freshness()
    ├─ RepairExecutor.repair()             ← 补K线 + 重算zscore
    └─ _load_zscore_history()             ← 重载验证
  Phase 4 : _final_assessment() → QualityAssessor.assess()

关键配置config.py / src/config.py):

  • HEALING_MAX_ITERATIONS = 3
  • FRESHNESS_MULTIPLIER = 2.5 → 新鲜度阈值 = 240min × 2.5 = 600min
  • HEALING_FALLBACK_THRESHOLD = 0.80 / HEALING_REJECT_THRESHOLD = 0.60
  • TOLERANCE_RATIO = 0.2(连续性容差 ±20%)

二、问题清单(按严重程度分级)


🔴 严重 BUG(直接导致修复失效)


BUG-01:加载语义与修复语义不一致 → 修复永远无法收敛

位置

  • orchestrator.py L407–486(_load_zscore_history
  • orchestrator.py L383–401(_generate_shortfall_targets

根因

加载层:WHERE kline_time >= NOW() - make_interval(hours => N)
         ↑ 固定时间窗,只看"过去N小时"

修复层:向 earliest_time 之前 补写更早的时间点
         ↑ 越补越往过去扩展

结果:补写 T0(更早)→ 重载仍用 NOW()-12h 窗 → T0 在窗外被过滤
      → 仍看到原来的条数 → 3轮循环全部无效 → 陷入 degraded/failed

完整因果链日志佐证

[轮1] 数量不足 (2/3) → 生成 shortfall_targets=[T0] → repair成功=1
[轮2] 重载: 仍2条   → 数量不足 (2/3) → 重新生成 T0 → repair成功=1(重复写)
[轮3] 重载: 仍2条   → 数量不足 (2/3) → 3轮耗尽 → degraded

影响:所有数量不足的配对均受影响(SEI、XAI、IOTA 等数据较少的配对)


BUG-02:单条数据时无法生成任何修复目标

位置

  • continuity_checker.py L55–57
  • orchestrator.py L233–239

根因

# continuity_checker.py L55-57(实际代码确认)
if len(records) < 2:
    return False, [], completeness_pct  # is_continuous 强制为 False

# orchestrator.py _diagnose() L233-238
if not is_fresh and is_continuous and len(records) >= required_count:
    stale_targets = ...  # 需 is_continuous=True AND count>=required

if is_continuous and not gap_times and len(records) < required_count:
    shortfall_targets = ...  # 需 is_continuous=True

# 结论:len(records)==1 时 is_continuous=False
# → stale 条件不满足(is_continuous=False)
# → shortfall 条件不满足(is_continuous=False)
# → 修复目标为空 → 终止修复

日志表现

数量不足(1/3), 已过时1284min
无法确定修复目标,终止修复

影响:所有仅剩1条历史记录的配对(系统长期离线后常见)


BUG-03:过时 + 数量不足的组合场景无法修复

位置orchestrator.py L233–239

根因:两类修复目标的生成条件相互排斥:

# stale 生成:需要 is_continuous=True AND len>=required(数量充足才允许)
# shortfall 生成:需要 is_continuous=True AND len<required(数量不足才允许)

# 当 len(records)==1:
# - is_continuous=False → 两者都不满足
# - 既无 stale 也无 shortfall → 修复目标为空

本质:BUG-02 的扩展场景,1条数据时"过时"条件加重了问题(stale条件额外要求count>=required,更难满足)


🟠 中等 BUG(功能受损、日志误导)


BUG-04:「尝试更大范围」日志具有误导性

位置orchestrator.py L418–425(时间窗列表定义)

根因

time_ranges_hours = [
    math.ceil(needed_hours),          # 例:12h
    math.ceil(needed_hours * 1.3),    # 例:16h
    math.ceil(needed_hours * 2),      # 例:24h  ← 最大仅到此
]
# 日志:"数据不足: 1条(24h窗口), 尝试更大范围"
# 但实际上24h已是最后一档,后面直接进入诊断,并无48h/72h

影响:日志误导排查方向;实际未尝试更大窗口,问题根因(加载语义)被掩盖


BUG-05:修复目标在多轮中重复写入数据库

根因:BUG-01 的副作用。因重载看不到补写的条,每轮都重新生成相同的 shortfall_targets,RepairExecutor.repair() 对同一时间点重复执行 K线拉取 + zscore写入操作。

影响

  • 无效的 API 调用(消耗速率限制)
  • 数据库重复写入(依赖DB层的幂等性保护)
  • 3轮修复时间全部浪费

🟡 性能问题(启动缓慢)


PERF-01:完全串行处理,无预过滤

位置src/services/realtime_kline_service_base.py L1766–1851(_run_data_healing

现象:100个配对全部串行走完整 4 阶段自愈流程(含 SQL 查询、API 调用),健康配对与问题配对同等耗时。

量化

场景 当前耗时 期望
100对全健康(正常重启) ~100s <5s
80健康+20需修复 ~260s ~50s
20需修复(首次/离线恢复) ~160s ~45s

根因

  1. 无批量预检(无法快速跳过已健康配对)
  2. 串行 for 循环,无并行
  3. HEALING_TIMEOUT_SECONDS=300(5分钟),串行仅能处理约30对

PERF-02:SIGALRM 超时机制在多线程下不可用

位置realtime_kline_service_base.py(超时控制)

根因:Python signal.SIGALRM 只能在主线程使用,并行化时子线程无法使用此超时机制,需改为 threading.Timer


🟢 潜在风险(低优先级)


RISK-01:WebSocket 重连竞态窗口

位置enhanced_ws_manager.py L744–756

风险stop()_on_close 同时触发时,虽有二次检查,但 stop_event.set() 与状态更新之间仍存在微小竞态窗口,极端情况可能导致停止期间触发一次额外重连。

缓解:已有 _reconnect_lockstop_event 二次检查,实际触发概率低。


RISK-02:continuity_checker 对0条数据和1条数据语义混淆

位置continuity_checker.py L49–57

问题:0条(not records)和 1条(len < 2)都返回 is_continuous=False,但语义不同:

  • 0条:「无数据,无法判断」
  • 1条:「有基准点,只是无法测量间隔」

调用层(orchestrator)无法区分这两种情况,导致修复策略不够精准。


三、问题根因总结图

所有BUG的共同根源:
┌─────────────────────────────────────────────────────┐
│  加载语义(时间窗)与修复语义(向前扩展)不一致    │  ← BUG-01,04,05
│  连续性检查对 len<2 过于严格                        │  ← BUG-02,03
│  修复目标生成条件相互依赖,组合场景未充分考虑       │  ← BUG-03
│  串行处理缺少预过滤                                 │  ← PERF-01
└─────────────────────────────────────────────────────┘

四、修复方案建议

修复 BUG-01,04,05(核心):改加载语义为「最近N条」

文件src/utils/data_healing/orchestrator.py _load_zscore_history()

# 旧(时间窗):
WHERE kline_time >= NOW() - make_interval(hours => %s)

# 新(最近N条,可加7天安全边界):
WHERE kline_time >= NOW() - INTERVAL '7 days'
ORDER BY kline_time DESC
LIMIT %s  # = required_count + buffer

收益:补写的条无论多早,重载时都能看到 → 修复在第1轮即可收敛


修复 BUG-02,03(核心):解耦连续性与shortfall生成

文件orchestrator.py _diagnose() + continuity_checker.py

# 方向1:continuity_checker 区分「0条」和「1条」
if len(records) < 2:
    return True, [], completeness_pct  # 1条视为"连续"(无矛盾)

# 方向2:orchestrator _diagnose() 解耦条件
# shortfall 不依赖 is_continuous(数量不足就补):
if len(records) < required_count:
    shortfall_targets = self._generate_shortfall_targets(records, required_count)

# stale 条件放宽(即使 count<required 也允许更新过时的已有条):
if not is_fresh:
    stale_targets = self._generate_stale_targets(records)

修复 PERF-01(性能):批量预过滤 + 并行化

文件realtime_kline_service_base.py _run_data_healing()

# Step 1:一次SQL查询全量配对的最新时间和数量
healthy_pairs, sick_pairs = _quick_health_check_batch(all_pairs)

# Step 2:直接跳过健康配对
# Step 3:并行处理需修复的配对
with ThreadPoolExecutor(max_workers=HEALING_WORKERS) as pool:
    futures = [pool.submit(heal_one, pair) for pair in sick_pairs]

注意:并行化需要对 KlineDataFiller.fill_cooldown 字典加 threading.Lock,超时改用 threading.Timer


五、修复优先级与实施顺序

优先级 问题 修改文件 难度 收益
P0 BUG-01:加载语义不一致 orchestrator.py 高(修复收敛)
P0 BUG-02:1条不修复 continuity_checker.py + orchestrator.py 高(覆盖更多场景)
P1 BUG-03:过时+不足组合 orchestrator.py
P1 PERF-01:预过滤 realtime_kline_service_base.py 极高(正常重启<5s)
P2 PERF-01:并行化 同上 + repair_executor.py 高(病态情况50%提速)
P3 BUG-04:日志误导 orchestrator.py 低(可观测性)
P3 RISK-01:WS竞态 enhanced_ws_manager.py 低(概率极低)

六、涉及的关键文件

文件 关键改动点
src/utils/data_healing/orchestrator.py _load_zscore_history()(加载语义)、_diagnose()(条件解耦)
src/utils/data_healing/continuity_checker.py L55-57(1条数据处理)
src/utils/data_healing/repair_executor.py threading.Lock 保护 fill_cooldown
src/services/realtime_kline_service_base.py _run_data_healing()(预过滤+并行)
src/config.py 新增 HEALING_WORKERS、调整 HEALING_TIMEOUT_SECONDS

七、验证方案

  1. 单元测试:针对 continuity_checker 的0条/1条/多条场景;orchestrator._diagnose() 的组合条件
  2. 集成测试:模拟 DB 仅1条数据的配对,验证1轮修复后重载能看到3条
  3. 性能测试:正常重启场景,验证启动时间 <5s(100对全健康)
  4. 日志验证:观察「数量不足」配对的修复过程,确认「重载后条数增加」的日志出现
  5. 回归测试:已健康配对不受影响,仍正常启动

Read more

数据自愈模块提速3

数据自愈并发优化设计方案 一、现状分析 当前流程 _run_data_healing(realtime_kline_service_base.py:1807): for (symbol, base_symbol) in heal_pairs: # ~50+ 配对,串行 DataHealingOrchestrator(...) # → RepairExecutor.__init__ │ # → KlineDataFiller.__init__ │ # → Info(MAINNET_API_URL) HTTP 握手 ~0.85s │ # ← 第 2-50 个握手完成后才被 shared_executor 覆盖,全部浪费! _load_zscore_history()

By SHI XIAOLONG

数据自愈模块提速2

数据自愈并发优化设计方案 一、现状分析 当前流程 _run_data_healing(realtime_kline_service_base.py:1807): for (symbol, base_symbol) in heal_pairs: # ~50+ 配对,串行 DataHealingOrchestrator(...) # → RepairExecutor.__init__ │ # → KlineDataFiller.__init__ │ # → Info(MAINNET_API_URL) HTTP 握手 ~0.85s │ # ← 第 2-50 个握手完成后才被 shared_executor 覆盖,全部浪费! _load_zscore_history()

By SHI XIAOLONG

数据自愈模块综合问题分析报告2

数据自愈模块优化设计文档 版本:v1.0 日期:2026-02-23 范围:src/utils/data_healing/ + src/services/realtime_kline_service_base.py(_run_data_healing 部分) 目录 1. 问题全景与优先级总表 2. BUG-01:加载语义与修复语义不一致 3. BUG-02/03:单条/少量记录无法生成修复目标 4. BUG-04:日志误导性 5. BUG-05:重复写入(BUG-01 副作用) 6. BUG-06:test_basic.py 与 Diagnosis 定义失同步 7.

By SHI XIAOLONG