数据自愈模块提速
数据自愈并发优化设计方案
一、现状分析
当前流程
_run_data_healing(realtime_kline_service_base.py:1807):
for (symbol, base_symbol) in heal_pairs: # ~50+ 配对,串行
DataHealingOrchestrator(...) # → RepairExecutor → KlineDataFiller
│ # Info(MAINNET_API_URL) HTTP 握手 ~0.85s
│ # 第 2-50 个立刻被 shared_executor 覆盖,全部浪费!
_load_zscore_history() # DB 查询 ×1(0.08-0.56s)
_get_db_now() # SELECT NOW() ×1
_diagnose() # 内存计算(~0ms)
check_continuity() # ← 第一次连续性检查
_final_assessment() # ← 第二次连续性检查(冗余)
QualityAssessor.assess() # 质量评估
瓶颈定量(日志实测,50 对)
每对的耗时结构(以 IO/BTC 对为例):
KlineDataFiller 初始化完成 ← Info(MAINNET_API_URL) HTTP 握手 ~0.85s ← 49 次是废弃的!
自愈总耗时: 0.14s ← DB 查询 + 诊断 + 评估
─────────────────────────────────────────────────────────────────────
每对实际耗时: ~1.0s
| 操作 | 单次耗时 | 次数 | 总计 | 占比 |
|---|---|---|---|---|
KlineDataFiller init(HTTP 握手,49次废弃) |
~0.85s | 50 | ~43s | 78% |
_load_zscore_history DB 查询 |
~0.15s avg | ~50 | ~7.5s | 14% |
| ONDO/BTC DB 慢查询(特例) | 4.44s | 1 | 4.44s | 8% |
_get_db_now SELECT NOW() |
~0.01s | ~50 | ~0.5s | ~1% |
_final_assessment 冗余连续性检查 |
~0.001s | ~50 | ~0.05s | ~0% |
| 总计 | ~55s |
根因:
DataHealingOrchestrator.__init__无条件调用RepairExecutor.__init__→
KlineDataFiller.__init__→Info(MAINNET_API_URL, skip_ws=True, timeout=30),
发起一次 HTTP 握手拉取资产元数据(kline_data_filler.py:82)。
服务层代码虽然通过shared_executor复用了第一个实例,但第 2-50 个KlineDataFiller
已经完成 HTTP 握手后才被覆盖,浪费约 43 秒。
基础设施
- 连接池:psycopg 3.x
ConnectionPool,min_size=2, max_size=10,支持并发 DB 查询 - 交易所 API:
KlineDataFiller内置 10 分钟冷却机制 - 超时保护:全局
healing_timeout(300s),单配对HEALING_PER_PAIR_TIMEOUT=60s
二、优化方案 A:批量 DB 查询
目标
50+ 次 DB 往返 → 1 次
核心 SQL
将原来的单配对查询:
-- 原始:每个配对执行一次
SELECT kline_time, zscore_4h, analysis_time
FROM (
SELECT DISTINCT ON (kline_time)
kline_time, zscore_4h, analysis_time
FROM analysis_results
WHERE symbol = %s AND base_symbol = %s
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - INTERVAL 'X days'
ORDER BY kline_time DESC, analysis_time DESC
LIMIT %s
) sub
ORDER BY kline_time ASC
改为批量查询:
-- 优化:所有配对一次查询
SELECT symbol, base_symbol, kline_time, zscore_4h, analysis_time
FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY symbol, base_symbol
ORDER BY kline_time DESC
) AS rn
FROM (
SELECT DISTINCT ON (symbol, base_symbol, kline_time)
symbol, base_symbol, kline_time, zscore_4h, analysis_time
FROM analysis_results
WHERE (symbol, base_symbol) IN (VALUES (%s,%s), (%s,%s), ...)
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - INTERVAL '{boundary_days} days'
ORDER BY symbol, base_symbol, kline_time DESC, analysis_time DESC
) deduped
) ranked
WHERE rn <= %s
ORDER BY symbol, base_symbol, kline_time ASC
SQL 逻辑说明
- 内层
DISTINCT ON:每个 (symbol, base_symbol, kline_time) 只保留最新analysis_time的记录 ROW_NUMBER() OVER (PARTITION BY ... ORDER BY kline_time DESC):给每个配对的记录按时间倒序编号WHERE rn <= required_count:每个配对只取最近 N 条- 外层
ORDER BY:按时间正序返回
内存分组
from collections import defaultdict
grouped: dict[tuple[str, str], list[dict]] = defaultdict(list)
for row in rows:
grouped[(row['symbol'], row['base_symbol'])].append(row)
代码改动
orchestrator.py— 新增batch_load_zscore_history()静态方法realtime_kline_service_base.py—_run_data_healing()调用批量加载替代循环内单独加载
预期收益
DB 往返从 ~50 次降到 1 次,数据加载阶段 ~7.5s → ~0.3s
三、优化方案 B:并发修复
前提
批量查询后,数据已在内存中。诊断是纯 CPU 操作(~0ms/对),不需要并发。只有不健康的配对需要调用 RepairExecutor.repair()(涉及 DB 写入 + 交易所 API),才需要并发。
并发约束
| 约束 | 说明 |
|---|---|
DB 连接池 max_size=10 |
并发 DB 操作不能超过 10 |
KlineDataFiller 冷却机制 |
10 分钟冷却期,同 symbol 不会重复请求 |
| 交易所 API 频率限制 | 需控制并发请求数 |
RepairExecutor._fill_kline_gaps_parallel |
内部已用 ThreadPoolExecutor(max_workers=2) |
设计
# 不健康的配对才需要修复
unhealthy_pairs = [(sym, base, records, diagnosis) for ...]
if unhealthy_pairs:
with ThreadPoolExecutor(max_workers=min(5, len(unhealthy_pairs))) as pool:
futures = {
pool.submit(_heal_single_pair, pair): pair
for pair in unhealthy_pairs
}
for future in as_completed(futures):
result = future.result()
...
并发度选择
max_workers=5(保守值):
- DB 池
max_size=10,每个 repair 最多占 2 连接(双 symbol 并行 K 线补充) - 5 × 2 = 10,刚好打满连接池
- 不会与主服务其他 DB 操作竞争过多
代码改动
realtime_kline_service_base.py—_run_data_healing()修复阶段改为并发orchestrator.py—heal_and_prepare()支持接收预加载数据,避免重复查询
四、附加优化
4a. db_now 共享
现状:每个配对调用 _get_db_now() → SELECT NOW()(50+ 次)
优化:在批量查询中一并获取 NOW(),传给所有配对
# 批量查询的同一事务中获取 NOW()
cur.execute("SELECT NOW() AS db_now")
db_now = cur.fetchone()['db_now']
4b. 健康配对跳过冗余评估
现状:_final_assessment() 总是重新调用 check_continuity(),即使诊断已判定健康
优化:诊断结果为 is_healthy=True 时,直接从诊断结果构建 HealingResult,跳过二次检查
# 在 heal_and_prepare() 中
if diagnosis.is_healthy:
# 直接构建结果,跳过 _final_assessment 的重复 check_continuity
quality = self.assessor.assess(records, required_count, True, 0)
return HealingResult(
status=self._determine_status(quality),
data=self._extract_zscore_values(records),
quality=quality,
iterations_used=iteration,
)
4c. 日志精简
现状:每个健康配对输出 5 行日志
- "连续性检查: 数据连续"(诊断阶段)
- "数据健康(第1轮)"(heal_and_prepare)
- "连续性检查: 数据连续"(_final_assessment 冗余)
- "质量评估: A级"(_final_assessment)
- "自愈结果: ready"(heal_and_prepare)
优化:健康配对只输出 1 行摘要日志,DEBUG 级别保留详细日志
五、重构后的流程
_run_data_healing():
│
├── 1. 查询 DB 中有数据的配对 # 已有逻辑,不变
├── 2. 构建 heal_pairs 列表 # 已有逻辑,不变
│
├── 3.【新】批量加载 + db_now # 1 次 DB 查询,替代 50+ 次
│ batch_load_zscore_history()
│ → grouped: {(sym, base): [records...]}
│ → db_now: datetime
│
├── 4.【新】批量诊断(内存操作,瞬间)
│ for pair in heal_pairs:
│ records = grouped[pair]
│ diagnosis = diagnose(records, required_count, db_now)
│ if healthy → 直接记录结果
│ else → 加入 unhealthy_pairs
│
├── 5.【新】并发修复不健康配对
│ ThreadPoolExecutor(max_workers=5):
│ for pair in unhealthy_pairs:
│ RepairExecutor.repair(...)
│ → 重新加载 + 评估
│
└── 6. 汇总日志
healed=X, failed=Y, healthy=Z (skipped)
六、预期性能对比
新架构在批量诊断阶段不再创建 DataHealingOrchestrator 对象,KlineDataFiller 只在真正
需要修复时才初始化(1 次),因此同时消除了 DB 查询和 HTTP 握手两大瓶颈。
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
KlineDataFiller init 次数(HTTP 握手) |
50 次(49 次废弃) | 0-1 次 | ~50x |
KlineDataFiller init 总耗时 |
~43s | ~0s(健康)/ ~1s(有修复) | 消除 |
| DB 查询次数 | ~100(加载50 + NOW50) | 2(批量加载1 + NOW1) | 50x |
| DB 查询总耗时 | ~7.5s | ~0.3s | 25x |
| 全健康场景总耗时 | ~55s | ~0.4s | ~140x |
| 有 5 对需修复 | ~55s | ~2s | ~28x |
| 有 15 对需修复 | ~55s | ~5s | ~11x |
| 日志行数 | ~250 行 | ~10 行 | 精简 |
全健康场景耗时拆解(优化后):
1 次批量 DB 查询: ~0.3s
50 对内存诊断(纯计算): ~0.05s
KlineDataFiller init: 0s(无需修复,不创建)
─────────────────────────────
总计: ~0.35s
七、改动文件清单
| 文件 | 改动类型 | 说明 |
|---|---|---|
orchestrator.py |
新增方法 + 修改 | 添加 batch_load_zscore_history();heal_and_prepare() 支持接收预加载数据 |
realtime_kline_service_base.py |
重构方法 | _run_data_healing() 改为批量加载 + 并发修复 |
不变的文件:repair_executor.py、continuity_checker.py、quality_assessor.py、config.py、__init__.py
八、风险控制
| 风险 | 缓解措施 |
|---|---|
| DB 连接池耗尽 | 并发修复 max_workers=5,远小于连接池 max_size=10 |
| 交易所 API 限流 | KlineDataFiller 已有 10 分钟冷却机制,不受影响 |
| 大批量 SQL 性能 | 50 个 VALUES 对 PostgreSQL 不是问题,有索引支持 |
| 向后兼容 | heal_and_prepare() 保留原有签名,新增可选参数 preloaded_records 和 db_now |
| 超时保护 | 全局 healing_timeout(300s) 保持不变 |
| 并发异常传播 | 每个 future 独立 try/except,单个配对失败不影响其他配对 |