启动性能优化设计文档:数据自愈并行化 + 预过滤
启动性能优化设计文档:数据自愈并行化 + 预过滤
一、问题现象
每次启动时,日志中出现大量类似以下输出,耗时数分钟才能完成启动:
KlineDataFiller 初始化完成 | 交易所: hyperliquid
数据自愈编排器初始化 | VINE/USDC:USDC vs AAVE/USDC:USDC | timeframe: 4h
数据自愈启动 | 需要 3 条 | 间隔 240min | 理论跨度: 12h
数据不足: 2 条(12h 窗口),尝试更大范围
加载历史数据: 3 条 (16h 窗口,需要 12h)
...(每对重复一次)...
自愈失败: ADA/USDC:USDC | 完整度: 33.3%
自愈失败: TRB/USDC:USDC | 完整度: 33.3%
二、根因分析
2.1 调用入口
RealtimeKlineServiceBase.__init__()
└─ _run_data_healing() ← realtime_kline_service_base.py L1800+
└─ for symbol, base_symbol in heal_pairs: ← 串行循环
└─ DataHealingOrchestrator.heal_and_prepare()
├─ Phase 1: _load_zscore_history() [DB 查询]
├─ Phase 2: _diagnose()
├─ Phase 3: RepairExecutor.repair() [含 API 调用]
└─ Phase 4: _final_assessment() [DB 查询]
2.2 三层叠加原因
| 层级 | 原因 | 量化影响 |
|---|---|---|
| L1 架构层 | 完全串行:for 循环逐个处理所有配对,无并行 |
100对 × 1s = 100s(健康配对) |
| L2 逻辑层 | 无预过滤:健康配对(A级)也走完整四阶段流程 | 每对额外浪费约 0.5-1s |
| L3 外部依赖 | 系统离线后数据积累缺口:API 补充 K线耗时 1.5s/次 | 每对修复约 3-15s |
2.3 关键参数
# 修复所需记录数(极小!)
repair_count = ceil(144 × 5 / 240) = 3 # 只需 3 条 4h 记录
# 新鲜度阈值
FRESHNESS_MULTIPLIER = 2.5
threshold = 240min × 2.5 = 600min(10小时)
# 超时保护
HEALING_TIMEOUT_SECONDS = 300 # 5分钟,串行时仅能处理约30对
# API 间隔 & 冷却
KLINE_FILLER_API_INTERVAL = 1.5s
KLINE_FILLER_COOLDOWN_SECONDS = 600
2.4 失败配对根因(ADA/TRB/ZETA 等)
诊断结果: 有0个连续性缺口, 数量不足(1/3), 已过时1281min
WARNING - 无法确定修复目标,终止修复
- 数据过时 1281 分钟 ≈ 21 小时:系统离线约 21 小时,数据库中该配对最后一条记录距今超过 10 小时阈值
- 在 24h 扩展窗口内仅能找到 1 条数据:该配对本身在 Hyperliquid 上历史 K 线稀少(新上市或低流动性币种)
shortfall_targets计算需要连续时间点,数据点太少无法推断缺口位置 → 直接终止修复
三、优化方案
方案概览
_run_data_healing()
│
├─ [现有] 构建 heal_pairs(有历史数据的配对)
│
├─ [新增 Step A] 批量预过滤:1次SQL → 跳过健康配对
│ ↓ pairs_to_heal(只剩需要修复的配对)
│
└─ [新增 Step B] 并行修复:ThreadPoolExecutor(4 workers)
方案 A:批量预过滤(Quick Pre-check)
改动文件:src/services/realtime_kline_service_base.py
核心思路:在进入逐对循环前,用1次批量 SQL替代 N 次完整自愈流程,快速判断哪些配对已健康。
新增方法 _quick_health_check_batch()
def _quick_health_check_batch(self, heal_pairs: list, repair_count: int) -> set:
"""
批量轻量级健康检查:1次SQL替代N次完整自愈流程。
返回"已健康配对"集合,这些配对可跳过完整自愈。
健康标准(与 DataHealingOrchestrator._diagnose() 一致):
1. 最新 kline_time 距今 <= interval_minutes × FRESHNESS_MULTIPLIER(600分钟)
2. 过去窗口内记录数 >= repair_count(3条)
"""
from src.utils.data_healing.config import FRESHNESS_MULTIPLIER
INTERVAL_MIN_4H = 240
freshness_min = INTERVAL_MIN_4H * FRESHNESS_MULTIPLIER # 600分钟
needed_hours = math.ceil(repair_count * INTERVAL_MIN_4H / 60 * 1.5) # ~1小时
placeholders = ', '.join(['(%s, %s)'] * len(heal_pairs))
params = [v for pair in heal_pairs for v in pair] + [needed_hours]
query = f"""
SELECT
symbol,
base_symbol,
MAX(kline_time) AS latest_time,
COUNT(*) AS record_count
FROM analysis_results
WHERE (symbol, base_symbol) IN ({placeholders})
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - make_interval(hours => %s)
GROUP BY symbol, base_symbol
"""
try:
rows = self.db_client.execute_query(query, tuple(params))
now_rows = self.db_client.execute_query("SELECT NOW() AS now")
db_now = now_rows[0]['now']
except Exception as e:
self.logger.warning(f"批量健康预检查失败,回退到完整流程: {e}")
return set() # 失败时降级:所有配对走完整流程
healthy = set()
for row in (rows or []):
staleness_min = (db_now - row['latest_time']).total_seconds() / 60
is_fresh = staleness_min <= freshness_min
is_sufficient = row['record_count'] >= repair_count
if is_fresh and is_sufficient:
healthy.add((row['symbol'], row['base_symbol']))
self.logger.info(
f"批量预检查 | 总配对: {len(heal_pairs)} | "
f"健康跳过: {len(healthy)} | "
f"需修复: {len(heal_pairs) - len(healthy)} | "
f"新鲜度阈值: {freshness_min:.0f}min"
)
return healthy
修改 _run_data_healing() 主逻辑
在 healed = 0 之前插入预过滤:
# --- 新增:批量预过滤 ---
# 提前计算 repair_count(与循环内逻辑一致)
INTERVAL_MIN_4H = 240
repair_count = math.ceil(144 * 5 / INTERVAL_MIN_4H) # = 3
healthy_pairs = self._quick_health_check_batch(heal_pairs, repair_count)
healed = len(healthy_pairs) # 健康配对直接计入成功
failed = 0
pairs_to_heal = [(s, b) for s, b in heal_pairs if (s, b) not in healthy_pairs]
if not pairs_to_heal:
self.logger.info(f"🩺 数据自愈完成(全跳过)| 成功={healed} 失败=0")
return
# --- 后续只处理 pairs_to_heal ---
方案 B:并行化修复
改动文件:
src/services/realtime_kline_service_base.py— 引入ThreadPoolExecutor,替换 SIGALRMsrc/utils/data_healing/repair_executor.py— 添加threading.Lock串行化 API 调用
约束与决策
| 约束 | 说明 | 决策 |
|---|---|---|
| SIGALRM 不支持子线程 | signal.alarm() 在非主线程抛 ValueError |
改用 threading.Timer |
| DB 连接池上限 10 | TIMESCALEDB_POOL_MAX_SIZE = 10 |
限制 HEALING_WORKERS = 4 |
| KlineDataFiller 冷却非线程安全 | fill_cooldown 字典是实例级,竞争风险 |
共享 executor + 加锁 |
repair_executor.py 改动
# __init__ 新增
import threading
self._kline_fill_lock = threading.Lock()
# _fill_kline_gaps 中加锁(仅锁 API 调用,不锁 DB 查询)
def _fill_kline_gaps(self, kline_gaps, symbol, base_symbol, timeframe):
for sym in [symbol, base_symbol]:
with self._kline_fill_lock: # 串行化 API,冷却状态安全共享
filled = self.kline_filler.fill_missing_data_precise(
symbol=sym,
timeframe=timeframe,
missing_timestamps=kline_gaps,
)
realtime_kline_service_base.py 并行循环
替换原有串行 for 循环:
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
stop_event = threading.Event()
timeout_timer = threading.Timer(HEALING_TIMEOUT_SECONDS, stop_event.set)
timeout_timer.daemon = True
timeout_timer.start()
shared_executor = None # 共享 RepairExecutor(含冷却锁)
def heal_one(pair):
nonlocal shared_executor
symbol, base_symbol = pair
if stop_event.is_set():
return symbol, 'timeout_skipped'
try:
healer = DataHealingOrchestrator(
db_client=self.db_client,
kline_repo=self.kline_repo,
symbol=symbol,
base_symbol=base_symbol,
repair_timeframe='4h',
)
if shared_executor is None:
shared_executor = healer.executor # 首次:取出执行器
else:
healer.executor = shared_executor # 后续:注入共享执行器
result = healer.heal_and_prepare(required_count=repair_count)
return symbol, result.status
except Exception as e:
self.logger.warning(f"自愈异常: {symbol} | {e}")
return symbol, 'failed'
try:
with ThreadPoolExecutor(max_workers=4, thread_name_prefix='healer') as pool:
futures = {pool.submit(heal_one, p): p for p in pairs_to_heal}
for future in as_completed(futures):
if stop_event.is_set():
for f in futures:
f.cancel()
self.logger.warning(
f"数据自愈超时 ({HEALING_TIMEOUT_SECONDS}s),已完成 {healed} 个"
)
break
sym, status = future.result(timeout=30)
if status not in ('failed', 'timeout_skipped'):
healed += 1
elif status == 'failed':
failed += 1
self.logger.warning(f"自愈失败: {sym}")
finally:
timeout_timer.cancel()
四、预期收益
| 场景 | 当前耗时 | 方案A后 | A+B后 |
|---|---|---|---|
| 100对全健康(正常重启) | ~100s | ~1s(1次SQL) | ~1s |
| 80健康 + 20需修复 | ~260s | ~165s | ~45s |
| 20需修复(首次/长期离线) | ~160s | ~160s | ~45s |
最常见场景(正常重启,大部分配对健康):启动时间从 1-3 分钟降至 5秒以内。
五、实施顺序与风险
实施顺序
- 先做方案A(预过滤):改动集中在一个函数,无线程安全风险,收益最大
- 再做方案B(并行化):需要验证 DB 连接池和 API 锁行为,需测试
风险与缓解
| 风险 | 场景 | 缓解措施 |
|---|---|---|
| SQL 预检查与实际诊断标准不一致 | 预检查判为健康但实际有缺口 | 预检查失败降级;标准参数与 orchestrator 保持一致 |
| DB 连接池耗尽 | 4个并行 worker 同时查询 | HEALING_WORKERS = min(4, pool_max // 2) 动态限制 |
| API 限速(冷却机制失效) | 多线程绕过 600s 冷却 | _kline_fill_lock 串行化所有 API 调用 |
| 线程中 shared_executor 竞争初始化 | 首次赋值 race condition | 使用 threading.Lock 保护 shared_executor 赋值 |
六、关键文件索引
| 文件 | 关键位置 | 作用 |
|---|---|---|
src/services/realtime_kline_service_base.py |
L1800-1870 _run_data_healing() |
主改动:预过滤 + 并行循环 |
src/utils/data_healing/repair_executor.py |
__init__, _fill_kline_gaps() |
新增线程锁 |
src/utils/data_healing/orchestrator.py |
heal_and_prepare(), healing_timeout |
参考:诊断逻辑,可废弃 SIGALRM 超时 |
src/utils/data_healing/config.py |
FRESHNESS_MULTIPLIER, timeframe_to_minutes |
预检查参数来源 |
src/config.py |
HEALING_TIMEOUT_SECONDS, HEALING_MAX_ITERATIONS |
超时与迭代配置 |