diff --git a/bot/data/schema.sql b/bot/data/schema.sql index 9bc56e2..26c1df5 100644 --- a/bot/data/schema.sql +++ b/bot/data/schema.sql @@ -243,3 +243,15 @@ ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS realized_pnl DOUBLE PRE ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS open_count INTEGER; ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS closed_count INTEGER; ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS resolved_count INTEGER; + +-- ───────────────────────────────────────────────────────────────────────────── +-- Checkpoint alerts — one-shot and rate-limited Telegram observation alerts +-- +-- fired_at: timestamp of the first fire (immutable for one-shot checkpoints) +-- last_fired_at: updated on every fire (used for rate-limiting repeatable alerts) +-- ───────────────────────────────────────────────────────────────────────────── +CREATE TABLE IF NOT EXISTS checkpoint_alerts ( + checkpoint_name TEXT PRIMARY KEY, + fired_at TIMESTAMPTZ NOT NULL, + last_fired_at TIMESTAMPTZ +); diff --git a/bot/main.py b/bot/main.py index af3d1a8..147517f 100644 --- a/bot/main.py +++ b/bot/main.py @@ -16,6 +16,7 @@ from bot.risk.manager import RiskManager from bot.executor.paper import PaperExecutor from bot.metrics.tracker import MetricsTracker from bot.data.db import Database +from bot.notify.checkpoints import CheckpointMonitor logging.basicConfig( level=logging.INFO, @@ -38,6 +39,7 @@ async def run_trading_loop( ) -> None: """Main trading loop — runs every 60 seconds.""" log.info("Trading loop started. PAPER_MODE=%s", PAPER_MODE) + checkpoint_monitor = CheckpointMonitor() while True: try: @@ -201,6 +203,17 @@ async def run_trading_loop( # 9. Update daily metrics await metrics.update_daily_summary() + # 10. Checkpoint alerts — one-shot / rate-limited Telegram notifications + current_portfolio = executor.get_portfolio() + try: + await checkpoint_monitor.check_all( + db, + exposure_pct=current_portfolio.exposure_pct, + exposure_cap_pct=risk.max_exposure_pct, + ) + except Exception as exc: + log.warning("checkpoint_monitor.check_all failed: %s", exc) + except Exception as e: log.error("Error in trading loop: %s", e, exc_info=True) diff --git a/bot/notify/checkpoints.py b/bot/notify/checkpoints.py new file mode 100644 index 0000000..f472319 --- /dev/null +++ b/bot/notify/checkpoints.py @@ -0,0 +1,184 @@ +"""One-shot and rate-limited Telegram checkpoint alerts. + +Called from the main trading loop at the end of each cycle. +Errors are swallowed — checkpoint failures must never break the loop. +""" +import logging +from datetime import datetime, timezone +from typing import Optional + +from bot.notify import telegram + +log = logging.getLogger(__name__) + +_EXPOSURE_COOLDOWN_HOURS = 6 + + +class CheckpointMonitor: + + async def check_all( + self, + db, + exposure_pct: float, + exposure_cap_pct: float, + ) -> None: + for name, coro in [ + ("primer_match_accepted", self._check_primer_match_accepted(db)), + ("primer_trade_phase6", self._check_primer_trade_phase6(db)), + ("primer_resolved", self._check_primer_resolved(db)), + ("exposure_cerca_cap", self._check_exposure_cerca_cap(db, exposure_pct, exposure_cap_pct)), + ]: + try: + await coro + except Exception as exc: + log.warning("checkpoint %s failed: %s", name, exc) + + # ── helpers ────────────────────────────────────────────────────────────── + + async def _one_shot_fired(self, db, name: str) -> bool: + async with db._pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT 1 FROM checkpoint_alerts WHERE checkpoint_name = $1", name + ) + return row is not None + + async def _mark_one_shot(self, db, name: str) -> None: + async with db._pool.acquire() as conn: + await conn.execute( + "INSERT INTO checkpoint_alerts (checkpoint_name, fired_at) VALUES ($1, NOW())", + name, + ) + + async def _last_fired_at(self, db, name: str) -> Optional[datetime]: + async with db._pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT last_fired_at FROM checkpoint_alerts WHERE checkpoint_name = $1", + name, + ) + if row is None: + return None + return row["last_fired_at"] + + async def _upsert_repeatable(self, db, name: str) -> None: + async with db._pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO checkpoint_alerts (checkpoint_name, fired_at, last_fired_at) + VALUES ($1, NOW(), NOW()) + ON CONFLICT (checkpoint_name) DO UPDATE SET last_fired_at = NOW() + """, + name, + ) + + # ── checkpoints ────────────────────────────────────────────────────────── + + async def _check_primer_match_accepted(self, db) -> None: + if await self._one_shot_fired(db, "primer_match_accepted"): + return + + async with db._pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT match_score, poly_question, mfld_market_title + FROM manifold_match_audit + WHERE match_status = 'accepted' + ORDER BY timestamp ASC + LIMIT 1 + """ + ) + if not row: + return + + score = float(row["match_score"] or 0.0) + poly_q = (row["poly_question"] or "")[:60] + mfld_t = (row["mfld_market_title"] or "")[:60] + + await telegram._send( + f"✅ Primer match Manifold accepted — score={score:.2f} " + f"poly='{poly_q}' mfld='{mfld_t}'" + ) + await self._mark_one_shot(db, "primer_match_accepted") + log.info("checkpoint primer_match_accepted fired") + + async def _check_primer_trade_phase6(self, db) -> None: + if await self._one_shot_fired(db, "primer_trade_phase6"): + return + + async with db._pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT question, mfld_match_score, edge_net + FROM trades + WHERE mfld_match_score IS NOT NULL + AND (excluded_from_metrics IS NOT TRUE) + ORDER BY timestamp ASC + LIMIT 1 + """ + ) + if not row: + return + + question = (row["question"] or "")[:70] + score = float(row["mfld_match_score"] or 0.0) + edge = float(row["edge_net"] or 0.0) + + await telegram._send( + f"🎯 Primer trade Phase-6 limpio — {question} " + f"score={score:.2f} edge={edge:.3f}" + ) + await self._mark_one_shot(db, "primer_trade_phase6") + log.info("checkpoint primer_trade_phase6 fired") + + async def _check_primer_resolved(self, db) -> None: + if await self._one_shot_fired(db, "primer_resolved"): + return + + async with db._pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT question, resolution, close_pnl + FROM trades + WHERE resolution IS NOT NULL + AND (excluded_from_metrics IS NOT TRUE) + ORDER BY closed_at ASC + LIMIT 1 + """ + ) + if not row: + return + + question = (row["question"] or "")[:70] + resolution = float(row["resolution"] or 0.0) + pnl = float(row["close_pnl"] or 0.0) + + await telegram._send( + f"🏁 Primer mercado resuelto — {question} " + f"result={resolution} pnl={pnl:.2f}" + ) + await self._mark_one_shot(db, "primer_resolved") + log.info("checkpoint primer_resolved fired") + + async def _check_exposure_cerca_cap( + self, db, exposure_pct: float, exposure_cap_pct: float + ) -> None: + if exposure_pct < 0.80 * exposure_cap_pct: + return + + last = await self._last_fired_at(db, "exposure_cerca_cap") + if last is not None: + now_utc = datetime.now(timezone.utc) + if last.tzinfo is None: + last = last.replace(tzinfo=timezone.utc) + elapsed_hours = (now_utc - last).total_seconds() / 3600 + if elapsed_hours < _EXPOSURE_COOLDOWN_HOURS: + return + + await telegram._send( + f"⚠️ Exposure al 80% del cap — revisar posiciones " + f"({exposure_pct * 100:.1f}% / {exposure_cap_pct * 100:.1f}%)" + ) + await self._upsert_repeatable(db, "exposure_cerca_cap") + log.info( + "checkpoint exposure_cerca_cap fired (%.1f%% / %.1f%%)", + exposure_pct * 100, exposure_cap_pct * 100, + )