启动性能优化设计文档:数据自愈并行化 + 预过滤

启动性能优化设计文档:数据自愈并行化 + 预过滤

一、问题现象

每次启动时,日志中出现大量类似以下输出,耗时数分钟才能完成启动:

KlineDataFiller 初始化完成 | 交易所: hyperliquid
数据自愈编排器初始化 | VINE/USDC:USDC vs AAVE/USDC:USDC | timeframe: 4h
数据自愈启动 | 需要 3 条 | 间隔 240min | 理论跨度: 12h
数据不足: 2 条(12h 窗口),尝试更大范围
加载历史数据: 3 条 (16h 窗口,需要 12h)
...(每对重复一次)...
自愈失败: ADA/USDC:USDC | 完整度: 33.3%
自愈失败: TRB/USDC:USDC | 完整度: 33.3%

二、根因分析

2.1 调用入口

RealtimeKlineServiceBase.__init__()
  └─ _run_data_healing()   ← realtime_kline_service_base.py L1800+
       └─ for symbol, base_symbol in heal_pairs:   ← 串行循环
            └─ DataHealingOrchestrator.heal_and_prepare()
                 ├─ Phase 1: _load_zscore_history()   [DB 查询]
                 ├─ Phase 2: _diagnose()
                 ├─ Phase 3: RepairExecutor.repair()  [含 API 调用]
                 └─ Phase 4: _final_assessment()      [DB 查询]

2.2 三层叠加原因

层级 原因 量化影响
L1 架构层 完全串行:for 循环逐个处理所有配对,无并行 100对 × 1s = 100s(健康配对)
L2 逻辑层 无预过滤:健康配对(A级)也走完整四阶段流程 每对额外浪费约 0.5-1s
L3 外部依赖 系统离线后数据积累缺口:API 补充 K线耗时 1.5s/次 每对修复约 3-15s

2.3 关键参数

# 修复所需记录数(极小!)
repair_count = ceil(144 × 5 / 240) = 3   # 只需 3 条 4h 记录

# 新鲜度阈值
FRESHNESS_MULTIPLIER = 2.5
threshold = 240min × 2.5 = 600min(10小时)

# 超时保护
HEALING_TIMEOUT_SECONDS = 300   # 5分钟,串行时仅能处理约30对

# API 间隔 & 冷却
KLINE_FILLER_API_INTERVAL = 1.5s
KLINE_FILLER_COOLDOWN_SECONDS = 600

2.4 失败配对根因(ADA/TRB/ZETA 等)

诊断结果: 有0个连续性缺口, 数量不足(1/3), 已过时1281min
WARNING - 无法确定修复目标,终止修复
  • 数据过时 1281 分钟 ≈ 21 小时:系统离线约 21 小时,数据库中该配对最后一条记录距今超过 10 小时阈值
  • 在 24h 扩展窗口内仅能找到 1 条数据:该配对本身在 Hyperliquid 上历史 K 线稀少(新上市或低流动性币种)
  • shortfall_targets 计算需要连续时间点,数据点太少无法推断缺口位置 → 直接终止修复

三、优化方案

方案概览

_run_data_healing()
  │
  ├─ [现有] 构建 heal_pairs(有历史数据的配对)
  │
  ├─ [新增 Step A] 批量预过滤:1次SQL → 跳过健康配对
  │           ↓ pairs_to_heal(只剩需要修复的配对)
  │
  └─ [新增 Step B] 并行修复:ThreadPoolExecutor(4 workers)

方案 A:批量预过滤(Quick Pre-check)

改动文件src/services/realtime_kline_service_base.py

核心思路:在进入逐对循环前,用1次批量 SQL替代 N 次完整自愈流程,快速判断哪些配对已健康。

新增方法 _quick_health_check_batch()

def _quick_health_check_batch(self, heal_pairs: list, repair_count: int) -> set:
    """
    批量轻量级健康检查:1次SQL替代N次完整自愈流程。
    返回"已健康配对"集合,这些配对可跳过完整自愈。

    健康标准(与 DataHealingOrchestrator._diagnose() 一致):
      1. 最新 kline_time 距今 <= interval_minutes × FRESHNESS_MULTIPLIER(600分钟)
      2. 过去窗口内记录数 >= repair_count(3条)
    """
    from src.utils.data_healing.config import FRESHNESS_MULTIPLIER

    INTERVAL_MIN_4H = 240
    freshness_min = INTERVAL_MIN_4H * FRESHNESS_MULTIPLIER     # 600分钟
    needed_hours = math.ceil(repair_count * INTERVAL_MIN_4H / 60 * 1.5)  # ~1小时

    placeholders = ', '.join(['(%s, %s)'] * len(heal_pairs))
    params = [v for pair in heal_pairs for v in pair] + [needed_hours]

    query = f"""
        SELECT
            symbol,
            base_symbol,
            MAX(kline_time)  AS latest_time,
            COUNT(*)         AS record_count
        FROM analysis_results
        WHERE (symbol, base_symbol) IN ({placeholders})
          AND zscore_4h IS NOT NULL
          AND kline_time >= NOW() - make_interval(hours => %s)
        GROUP BY symbol, base_symbol
    """

    try:
        rows = self.db_client.execute_query(query, tuple(params))
        now_rows = self.db_client.execute_query("SELECT NOW() AS now")
        db_now = now_rows[0]['now']
    except Exception as e:
        self.logger.warning(f"批量健康预检查失败,回退到完整流程: {e}")
        return set()   # 失败时降级:所有配对走完整流程

    healthy = set()
    for row in (rows or []):
        staleness_min = (db_now - row['latest_time']).total_seconds() / 60
        is_fresh      = staleness_min <= freshness_min
        is_sufficient = row['record_count'] >= repair_count
        if is_fresh and is_sufficient:
            healthy.add((row['symbol'], row['base_symbol']))

    self.logger.info(
        f"批量预检查 | 总配对: {len(heal_pairs)} | "
        f"健康跳过: {len(healthy)} | "
        f"需修复: {len(heal_pairs) - len(healthy)} | "
        f"新鲜度阈值: {freshness_min:.0f}min"
    )
    return healthy

修改 _run_data_healing() 主逻辑

healed = 0 之前插入预过滤:

# --- 新增:批量预过滤 ---
# 提前计算 repair_count(与循环内逻辑一致)
INTERVAL_MIN_4H = 240
repair_count = math.ceil(144 * 5 / INTERVAL_MIN_4H)   # = 3

healthy_pairs = self._quick_health_check_batch(heal_pairs, repair_count)
healed = len(healthy_pairs)   # 健康配对直接计入成功
failed = 0
pairs_to_heal = [(s, b) for s, b in heal_pairs if (s, b) not in healthy_pairs]

if not pairs_to_heal:
    self.logger.info(f"🩺 数据自愈完成(全跳过)| 成功={healed} 失败=0")
    return
# --- 后续只处理 pairs_to_heal ---

方案 B:并行化修复

改动文件

  • src/services/realtime_kline_service_base.py — 引入 ThreadPoolExecutor,替换 SIGALRM
  • src/utils/data_healing/repair_executor.py — 添加 threading.Lock 串行化 API 调用

约束与决策

约束 说明 决策
SIGALRM 不支持子线程 signal.alarm() 在非主线程抛 ValueError 改用 threading.Timer
DB 连接池上限 10 TIMESCALEDB_POOL_MAX_SIZE = 10 限制 HEALING_WORKERS = 4
KlineDataFiller 冷却非线程安全 fill_cooldown 字典是实例级,竞争风险 共享 executor + 加锁

repair_executor.py 改动

# __init__ 新增
import threading
self._kline_fill_lock = threading.Lock()

# _fill_kline_gaps 中加锁(仅锁 API 调用,不锁 DB 查询)
def _fill_kline_gaps(self, kline_gaps, symbol, base_symbol, timeframe):
    for sym in [symbol, base_symbol]:
        with self._kline_fill_lock:   # 串行化 API,冷却状态安全共享
            filled = self.kline_filler.fill_missing_data_precise(
                symbol=sym,
                timeframe=timeframe,
                missing_timestamps=kline_gaps,
            )

realtime_kline_service_base.py 并行循环

替换原有串行 for 循环:

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

stop_event = threading.Event()
timeout_timer = threading.Timer(HEALING_TIMEOUT_SECONDS, stop_event.set)
timeout_timer.daemon = True
timeout_timer.start()

shared_executor = None   # 共享 RepairExecutor(含冷却锁)

def heal_one(pair):
    nonlocal shared_executor
    symbol, base_symbol = pair
    if stop_event.is_set():
        return symbol, 'timeout_skipped'
    try:
        healer = DataHealingOrchestrator(
            db_client=self.db_client,
            kline_repo=self.kline_repo,
            symbol=symbol,
            base_symbol=base_symbol,
            repair_timeframe='4h',
        )
        if shared_executor is None:
            shared_executor = healer.executor   # 首次:取出执行器
        else:
            healer.executor = shared_executor   # 后续:注入共享执行器
        result = healer.heal_and_prepare(required_count=repair_count)
        return symbol, result.status
    except Exception as e:
        self.logger.warning(f"自愈异常: {symbol} | {e}")
        return symbol, 'failed'

try:
    with ThreadPoolExecutor(max_workers=4, thread_name_prefix='healer') as pool:
        futures = {pool.submit(heal_one, p): p for p in pairs_to_heal}
        for future in as_completed(futures):
            if stop_event.is_set():
                for f in futures:
                    f.cancel()
                self.logger.warning(
                    f"数据自愈超时 ({HEALING_TIMEOUT_SECONDS}s),已完成 {healed} 个"
                )
                break
            sym, status = future.result(timeout=30)
            if status not in ('failed', 'timeout_skipped'):
                healed += 1
            elif status == 'failed':
                failed += 1
                self.logger.warning(f"自愈失败: {sym}")
finally:
    timeout_timer.cancel()

四、预期收益

场景 当前耗时 方案A后 A+B后
100对全健康(正常重启) ~100s ~1s(1次SQL) ~1s
80健康 + 20需修复 ~260s ~165s ~45s
20需修复(首次/长期离线) ~160s ~160s ~45s

最常见场景(正常重启,大部分配对健康):启动时间从 1-3 分钟降至 5秒以内


五、实施顺序与风险

实施顺序

  1. 先做方案A(预过滤):改动集中在一个函数,无线程安全风险,收益最大
  2. 再做方案B(并行化):需要验证 DB 连接池和 API 锁行为,需测试

风险与缓解

风险 场景 缓解措施
SQL 预检查与实际诊断标准不一致 预检查判为健康但实际有缺口 预检查失败降级;标准参数与 orchestrator 保持一致
DB 连接池耗尽 4个并行 worker 同时查询 HEALING_WORKERS = min(4, pool_max // 2) 动态限制
API 限速(冷却机制失效) 多线程绕过 600s 冷却 _kline_fill_lock 串行化所有 API 调用
线程中 shared_executor 竞争初始化 首次赋值 race condition 使用 threading.Lock 保护 shared_executor 赋值

六、关键文件索引

文件 关键位置 作用
src/services/realtime_kline_service_base.py L1800-1870 _run_data_healing() 主改动:预过滤 + 并行循环
src/utils/data_healing/repair_executor.py __init__, _fill_kline_gaps() 新增线程锁
src/utils/data_healing/orchestrator.py heal_and_prepare(), healing_timeout 参考:诊断逻辑,可废弃 SIGALRM 超时
src/utils/data_healing/config.py FRESHNESS_MULTIPLIER, timeframe_to_minutes 预检查参数来源
src/config.py HEALING_TIMEOUT_SECONDS, HEALING_MAX_ITERATIONS 超时与迭代配置

Read more

数据自愈模块提速3

数据自愈并发优化设计方案 一、现状分析 当前流程 _run_data_healing(realtime_kline_service_base.py:1807): for (symbol, base_symbol) in heal_pairs: # ~50+ 配对,串行 DataHealingOrchestrator(...) # → RepairExecutor.__init__ │ # → KlineDataFiller.__init__ │ # → Info(MAINNET_API_URL) HTTP 握手 ~0.85s │ # ← 第 2-50 个握手完成后才被 shared_executor 覆盖,全部浪费! _load_zscore_history()

By SHI XIAOLONG

数据自愈模块提速2

数据自愈并发优化设计方案 一、现状分析 当前流程 _run_data_healing(realtime_kline_service_base.py:1807): for (symbol, base_symbol) in heal_pairs: # ~50+ 配对,串行 DataHealingOrchestrator(...) # → RepairExecutor.__init__ │ # → KlineDataFiller.__init__ │ # → Info(MAINNET_API_URL) HTTP 握手 ~0.85s │ # ← 第 2-50 个握手完成后才被 shared_executor 覆盖,全部浪费! _load_zscore_history()

By SHI XIAOLONG

数据自愈模块综合问题分析报告2

数据自愈模块优化设计文档 版本:v1.0 日期:2026-02-23 范围:src/utils/data_healing/ + src/services/realtime_kline_service_base.py(_run_data_healing 部分) 目录 1. 问题全景与优先级总表 2. BUG-01:加载语义与修复语义不一致 3. BUG-02/03:单条/少量记录无法生成修复目标 4. BUG-04:日志误导性 5. BUG-05:重复写入(BUG-01 副作用) 6. BUG-06:test_basic.py 与 Diagnosis 定义失同步 7.

By SHI XIAOLONG