数据自愈模块综合问题分析报告2
数据自愈模块优化设计文档
版本:v1.0
日期:2026-02-23
范围:src/utils/data_healing/ + src/services/realtime_kline_service_base.py(_run_data_healing 部分)
目录
- 问题全景与优先级总表
- BUG-01:加载语义与修复语义不一致
- BUG-02/03:单条/少量记录无法生成修复目标
- BUG-04:日志误导性
- BUG-05:重复写入(BUG-01 副作用)
- BUG-06:test_basic.py 与 Diagnosis 定义失同步
- PERF-01:heal_and_prepare 内部缺少超时保护
- PERF-02:healing 在 init 中同步阻塞服务启动
- PERF-03:每轮修复迭代全量重载历史数据
- PERF-04:kline 查询 limit=10000 硬编码过大
- PERF-05:_run_data_healing 串行无预过滤
- CODE-01:zscore_4h 字段赋值逻辑不一致
- CODE-02:_merge_repair_targets 注释优先级与实现不符
- CODE-03:_determine_grade 扩展性差
- DB-01:缺少 kline_time 维度的覆盖索引
- 实施路线图
1. 问题全景与优先级总表
| ID | 分类 | 文件 | 严重程度 | 实施难度 | 收益 |
|---|---|---|---|---|---|
| BUG-01 | 逻辑错误 | orchestrator.py |
P0 | 低 | 修复循环可以收敛 |
| BUG-02/03 | 逻辑错误 | orchestrator.py + continuity_checker.py |
P0 | 中 | 覆盖离线恢复场景 |
| BUG-04 | 可观测性 | orchestrator.py |
P3 | 低 | 排查日志准确 |
| BUG-05 | 副作用 | repair_executor.py |
P1(BUG-01修复后自动消除) | — | — |
| BUG-06 | 测试可信度 | test_basic.py |
P1 | 低 | 测试真实可运行 |
| PERF-01 | 超时缺失 | orchestrator.py |
P0 | 低 | 防止单对挂起全局 |
| PERF-02 | 启动阻塞 | realtime_kline_service_base.py |
P1 | 中 | 启动速度 ↑ 显著 |
| PERF-03 | 冗余查询 | orchestrator.py |
P2 | 低 | DB 查询 -60% |
| PERF-04 | 资源浪费 | repair_executor.py |
P2 | 低 | I/O -95% |
| PERF-05 | 串行性能 | realtime_kline_service_base.py |
P1 | 高 | 启动 -50%~80% |
| CODE-01 | 代码质量 | repair_executor.py |
P2 | 低 | 可维护性 ↑ |
| CODE-02 | 可读性 | orchestrator.py |
P3 | 低 | 调试日志更准确 |
| CODE-03 | 可扩展性 | quality_assessor.py |
P3 | 低 | 易添加新等级 |
| DB-01 | 数据库 | database/init_timescaledb.sql |
P1 | 低 | 查询速度 10-100x ↑ |
2. BUG-01:加载语义与修复语义不一致
2.1 问题根因
文件:orchestrator.py L407–486(_load_zscore_history)、L383–401(_generate_shortfall_targets)
加载层使用固定时间窗口(最大 needed_hours × 2),修复层向 earliest_time 之前扩展生成目标。当 earliest_time 已临近窗口边界时,补写的新时间点落在窗口之外,重载时不可见,导致 3 轮循环全部无效。
加载窗 [NOW()-24h, NOW()] ← 以 required_count=3, interval=240min 为例
records = [T-20h, T-16h] ← 窗口内仅2条
shortfall_target = T-24h ← 越过24h边界
重载窗仍是 [NOW()-24h, NOW()] ← T-24h 落点恰好在边界外(NOW()已推移几秒)
→ 仍看到2条 → 修复无效
2.2 设计要求(已通过 PostgreSQL 研究确认)
关键结论:原 SQL 使用
DISTINCT ON (kline_time)是必要的。
同一kline_time可能对应多条不同analysis_time的记录(重算写入)。
直接改为LIMIT N不能保证每个kline_time唯一,会破坏业务正确性。
正确方案是DISTINCT ON + LIMIT组合,同时将时间窗口语义改为 "7天安全边界 + 最近 N 条"。
2.3 修复方案
修改文件:orchestrator.py _load_zscore_history()
def _load_zscore_history(self, required_count: int) -> List[dict]:
"""
加载历史 zscore 数据。
修复 BUG-01:
- 原方案:time_window = required_count × interval × (1~2),修复目标可能落在窗口外
- 新方案:固定 7 天安全边界 + LIMIT N,保证补写的记录下轮可见
- 保留 DISTINCT ON (kline_time):同一 kline_time 可能有多条 analysis_time,
需保留最新的那条(by analysis_time DESC)
"""
# 7 天覆盖所有业务场景(4h×3=12h 所需,远小于 7d)
# 使用 DISTINCT ON 保证每个 kline_time 只保留最新 analysis_time 的一条
# LIMIT 精确控制返回数量,无需多档时间窗口尝试
query = """
SELECT DISTINCT ON (kline_time)
kline_time,
zscore_4h,
analysis_time
FROM analysis_results
WHERE symbol = %s
AND base_symbol = %s
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - INTERVAL '7 days'
ORDER BY kline_time DESC, analysis_time DESC
LIMIT %s
"""
try:
rows = self.db_client.execute_query(
query,
params=(self.symbol, self.base_symbol, required_count)
)
if not rows:
logger.warning(f"数据库中无 {self.symbol}/{self.base_symbol} 的历史 zscore 数据")
return []
# 按 kline_time 升序返回(调用方期望的顺序)
records = sorted(rows, key=lambda r: r['kline_time'])
logger.debug(
f"加载完成: {len(records)} 条 | "
f"{records[0]['kline_time']} ~ {records[-1]['kline_time']}"
)
return records
except (KeyError, TypeError, ValueError) as e:
logger.warning(f"加载历史数据失败 - 数据格式错误: {e}")
return []
except (TimeoutError, KeyboardInterrupt):
raise
except Exception as e:
logger.error(f"加载历史数据失败 - 数据库错误: {e}", exc_info=True)
raise RuntimeError(f"加载历史数据失败: {e}") from e
同步删除原有的三档时间窗口循环逻辑(L419–460),整个方法简化为单次查询。
2.4 收益
- BUG-05(重复写入)自动消除:补写后重载可见,第1轮即可收敛
- 代码行数从 ~80 行降至 ~40 行
- 消除"尝试更大范围"误导日志(BUG-04 一并解决)
3. BUG-02/03:单条/少量记录无法生成修复目标
3.1 问题根因
文件:continuity_checker.py L55–57、orchestrator.py L233–239
# continuity_checker.py L55-57
if len(records) < 2:
return False, [], completeness_pct # is_continuous 强制 False
# orchestrator.py _diagnose() — 两个分支都要求 is_continuous=True
if not is_fresh and is_continuous and len(records) >= required_count: # stale
stale_targets = ...
if is_continuous and not gap_times and len(records) < required_count: # shortfall
shortfall_targets = ...
len(records) == 1 时:is_continuous=False → 两个分支均跳过 → all_targets=[] → "无法确定修复目标,终止修复"
len(records) >= 2 但 < required_count 且 is_fresh=False 时:
shortfall分支满足(is_continuous=True AND len<required)→ 生成 shortfall_targets ✓stale分支不满足(len<required导致len>=required为 False)→ 不生成 stale_targets- 第一轮仅补数量,第二轮才修新鲜度,增加了不必要的循环轮次
3.2 修复方案
两个方向互补,同时实施:
方向 1:continuity_checker.py — 区分 0 条(无法判断)和 1 条(有基准,只是无法测量间隔)
# continuity_checker.py L49-57 — 修改前
if not records:
logger.warning("空记录列表,无法检查连续性")
return False, [], 0.0
completeness_pct = (len(records) / expected_count) * 100 if expected_count > 0 else 0.0
if len(records) < 2:
logger.debug(f"数据点不足({len(records)}条),无法判断连续性")
return False, [], completeness_pct
# continuity_checker.py L49-57 — 修改后
if not records:
logger.warning("空记录列表,无法检查连续性")
return False, [], 0.0
completeness_pct = (len(records) / expected_count) * 100 if expected_count > 0 else 0.0
if len(records) < 2:
# 1条:有数据基准但无法测量间隔
# 语义上视为"连续"(不存在矛盾),让 orchestrator 通过 shortfall 路径补足
logger.debug(
f"仅1条记录({completeness_pct:.1f}%),视为连续,由 shortfall 路径补足"
)
return True, [], completeness_pct
方向 2:orchestrator.py _diagnose() — 解耦生成条件,不依赖 is_continuous 作为前置门控
# orchestrator.py _diagnose() L230-239 — 修改后
# 2. 新鲜度检查
is_fresh, staleness_min = self._check_freshness(records)
# stale 修复:数据过时时更新到当前(无论数量是否充足)
stale_targets: List[datetime] = []
if not is_fresh:
stale_targets = self._generate_stale_targets(records)
# shortfall 修复:数量不足时向前补充(无需依赖连续性判断)
shortfall_targets: List[datetime] = []
if len(records) < required_count and not gap_times:
# gap_times 非空时优先修间隙,shortfall 等下轮再处理
shortfall_targets = self._generate_shortfall_targets(records, required_count)
3.3 边界场景分析
| 场景 | len | is_fresh | is_continuous | 修改前 | 修改后 |
|---|---|---|---|---|---|
| 0 条 | 0 | False | False | 走无数据分支,生成完整时间线 ✓ | 同左,不受影响 |
| 1 条,过时 | 1 | False | False→True | 两分支全跳 ❌ | stale+shortfall 均生成 ✓ |
| 2 条,过时不足 | 2 | False | True | shortfall ✓,stale ❌ | stale+shortfall 均生成 ✓ |
| 3 条,有间隙 | 3 | True | False | gap_targets 生成,shortfall 跳 ✓ | 同左,不受影响 |
| 3 条,健康 | 3 | True | True | is_healthy=True ✓ | 同左,不受影响 |
注意:stale_targets 和 shortfall_targets 可能在时间上重叠,_merge_repair_targets 内的 set 去重已覆盖此场景。
4. BUG-04:日志误导性
4.1 问题根因
文件:orchestrator.py L458
# 原代码(BUG-01 修复后此逻辑已删除,本节描述若保留原逻辑需修复的内容)
logger.debug(
f"数据不足: {len(rows)} 条({hours}h 窗口),尝试更大范围" # ← 最后一档时仍打印此文字
)
当 hours 已是 time_ranges_hours 最后一个元素(needed_hours × 2)时,循环结束后不会再尝试更大范围,但日志仍写"尝试更大范围"。
4.2 修复方案
BUG-01 修复后此逻辑整体删除,本问题自动消除。
若选择保留原加载逻辑(不推荐),则应修改为:
is_last_range = (hours == time_ranges_hours[-1])
suffix = "(已到最大范围)" if is_last_range else ",尝试更大范围"
logger.debug(f"数据不足: {len(rows)} 条({hours}h 窗口){suffix}")
5. BUG-05:重复写入(BUG-01 副作用)
5.1 根因
BUG-01 导致每轮 _load_zscore_history 返回相同条数 → _diagnose 生成相同 shortfall_targets → RepairExecutor.repair() 对同一时间点连续写入 3 次。
5.2 修复
BUG-01 修复后自动消除:第 1 轮补写后重载可见,修复收敛,后续轮不再生成相同目标。
RepairExecutor.repair() 本身不需要修改——数据库层的 batch_insert 应具备幂等性(UPSERT 或 ON CONFLICT 处理),作为防御性保障可独立验证。
6. BUG-06:test_basic.py 与 Diagnosis 定义失同步
6.1 问题根因
文件:test_basic.py L226–236
# 测试代码传入了 Diagnosis dataclass 不存在的字段
healthy = Diagnosis(
is_healthy=True,
completeness_pct=100.0,
is_continuous=True, # ← Diagnosis 无此字段,将触发 TypeError
record_count=144,
staleness_minutes=10.0, # ← Diagnosis 无此字段,将触发 TypeError
)
实际 Diagnosis(orchestrator.py L42–50)的字段为:
@dataclass
class Diagnosis:
is_healthy: bool
gap_targets: List[datetime] = field(default_factory=list)
stale_targets: List[datetime] = field(default_factory=list)
shortfall_targets: List[datetime] = field(default_factory=list)
completeness_pct: float = 0.0
record_count: int = 0
is_continuous 和 staleness_minutes 不存在,测试文件实际无法运行,掩盖了回归问题。
6.2 修复方案
修改文件:test_basic.py — test_diagnosis_dataclass() 函数(L218–271)
def test_diagnosis_dataclass():
"""测试 Diagnosis 数据类 — 修复与实际定义对齐"""
print("\n" + "=" * 60)
print("测试 5: Diagnosis 数据类")
print("=" * 60)
# 场景1: 健康状态(仅使用 Diagnosis 中存在的字段)
print("\n场景1: 健康状态")
healthy = Diagnosis(
is_healthy=True,
completeness_pct=100.0,
record_count=144,
# gap/stale/shortfall_targets 均使用默认值 []
)
assert healthy.is_healthy == True
assert healthy.gap_targets == []
assert healthy.stale_targets == []
assert healthy.shortfall_targets == []
assert healthy.record_count == 144
assert healthy.completeness_pct == 100.0
print(" PASS")
# 场景2: 有连续性缺口
print("\n场景2: 有连续性缺口")
base_time = datetime(2024, 1, 1, 0, 0)
gap_times = [base_time + timedelta(hours=i * 4) for i in range(3)]
unhealthy = Diagnosis(
is_healthy=False,
gap_targets=gap_times,
completeness_pct=85.0,
record_count=120,
)
assert unhealthy.is_healthy == False
assert len(unhealthy.gap_targets) == 3
assert unhealthy.stale_targets == []
assert unhealthy.shortfall_targets == []
print(" PASS")
# 场景3: 数量不足
print("\n场景3: 数量不足")
shortfall_times = [base_time - timedelta(hours=i * 4) for i in range(1, 3)]
insufficient = Diagnosis(
is_healthy=False,
shortfall_targets=shortfall_times,
completeness_pct=33.3,
record_count=1,
)
assert insufficient.is_healthy == False
assert insufficient.gap_targets == []
assert len(insufficient.shortfall_targets) == 2
print(" PASS")
# 场景4: 数据过时
print("\n场景4: 数据过时")
stale_times = [base_time + timedelta(hours=i * 4) for i in range(2)]
stale = Diagnosis(
is_healthy=False,
stale_targets=stale_times,
completeness_pct=100.0,
record_count=3,
)
assert stale.is_healthy == False
assert stale.gap_targets == []
assert len(stale.stale_targets) == 2
print(" PASS")
7. PERF-01:heal_and_prepare 内部缺少超时保护
7.1 问题根因
orchestrator.py 定义了 healing_timeout() 上下文管理器,但 heal_and_prepare() 方法体从未调用它。所有超时保护只在外层 _run_data_healing 的总超时中,单对配对若 DB 查询挂起则无限阻塞。
7.2 Python 多线程超时的确定性结论
研究结论(来源:Python 官方文档 + 实验验证):
signal.SIGALRM只能在主线程使用,在子线程调用会抛ValueError: signal only works in main threadconcurrent.futures.Future.result(timeout=N)可以让调用方超时,但无法中断正在执行的线程- Python 无法从外部强制终止线程(GIL 限制),唯一可靠的方式是协作式取消(Cooperative Cancellation)
- DB 层的
statement_timeout是最优雅的超时机制:PostgreSQL 主动终止查询并抛异常,Python 可捕获
7.3 修复方案:三层超时防御
层1(推荐首先实施):DB 层 statement_timeout
在执行 _load_zscore_history 的数据库查询前设置会话级超时:
# orchestrator.py _load_zscore_history() 修改
def _load_zscore_history(self, required_count: int) -> List[dict]:
try:
# DB 级超时:若单条 SQL 超过 10s,PostgreSQL 主动取消并抛异常
# 这是最可靠的超时机制,不依赖 Python 信号或线程中断
self.db_client.execute_query(
"SET LOCAL statement_timeout = '10000'", # 10秒,单位毫秒
params=()
)
query = """
SELECT DISTINCT ON (kline_time) ...
"""
rows = self.db_client.execute_query(query, params=(...))
...
except Exception as e:
# statement_timeout 超时时抛出 QueryCanceled 异常,
# psycopg2 将其包装为 QueryCanceledError(继承自 OperationalError)
if 'canceling statement due to statement timeout' in str(e):
raise TimeoutError(f"数据库查询超时(>10s)") from e
raise
层2:heal_and_prepare 使用现有 healing_timeout(主线程调用时)
# orchestrator.py heal_and_prepare() — 补充超时包装
def heal_and_prepare(self, required_count: int) -> HealingResult:
"""
主流程:自愈并准备数据
超时保护说明:
- 若在主线程调用(_run_data_healing 串行模式),healing_timeout 通过 SIGALRM 生效
- 若在子线程调用(并行模式),healing_timeout 自动降级为无-op(已在 contextmanager 中处理),
依靠 DB statement_timeout 保证不挂起
"""
from src.config import HEALING_TIMEOUT_SECONDS
per_pair_timeout = min(HEALING_TIMEOUT_SECONDS, 60) # 单对最多 60 秒
try:
with healing_timeout(per_pair_timeout):
# Phase 1: 加载
records = self._load_zscore_history(required_count)
...
except TimeoutError:
logger.warning(f"数据自愈超时 ({per_pair_timeout}s): {self.symbol}/{self.base_symbol}")
empty_quality = self.assessor.assess([], required_count, False, 0)
return HealingResult(
status='failed',
data=[],
quality=empty_quality,
iterations_used=0,
)
层3:并行模式下使用 threading.Event 协作式取消(PERF-05 实施后)
# 在 heal_and_prepare 签名中增加 cancel_event 参数
def heal_and_prepare(
self,
required_count: int,
cancel_event: Optional[threading.Event] = None
) -> HealingResult:
for iteration in range(1, self.max_iterations + 1):
# 每轮修复前检查取消信号
if cancel_event and cancel_event.is_set():
logger.info(f"收到取消信号,提前终止修复: {self.symbol}/{self.base_symbol}")
break
...
8. PERF-02:healing 在 __init__ 中同步阻塞服务启动
8.1 问题根因
realtime_kline_service_base.py L212:
# __init__ 调用链(阻塞序列)
self._load_pair_cache() # 1-2s(DB 查询)
self._run_data_healing() # 10-60s(全量自愈,阻塞在此)
self.market_subscriptions = self._build_market_subscriptions() # 等前者结束
服务必须等待自愈完成才能建立 WebSocket 连接,用户感知启动延迟 = 自愈时间。
8.2 权衡分析
| 方案 | 启动延迟 | 风险 | 适用场景 |
|---|---|---|---|
| 现状:同步阻塞 | 10-60s | 低 | 对启动速度不敏感 |
| 方案A:init 后台线程,start() 启动 | ~0s | 交易信号可能在自愈完成前到达,使用未恢复的旧数据 | 可接受,加保护 |
| 方案B:init 后台线程+完成事件 | ~0s | 首批信号等待 healing_done_event | 推荐 |
8.3 修复方案(方案 B)
修改文件:realtime_kline_service_base.py
# __init__ 中:改为创建后台线程但不立即启动
# 删除:self._run_data_healing()
# 新增:
self._healing_done_event = threading.Event()
self._healing_thread: Optional[threading.Thread] = None
# start() 方法中(在 WebSocket 连接建立之前):
def start(self):
# 先启动数据自愈(后台运行)
self._healing_thread = threading.Thread(
target=self._run_data_healing_with_signal,
daemon=True,
name="data-healing"
)
self._healing_thread.start()
# 启动 WebSocket(不等待自愈完成,自愈在后台进行)
self._connect_websocket()
...
def _run_data_healing_with_signal(self):
"""带完成信号的自愈包装"""
try:
self._run_data_healing()
finally:
self._healing_done_event.set()
self.logger.info("数据自愈后台线程完成")
def _is_healing_complete(self) -> bool:
"""供交易信号处理器检查自愈是否完成"""
return self._healing_done_event.is_set()
在交易信号处理前加保护(可选,根据业务需求决定):
def _process_trading_signal(self, signal):
# 如果自愈尚未完成,等待最多 30 秒
if not self._healing_done_event.wait(timeout=30):
self.logger.warning("数据自愈尚未完成,使用当前数据处理信号")
...
9. PERF-03:每轮修复迭代全量重载历史数据
9.1 问题根因
orchestrator.py L188:
for iteration in range(1, self.max_iterations + 1):
diagnosis = self._diagnose(records, required_count)
...
repaired_count = self.executor.repair(all_targets, ...)
# 无论修复了几条,都全量重载 required_count 条历史数据
records = self._load_zscore_history(required_count) # ← 每轮都是全量
修复通常只写入 1-3 条新记录,却要重载 3 条(required_count=3),对应 3 次 SQL 查询。这在 BUG-01 修复后虽然查询会成功,但仍是不必要的开销。
9.2 修复方案:只加载新增部分,与现有记录合并
# orchestrator.py heal_and_prepare() 修改
for iteration in range(1, self.max_iterations + 1):
logger.info(f"第 {iteration} 轮检查...")
diagnosis = self._diagnose(records, required_count)
if diagnosis.is_healthy:
break
all_targets = self._merge_repair_targets(diagnosis)
if not all_targets:
logger.warning("无法确定修复目标,终止修复")
break
repaired_count = self.executor.repair(all_targets, self.symbol, self.base_symbol, self.repair_timeframe)
if repaired_count == 0:
logger.warning(f"第{iteration}轮修复无进展,可能在冷却期或数据源不可用")
break
logger.info(f"第{iteration}轮修复完成: 修复 {repaired_count} 条记录")
# 优化:增量加载(只加载本轮 repair_targets 对应的新记录)
# 而非全量重载所有记录
new_records = self._load_new_records(all_targets)
if new_records:
# 合并并去重(以 kline_time 为 key)
existing_times = {r['kline_time'] for r in records}
added = [r for r in new_records if r['kline_time'] not in existing_times]
records = sorted(records + added, key=lambda r: r['kline_time'])
# 只保留最近 required_count 条
if len(records) > required_count:
records = records[-required_count:]
logger.debug(f"增量更新: +{len(added)} 条,总计 {len(records)} 条")
else:
# 增量加载失败则回退全量
records = self._load_zscore_history(required_count)
新增辅助方法:
def _load_new_records(self, targets: List[datetime]) -> List[dict]:
"""
加载指定时间点的 zscore 记录(仅加载刚修复的目标)
用于替代全量重载,减少 DB 查询开销。
"""
if not targets:
return []
# 使用 IN 参数批量查询(最多查 required_count 条)
# DISTINCT ON 保证每个 kline_time 只返回最新的一条
placeholders = ','.join(['%s'] * len(targets))
query = f"""
SELECT DISTINCT ON (kline_time)
kline_time,
zscore_4h,
analysis_time
FROM analysis_results
WHERE symbol = %s
AND base_symbol = %s
AND zscore_4h IS NOT NULL
AND kline_time IN ({placeholders})
ORDER BY kline_time DESC, analysis_time DESC
"""
try:
params = (self.symbol, self.base_symbol, *targets)
rows = self.db_client.execute_query(query, params=params)
return rows or []
except Exception as e:
logger.warning(f"增量加载失败,将回退全量: {e}")
return []
10. PERF-04:kline 查询 limit=10000 硬编码过大
10.1 问题根因
repair_executor.py L123、L131、L243、L248:
alt_klines = self.kline_repo.query_range(
symbol=symbol, timeframe=timeframe,
start_time=start_time, end_time=end_time,
limit=10000 # ← 硬编码,实际需要约 TOTAL_WINDOW=130 条
)
TOTAL_WINDOW = BETA_WINDOW + ZSCORE_WINDOW = 100 + 30 = 130。
请求 10000 条但只使用 ~130 条,浪费 98.7% 的 I/O。
10.2 修复方案
# repair_executor.py _repair_from_klines() 修改
def _repair_from_klines(self, missing_times, symbol, base_symbol, timeframe='5m'):
...
interval_min = timeframe_to_minutes(timeframe)
lookback_minutes = TOTAL_WINDOW * interval_min
start_time = min(missing_times) - timedelta(minutes=lookback_minutes)
end_time = max(missing_times) + timedelta(minutes=interval_min)
# 精确计算所需 K 线数量,加 20% 余量应对可能的空洞
# 实际最多需要: TOTAL_WINDOW + len(missing_times)
# 加倍作为安全缓冲
required_klines = (TOTAL_WINDOW + len(missing_times)) * 2
try:
alt_klines = self.kline_repo.query_range(
symbol=symbol,
timeframe=timeframe,
start_time=start_time,
end_time=end_time,
limit=required_klines # ← 从 10000 降至 ~270,减少 I/O 98%
)
base_klines = self.kline_repo.query_range(
symbol=base_symbol,
timeframe=timeframe,
start_time=start_time,
end_time=end_time,
limit=required_klines
)
...
同步修改 _find_kline_gaps() 中的两处 limit=10000(L243、L248),使用同样的动态计算值。
11. PERF-05:_run_data_healing 串行无预过滤
11.1 问题根因
realtime_kline_service_base.py L1806–1840:
with healing_timeout(HEALING_TIMEOUT_SECONDS):
for symbol, base_symbol in heal_pairs: # ← 串行循环
healer = DataHealingOrchestrator(...)
result = healer.heal_and_prepare(...) # ← 每对都走全流程
100 对配对时,即使 90 对数据完全健康,也要逐一走 Phase 1(SQL 查询)+ Phase 2(诊断)后才 break。
11.2 分步实施计划
阶段 1(低风险,建议先行):批量预过滤,跳过健康配对
def _run_data_healing(self):
...
# 阶段1增强:批量预检,一次 SQL 获取所有配对的健康状态
healthy_pairs, sick_pairs = self._batch_health_check(heal_pairs, repair_count)
self.logger.info(
f"预检完成: 健康={len(healthy_pairs)} 跳过 | 需修复={len(sick_pairs)} 待处理"
)
if not sick_pairs:
self.logger.info("所有配对数据健康,无需自愈")
return
# 仅对不健康的配对运行完整自愈流程
for symbol, base_symbol in sick_pairs:
...
def _batch_health_check(
self,
pairs: List[tuple],
required_count: int
) -> tuple[List[tuple], List[tuple]]:
"""
一次 SQL 批量检查所有配对的健康状态。
健康判定:最近记录数 >= required_count 且 最新记录时间 在新鲜度阈值内。
Returns:
(healthy_pairs, sick_pairs)
"""
if not pairs:
return [], []
# 构建 IN 参数:(symbol, base_symbol) 对列表
# 使用 VALUES 子句与主表 JOIN,支持批量查询
pair_values = ','.join(
f"('{sym}', '{base}')" for sym, base in pairs
)
# 新鲜度阈值(与 orchestrator 中保持一致)
from .config import FRESHNESS_MULTIPLIER
from src.utils.data_healing.config import timeframe_to_minutes
interval_minutes = timeframe_to_minutes('4h')
freshness_threshold_minutes = interval_minutes * FRESHNESS_MULTIPLIER # 600min
query = f"""
WITH pair_list(symbol, base_symbol) AS (
VALUES {pair_values}
),
pair_stats AS (
SELECT
a.symbol,
a.base_symbol,
COUNT(DISTINCT a.kline_time) AS kline_count,
MAX(a.kline_time) AS latest_kline_time
FROM analysis_results a
INNER JOIN pair_list p
ON a.symbol = p.symbol AND a.base_symbol = p.base_symbol
WHERE a.zscore_4h IS NOT NULL
AND a.kline_time >= NOW() - INTERVAL '7 days'
GROUP BY a.symbol, a.base_symbol
)
SELECT
symbol,
base_symbol,
kline_count,
latest_kline_time,
EXTRACT(EPOCH FROM (NOW() - latest_kline_time)) / 60 AS staleness_minutes,
CASE
WHEN kline_count >= %s
AND EXTRACT(EPOCH FROM (NOW() - latest_kline_time)) / 60 <= %s
THEN TRUE
ELSE FALSE
END AS is_healthy
FROM pair_stats
"""
try:
rows = self.db_client.execute_query(
query,
params=(required_count, freshness_threshold_minutes)
)
except Exception as e:
self.logger.warning(f"批量预检失败,所有配对将进入完整自愈: {e}")
return [], list(pairs)
healthy_set = set()
for row in (rows or []):
if row['is_healthy']:
healthy_set.add((row['symbol'], row['base_symbol']))
healthy = [(s, b) for s, b in pairs if (s, b) in healthy_set]
sick = [(s, b) for s, b in pairs if (s, b) not in healthy_set]
return healthy, sick
阶段 2(风险较高,需充分测试):对不健康配对并行处理
Python threading 超时的确定性结论:
threading.Timer无法中断正在运行的线程,只能作为标志位通知机制concurrent.futures.Future.result(timeout=N)从调用方超时,但线程继续运行- 最可靠的单线程超时:DB
statement_timeout(见 PERF-01)- 并行场景推荐:
ThreadPoolExecutor+threading.Event协作式取消 + DBstatement_timeout
# realtime_kline_service_base.py _run_data_healing() 阶段2扩展
import concurrent.futures
# 配置:从 src/config.py 读取
HEALING_WORKERS = getattr(config, 'HEALING_WORKERS', 5) # 并发数
PER_PAIR_TIMEOUT = getattr(config, 'HEALING_PER_PAIR_TIMEOUT', 60) # 单对超时秒数
def _run_data_healing(self):
...
healthy_pairs, sick_pairs = self._batch_health_check(heal_pairs, repair_count)
if not sick_pairs:
return
# 全局取消事件:若总超时触发,通知所有子线程退出
global_cancel = threading.Event()
def heal_one(pair: tuple) -> dict:
"""单对配对的自愈任务,运行在子线程"""
symbol, base_symbol = pair
result_info = {'pair': pair, 'status': 'unknown', 'error': None}
try:
healer = DataHealingOrchestrator(
db_client=self.db_client,
kline_repo=self.kline_repo,
symbol=symbol,
base_symbol=base_symbol,
repair_timeframe='4h',
)
if shared_executor:
healer.executor = shared_executor
# 传入取消事件(heal_and_prepare 内部轮询检查,见 PERF-01 层3)
result = healer.heal_and_prepare(
required_count=repair_count,
cancel_event=global_cancel
)
result_info['status'] = result.status
except (TimeoutError, KeyboardInterrupt):
result_info['status'] = 'timeout'
global_cancel.set() # 单对超时时通知其他对也取消
except Exception as e:
result_info['status'] = 'error'
result_info['error'] = str(e)
self.logger.warning(f"自愈异常: {symbol} | {e}")
return result_info
shared_executor = None
healed = failed = 0
# 全局超时:总体不超过 HEALING_TIMEOUT_SECONDS
deadline = time.time() + HEALING_TIMEOUT_SECONDS
with concurrent.futures.ThreadPoolExecutor(
max_workers=HEALING_WORKERS,
thread_name_prefix="data-healing"
) as pool:
future_to_pair = {
pool.submit(heal_one, pair): pair
for pair in sick_pairs
}
for future in concurrent.futures.as_completed(
future_to_pair,
timeout=max(0, deadline - time.time())
):
try:
info = future.result(timeout=PER_PAIR_TIMEOUT)
if info['status'] not in ('error', 'timeout', 'failed'):
healed += 1
else:
failed += 1
except concurrent.futures.TimeoutError:
failed += 1
pair = future_to_pair[future]
self.logger.warning(f"单对超时 ({PER_PAIR_TIMEOUT}s): {pair}")
except Exception as e:
failed += 1
self.logger.info(f"数据自愈完成 | 成功={healed} 失败={failed} | 跳过健康={len(healthy_pairs)}")
并发安全注意事项:
KlineDataFiller.fill_cooldown字典需加threading.Lock(多线程并发访问)AnalysisResultRepository.batch_insert需确认线程安全(通常 psycopg2 连接是非线程安全的,需每个线程独立连接)- 建议并发数
HEALING_WORKERS=5,避免 DB 连接耗尽(连接池限制)
新增配置(src/config.py):
HEALING_WORKERS: int = 5 # 并行修复线程数
HEALING_PER_PAIR_TIMEOUT: int = 60 # 单对配对最大修复时间(秒)
12. CODE-01:zscore_4h 字段赋值逻辑不一致
12.1 问题根因
repair_executor.py L183–188:
'zscore_5m': zscore if timeframe == '5m' else None, # ← 有条件
'zscore_1h': zscore if timeframe == '1h' else None, # ← 有条件
'zscore_4h': zscore, # ← 无条件赋值!不一致
'corr_5m_7d': corr if timeframe == '5m' else None,
'corr_1h_30d': corr if timeframe == '1h' else None,
'corr_4h_60d': corr if timeframe == '4h' else None, # ← 有条件
zscore_4h 无论 timeframe 是什么都会被赋值,如用 5m timeframe 修复时,zscore_4h 也会写入,语义错误。
12.2 修复方案
# repair_executor.py — 引入映射表,统一处理
# 文件顶部常量
ZSCORE_FIELD_MAP = {
'5m': 'zscore_5m',
'1h': 'zscore_1h',
'4h': 'zscore_4h',
}
CORR_FIELD_MAP = {
'5m': 'corr_5m_7d',
'1h': 'corr_1h_30d',
'4h': 'corr_4h_60d',
}
# _repair_from_klines() 中替换硬编码
zscore_field = ZSCORE_FIELD_MAP.get(timeframe)
corr_field = CORR_FIELD_MAP.get(timeframe)
if zscore_field is None:
logger.warning(f"未知 timeframe: {timeframe},跳过 zscore 写入")
fail_count += 1
continue
records.append({
'analysis_time': missing_time,
'kline_time': missing_time,
'symbol': symbol,
'base_symbol': base_symbol,
'zscore_5m': zscore if timeframe == '5m' else None,
'zscore_1h': zscore if timeframe == '1h' else None,
'zscore_4h': zscore if timeframe == '4h' else None, # ← 修正为有条件
'corr_5m_7d': corr if timeframe == '5m' else None,
'corr_1h_30d': corr if timeframe == '1h' else None,
'corr_4h_60d': corr if timeframe == '4h' else None,
'is_anomaly': False,
'trading_direction': 'none',
'signal_strength': 'weak',
'analysis_delay_seconds': 0.0,
'cointegration_passed': True,
})
13. CODE-02:_merge_repair_targets 注释优先级与实现不符
13.1 问题根因
orchestrator.py L271–281:
def _merge_repair_targets(self, diagnosis: Diagnosis) -> List[datetime]:
"""
合并三类修复目标,去重排序。
优先级:gap_targets > stale_targets > shortfall_targets # ← 注释声称有优先级
"""
all_targets = set()
all_targets.update(diagnosis.gap_targets) # ← 实际是无序 set 合并
all_targets.update(diagnosis.stale_targets)
all_targets.update(diagnosis.shortfall_targets)
return sorted(all_targets)
优先级信息丢失,调试时无法判断"修的是哪类问题"。
13.2 修复方案
选项 A(简单):删除误导性优先级注释,如实描述行为:
def _merge_repair_targets(self, diagnosis: Diagnosis) -> List[datetime]:
"""
合并三类修复目标,按时间排序(去重)。
三类目标均被修复,无优先级区分。
如需区分修复来源,查看 diagnosis.gap/stale/shortfall_targets 各字段。
"""
all_targets = set()
all_targets.update(diagnosis.gap_targets)
all_targets.update(diagnosis.stale_targets)
all_targets.update(diagnosis.shortfall_targets)
return sorted(all_targets)
选项 B(完整):记录来源类型,供日志输出:
def _merge_repair_targets(self, diagnosis: Diagnosis) -> List[datetime]:
"""合并三类修复目标,按时间排序(去重),记录来源供日志"""
target_sources = {} # kline_time → source_type
for t in diagnosis.gap_targets:
target_sources[t] = 'gap'
for t in diagnosis.stale_targets:
target_sources.setdefault(t, 'stale') # gap 优先
for t in diagnosis.shortfall_targets:
target_sources.setdefault(t, 'shortfall') # gap/stale 优先
if target_sources:
from collections import Counter
counts = Counter(target_sources.values())
logger.info(
f"修复目标汇总: gap={counts.get('gap',0)} | "
f"stale={counts.get('stale',0)} | "
f"shortfall={counts.get('shortfall',0)}"
)
return sorted(target_sources.keys())
14. CODE-03:_determine_grade 扩展性差
14.1 问题根因
quality_assessor.py L98–117:5 层 if-elif,新增等级时需修改代码而非配置。
14.2 修复方案
# quality_assessor.py _determine_grade() — 修改后
def _determine_grade(self, completeness_pct: float) -> str:
"""
根据完整度百分比确定质量等级。
等级阈值来自 QUALITY_GRADES 配置(config.py),
新增等级只需修改配置,无需改动此方法。
"""
# 按 min_pct 降序排列等级(最高阈值优先匹配)
sorted_grades = sorted(
self.grades.items(),
key=lambda item: item[1]['min_pct'],
reverse=True
)
for grade_name, grade_config in sorted_grades:
if completeness_pct >= grade_config['min_pct']:
return grade_name
# 理论上不会到达这里(F 级 min_pct=0.0 兜底),防御性返回
return 'F'
15. DB-01:缺少 kline_time 维度的覆盖索引
15.1 问题根因
文件:database/init_timescaledb.sql L186–191
现有索引 idx_analysis_results_pair_time 定义在 (symbol, base_symbol, analysis_time DESC),缺少 kline_time 列。而 _load_zscore_history 的新查询(BUG-01 修复后)需要按 kline_time 的 DISTINCT ON + ORDER BY,现有索引无法完全覆盖此查询,可能触发 Sort + Hash Distinct 操作。
PostgreSQL 研究结论(已验证):
DISTINCT ON (kline_time) ... ORDER BY kline_time DESC, analysis_time DESC要求:
- ORDER BY 必须以 DISTINCT ON 列(
kline_time)开头- 最优索引:
(symbol, base_symbol, kline_time DESC, analysis_time DESC)加WHERE zscore_4h IS NOT NULL- PostgreSQL 18+ 会自动使用 Skip Scan 优化,性能可提升 26~8500 倍
15.2 修复方案
在 database/init_timescaledb.sql 中新增索引:
-- 索引13: analysis_results kline_time 维度索引(自愈模块专用)
-- 用途: 优化 _load_zscore_history() 的 DISTINCT ON (kline_time) 查询
-- 查询模式: WHERE symbol=X AND base_symbol=Y AND zscore_4h IS NOT NULL
-- ORDER BY kline_time DESC, analysis_time DESC LIMIT N
CREATE INDEX IF NOT EXISTS idx_analysis_results_healing_query
ON analysis_results (symbol, base_symbol, kline_time DESC, analysis_time DESC)
WHERE zscore_4h IS NOT NULL;
\echo '✅ 索引 idx_analysis_results_healing_query 已创建(自愈模块专用)'
现有索引评估:
| 索引名 | 列 | 对 _load_zscore_history 的作用 |
|---|---|---|
idx_analysis_results_pair_time |
(symbol, base_symbol, analysis_time DESC) |
仅过滤 symbol/base_symbol,无法利用 kline_time ORDER BY,性能差 |
idx_analysis_results_healing_query(新增) |
(symbol, base_symbol, kline_time DESC, analysis_time DESC) WHERE zscore_4h NOT NULL |
完全覆盖查询,Index Only Scan,性能最优 |
在线添加索引(不影响写入):
-- 对已有数据库:使用 CONCURRENTLY 不阻塞写入
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_analysis_results_healing_query
ON analysis_results (symbol, base_symbol, kline_time DESC, analysis_time DESC)
WHERE zscore_4h IS NOT NULL;
16. 实施路线图
第一批(P0,1-2天,修 BUG 不引入风险)
优先级顺序:
1. DB-01 → 新增索引(无风险,CONCURRENTLY 在线加)
2. BUG-01 → 修改 _load_zscore_history SQL(简单,低风险)
3. BUG-02/03 → 修改 continuity_checker + _diagnose(中风险,有测试覆盖)
4. BUG-06 → 修复 test_basic.py(无风险,纯测试改动)
5. PERF-01 → 在 heal_and_prepare 内加 healing_timeout + DB statement_timeout(低风险)
6. CODE-01 → 修复 zscore_4h 赋值逻辑(低风险)
验证方式:运行 test_basic.py,观察 1 条数据的配对能否在 1 轮内完成修复。
第二批(P1,3-5天,性能优化)
7. PERF-04 → kline query limit=10000 → 动态计算(无风险)
8. PERF-03 → 增量加载历史数据(中风险,需单元测试)
9. CODE-02 → _merge_repair_targets 注释修正(无风险)
10. CODE-03 → _determine_grade 重构(无风险)
第三批(P2,1-2周,架构优化,需充分测试)
11. PERF-05 阶段1 → 批量预过滤(中风险,需集成测试)
12. PERF-02 → healing 移至后台线程(高风险,需完整测试启动流程)
13. PERF-05 阶段2 → 并行化(高风险,需并发安全审查)
修复后预期效果
| 场景 | 修改前 | 修改后(P0完成) | 修改后(P2完成) |
|---|---|---|---|
| 1条历史记录的配对 | 3轮无效,最终 degraded | 1轮修复成功 | 同左,更快 |
| 100对全健康(正常重启) | ~100s(阻塞启动) | ~15s(串行诊断) | <2s(预过滤+后台) |
| 80对健康+20对需修 | ~260s | ~40s | ~15s(并行修复) |
| 单条 SQL 挂起 | 无限阻塞 | 10s超时 | 同左 |
本文档描述的所有修改均基于实际代码核查,SQL 语义经 PostgreSQL 官方文档确认,threading 超时方案经 Python 官方文档验证。