"""One-shot and rate-limited Telegram checkpoint alerts. Called from the main trading loop at the end of each cycle. Errors are swallowed — checkpoint failures must never break the loop. """ import logging from datetime import datetime, timezone from typing import Optional from bot.notify import telegram log = logging.getLogger(__name__) _EXPOSURE_COOLDOWN_HOURS = 6 class CheckpointMonitor: async def check_all( self, db, exposure_pct: float, exposure_cap_pct: float, ) -> None: for name, coro in [ ("primer_match_accepted", self._check_primer_match_accepted(db)), ("primer_trade_phase6", self._check_primer_trade_phase6(db)), ("primer_resolved", self._check_primer_resolved(db)), ("exposure_cerca_cap", self._check_exposure_cerca_cap(db, exposure_pct, exposure_cap_pct)), ]: try: await coro except Exception as exc: log.warning("checkpoint %s failed: %s", name, exc) # ── helpers ────────────────────────────────────────────────────────────── async def _one_shot_fired(self, db, name: str) -> bool: async with db._pool.acquire() as conn: row = await conn.fetchrow( "SELECT 1 FROM checkpoint_alerts WHERE checkpoint_name = $1", name ) return row is not None async def _mark_one_shot(self, db, name: str) -> None: async with db._pool.acquire() as conn: await conn.execute( "INSERT INTO checkpoint_alerts (checkpoint_name, fired_at) VALUES ($1, NOW())", name, ) async def _last_fired_at(self, db, name: str) -> Optional[datetime]: async with db._pool.acquire() as conn: row = await conn.fetchrow( "SELECT last_fired_at FROM checkpoint_alerts WHERE checkpoint_name = $1", name, ) if row is None: return None return row["last_fired_at"] async def _upsert_repeatable(self, db, name: str) -> None: async with db._pool.acquire() as conn: await conn.execute( """ INSERT INTO checkpoint_alerts (checkpoint_name, fired_at, last_fired_at) VALUES ($1, NOW(), NOW()) ON CONFLICT (checkpoint_name) DO UPDATE SET last_fired_at = NOW() """, name, ) # ── checkpoints ────────────────────────────────────────────────────────── async def _check_primer_match_accepted(self, db) -> None: if await self._one_shot_fired(db, "primer_match_accepted"): return async with db._pool.acquire() as conn: row = await conn.fetchrow( """ SELECT match_score, poly_question, mfld_market_title FROM manifold_match_audit WHERE match_status = 'accepted' ORDER BY timestamp ASC LIMIT 1 """ ) if not row: return score = float(row["match_score"] or 0.0) poly_q = (row["poly_question"] or "")[:60] mfld_t = (row["mfld_market_title"] or "")[:60] await telegram._send( f"✅ Primer match Manifold accepted — score={score:.2f} " f"poly='{poly_q}' mfld='{mfld_t}'" ) await self._mark_one_shot(db, "primer_match_accepted") log.info("checkpoint primer_match_accepted fired") async def _check_primer_trade_phase6(self, db) -> None: if await self._one_shot_fired(db, "primer_trade_phase6"): return async with db._pool.acquire() as conn: row = await conn.fetchrow( """ SELECT question, mfld_match_score, edge_net FROM trades WHERE mfld_match_score IS NOT NULL AND (excluded_from_metrics IS NOT TRUE) ORDER BY timestamp ASC LIMIT 1 """ ) if not row: return question = (row["question"] or "")[:70] score = float(row["mfld_match_score"] or 0.0) edge = float(row["edge_net"] or 0.0) await telegram._send( f"🎯 Primer trade Phase-6 limpio — {question} " f"score={score:.2f} edge={edge:.3f}" ) await self._mark_one_shot(db, "primer_trade_phase6") log.info("checkpoint primer_trade_phase6 fired") async def _check_primer_resolved(self, db) -> None: if await self._one_shot_fired(db, "primer_resolved"): return async with db._pool.acquire() as conn: row = await conn.fetchrow( """ SELECT question, resolution, close_pnl FROM trades WHERE resolution IS NOT NULL AND (excluded_from_metrics IS NOT TRUE) ORDER BY closed_at ASC LIMIT 1 """ ) if not row: return question = (row["question"] or "")[:70] resolution = float(row["resolution"] or 0.0) pnl = float(row["close_pnl"] or 0.0) await telegram._send( f"🏁 Primer mercado resuelto — {question} " f"result={resolution} pnl={pnl:.2f}" ) await self._mark_one_shot(db, "primer_resolved") log.info("checkpoint primer_resolved fired") async def _check_exposure_cerca_cap( self, db, exposure_pct: float, exposure_cap_pct: float ) -> None: if exposure_pct < 0.80 * exposure_cap_pct: return last = await self._last_fired_at(db, "exposure_cerca_cap") if last is not None: now_utc = datetime.now(timezone.utc) if last.tzinfo is None: last = last.replace(tzinfo=timezone.utc) elapsed_hours = (now_utc - last).total_seconds() / 3600 if elapsed_hours < _EXPOSURE_COOLDOWN_HOURS: return await telegram._send( f"⚠️ Exposure al 80% del cap — revisar posiciones " f"({exposure_pct * 100:.1f}% / {exposure_cap_pct * 100:.1f}%)" ) await self._upsert_repeatable(db, "exposure_cerca_cap") log.info( "checkpoint exposure_cerca_cap fired (%.1f%% / %.1f%%)", exposure_pct * 100, exposure_cap_pct * 100, )