Files
polymarket-bot/bot/data/db.py
T
chemavx 9a5be27532
CI/CD / build-and-push (push) Successful in 1m47s
feat(metrics): Fix 3 — DB-computed metrics, stateless tracker, resolution tracking
schema.sql
  trades:        + close_pnl, resolution (market outcome storage)
  metrics_daily: + unrealized_pnl_est, realized_pnl, open/closed/resolved_count

db.py
  close_paper_position(): accepts resolution; computes close_pnl in SQL
    BUY_YES: (resolution − entry_price) × shares
    BUY_NO:  ((1 − resolution) − entry_price) × shares
  save_daily_metrics(): persists new columns
  compute_metrics_from_db(): single DB query for all metrics; no in-memory state

tracker.py — complete rewrite (stateless)
  Removed self._trades, self._daily_returns, compute_metrics(), _compute_sharpe(),
  check_promotion_thresholds(), _empty_metrics()
  update_daily_summary() now reads compute_metrics_from_db() every cycle
  Safe across pod restarts: always reflects full DB history

paper.py
  close_position(): passes resolution to close_paper_position()

api/main.py  /api/summary
  Added unrealized_pnl_est (estimated, open trades) and realized_pnl (exact,
  closed+resolved) as separate fields alongside total_pnl
  win_rate: null if < 5 resolved trades (was proxy on entry_price < 0.5)
  calibration_score: Brier-based, null if < 10 resolved trades
  resolved_count exposed as field
  Each field annotated with: exact/estimated, source, null conditions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 17:34:48 +00:00

267 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Database layer using asyncpg for PostgreSQL."""
import logging
import os
from typing import Optional
import asyncpg
log = logging.getLogger(__name__)
class Database:
def __init__(self) -> None:
self._url = os.getenv("DATABASE_URL", "postgresql://bot:bot@localhost:5432/polymarket")
self._pool: Optional[asyncpg.Pool] = None
async def connect(self) -> None:
self._pool = await asyncpg.create_pool(self._url)
log.info("Database connected")
async def disconnect(self) -> None:
if self._pool:
await self._pool.close()
async def run_migrations(self) -> None:
schema_path = os.path.join(os.path.dirname(__file__), "schema.sql")
with open(schema_path) as f:
schema = f.read()
async with self._pool.acquire() as conn:
await conn.execute(schema)
log.info("Migrations applied")
async def save_trade(self, trade) -> None:
async with self._pool.acquire() as conn:
await conn.execute("""
INSERT INTO trades (
id, market_id, question, direction, size_usdc,
entry_price, shares, fee_usdc, net_cost, timestamp, reasoning, paper,
edge_gross, edge_net, prior_prob, final_prob,
mid_price, spread_estimate, commission, family_key
) VALUES (
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,
$13,$14,$15,$16,$17,$18,$19,$20
)
ON CONFLICT (id) DO NOTHING
""",
trade.id, trade.market_id, trade.question, trade.direction,
trade.size_usdc, trade.entry_price, trade.shares, trade.fee_usdc,
trade.net_cost, trade.timestamp, trade.reasoning, trade.paper,
# Phase 1 fields
trade.edge_gross, trade.edge_net, trade.prior_prob, trade.final_prob,
trade.mid_price, trade.spread_estimate, trade.commission, trade.family_key,
)
async def save_daily_metrics(self, metrics: dict) -> None:
async with self._pool.acquire() as conn:
await conn.execute("""
INSERT INTO metrics_daily (
timestamp, total_trades, total_deployed, total_fees,
unrealized_pnl_est, realized_pnl, total_pnl,
win_rate, avg_edge, sharpe_ratio, calibration_score, paper_mode,
open_count, closed_count, resolved_count
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)
""",
metrics["timestamp"],
metrics["total_trades"],
metrics["total_deployed"],
metrics["total_fees"],
metrics["unrealized_pnl_est"],
metrics["realized_pnl"],
metrics["total_pnl"],
metrics["win_rate"],
metrics["avg_edge"],
metrics["sharpe_ratio"],
metrics["calibration_score"],
metrics["paper_mode"],
metrics["open_count"],
metrics["closed_count"],
metrics["resolved_count"],
)
async def get_open_positions(self) -> dict[str, float]:
"""Return {market_id: total_net_cost} for all open (not closed) trades in DB."""
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"SELECT market_id, SUM(net_cost) AS total "
"FROM trades WHERE closed_at IS NULL GROUP BY market_id"
)
return {r["market_id"]: float(r["total"]) for r in rows}
async def get_open_families(self) -> set[str]:
"""Return the set of family_key values from all open positions.
Used at startup to rebuild occupied_families from DB state so the
family-deduplication logic survives pod restarts.
"""
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"SELECT DISTINCT family_key FROM trades "
"WHERE family_key IS NOT NULL AND closed_at IS NULL"
)
return {r["family_key"] for r in rows if r["family_key"]}
async def get_open_position_details(self) -> list[dict]:
"""Return one row per open position with family_key and direction.
Used at startup to detect positions that share a family_key (same
underlying event), which indicates a contradictory paper trade entered
before the general-election family fix was deployed.
"""
async with self._pool.acquire() as conn:
rows = await conn.fetch("""
SELECT DISTINCT ON (market_id)
market_id, question, direction, edge_net, family_key, timestamp
FROM trades
WHERE paper = TRUE AND closed_at IS NULL
ORDER BY market_id, timestamp DESC
""")
return [dict(r) for r in rows]
async def close_paper_position(
self, market_id: str, reason: str = "", resolution: Optional[float] = None
) -> None:
"""Mark a paper position as closed.
resolution: 1.0 if YES resolved, 0.0 if NO resolved, None if unknown
(legacy closes, inversion fixes). When resolution is provided, close_pnl
is computed in SQL so it matches the stored entry_price and shares exactly.
"""
async with self._pool.acquire() as conn:
await conn.execute("""
UPDATE trades
SET closed_at = NOW(),
close_reason = $2,
resolution = $3,
close_pnl = CASE
WHEN $3 IS NOT NULL AND direction = 'BUY_YES'
THEN ($3::double precision - entry_price) * shares
WHEN $3 IS NOT NULL AND direction = 'BUY_NO'
THEN ((1.0 - $3::double precision) - entry_price) * shares
ELSE NULL
END
WHERE market_id = $1 AND closed_at IS NULL
""", market_id, reason, resolution)
async def update_family_key(self, market_id: str, new_key: str) -> None:
"""Persist a corrected family_key for all open trades of a market."""
async with self._pool.acquire() as conn:
await conn.execute(
"UPDATE trades SET family_key = $2 WHERE market_id = $1 AND closed_at IS NULL",
market_id, new_key,
)
async def get_legacy_incomplete_count(self) -> int:
"""Return count of open trades with NULL edge_net (legacy data without signal values)."""
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT COUNT(*) FROM trades WHERE closed_at IS NULL AND edge_net IS NULL"
)
return int(row[0])
async def get_recently_closed_inverted(self, hours: int = 24) -> set[str]:
"""Return market_ids closed for inversion bug within the last N hours.
Used as a reentry guard: prevents re-entering a market that was just
closed because the signal direction was inverted.
"""
async with self._pool.acquire() as conn:
rows = await conn.fetch("""
SELECT DISTINCT market_id FROM trades
WHERE closed_at > NOW() - ($1 || ' hours')::interval
AND close_reason ILIKE '%inversion bug%'
""", str(hours))
return {r["market_id"] for r in rows}
async def compute_metrics_from_db(self) -> dict:
"""Compute all trading metrics directly from the trades table.
This is the single source of truth for MetricsTracker — no in-memory
state required. Safe to call after pod restarts: always reflects the
full DB history.
Returns a dict with keys:
total_trades, open_count, closed_count, resolved_count,
total_deployed, total_fees,
unrealized_pnl_est — estimated, open trades with edge_net
realized_pnl — exact, closed trades with resolution
wins_realized — closed trades where close_pnl > 0
calibration_score — Brier-based (1 MSE), null if resolved < 10
"""
async with self._pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT
COUNT(*) AS total_trades,
COUNT(*) FILTER (WHERE closed_at IS NULL) AS open_count,
COUNT(*) FILTER (WHERE closed_at IS NOT NULL) AS closed_count,
COUNT(*) FILTER (WHERE resolution IS NOT NULL
AND final_prob IS NOT NULL) AS resolved_count,
COALESCE(SUM(net_cost)
FILTER (WHERE closed_at IS NULL), 0) AS total_deployed,
COALESCE(SUM(fee_usdc), 0) AS total_fees,
-- Estimated unrealized PnL: open trades with known edge.
-- Formula: edge_net × net_cost fee_usdc.
-- Trades with NULL edge_net (legacy data) are excluded.
COALESCE(SUM(edge_net * net_cost - fee_usdc)
FILTER (WHERE closed_at IS NULL
AND edge_net IS NOT NULL), 0) AS unrealized_pnl_est,
-- Realized PnL: closed trades with a known resolution.
-- close_pnl is computed at close time from actual resolution.
COALESCE(SUM(close_pnl)
FILTER (WHERE closed_at IS NOT NULL
AND close_pnl IS NOT NULL), 0) AS realized_pnl,
COUNT(*) FILTER (WHERE closed_at IS NOT NULL
AND close_pnl IS NOT NULL
AND close_pnl > 0) AS wins_realized,
-- Calibration (Brier score transformed to higher-is-better):
-- 1 AVG((final_prob resolution)²) on resolved trades.
-- final_prob is the model's estimated YES probability at entry.
-- resolution is 1.0 (YES won) or 0.0 (NO won).
-- Perfect calibration → 1.0 | Random → ~0.75 | Worst → 0.0
-- Returns NULL if fewer than 10 resolved trades with final_prob.
CASE
WHEN COUNT(*) FILTER (WHERE resolution IS NOT NULL
AND final_prob IS NOT NULL) >= 10
THEN 1.0 - AVG((final_prob - resolution) * (final_prob - resolution))
FILTER (WHERE resolution IS NOT NULL
AND final_prob IS NOT NULL)
ELSE NULL
END AS calibration_score
FROM trades
""")
return dict(row)
async def get_recent_trades(self, limit: int = 100, status: Optional[str] = None) -> list[dict]:
"""Return trades ordered by timestamp DESC.
status: None (all) | "open" (closed_at IS NULL) | "closed" (closed_at IS NOT NULL)
Each row includes a computed "status" field ("open" or "closed").
"""
if status == "open":
where = "WHERE closed_at IS NULL"
elif status == "closed":
where = "WHERE closed_at IS NOT NULL"
else:
where = ""
async with self._pool.acquire() as conn:
rows = await conn.fetch(
f"SELECT * FROM trades {where} ORDER BY timestamp DESC LIMIT $1", limit
)
result = []
for r in rows:
d = dict(r)
d["status"] = "closed" if d.get("closed_at") else "open"
result.append(d)
return result
async def get_metrics_history(self, days: int = 42) -> list[dict]:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"SELECT * FROM metrics_daily ORDER BY timestamp DESC LIMIT $1", days
)
return [dict(r) for r in rows]