feat(attribution): dominant_feature per trade + /api/metrics/attribution
CI/CD / build-and-push (push) Successful in 1m52s

Adds alpha attribution by dominant signal feature — which feat_*_lo had
the largest absolute log-odds value on each trade.

Changes:
- _dominant_feature() helper in api/main.py: picks the winning feature
  from signal_components (threshold 0.0001, same as "triggered" in
  /api/metrics/features)
- _enrich_trade() refactored to single exit point; adds dominant_feature
  field to every open trade in /api/trades
- compute_attribution_from_db() in db.py: VALUES subquery finds dominant
  feature per trade in SQL, then aggregates trade_count/avg_edge_net/
  unrealized_pnl_est/realized_pnl/resolved_count/win_rate per group
- /api/metrics/attribution endpoint: returns attribution dict + total_attributed_trades

No schema changes, no strategy changes. Pure observability.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
chemavx
2026-04-22 16:35:24 +00:00
parent 6d23e8042b
commit adf2917cda
2 changed files with 139 additions and 32 deletions
+75 -32
View File
@@ -26,8 +26,26 @@ _FEAT_RE_RAW = re.compile(
)
def _dominant_feature(sc: dict | None) -> str | None:
"""Return the name of the signal_components key with the largest abs log-odds value.
Returns None if signal_components is absent or all features are below threshold.
Threshold 0.0001 matches the "triggered" definition used in /api/metrics/features.
"""
if not sc:
return None
candidates = {
k: abs(v)
for k, v in sc.items()
if k != "unit" and v is not None and abs(v) > 0.0001
}
if not candidates:
return None
return max(candidates, key=candidates.__getitem__)
def _enrich_trade(trade: dict) -> dict:
"""Add days_open and signal_components (all log-odds) to an open trade dict."""
"""Add days_open, signal_components (log-odds), and dominant_feature to a trade."""
ts = trade.get("timestamp")
if ts is not None:
now = datetime.now(timezone.utc)
@@ -39,7 +57,7 @@ def _enrich_trade(trade: dict) -> dict:
# Prefer DB columns (Phase 6+) — exact, no parsing required.
if trade.get("feat_fg_lo") is not None:
trade["signal_components"] = {
sc = {
"unit": "log_odds",
"fg": trade["feat_fg_lo"],
"mom": trade["feat_mom_lo"],
@@ -47,37 +65,35 @@ def _enrich_trade(trade: dict) -> dict:
"mfld": trade["feat_mfld_lo"],
"btc_dom": trade.get("feat_btc_dom_lo"),
}
return trade
else:
# Fallback: parse reasoning string (trades before Phase 6 DB columns exist).
reasoning = trade.get("reasoning") or ""
m_lo = _FEAT_RE_LO.search(reasoning)
m_raw = _FEAT_RE_RAW.search(reasoning)
if m_lo:
sc = {
"unit": "log_odds",
"fg": float(m_lo.group(1)),
"mom": float(m_lo.group(2)),
"news": float(m_lo.group(3)),
"mfld": float(m_lo.group(4)),
"btc_dom": float(m_lo.group(5)),
}
elif m_raw:
# Pre-Phase-6: fg/mom are raw probability-deltas → multiply ×2.
sc = {
"unit": "log_odds",
"fg": float(m_raw.group(1)) * 2,
"mom": float(m_raw.group(2)) * 2,
"news": float(m_raw.group(3)),
"mfld": float(m_raw.group(4)),
"btc_dom": None,
}
else:
sc = None
# Fallback: parse reasoning string (trades before Phase 6 DB columns exist).
reasoning = trade.get("reasoning") or ""
m_lo = _FEAT_RE_LO.search(reasoning)
if m_lo:
# Phase 6 reasoning format — values already in log-odds.
trade["signal_components"] = {
"unit": "log_odds",
"fg": float(m_lo.group(1)),
"mom": float(m_lo.group(2)),
"news": float(m_lo.group(3)),
"mfld": float(m_lo.group(4)),
"btc_dom": float(m_lo.group(5)),
}
return trade
m_raw = _FEAT_RE_RAW.search(reasoning)
if m_raw:
# Pre-Phase-6 reasoning: fg/mom are raw probability-deltas → multiply ×2.
trade["signal_components"] = {
"unit": "log_odds",
"fg": float(m_raw.group(1)) * 2,
"mom": float(m_raw.group(2)) * 2,
"news": float(m_raw.group(3)),
"mfld": float(m_raw.group(4)),
"btc_dom": None,
}
return trade
trade["signal_components"] = None
trade["signal_components"] = sc
trade["dominant_feature"] = _dominant_feature(sc)
return trade
db = Database()
@@ -166,6 +182,33 @@ async def get_legacy_trades():
return {"trades": trades, "count": len(trades)}
@app.get("/api/metrics/attribution")
async def get_attribution():
"""Alpha attribution by dominant signal feature.
Groups Phase 6 trades by their dominant feature — the feat_*_lo with the
largest absolute log-odds value — to reveal which signal is actually
generating opportunities.
Each key in "attribution" contains:
trade_count trades where this feature was dominant
avg_edge_net mean net edge for those trades
unrealized_pnl_est open-position PnL estimate (edge_net × net_cost fee)
realized_pnl sum close_pnl for resolved trades in this group
resolved_count closed trades with known outcome
win_rate null if resolved_count < 5
"none" appears for Phase 6 trades where all feat_*_lo values are < 0.0001
(signals fired but below trigger threshold).
Only Phase 6 trades (feat_fg_lo IS NOT NULL) are included.
Pre-Phase-6 trades appear in /api/trades/legacy instead.
"""
attribution = await db.compute_attribution_from_db()
total = sum(v["trade_count"] for v in attribution.values())
return {"attribution": attribution, "total_attributed_trades": total}
@app.get("/api/summary")
async def get_summary():
"""Dashboard summary card data.
+64
View File
@@ -429,6 +429,70 @@ class Database:
return result
async def compute_attribution_from_db(self) -> dict:
"""Alpha attribution grouped by dominant signal feature.
For each Phase 6 trade, the dominant feature is the feat_*_lo with the
largest absolute value (> 0.0001). Trades are then aggregated per group.
Returns {feature_name: {trade_count, avg_edge_net, unrealized_pnl_est,
realized_pnl, resolved_count, win_rate}}.
"none" group collects trades where all features are below threshold.
"""
async with self._pool.acquire() as conn:
rows = await conn.fetch("""
WITH dominant_per_trade AS (
SELECT
edge_net, net_cost, fee_usdc, closed_at, close_pnl,
(
SELECT key
FROM (VALUES
('fg', ABS(COALESCE(feat_fg_lo, 0))),
('mom', ABS(COALESCE(feat_mom_lo, 0))),
('news', ABS(COALESCE(feat_news_lo, 0))),
('mfld', ABS(COALESCE(feat_mfld_lo, 0))),
('btc_dom', ABS(COALESCE(feat_btc_dom_lo, 0)))
) AS t(key, val)
WHERE val > 0.0001
ORDER BY val DESC
LIMIT 1
) AS dominant
FROM trades
WHERE feat_fg_lo IS NOT NULL
)
SELECT
COALESCE(dominant, 'none') AS dominant_feature,
COUNT(*) AS trade_count,
AVG(edge_net) AS avg_edge_net,
COALESCE(SUM(edge_net * net_cost - fee_usdc)
FILTER (WHERE closed_at IS NULL
AND edge_net IS NOT NULL), 0) AS unrealized_pnl_est,
COALESCE(SUM(close_pnl)
FILTER (WHERE close_pnl IS NOT NULL), 0) AS realized_pnl,
COUNT(*) FILTER (WHERE close_pnl IS NOT NULL) AS resolved_count,
COUNT(*) FILTER (WHERE close_pnl IS NOT NULL AND close_pnl > 0) AS wins
FROM dominant_per_trade
GROUP BY dominant_feature
ORDER BY trade_count DESC
""")
result: dict[str, dict] = {}
for r in rows:
d = dict(r)
feature = d["dominant_feature"]
resolved = int(d.get("resolved_count") or 0)
wins = int(d.get("wins") or 0)
result[feature] = {
"trade_count": int(d["trade_count"]),
"avg_edge_net": _f(d.get("avg_edge_net")),
"unrealized_pnl_est": float(d.get("unrealized_pnl_est") or 0),
"realized_pnl": float(d.get("realized_pnl") or 0),
"resolved_count": resolved,
"win_rate": (wins / resolved) if resolved >= 5 else None,
}
return result
def _f(v) -> Optional[float]:
"""None-safe float cast for asyncpg Decimal/None values."""
return float(v) if v is not None else None