feat(notify): checkpoint alerts for first match, trade, resolution and exposure cap
CI/CD / build-and-push (push) Successful in 8s

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
chemavx
2026-05-28 08:47:51 +00:00
parent 8febd32136
commit d51d47c921
3 changed files with 209 additions and 0 deletions
+12
View File
@@ -243,3 +243,15 @@ ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS realized_pnl DOUBLE PRE
ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS open_count INTEGER; ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS open_count INTEGER;
ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS closed_count INTEGER; ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS closed_count INTEGER;
ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS resolved_count INTEGER; ALTER TABLE metrics_daily ADD COLUMN IF NOT EXISTS resolved_count INTEGER;
-- ─────────────────────────────────────────────────────────────────────────────
-- Checkpoint alerts — one-shot and rate-limited Telegram observation alerts
--
-- fired_at: timestamp of the first fire (immutable for one-shot checkpoints)
-- last_fired_at: updated on every fire (used for rate-limiting repeatable alerts)
-- ─────────────────────────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS checkpoint_alerts (
checkpoint_name TEXT PRIMARY KEY,
fired_at TIMESTAMPTZ NOT NULL,
last_fired_at TIMESTAMPTZ
);
+13
View File
@@ -16,6 +16,7 @@ from bot.risk.manager import RiskManager
from bot.executor.paper import PaperExecutor from bot.executor.paper import PaperExecutor
from bot.metrics.tracker import MetricsTracker from bot.metrics.tracker import MetricsTracker
from bot.data.db import Database from bot.data.db import Database
from bot.notify.checkpoints import CheckpointMonitor
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -38,6 +39,7 @@ async def run_trading_loop(
) -> None: ) -> None:
"""Main trading loop — runs every 60 seconds.""" """Main trading loop — runs every 60 seconds."""
log.info("Trading loop started. PAPER_MODE=%s", PAPER_MODE) log.info("Trading loop started. PAPER_MODE=%s", PAPER_MODE)
checkpoint_monitor = CheckpointMonitor()
while True: while True:
try: try:
@@ -201,6 +203,17 @@ async def run_trading_loop(
# 9. Update daily metrics # 9. Update daily metrics
await metrics.update_daily_summary() await metrics.update_daily_summary()
# 10. Checkpoint alerts — one-shot / rate-limited Telegram notifications
current_portfolio = executor.get_portfolio()
try:
await checkpoint_monitor.check_all(
db,
exposure_pct=current_portfolio.exposure_pct,
exposure_cap_pct=risk.max_exposure_pct,
)
except Exception as exc:
log.warning("checkpoint_monitor.check_all failed: %s", exc)
except Exception as e: except Exception as e:
log.error("Error in trading loop: %s", e, exc_info=True) log.error("Error in trading loop: %s", e, exc_info=True)
+184
View File
@@ -0,0 +1,184 @@
"""One-shot and rate-limited Telegram checkpoint alerts.
Called from the main trading loop at the end of each cycle.
Errors are swallowed — checkpoint failures must never break the loop.
"""
import logging
from datetime import datetime, timezone
from typing import Optional
from bot.notify import telegram
log = logging.getLogger(__name__)
_EXPOSURE_COOLDOWN_HOURS = 6
class CheckpointMonitor:
async def check_all(
self,
db,
exposure_pct: float,
exposure_cap_pct: float,
) -> None:
for name, coro in [
("primer_match_accepted", self._check_primer_match_accepted(db)),
("primer_trade_phase6", self._check_primer_trade_phase6(db)),
("primer_resolved", self._check_primer_resolved(db)),
("exposure_cerca_cap", self._check_exposure_cerca_cap(db, exposure_pct, exposure_cap_pct)),
]:
try:
await coro
except Exception as exc:
log.warning("checkpoint %s failed: %s", name, exc)
# ── helpers ──────────────────────────────────────────────────────────────
async def _one_shot_fired(self, db, name: str) -> bool:
async with db._pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT 1 FROM checkpoint_alerts WHERE checkpoint_name = $1", name
)
return row is not None
async def _mark_one_shot(self, db, name: str) -> None:
async with db._pool.acquire() as conn:
await conn.execute(
"INSERT INTO checkpoint_alerts (checkpoint_name, fired_at) VALUES ($1, NOW())",
name,
)
async def _last_fired_at(self, db, name: str) -> Optional[datetime]:
async with db._pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT last_fired_at FROM checkpoint_alerts WHERE checkpoint_name = $1",
name,
)
if row is None:
return None
return row["last_fired_at"]
async def _upsert_repeatable(self, db, name: str) -> None:
async with db._pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO checkpoint_alerts (checkpoint_name, fired_at, last_fired_at)
VALUES ($1, NOW(), NOW())
ON CONFLICT (checkpoint_name) DO UPDATE SET last_fired_at = NOW()
""",
name,
)
# ── checkpoints ──────────────────────────────────────────────────────────
async def _check_primer_match_accepted(self, db) -> None:
if await self._one_shot_fired(db, "primer_match_accepted"):
return
async with db._pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT match_score, poly_question, mfld_market_title
FROM manifold_match_audit
WHERE match_status = 'accepted'
ORDER BY timestamp ASC
LIMIT 1
"""
)
if not row:
return
score = float(row["match_score"] or 0.0)
poly_q = (row["poly_question"] or "")[:60]
mfld_t = (row["mfld_market_title"] or "")[:60]
await telegram._send(
f"✅ Primer match Manifold accepted — score={score:.2f} "
f"poly='{poly_q}' mfld='{mfld_t}'"
)
await self._mark_one_shot(db, "primer_match_accepted")
log.info("checkpoint primer_match_accepted fired")
async def _check_primer_trade_phase6(self, db) -> None:
if await self._one_shot_fired(db, "primer_trade_phase6"):
return
async with db._pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT question, mfld_match_score, edge_net
FROM trades
WHERE mfld_match_score IS NOT NULL
AND (excluded_from_metrics IS NOT TRUE)
ORDER BY timestamp ASC
LIMIT 1
"""
)
if not row:
return
question = (row["question"] or "")[:70]
score = float(row["mfld_match_score"] or 0.0)
edge = float(row["edge_net"] or 0.0)
await telegram._send(
f"🎯 Primer trade Phase-6 limpio — {question} "
f"score={score:.2f} edge={edge:.3f}"
)
await self._mark_one_shot(db, "primer_trade_phase6")
log.info("checkpoint primer_trade_phase6 fired")
async def _check_primer_resolved(self, db) -> None:
if await self._one_shot_fired(db, "primer_resolved"):
return
async with db._pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT question, resolution, close_pnl
FROM trades
WHERE resolution IS NOT NULL
AND (excluded_from_metrics IS NOT TRUE)
ORDER BY closed_at ASC
LIMIT 1
"""
)
if not row:
return
question = (row["question"] or "")[:70]
resolution = float(row["resolution"] or 0.0)
pnl = float(row["close_pnl"] or 0.0)
await telegram._send(
f"🏁 Primer mercado resuelto — {question} "
f"result={resolution} pnl={pnl:.2f}"
)
await self._mark_one_shot(db, "primer_resolved")
log.info("checkpoint primer_resolved fired")
async def _check_exposure_cerca_cap(
self, db, exposure_pct: float, exposure_cap_pct: float
) -> None:
if exposure_pct < 0.80 * exposure_cap_pct:
return
last = await self._last_fired_at(db, "exposure_cerca_cap")
if last is not None:
now_utc = datetime.now(timezone.utc)
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
elapsed_hours = (now_utc - last).total_seconds() / 3600
if elapsed_hours < _EXPOSURE_COOLDOWN_HOURS:
return
await telegram._send(
f"⚠️ Exposure al 80% del cap — revisar posiciones "
f"({exposure_pct * 100:.1f}% / {exposure_cap_pct * 100:.1f}%)"
)
await self._upsert_repeatable(db, "exposure_cerca_cap")
log.info(
"checkpoint exposure_cerca_cap fired (%.1f%% / %.1f%%)",
exposure_pct * 100, exposure_cap_pct * 100,
)