数据自愈模块提速2

数据自愈并发优化设计方案

一、现状分析

当前流程

_run_data_healingrealtime_kline_service_base.py:1807):

for (symbol, base_symbol) in heal_pairs:  # ~50+ 配对,串行
    DataHealingOrchestrator(...)           # → RepairExecutor.__init__
    │                                      #   → KlineDataFiller.__init__
    │                                      #   → Info(MAINNET_API_URL) HTTP 握手 ~0.85s
    │                                      #   ← 第 2-50 个握手完成后才被 shared_executor 覆盖,全部浪费!
    _load_zscore_history()                # DB 查询 ×1(0.08-0.56s)
    _get_db_now()                         # SELECT NOW() ×1
    _diagnose()                           # 内存计算(~0ms)
    check_continuity()                    # ← 第一次连续性检查
    _final_assessment()                   # ← 第二次连续性检查(冗余)
    QualityAssessor.assess()              # 质量评估

根因(精确代码路径)

# realtime_kline_service_base.py(精确执行顺序)
healer = DataHealingOrchestrator(...)   # ← HTTP 握手在此发生
#   DataHealingOrchestrator.__init__:
#     self.executor = RepairExecutor(db_client, kline_repo, exchange_id)
#       RepairExecutor.__init__:
#         self.kline_filler = KlineDataFiller(exchange_id, kline_repo)
#           KlineDataFiller.__init__:
#             self._info = self._init_info()
#               → Info(MAINNET_API_URL, skip_ws=True, timeout=30)  # ~0.85s 握手完成

# 握手已完成,才执行 shared_executor 赋值
if shared_executor is None:
    shared_executor = healer.executor
else:
    healer.executor = shared_executor   # 第 2-50 个握手已白白完成

瓶颈定量(日志实测,50 对)

每对的耗时结构(以 IO/BTC 对为例):

KlineDataFiller 初始化完成  ← Info(MAINNET_API_URL) HTTP 握手 ~0.85s  ← 49 次是废弃的!
自愈总耗时: 0.14s           ← DB 查询 + 诊断 + 评估
─────────────────────────────────────────────────────────────────────
每对实际耗时: ~1.0s
操作 单次耗时 次数 总计 占比
KlineDataFiller init(HTTP 握手,49次废弃) ~0.85s 50 ~43s 78%
_load_zscore_history DB 查询 ~0.15s avg ~50 ~7.5s 14%
ONDO/BTC DB 慢查询(特例,见附加优化 4d) 4.44s 1 4.44s 8%
_get_db_now SELECT NOW() ~0.01s ~50 ~0.5s ~1%
_final_assessment 冗余连续性检查 ~0.001s ~50 ~0.05s ~0%
总计 ~55s

基础设施

  • 连接池:psycopg 3.x ConnectionPoolmin_size=2, max_size=10,支持并发 DB 查询
  • 交易所 APIKlineDataFiller 内置 10 分钟冷却机制(threading.Lock() 保护)
  • 超时保护:全局 healing_timeout(300s),单配对 HEALING_PER_PAIR_TIMEOUT=60s
  • 覆盖索引idx_analysis_results_pair_kline_time ON analysis_results (symbol, base_symbol, kline_time DESC, analysis_time DESC) WHERE zscore_4h IS NOT NULL(与批量 SQL 完全对齐)

二、优化方案 A:批量 DB 查询

目标

50+ 次 DB 往返 → 1 次db_now 内嵌在同一查询中)

核心 SQL

将原来的单配对查询:

-- 原始:每个配对执行一次(共 ~100 次往返,含 SELECT NOW())
SELECT kline_time, zscore_4h, analysis_time
FROM (
    SELECT DISTINCT ON (kline_time)
        kline_time, zscore_4h, analysis_time
    FROM analysis_results
    WHERE symbol = %s AND base_symbol = %s
      AND zscore_4h IS NOT NULL
      AND kline_time >= NOW() - INTERVAL 'X days'
    ORDER BY kline_time DESC, analysis_time DESC
    LIMIT %s
) sub
ORDER BY kline_time ASC

改为批量查询(db_now 内嵌,真正 1 次往返):

-- 优化:所有配对一次查询,同时返回 db_now
SELECT
    NOW() AS db_now,                          -- db_now 内嵌,消除独立 SELECT NOW()
    symbol, base_symbol,
    kline_time, zscore_4h, analysis_time
FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY symbol, base_symbol
        ORDER BY kline_time DESC
    ) AS rn
    FROM (
        SELECT DISTINCT ON (symbol, base_symbol, kline_time)
            symbol, base_symbol, kline_time, zscore_4h, analysis_time
        FROM analysis_results
        WHERE (symbol, base_symbol) IN (VALUES (%s,%s), (%s,%s), ...)
          AND zscore_4h IS NOT NULL
          AND kline_time >= NOW() - INTERVAL '{boundary_days} days'
        ORDER BY symbol, base_symbol, kline_time DESC, analysis_time DESC
    ) deduped
) ranked
WHERE rn <= %s
ORDER BY symbol, base_symbol, kline_time ASC

SQL 逻辑说明

  1. 内层 DISTINCT ON:每个 (symbol, base_symbol, kline_time) 只保留最新 analysis_time 的记录,ORDER BY 列与覆盖索引 idx_analysis_results_pair_kline_time 完全对齐,走 Index Scan
  2. ROW_NUMBER() OVER (PARTITION BY ... ORDER BY kline_time DESC):给每个配对的记录按时间倒序编号
  3. WHERE rn <= required_count:每个配对只取最近 N 条
  4. NOW() AS db_now:与数据在同一事务快照内,消除时间漂移,同时替代 50 次独立 SELECT NOW()
  5. 外层 ORDER BY:按时间正序返回

Statement Timeout 适配

批量查询取代了 50 次独立查询,需适当放宽超时(原单条 SQL 使用 DB_STATEMENT_TIMEOUT_MS,默认 10s):

# 批量查询使用更宽松的超时
BATCH_STATEMENT_TIMEOUT_MS = max(DB_STATEMENT_TIMEOUT_MS, 15_000)
cur.execute(f"SET LOCAL statement_timeout = '{BATCH_STATEMENT_TIMEOUT_MS}'")

理由:单条慢查询(如 ONDO/BTC,4.44s)在批量模式下不会单独超时,但也不会拖慢其他配对(PostgreSQL 对每个 PARTITION 独立计算),整体批量耗时预期 < 1s。

失败回退机制

批量查询一旦失败,50 个配对的数据全部丢失,需降级到逐配对加载:

def _batch_load_with_fallback(
    heal_pairs: list[tuple[str, str]],
    required_count: int,
    db_client: TimescaleDBClient,
) -> tuple[dict[tuple[str, str], list[dict]], datetime]:
    """批量加载,失败时降级为逐配对加载。"""
    try:
        grouped, db_now = batch_load_zscore_history(
            heal_pairs, required_count, db_client
        )
        return grouped, db_now
    except Exception as e:
        logger.warning(
            f"批量查询失败,降级为逐配对加载: {e}",
            exc_info=True,
        )
        return _fallback_individual_load(heal_pairs, required_count, db_client)


def _fallback_individual_load(
    heal_pairs: list[tuple[str, str]],
    required_count: int,
    db_client: TimescaleDBClient,
) -> tuple[dict[tuple[str, str], list[dict]], datetime]:
    grouped: dict[tuple[str, str], list[dict]] = {}
    db_now = datetime.utcnow()
    for symbol, base_symbol in heal_pairs:
        try:
            records = _load_zscore_history_single(
                symbol, base_symbol, required_count, db_client
            )
            grouped[(symbol, base_symbol)] = records
        except Exception as e:
            logger.warning(f"单配对加载失败 [{symbol}/{base_symbol}]: {e}")
            grouped[(symbol, base_symbol)] = []
    return grouped, db_now

内存分组

from collections import defaultdict

grouped: dict[tuple[str, str], list[dict]] = defaultdict(list)
db_now: datetime | None = None

for row in rows:
    if db_now is None:
        db_now = row['db_now']                          # 取第一行的 db_now
    grouped[(row['symbol'], row['base_symbol'])].append({
        'kline_time':    row['kline_time'],
        'zscore_4h':     row['zscore_4h'],
        'analysis_time': row['analysis_time'],
    })

db_now = db_now or datetime.utcnow()                    # 空结果时降级

代码改动

  1. orchestrator.py — 新增 batch_load_zscore_history() 静态方法(含 db_now 返回)
  2. realtime_kline_service_base.py_run_data_healing() 调用 _batch_load_with_fallback() 替代循环内单独加载

预期收益

DB 往返从 ~100 次(加载 50 + NOW 50)降到 1 次,数据加载阶段 ~8s → ~0.3s


三、优化方案 B:并发修复

前提

批量查询后,数据已在内存中。诊断是纯 CPU 操作(~0ms/对),不需要并发。只有不健康的配对需要调用 RepairExecutor.repair()(涉及 DB 写入 + 交易所 API),才需要并发。

线程安全验证

RepairExecutor 的所有内部组件均支持多线程共享:

组件 线程安全方式
db_clientConnectionPool psycopg 3.x 连接池原生线程安全
kline_repo / analysis_repo 每次操作从连接池独立获取连接
kline_fillerKlineDataFiller 冷却字典由 threading.Lock() 保护
repair() 方法本身 无跨调用共享的可变状态

设计:共享单个 RepairExecutor

并发修复共享 1 个 RepairExecutor 实例,避免多次 HTTP 握手:

def _run_concurrent_repair(
    unhealthy_pairs: list[tuple],
    db_client: TimescaleDBClient,
    kline_repo: KlineRepository,
) -> dict[tuple[str, str], HealingResult]:
    """并发修复不健康配对,共享单个 RepairExecutor(线程安全)。"""
    if not unhealthy_pairs:
        return {}

    # 单次 HTTP 握手,所有并发线程共享
    shared_executor = RepairExecutor(db_client, kline_repo)

    results: dict[tuple[str, str], HealingResult] = {}

    def _heal_one(pair_data: tuple) -> tuple[tuple[str, str], HealingResult]:
        symbol, base_symbol, records, diagnosis = pair_data
        orchestrator = DataHealingOrchestrator(
            db_client=db_client,
            kline_repo=kline_repo,
            symbol=symbol,
            base_symbol=base_symbol,
        )
        orchestrator.executor = shared_executor   # 注入共享实例,跳过 HTTP 握手
        result = orchestrator.heal_and_prepare(
            required_count=len(records),
            preloaded_diagnosis=diagnosis,
        )
        return (symbol, base_symbol), result

    max_workers = min(5, len(unhealthy_pairs))
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {pool.submit(_heal_one, pair): pair for pair in unhealthy_pairs}
        for future in as_completed(futures):
            try:
                key, result = future.result()
                results[key] = result
            except Exception as e:
                pair = futures[future]
                logger.error(
                    f"并发修复异常 [{pair[0]}/{pair[1]}]: {e}",
                    exc_info=True,
                )
                results[(pair[0], pair[1])] = HealingResult(status='failed', ...)

    return results

并发度选择

max_workers=5(保守值):

RepairExecutor.repair() 内部
└─ _fill_kline_gaps_parallel()
   └─ ThreadPoolExecutor(max_workers=2)  # 双 symbol 并行补 K 线

并发占用连接数估算:
5 workers × 2 内部线程 = 10 并发 DB 操作
10 ≤ ConnectionPool.max_size=10  ✅

_run_data_healing 在服务 __init__ 中调用,
其他分析工作线程尚未启动,DB 连接无竞争。

代码改动

  1. realtime_kline_service_base.py_run_data_healing() 修复阶段改为 _run_concurrent_repair()
  2. orchestrator.pyheal_and_prepare() 新增可选参数 preloaded_diagnosis,支持跳过首轮诊断

四、附加优化

4a. db_now 内嵌批量查询(已整合至方案 A)

现状:每个配对调用 _get_db_now()SELECT NOW()(50+ 次)

优化db_now 作为列内嵌在批量 SQL 的 SELECT NOW() AS db_now 中,与数据同一事务快照,无需额外往返。见方案 A 核心 SQL。

4b. 健康配对跳过冗余评估

现状_final_assessment() 总是重新调用 check_continuity(),即使诊断已判定健康

冗余路径

heal_and_prepare()
├─ _diagnose()
│   └─ check_continuity()    ← 第 1 次
└─ _final_assessment()
    └─ check_continuity()    ← 第 2 次(参数完全相同,纯冗余)
        └─ assessor.assess()

优化:诊断结果为 is_healthy=True 时,直接复用诊断结论构建 HealingResult

# orchestrator.py — heal_and_prepare() 内
if diagnosis.is_healthy:
    # 复用诊断结果,跳过 _final_assessment 的重复 check_continuity
    missing_count = len(diagnosis.gap_targets)   # is_healthy=True 时必为 0,但语义更清晰
    quality = self.assessor.assess(
        records, required_count,
        is_continuous=True,
        missing_count=missing_count,
    )
    return HealingResult(
        status=self._determine_status(quality),
        data=self._extract_zscore_values(records),
        quality=quality,
        iterations_used=iteration,
    )

4c. 日志精简

现状:每个健康配对输出 5 行日志

  1. "连续性检查: 数据连续"(诊断阶段)
  2. "数据健康(第1轮)"(heal_and_prepare)
  3. "连续性检查: 数据连续"(_final_assessment 冗余)
  4. "质量评估: A级"(_final_assessment)
  5. "自愈结果: ready"(heal_and_prepare)

优化:健康配对只输出 1 行摘要日志,DEBUG 级别保留详细日志:

# 优化后(INFO 级别)
自愈完成 | healed=3, failed=0, healthy=47 | 耗时: 0.42s

4d. ONDO/BTC 慢查询根因排查

现状:ONDO/BTC 配对单次 _load_zscore_history 耗时 4.44s,是其他配对平均耗时(~0.15s)的 30 倍

排查步骤

-- Step 1:确认数据量是否异常
SELECT COUNT(*)
FROM analysis_results
WHERE symbol = 'ONDO/USDC:USDC'
  AND base_symbol = 'BTC/USDC:USDC'
  AND zscore_4h IS NOT NULL;

-- Step 2:检查 EXPLAIN ANALYZE(查看是否走索引)
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT DISTINCT ON (kline_time)
    kline_time, zscore_4h, analysis_time
FROM analysis_results
WHERE symbol = 'ONDO/USDC:USDC'
  AND base_symbol = 'BTC/USDC:USDC'
  AND zscore_4h IS NOT NULL
  AND kline_time >= NOW() - INTERVAL '30 days'
ORDER BY kline_time DESC, analysis_time DESC
LIMIT 130;

-- Step 3:若数据量过大,清理超出保留窗口的历史数据
DELETE FROM analysis_results
WHERE symbol = 'ONDO/USDC:USDC'
  AND base_symbol = 'BTC/USDC:USDC'
  AND kline_time < NOW() - INTERVAL '60 days';

预期根因:表膨胀(dead tuples 过多)或 ONDO/BTC 历史数据量远超其他配对。

批量查询后,ONDO/BTC 的慢查询被吸收在同一 SQL 中,只要不超过 15s 超时,不影响其他配对的加载结果。


五、重构后的流程

_run_data_healing():
│
├── 1. 查询 DB 中有数据的配对              # 已有逻辑,不变
├── 2. 构建 heal_pairs 列表               # 已有逻辑,不变
│
├── 3.【新】批量加载(含 db_now,1 次 DB 往返)
│      _batch_load_with_fallback()
│      ├── 正常路径: batch_load_zscore_history()
│      │   → grouped: {(sym, base): [records...]}
│      │   → db_now: datetime(内嵌于 SQL 结果)
│      └── 降级路径: _fallback_individual_load()(批量查询失败时)
│          → 逐配对加载,db_now = datetime.utcnow()
│
├── 4.【新】批量诊断(内存操作,~0ms)
│      for pair in heal_pairs:
│          records = grouped[pair]
│          diagnosis = _diagnose(records, required_count, db_now)
│          if is_healthy → 跳过 _final_assessment,直接构建结果
│          else          → 加入 unhealthy_pairs
│
├── 5.【新】并发修复不健康配对
│      shared_executor = RepairExecutor(...)   # 单次 HTTP 握手
│      ThreadPoolExecutor(max_workers=min(5, len(unhealthy_pairs))):
│          for pair in unhealthy_pairs:
│              orchestrator.executor = shared_executor  # 注入,跳过握手
│              orchestrator.heal_and_prepare(...)
│              → 内部重新加载 + 修复 + 评估
│
└── 6. 汇总日志(1 行)
       healed=X, failed=Y, healthy=Z | 耗时: T s

六、预期性能对比

新架构在批量诊断阶段不再创建 DataHealingOrchestrator 对象,KlineDataFiller 只在真正
需要修复时才初始化(共享 1 次),同时消除了 DB 查询、SELECT NOW() 和 HTTP 握手三大瓶颈。

指标 优化前 优化后 提升
KlineDataFiller init 次数(HTTP 握手) 50 次(49 次废弃) 0-1 次 ~50x
KlineDataFiller init 总耗时 ~43s 0s(全健康)/ ~1s(有修复) 消除
DB 查询次数(加载 + NOW) ~100 次 1 次(批量含 db_now) 100x
DB 查询总耗时 ~8s ~0.3s 25x
全健康场景总耗时 ~55s ~0.35s ~157x
有 5 对需修复 ~55s ~2s ~28x
有 15 对需修复 ~55s ~5s ~11x
日志行数 ~250 行 ~10 行 精简

全健康场景耗时拆解(优化后):

1 次批量 DB 查询(含 db_now):  ~0.3s
50 对内存诊断(纯 CPU 计算):   ~0.05s
KlineDataFiller init:          0s(无需修复,不创建)
─────────────────────────────────────────
总计:                          ~0.35s

七、改动文件清单

文件 改动类型 说明
orchestrator.py 新增方法 + 修改 新增 batch_load_zscore_history()(返回 grouped + db_now);heal_and_prepare() 新增可选参数 preloaded_diagnosis
realtime_kline_service_base.py 重构方法 _run_data_healing() 改为批量加载 + 批量诊断 + 并发修复;新增 _batch_load_with_fallback()_run_concurrent_repair()

不变的文件repair_executor.pycontinuity_checker.pyquality_assessor.pyconfig.py__init__.py


八、风险控制

风险 缓解措施
批量查询失败导致全部配对数据丢失 _batch_load_with_fallback() 捕获异常,降级为逐配对加载
批量查询 statement_timeout 超时 批量超时设为 max(DB_STATEMENT_TIMEOUT_MS, 15000),高于单配对阈值
ONDO/BTC 慢查询拖慢批量 SQL PostgreSQL PARTITION 独立计算,慢配对不影响其他;超时后降级回退
DB 连接池耗尽 并发修复 max_workers=5,内部 2 线程,理论最大 10 连接 = pool max_size
并发修复多次 HTTP 握手 所有并发线程注入同一 shared_executor,仅 1 次握手
KlineDataFiller 冷却状态并发竞争 _update_cooldown 已有 threading.Lock() 保护,天然线程安全
交易所 API 频率限制 KlineDataFiller 10 分钟冷却机制,5 并发不会重复请求同一 symbol
向后兼容 heal_and_prepare() 新增参数均为可选,原有调用方无需修改
超时保护 全局 healing_timeout(300s) 保持不变
并发异常传播 每个 future 独立 try/except,单个配对失败不影响其他配对

Read more

数据自愈模块提速3

数据自愈并发优化设计方案 一、现状分析 当前流程 _run_data_healing(realtime_kline_service_base.py:1807): for (symbol, base_symbol) in heal_pairs: # ~50+ 配对,串行 DataHealingOrchestrator(...) # → RepairExecutor.__init__ │ # → KlineDataFiller.__init__ │ # → Info(MAINNET_API_URL) HTTP 握手 ~0.85s │ # ← 第 2-50 个握手完成后才被 shared_executor 覆盖,全部浪费! _load_zscore_history()

By SHI XIAOLONG

数据自愈模块综合问题分析报告2

数据自愈模块优化设计文档 版本:v1.0 日期:2026-02-23 范围:src/utils/data_healing/ + src/services/realtime_kline_service_base.py(_run_data_healing 部分) 目录 1. 问题全景与优先级总表 2. BUG-01:加载语义与修复语义不一致 3. BUG-02/03:单条/少量记录无法生成修复目标 4. BUG-04:日志误导性 5. BUG-05:重复写入(BUG-01 副作用) 6. BUG-06:test_basic.py 与 Diagnosis 定义失同步 7.

By SHI XIAOLONG

数据自愈模块综合问题分析报告

数据自愈模块综合问题分析报告 Context 本分析旨在全面梳理 src/utils/data_healing/ 数据自愈模块存在的各类问题,结合三份参考文档(数据自愈BUG Cursor 1.md、数据自愈BUG Cursor 2.md、启动性能优化设计文档.md)及代码实地核查,形成统一的问题清单与修复优先级建议。 一、模块架构速览 DataHealingOrchestrator.heal_and_prepare() Phase 1 : _load_zscore_history() ← SQL 时间窗口加载 Phase 2–3: 诊断-修复循环(最多3轮) ├─ _diagnose() ← 三项检查(连续性/新鲜度/数量) │ ├─ ContinuityChecker.check_continuity() │ └─ _check_freshness() ├─ RepairExecutor.

By SHI XIAOLONG