数据自愈模块提速3
数据自愈并发优化设计方案
一、现状分析
当前流程
_run_data_healing(realtime_kline_service_base.py:1807):
for (symbol, base_symbol) in heal_pairs: # ~50+ 配对,串行
DataHealingOrchestrator(...) # → RepairExecutor.__init__
│ # → KlineDataFiller.__init__
│ # → Info(MAINNET_API_URL) HTTP 握手 ~0.85s
│ # ← 第 2-50 个握手完成后才被 shared_executor 覆盖,全部浪费!
_load_zscore_history() # DB 查询 ×1(0.08-0.56s)
_get_db_now() # SELECT NOW() ×1
_diagnose() # 内存计算(~0ms)
check_continuity() # ← 第一次连续性检查
_final_assessment() # ← 第二次连续性检查(冗余)
QualityAssessor.assess() # 质量评估
根因(精确代码路径):
# realtime_kline_service_base.py(精确执行顺序)
healer = DataHealingOrchestrator(...) # ← HTTP 握手在此发生
# DataHealingOrchestrator.__init__:
# self.executor = RepairExecutor(db_client, kline_repo, exchange_id)
# RepairExecutor.__init__:
# self.kline_filler = KlineDataFiller(exchange_id, kline_repo)
# KlineDataFiller.__init__:
# self._info = self._init_info()
# → Info(MAINNET_API_URL, skip_ws=True, timeout=30) # ~0.85s 握手完成
# 握手已完成,才执行 shared_executor 赋值
if shared_executor is None:
shared_executor = healer.executor
else:
healer.executor = shared_executor # 第 2-50 个握手已白白完成
瓶颈定量(日志实测,50 对)
每对的耗时结构(以 IO/BTC 对为例):
KlineDataFiller 初始化完成 ← Info(MAINNET_API_URL) HTTP 握手 ~0.85s ← 49 次是废弃的!
自愈总耗时: 0.14s ← DB 查询 + 诊断 + 评估
─────────────────────────────────────────────────────────────────────
每对实际耗时: ~1.0s
| 操作 | 单次耗时 | 次数 | 总计 | 占比 |
|---|---|---|---|---|
KlineDataFiller init(HTTP 握手,49次废弃) |
~0.85s | 50 | ~43s | 78% |
_load_zscore_history DB 查询 |
~0.15s avg | ~50 | ~7.5s | 14% |
| ONDO/BTC DB 慢查询(特例,见附加优化 4d) | 4.44s | 1 | 4.44s | 8% |
_get_db_now SELECT NOW() |
~0.01s | ~50 | ~0.5s | ~1% |
_final_assessment 冗余连续性检查 |
~0.001s | ~50 | ~0.05s | ~0% |
| 总计 | ~55s |
基础设施
- 连接池:psycopg 3.x
ConnectionPool,min_size=2, max_size=10,支持并发 DB 查询 - 交易所 API:
KlineDataFiller内置 10 分钟冷却机制(threading.Lock()保护) - 超时保护:全局
healing_timeout(300s) - 覆盖索引:
idx_analysis_results_pair_kline_time ON analysis_results (symbol, base_symbol, kline_time DESC, analysis_time DESC) WHERE zscore_4h IS NOT NULL(与批量 SQL 完全对齐)
二、优化方案 A:批量 DB 查询
目标
50+ 次 DB 往返 → 1 次(db_now 内嵌在同一查询中)
核心 SQL
将原来的单配对查询:
-- 原始:每个配对执行一次(共 ~100 次往返,含 SELECT NOW())
SELECT kline_time, zscore_4h, analysis_time
FROM (
SELECT DISTINCT ON (kline_time)
kline_time, zscore_4h, analysis_time
FROM analysis_results
WHERE symbol = %s AND base_symbol = %s
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - INTERVAL 'X days'
ORDER BY kline_time DESC, analysis_time DESC
LIMIT %s
) sub
ORDER BY kline_time ASC
改为批量查询(db_now 内嵌,真正 1 次往返):
-- 优化:所有配对一次查询,同时返回 db_now
-- boundary_days 由调用方计算:max(required_count × interval_minutes / 1440) + LOAD_SAFETY_MARGIN_DAYS
-- 所有配对同一 timeframe(4h),required_count 一致,boundary_days 统一取同一值
SELECT
NOW() AS db_now, -- db_now 内嵌,消除独立 SELECT NOW()
symbol, base_symbol,
kline_time, zscore_4h, analysis_time
FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY symbol, base_symbol
ORDER BY kline_time DESC
) AS rn
FROM (
SELECT DISTINCT ON (symbol, base_symbol, kline_time)
symbol, base_symbol, kline_time, zscore_4h, analysis_time
FROM analysis_results
WHERE (symbol, base_symbol) IN (VALUES (%s,%s), (%s,%s), ...)
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - INTERVAL '{boundary_days} days'
ORDER BY symbol, base_symbol, kline_time DESC, analysis_time DESC
) deduped
) ranked
WHERE rn <= %s
ORDER BY symbol, base_symbol, kline_time ASC
注意:
boundary_days为 Python 计算的整型安全字面量(math.ceil(required_count * interval_minutes / 1440) + LOAD_SAFETY_MARGIN_DAYS),不涉及用户输入,字符串插值安全。所有配对统一使用同一required_count(均为 4h timeframe),boundary_days取单一值即可,无需逐对计算。
SQL 逻辑说明
- 内层
DISTINCT ON:每个(symbol, base_symbol, kline_time)只保留最新analysis_time的记录,ORDER BY列与覆盖索引idx_analysis_results_pair_kline_time完全对齐,走 Index Scan ROW_NUMBER() OVER (PARTITION BY ... ORDER BY kline_time DESC):给每个配对的记录按时间倒序编号WHERE rn <= required_count:每个配对只取最近 N 条NOW() AS db_now:与数据在同一事务快照内,消除时间漂移,同时替代 50 次独立SELECT NOW()- 外层
ORDER BY:按时间正序返回
Statement Timeout 适配
批量查询取代了 50 次独立查询,需适当放宽超时(原单条 SQL 使用 DB_STATEMENT_TIMEOUT_MS,默认 10s):
# 批量查询使用更宽松的超时
BATCH_STATEMENT_TIMEOUT_MS = max(DB_STATEMENT_TIMEOUT_MS, 15_000)
cur.execute(f"SET LOCAL statement_timeout = '{BATCH_STATEMENT_TIMEOUT_MS}'")
理由:批量查询将所有配对的数据在同一次执行中返回。statement_timeout 作用于整个语句,不会因 PARTITION 独立分割。ONDO/BTC 等慢配对(4.44s)会拖慢整体执行时间,但在 15s 超时内仍可完成;即便超时也会触发降级回退,不影响其他配对的后续加载。
失败回退机制
批量查询一旦失败,50 个配对的数据全部丢失,需降级到逐配对加载:
from datetime import timezone
def _batch_load_with_fallback(
heal_pairs: list[tuple[str, str]],
required_count: int,
db_client: TimescaleDBClient,
) -> tuple[dict[tuple[str, str], list[dict]], datetime]:
"""批量加载,失败时降级为逐配对加载。"""
try:
grouped, db_now = batch_load_zscore_history(
heal_pairs, required_count, db_client
)
return grouped, db_now
except Exception as e:
logger.warning(
f"批量查询失败,降级为逐配对加载: {e}",
exc_info=True,
)
return _fallback_individual_load(heal_pairs, required_count, db_client)
def _fallback_individual_load(
heal_pairs: list[tuple[str, str]],
required_count: int,
db_client: TimescaleDBClient,
) -> tuple[dict[tuple[str, str], list[dict]], datetime]:
grouped: dict[tuple[str, str], list[dict]] = {}
# ✅ 使用 timezone-aware datetime,与 DB 返回的 NOW() 类型一致,避免减法时 TypeError
db_now = datetime.now(timezone.utc)
for symbol, base_symbol in heal_pairs:
try:
records = _load_zscore_history_single(
symbol, base_symbol, required_count, db_client
)
grouped[(symbol, base_symbol)] = records
except Exception as e:
logger.warning(f"单配对加载失败 [{symbol}/{base_symbol}]: {e}")
grouped[(symbol, base_symbol)] = []
return grouped, db_now
修复说明:原方案使用
datetime.utcnow()(naive datetime),而 DB 的SELECT NOW()返回 timezone-aware datetime。_check_freshness中db_now - latest_time会因类型不一致抛出TypeError。统一改为datetime.now(timezone.utc)。
内存分组
from collections import defaultdict
from datetime import timezone
grouped: dict[tuple[str, str], list[dict]] = defaultdict(list)
db_now: datetime | None = None
for row in rows:
if db_now is None:
db_now = row['db_now'] # 取第一行的 db_now(timezone-aware)
grouped[(row['symbol'], row['base_symbol'])].append({
'kline_time': row['kline_time'],
'zscore_4h': row['zscore_4h'],
'analysis_time': row['analysis_time'],
})
db_now = db_now or datetime.now(timezone.utc) # 空结果时降级(timezone-aware)
代码改动
orchestrator.py— 新增batch_load_zscore_history()静态方法(含 db_now 返回)realtime_kline_service_base.py—_run_data_healing()调用_batch_load_with_fallback()替代循环内单独加载
预期收益
DB 往返从 ~100 次(加载 50 + NOW 50)降到 1 次,数据加载阶段 ~8s → ~0.3s
三、优化方案 B:并发修复
前提
批量查询后,数据已在内存中。诊断是纯 CPU 操作(~0ms/对),不需要并发。只有不健康的配对需要调用 RepairExecutor.repair()(涉及 DB 写入 + 交易所 API),才需要并发。
线程安全验证
RepairExecutor 的所有内部组件均支持多线程共享:
| 组件 | 线程安全方式 |
|---|---|
db_client(ConnectionPool) |
psycopg 3.x 连接池原生线程安全 |
kline_repo / analysis_repo |
每次操作从连接池独立获取连接 |
kline_filler(KlineDataFiller) |
冷却字典由 threading.Lock() 保护 |
repair() 方法本身 |
无跨调用共享的可变状态 |
关键修复:DataHealingOrchestrator 支持外部注入 executor
问题根因:在旧版并发修复设计中,_heal_one 内部创建 DataHealingOrchestrator(...) 时,__init__ 会立即触发 RepairExecutor.__init__ → KlineDataFiller.__init__ → Info(MAINNET_API_URL) HTTP 握手。虽然随后将 orchestrator.executor 替换为 shared_executor,但握手已经发生——与 _run_data_healing 中的原始 Bug 完全对称。
5 个并发 worker 同时执行时,会产生 5 次额外握手(并行 ~0.85s)加上 shared_executor 自身的 1 次,实际握手时间 ~1.7s。
修复方案:在 DataHealingOrchestrator.__init__ 中新增可选参数 executor,若传入则直接使用,跳过内部 RepairExecutor 创建:
# orchestrator.py — DataHealingOrchestrator.__init__ 修改
def __init__(
self,
db_client: TimescaleDBClient,
kline_repo: KlineRepository,
symbol: str = 'PURR/USDC:USDC',
base_symbol: str = 'HYPE/USDC:USDC',
repair_timeframe: str = '4h',
exchange_id: str = 'hyperliquid',
executor: Optional['RepairExecutor'] = None, # ← 新增:支持外部注入
):
...
# 若传入 executor 则直接复用,跳过 RepairExecutor.__init__ → KlineDataFiller.__init__ → HTTP 握手
self.executor = executor if executor is not None else RepairExecutor(
db_client, kline_repo, exchange_id
)
...
设计:共享单个 RepairExecutor
并发修复共享 1 个 RepairExecutor 实例,仅 1 次 HTTP 握手:
def _run_concurrent_repair(
unhealthy_pairs: list[tuple],
required_count: int, # ← 原始 required_count,非 len(records)
db_client: TimescaleDBClient,
kline_repo: KlineRepository,
) -> dict[tuple[str, str], HealingResult]:
"""并发修复不健康配对,共享单个 RepairExecutor(线程安全)。"""
if not unhealthy_pairs:
return {}
# 单次 HTTP 握手,所有并发线程共享
shared_executor = RepairExecutor(db_client, kline_repo)
results: dict[tuple[str, str], HealingResult] = {}
def _heal_one(pair_data: tuple) -> tuple[tuple[str, str], HealingResult]:
symbol, base_symbol, records, diagnosis = pair_data
# ✅ 传入 executor=shared_executor,__init__ 跳过 RepairExecutor 创建,无额外 HTTP 握手
orchestrator = DataHealingOrchestrator(
db_client=db_client,
kline_repo=kline_repo,
symbol=symbol,
base_symbol=base_symbol,
executor=shared_executor,
)
result = orchestrator.heal_and_prepare(
required_count=required_count, # ✅ 传入原始 required_count,不能用 len(records)
preloaded_diagnosis=diagnosis,
)
return (symbol, base_symbol), result
max_workers = min(5, len(unhealthy_pairs))
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(_heal_one, pair): pair for pair in unhealthy_pairs}
for future in as_completed(futures):
try:
key, result = future.result()
results[key] = result
except Exception as e:
pair = futures[future]
logger.error(
f"并发修复异常 [{pair[0]}/{pair[1]}]: {e}",
exc_info=True,
)
# 构造失败结果,all required fields 需显式填充
empty_quality = QualityAssessor().assess([], required_count, False, 0)
results[(pair[0], pair[1])] = HealingResult(
status='failed',
data=[],
quality=empty_quality,
iterations_used=0,
)
return results
required_count说明:传入len(records)是错误的,因为 unhealthy pair 正是因为数据不足才进入修复流程,len(records) < required_count。以较小值作为目标会导致修复提前结束,实际数据量仍不足。必须传入原始required_count(repair_count)。
并发写入保护:batch_insert 需加冲突处理
并发修复时,多个 worker 可能对相同 (analysis_time, symbol, base_symbol) 时间点计算 zscore 并同时写入,触发主键冲突异常。AnalysisResultRepository.batch_insert 当前使用裸 INSERT,无冲突处理。
修复方案:在 timescaledb.py 的 batch_insert 中追加冲突忽略:
# timescaledb.py — AnalysisResultRepository.batch_insert
cur.executemany(
"""
INSERT INTO analysis_results (
analysis_time, symbol, base_symbol,
kline_time, analysis_delay_seconds,
corr_5m_7d, corr_1h_30d, corr_4h_60d,
zscore_5m, zscore_1h, zscore_4h,
cointegration_passed, adf_pvalue,
is_anomaly, trading_direction, signal_strength
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
ON CONFLICT (analysis_time, symbol, base_symbol) DO NOTHING; -- ✅ 并发写入安全
""",
values
)
此改动对现有串行逻辑无影响(无冲突时行为一致),同时保护并发修复场景。
并发度选择
max_workers=5(保守值):
RepairExecutor.repair() 内部
└─ _fill_kline_gaps_parallel()
└─ ThreadPoolExecutor(max_workers=2) # 双 symbol 并行补 K 线
并发占用连接数估算:
5 workers × 2 内部线程 = 10 并发 DB 操作
10 ≤ ConnectionPool.max_size=10 ✅
_run_data_healing 在服务 __init__ 中调用,
其他分析工作线程尚未启动,DB 连接无竞争。
代码改动
realtime_kline_service_base.py—_run_data_healing()修复阶段改为_run_concurrent_repair()orchestrator.py—DataHealingOrchestrator.__init__新增可选参数executor;heal_and_prepare()新增可选参数preloaded_diagnosis,支持跳过首轮诊断timescaledb.py—AnalysisResultRepository.batch_insert追加ON CONFLICT (analysis_time, symbol, base_symbol) DO NOTHING
四、附加优化
4a. db_now 内嵌批量查询(已整合至方案 A)
现状:每个配对调用 _get_db_now() → SELECT NOW()(50+ 次)
优化:db_now 作为列内嵌在批量 SQL 的 SELECT NOW() AS db_now 中,与数据同一事务快照,无需额外往返。见方案 A 核心 SQL。
4b. 健康配对跳过冗余评估
现状:_final_assessment() 总是重新调用 check_continuity(),即使诊断已判定健康
冗余路径:
heal_and_prepare()
├─ _diagnose()
│ └─ check_continuity() ← 第 1 次
└─ _final_assessment()
└─ check_continuity() ← 第 2 次(参数完全相同,纯冗余)
└─ assessor.assess()
优化:诊断结果为 is_healthy=True 时,直接复用诊断结论构建 HealingResult:
# orchestrator.py — heal_and_prepare() 内
if diagnosis.is_healthy:
# 复用诊断结果,跳过 _final_assessment 的重复 check_continuity
# is_healthy=True 时:is_continuous=True, gap_targets=[], missing_count=0
quality = self.assessor.assess(
records, required_count,
is_continuous=True,
missing_count=0,
)
return HealingResult(
status=self._determine_status(quality),
data=self._extract_zscore_values(records),
quality=quality,
iterations_used=iteration,
)
4c. 日志精简
现状:每个健康配对输出 5 行日志
- "连续性检查: 数据连续"(诊断阶段)
- "数据健康(第1轮)"(heal_and_prepare)
- "连续性检查: 数据连续"(_final_assessment 冗余)
- "质量评估: A级"(_final_assessment)
- "自愈结果: ready"(heal_and_prepare)
优化:健康配对只输出 1 行摘要日志,DEBUG 级别保留详细日志:
# 优化后(INFO 级别)
自愈完成 | healed=3, failed=0, healthy=47 | 耗时: 0.42s
4d. ONDO/BTC 慢查询根因排查
现状:ONDO/BTC 配对单次 _load_zscore_history 耗时 4.44s,是其他配对平均耗时(~0.15s)的 30 倍。
排查步骤:
-- Step 1:确认数据量是否异常
SELECT COUNT(*)
FROM analysis_results
WHERE symbol = 'ONDO/USDC:USDC'
AND base_symbol = 'BTC/USDC:USDC'
AND zscore_4h IS NOT NULL;
-- Step 2:检查 EXPLAIN ANALYZE(查看是否走索引)
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT DISTINCT ON (kline_time)
kline_time, zscore_4h, analysis_time
FROM analysis_results
WHERE symbol = 'ONDO/USDC:USDC'
AND base_symbol = 'BTC/USDC:USDC'
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - INTERVAL '30 days'
ORDER BY kline_time DESC, analysis_time DESC
LIMIT 130;
-- Step 3:若数据量过大,清理超出保留窗口的历史数据
DELETE FROM analysis_results
WHERE symbol = 'ONDO/USDC:USDC'
AND base_symbol = 'BTC/USDC:USDC'
AND kline_time < NOW() - INTERVAL '60 days';
预期根因:表膨胀(dead tuples 过多)或 ONDO/BTC 历史数据量远超其他配对。
批量查询后,ONDO/BTC 的慢查询被吸收在同一 SQL 中,只要不超过 15s 超时,不影响其他配对的加载结果。
五、重构后的流程
_run_data_healing():
│
├── 1. 查询 DB 中有数据的配对 # 已有逻辑,不变
├── 2. 构建 heal_pairs 列表 # 已有逻辑,不变
│ └─ 同时计算统一 required_count(4h timeframe,所有配对一致)
│
├── 3.【新】批量加载(含 db_now,1 次 DB 往返)
│ _batch_load_with_fallback(heal_pairs, required_count, db_client)
│ ├── 正常路径: batch_load_zscore_history()
│ │ → grouped: {(sym, base): [records...]}
│ │ → db_now: datetime(timezone-aware,内嵌于 SQL 结果)
│ └── 降级路径: _fallback_individual_load()(批量查询失败时)
│ → 逐配对加载,db_now = datetime.now(timezone.utc)
│
├── 4.【新】批量诊断(内存操作,~0ms)
│ for pair in heal_pairs:
│ records = grouped[pair]
│ diagnosis = _diagnose(records, required_count, db_now)
│ if is_healthy → 跳过 _final_assessment,直接构建结果
│ else → 加入 unhealthy_pairs
│
├── 5.【新】并发修复不健康配对
│ shared_executor = RepairExecutor(...) # 单次 HTTP 握手
│ ThreadPoolExecutor(max_workers=min(5, len(unhealthy_pairs))):
│ for pair in unhealthy_pairs:
│ DataHealingOrchestrator(..., executor=shared_executor) # ← 注入,跳过握手
│ orchestrator.heal_and_prepare(required_count, preloaded_diagnosis)
│ → 内部重新加载 + 修复 + 评估
│
└── 6. 汇总日志(1 行)
healed=X, failed=Y, healthy=Z | 耗时: T s
六、预期性能对比
新架构在批量诊断阶段不再创建 DataHealingOrchestrator 对象,KlineDataFiller 只在真正
需要修复时才初始化(共享 1 次,通过构造函数注入完全跳过),同时消除了 DB 查询、SELECT NOW() 和 HTTP 握手三大瓶颈。
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
KlineDataFiller init 次数(HTTP 握手) |
50 次(49 次废弃) | 0-1 次 | ~50x |
KlineDataFiller init 总耗时 |
~43s | 0s(全健康)/ ~0.85s(有修复) | 消除 |
| DB 查询次数(加载 + NOW) | ~100 次 | 1 次(批量含 db_now) | 100x |
| DB 查询总耗时 | ~8s | ~0.3s | 25x |
| 全健康场景总耗时 | ~55s | ~0.35s | ~157x |
| 有 5 对需修复 | ~55s | ~2.5s | ~22x |
| 有 15 对需修复 | ~55s | ~6s | ~9x |
| 日志行数 | ~250 行 | ~10 行 | 精简 |
全健康场景耗时拆解(优化后):
1 次批量 DB 查询(含 db_now): ~0.3s
50 对内存诊断(纯 CPU 计算): ~0.05s
KlineDataFiller init: 0s(无需修复,不创建)
─────────────────────────────────────────
总计: ~0.35s
有 5 对需修复的场景耗时拆解(优化后):
1 次批量 DB 查询: ~0.3s
50 对内存诊断: ~0.05s
1 次 shared_executor HTTP 握手:~0.85s(串行,并发前创建)
5 对并发修复(含各自 DB 写入):~1.3s(并行,取最慢一对)
─────────────────────────────────────────
总计: ~2.5s
七、改动文件清单
| 文件 | 改动类型 | 说明 |
|---|---|---|
orchestrator.py |
新增方法 + 修改 | 新增 batch_load_zscore_history()(返回 grouped + db_now);DataHealingOrchestrator.__init__ 新增可选参数 executor;heal_and_prepare() 新增可选参数 preloaded_diagnosis |
realtime_kline_service_base.py |
重构方法 | _run_data_healing() 改为批量加载 + 批量诊断 + 并发修复;新增 _batch_load_with_fallback()、_run_concurrent_repair() |
timescaledb.py |
安全修复 | AnalysisResultRepository.batch_insert 追加 ON CONFLICT (analysis_time, symbol, base_symbol) DO NOTHING |
不变的文件:repair_executor.py、continuity_checker.py、quality_assessor.py、config.py、__init__.py
八、风险控制
| 风险 | 缓解措施 |
|---|---|
| 批量查询失败导致全部配对数据丢失 | _batch_load_with_fallback() 捕获异常,降级为逐配对加载 |
| 批量查询 statement_timeout 超时 | 批量超时设为 max(DB_STATEMENT_TIMEOUT_MS, 15000),超时后自动降级 |
| ONDO/BTC 慢查询拖慢批量 SQL | 被批量语句吸收,不影响其他配对结果;超时后统一降级回退 |
| DB 连接池耗尽 | 并发修复 max_workers=5,内部 2 线程,理论最大 10 连接 = pool max_size |
| 并发修复多次 HTTP 握手 | executor=shared_executor 注入构造函数,__init__ 跳过 RepairExecutor 创建,仅 1 次握手 |
KlineDataFiller 冷却状态并发竞争 |
_cooldown_lock 已有 threading.Lock() 保护,天然线程安全 |
| 交易所 API 频率限制 | KlineDataFiller 10 分钟冷却机制,5 并发不会重复请求同一 symbol |
| 并发写入主键冲突 | batch_insert 追加 ON CONFLICT DO NOTHING,串行场景行为不变 |
| fallback 中 datetime 类型不一致 | 统一使用 datetime.now(timezone.utc)(timezone-aware),与 DB NOW() 类型对齐 |
| 向后兼容 | DataHealingOrchestrator.__init__ 新增参数均为可选(默认 None),原有调用方无需修改 |
| 超时保护 | 全局 healing_timeout(300s) 保持不变 |
| 并发异常传播 | 每个 future 独立 try/except,单个配对失败不影响其他配对 |