8479a63174
CI/CD / build-and-push (push) Successful in 1m56s
Adds feat_fg_lo / feat_mom_lo / feat_news_lo / feat_mfld_lo / feat_btc_dom_lo to every trade, all normalized to log-odds contribution for direct comparability. - fg / mom / btc_dom: raw probability-delta × 2 → log-odds - news / mfld: already log-odds (LOGODDS_WEIGHT already applied), no scaling - btc_dom tracked separately in bayesian.py instead of bundled in total_adj - reasoning string updated to fg_lo= / mom_lo= notation for self-documentation Schema: 5 new DOUBLE PRECISION columns + 2 partial indexes Stack: TradingSignal → Order → Trade → save_trade all carry feat fields Startup: backfill_feature_columns() recovers fg/mom/news/mfld from old reasoning strings (×2 applied to fg/mom); btc_dom_lo stays NULL for legacy API: /api/metrics/features — triggered/material split per feature with two-level thresholds (0.05 for fg/mom/btc_dom, 0.10 for news/mfld) API: /api/trades/legacy — exposes pre-Phase-1 trades (edge_net IS NULL) API: _enrich_trade backward-compat: reads DB columns first, falls back to reasoning regex with unit conversion for pre-Phase-6 trades Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
410 lines
17 KiB
Python
410 lines
17 KiB
Python
"""
|
|
Polymarket Trading Bot — Main Entry Point
|
|
# ci-test: 2026-04-16
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timezone
|
|
|
|
from bot.data.polymarket import PolymarketClient, Market, market_family_key
|
|
from bot.data.external import ExternalDataClient
|
|
from bot.data.news import NewsClient
|
|
from bot.data.manifold import ManifoldClient
|
|
from bot.strategy.bayesian import BayesianStrategy, gnews_priority, MAX_NEWS_QUERIES_PER_CYCLE
|
|
from bot.risk.manager import RiskManager
|
|
from bot.executor.paper import PaperExecutor
|
|
from bot.metrics.tracker import MetricsTracker
|
|
from bot.data.db import Database
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
)
|
|
log = logging.getLogger("bot.main")
|
|
|
|
PAPER_MODE = os.getenv("PAPER_MODE", "true").lower() == "true"
|
|
PAPER_BANKROLL = float(os.getenv("PAPER_BANKROLL", "10000"))
|
|
|
|
|
|
async def run_trading_loop(
|
|
poly: PolymarketClient,
|
|
external: ExternalDataClient,
|
|
strategy: BayesianStrategy,
|
|
risk: RiskManager,
|
|
executor: PaperExecutor,
|
|
metrics: MetricsTracker,
|
|
db: Database,
|
|
) -> None:
|
|
"""Main trading loop — runs every 60 seconds."""
|
|
log.info("Trading loop started. PAPER_MODE=%s", PAPER_MODE)
|
|
|
|
while True:
|
|
try:
|
|
# 1. Fetch active markets (90-day window)
|
|
markets = await poly.get_active_markets()
|
|
log.info("Found %d active markets", len(markets))
|
|
|
|
# 2. Get external signals
|
|
ext_data = await external.get_all_signals()
|
|
|
|
# 3. Build occupied_families from the current open portfolio positions.
|
|
# This prevents re-entering a family where we already hold a position.
|
|
# We also pull from DB to survive pod restarts.
|
|
portfolio = executor.get_portfolio()
|
|
occupied_families: set[str] = set()
|
|
for market_id in portfolio.positions:
|
|
mkt = next((m for m in markets if m.id == market_id), None)
|
|
if mkt:
|
|
occupied_families.add(market_family_key(mkt))
|
|
# Also seed from DB in case a family was traded in a prior cycle
|
|
# that isn't reflected in the current markets list
|
|
db_families = await db.get_open_families()
|
|
occupied_families |= db_families
|
|
if occupied_families:
|
|
log.info("Occupied families (from portfolio): %s", sorted(occupied_families))
|
|
|
|
# 4. Sort markets.
|
|
# Politics: sort by gnews_priority DESC (highest-value markets get
|
|
# GNews budget first — Phase 3).
|
|
# Others: sort by end_date ASC (soonest-resolving first).
|
|
def _sort_key(m):
|
|
try:
|
|
dt = datetime.fromisoformat(m.end_date.replace("Z", "+00:00"))
|
|
except Exception:
|
|
dt = datetime(9999, 12, 31, tzinfo=timezone.utc)
|
|
if m.category == "politics":
|
|
priority = gnews_priority(m, strategy._news) if strategy._news else 0.0
|
|
# Bucket 0 = politics, sort by priority DESC (negate for asc sort)
|
|
return (0, -priority, dt)
|
|
return (1, 0.0, dt)
|
|
|
|
markets = sorted(markets, key=_sort_key)
|
|
|
|
for _m in markets:
|
|
log.info(
|
|
" [market] %-55s | cat=%-12s | family=%-28s | ends=%s | yes=%.3f",
|
|
_m.question[:55], _m.category, market_family_key(_m),
|
|
_m.end_date[:10] if _m.end_date else "?", _m.yes_price,
|
|
)
|
|
|
|
# Reset per-cycle GNews counter
|
|
strategy.reset_cycle()
|
|
|
|
# 5. Evaluate each market
|
|
# Fetch markets recently closed for inversion bug — block re-entry for 24h
|
|
inverted_guard: set[str] = await db.get_recently_closed_inverted(hours=24)
|
|
if inverted_guard:
|
|
log.info(
|
|
"Reentry guard active for %d market(s) (inversion, 24h): %s",
|
|
len(inverted_guard), sorted(inverted_guard),
|
|
)
|
|
|
|
reentry_guard_count = 0
|
|
cycle_trades = 0
|
|
for market in markets:
|
|
if market.id in inverted_guard:
|
|
log.info(
|
|
"reentry_guard_triggered market=%s | skipping — closed for inversion within 24h | %s",
|
|
market.id, market.question[:60],
|
|
)
|
|
reentry_guard_count += 1
|
|
continue
|
|
|
|
# evaluate() returns None for all skips — reasons are logged internally
|
|
signal = await strategy.evaluate(market, ext_data, occupied_families)
|
|
if signal is None:
|
|
continue
|
|
|
|
log.info(
|
|
"Signal generated: market=%-50s | edge_gross=%+.3f | edge_net=%+.3f | "
|
|
"regime_min=%.2f | family=%s | conf=%.2f",
|
|
market.question[:50],
|
|
signal.edge_gross,
|
|
signal.edge_net,
|
|
signal.regime_min_edge,
|
|
signal.family_key,
|
|
signal.confidence,
|
|
)
|
|
|
|
# 6. Risk check + position sizing
|
|
order = risk.size_order(signal, portfolio)
|
|
if order is None:
|
|
log.debug("Risk manager rejected order for %s", market.id)
|
|
continue
|
|
|
|
# 7. Execute (paper)
|
|
trade = await executor.execute(order)
|
|
if trade:
|
|
await metrics.record_trade(trade)
|
|
log.info("Trade executed: %s", trade)
|
|
# Block this family for the rest of the cycle (Phase 2)
|
|
occupied_families.add(signal.family_key)
|
|
cycle_trades += 1
|
|
|
|
# 8. [CYCLE SUMMARY] — one block per cycle, stable format for grep/compare
|
|
stats = strategy.get_cycle_stats()
|
|
legacy_incomplete_count = await db.get_legacy_incomplete_count()
|
|
n_total = len(markets)
|
|
n_uncertainty = sum(1 for m in markets if 0.35 <= m.yes_price <= 0.65)
|
|
n_eval = stats["evaluated_count"]
|
|
def _pct(n: int, denom: int) -> str:
|
|
if denom == 0:
|
|
return "0% (0/0)"
|
|
return f"{n * 100 // denom}% ({n}/{denom})"
|
|
gnews_cap = strategy._news_queries_this_cycle # already updated by reset below
|
|
|
|
log.info(
|
|
"[CYCLE SUMMARY]\n"
|
|
" markets_total: %d\n"
|
|
" markets_uncertainty_zone: %d (prior 0.35-0.65)\n"
|
|
" max_edge_gross: %+.3f\n"
|
|
" max_edge_net: %+.3f\n"
|
|
" pct_edge_gross_gt_002: %s\n"
|
|
" pct_edge_gross_gt_004: %s\n"
|
|
" blocked_by_family: %d\n"
|
|
" blocked_by_prior_extreme: %d\n"
|
|
" blocked_by_edge_net_nonpositive:%d\n"
|
|
" blocked_by_edge_net_below_regime:%d\n"
|
|
" trades_executed: %d\n"
|
|
" gnews_queries_used: %d/%d\n"
|
|
" reentry_guard_blocked: %d\n"
|
|
" legacy_incomplete_seen: %d\n"
|
|
" family_conflicts_prevented: %d\n"
|
|
" manifold_matches_accepted: %d\n"
|
|
" manifold_matches_rejected: %d",
|
|
n_total,
|
|
n_uncertainty,
|
|
stats["max_edge_gross"],
|
|
stats["max_edge_net"],
|
|
_pct(stats["gross_gt_002"], n_total),
|
|
_pct(stats["gross_gt_004"], n_total),
|
|
stats["skip_family"],
|
|
stats["skip_prior_extreme"],
|
|
stats["skip_edge_net_nonpositive"],
|
|
stats["skip_edge_net_below_regime"],
|
|
cycle_trades,
|
|
stats["gnews_queries_used"], MAX_NEWS_QUERIES_PER_CYCLE,
|
|
reentry_guard_count,
|
|
legacy_incomplete_count,
|
|
stats["skip_family"],
|
|
stats["manifold_matches_accepted"],
|
|
stats["manifold_matches_rejected"],
|
|
)
|
|
|
|
# 9. Update daily metrics
|
|
await metrics.update_daily_summary()
|
|
|
|
except Exception as e:
|
|
log.error("Error in trading loop: %s", e, exc_info=True)
|
|
|
|
await asyncio.sleep(60)
|
|
|
|
|
|
async def run_legacy_scan(
|
|
db: Database,
|
|
markets: list,
|
|
manifold: ManifoldClient,
|
|
executor: PaperExecutor,
|
|
paper_mode: bool,
|
|
) -> None:
|
|
"""
|
|
One-time startup scan: re-key all open DB positions with the current
|
|
market_family_key() logic, detect contradictions, re-validate Manifold
|
|
signals, and report KEEP / REVIEW / CLOSE_RECOMMENDED per position.
|
|
|
|
In paper_mode: auto-closes all CLOSE_RECOMMENDED positions after logging.
|
|
"""
|
|
positions = await db.get_open_position_details()
|
|
if not positions:
|
|
log.info("Legacy scan: no open positions — skipping.")
|
|
return
|
|
|
|
market_by_id: dict = {str(m.id): m for m in markets}
|
|
|
|
# Step 1: enrich each position with the re-computed family key
|
|
enriched: list[dict] = []
|
|
for pos in positions:
|
|
mid = str(pos["market_id"])
|
|
live_mkt = market_by_id.get(mid)
|
|
old_fk = pos.get("family_key") or ""
|
|
if live_mkt:
|
|
new_fk = market_family_key(live_mkt)
|
|
else:
|
|
# Market not in active list — compute from stored question alone
|
|
_dummy = Market(
|
|
id=mid, condition_id="", question=pos["question"],
|
|
yes_token_id="", no_token_id="",
|
|
yes_price=0.5, no_price=0.5,
|
|
volume_24h=0, end_date="", active=False, category="",
|
|
)
|
|
computed = market_family_key(_dummy)
|
|
# Reject degenerate fallbacks that start with "-" (missing category + end_date)
|
|
new_fk = computed if not computed.startswith("-") else (old_fk or "unknown")
|
|
|
|
is_legacy_incomplete = (pos.get("edge_net") is None) and (not live_mkt)
|
|
enriched.append({
|
|
**dict(pos),
|
|
"market_id": mid,
|
|
"live_market": live_mkt,
|
|
"family_key_old": old_fk,
|
|
"family_key_new": new_fk,
|
|
"fk_changed": new_fk != old_fk,
|
|
"manifold_prob_new": None,
|
|
"manifold_inverted": False,
|
|
"recommendation": "legacy_incomplete" if is_legacy_incomplete else "OK",
|
|
"rec_reason": "edge_net and live market unavailable" if is_legacy_incomplete else "no family conflict",
|
|
})
|
|
|
|
# Step 2: group by new family key — identify conflicting siblings
|
|
family_groups: dict[str, list[dict]] = {}
|
|
for p in enriched:
|
|
family_groups.setdefault(p["family_key_new"], []).append(p)
|
|
|
|
for p in enriched:
|
|
group = family_groups[p["family_key_new"]]
|
|
if len(group) > 1:
|
|
best = max(group, key=lambda x: (x.get("edge_net") or 0.0))
|
|
if p["market_id"] == best["market_id"]:
|
|
p["recommendation"] = "KEEP"
|
|
p["rec_reason"] = (
|
|
f"highest edge_net={p.get('edge_net') or 0.0:.3f} in family"
|
|
)
|
|
else:
|
|
p["recommendation"] = "CLOSE_RECOMMENDED"
|
|
p["rec_reason"] = (
|
|
f"family conflict: sibling {best['market_id']} "
|
|
f"has edge_net={best.get('edge_net') or 0.0:.3f}"
|
|
)
|
|
elif p["fk_changed"]:
|
|
p["recommendation"] = "REVIEW"
|
|
p["rec_reason"] = "family key changed but no sibling conflict"
|
|
|
|
# Step 2.5: persist corrected family keys in DB for changed positions
|
|
for p in enriched:
|
|
if p["fk_changed"] and p["family_key_new"] not in ("unknown", ""):
|
|
await db.update_family_key(p["market_id"], p["family_key_new"])
|
|
log.info(
|
|
"family_key updated in DB: market=%s | %s → %s",
|
|
p["market_id"], p["family_key_old"] or "none", p["family_key_new"],
|
|
)
|
|
|
|
# Step 3: Manifold re-query for positions whose family key changed
|
|
for p in enriched:
|
|
if p["live_market"] and p["fk_changed"]:
|
|
prob = await manifold.get_probability(p["question"])
|
|
p["manifold_prob_new"] = prob
|
|
if prob is not None:
|
|
# Detect if original trade direction conflicts with corrected Manifold signal
|
|
if prob < 0.40 and p["direction"] == "BUY_YES":
|
|
p["manifold_inverted"] = True
|
|
note = f"Manifold:{prob:.3f} contradicts BUY_YES (inversion bug confirmed)"
|
|
if p["recommendation"] in ("OK", "REVIEW"):
|
|
p["recommendation"] = "CLOSE_RECOMMENDED"
|
|
p["rec_reason"] = note
|
|
else:
|
|
p["rec_reason"] += f" | {note}"
|
|
elif prob > 0.60 and p["direction"] == "BUY_NO":
|
|
p["manifold_inverted"] = True
|
|
note = f"Manifold:{prob:.3f} contradicts BUY_NO (inversion bug confirmed)"
|
|
if p["recommendation"] in ("OK", "REVIEW"):
|
|
p["recommendation"] = "CLOSE_RECOMMENDED"
|
|
p["rec_reason"] = note
|
|
else:
|
|
p["rec_reason"] += f" | {note}"
|
|
|
|
# Step 4: log the full scan report (before any closures)
|
|
n_close = sum(1 for p in enriched if p["recommendation"] == "CLOSE_RECOMMENDED")
|
|
n_keep = sum(1 for p in enriched if p["recommendation"] == "KEEP")
|
|
n_ok = sum(1 for p in enriched if p["recommendation"] == "OK")
|
|
n_review = sum(1 for p in enriched if p["recommendation"] == "REVIEW")
|
|
n_legacy = sum(1 for p in enriched if p["recommendation"] == "legacy_incomplete")
|
|
|
|
log.warning(
|
|
"━" * 70 + "\nLEGACY SCAN — %d position(s): OK=%d KEEP=%d REVIEW=%d CLOSE_RECOMMENDED=%d LEGACY_INCOMPLETE=%d",
|
|
len(enriched), n_ok, n_keep, n_review, n_close, n_legacy,
|
|
)
|
|
for p in enriched:
|
|
log.warning(
|
|
" [%-18s] market=%-8s | dir=%-8s | edge_net=%+.3f\n"
|
|
" stored_family: %s\n"
|
|
" new_family: %s%s\n"
|
|
" manifold_new: %s\n"
|
|
" reason: %s",
|
|
p["recommendation"],
|
|
p["market_id"], p["direction"],
|
|
p.get("edge_net") or 0.0,
|
|
p["family_key_old"] or "none",
|
|
p["family_key_new"],
|
|
" [CHANGED]" if p["fk_changed"] else "",
|
|
f"{p['manifold_prob_new']:.3f}" if p["manifold_prob_new"] is not None else "n/a",
|
|
p["rec_reason"],
|
|
)
|
|
log.warning("━" * 70)
|
|
|
|
# Step 5: auto-close in paper mode
|
|
if paper_mode and n_close > 0 and isinstance(executor, PaperExecutor):
|
|
log.warning("PAPER MODE: auto-closing %d CLOSE_RECOMMENDED position(s)...", n_close)
|
|
for p in enriched:
|
|
if p["recommendation"] == "CLOSE_RECOMMENDED":
|
|
recovered = await executor.close_legacy_position(p["market_id"], p["rec_reason"])
|
|
log.warning(
|
|
" AUTO_CLOSED market=%s | $%.2f returned to cash | %s",
|
|
p["market_id"], recovered, p["question"][:60],
|
|
)
|
|
log.warning("Legacy scan closures complete.")
|
|
elif n_close > 0:
|
|
log.warning("REAL MODE: %d position(s) marked CLOSE_RECOMMENDED — close manually.", n_close)
|
|
|
|
|
|
async def main() -> None:
|
|
if PAPER_MODE:
|
|
log.info("=" * 60)
|
|
log.info(" PAPER TRADING MODE — No real money at risk")
|
|
log.info(" Bankroll: $%.2f simulated", PAPER_BANKROLL)
|
|
log.info("=" * 60)
|
|
else:
|
|
log.warning("REAL TRADING MODE ACTIVE — Real money at risk!")
|
|
|
|
db = Database()
|
|
await db.connect()
|
|
await db.run_migrations()
|
|
await db.backfill_feature_columns()
|
|
|
|
poly = PolymarketClient()
|
|
external = ExternalDataClient()
|
|
news = NewsClient()
|
|
manifold = ManifoldClient()
|
|
strategy = BayesianStrategy(news=news, manifold=manifold)
|
|
risk = RiskManager(max_position_pct=0.05, max_exposure_pct=0.30)
|
|
executor = PaperExecutor(db=db, bankroll=PAPER_BANKROLL) if PAPER_MODE else None
|
|
metrics = MetricsTracker(db=db)
|
|
|
|
if executor is None:
|
|
from bot.executor.real import RealExecutor # noqa
|
|
executor = RealExecutor(db=db)
|
|
|
|
if PAPER_MODE:
|
|
await executor.initialize()
|
|
|
|
# Legacy scan: re-key all open positions, detect contradictions, auto-close
|
|
# CLOSE_RECOMMENDED in paper mode. Runs once at startup using a fresh
|
|
# market snapshot; the trading loop will re-fetch on its own first cycle.
|
|
try:
|
|
scan_markets = await poly.get_active_markets()
|
|
except Exception as e:
|
|
log.warning("Could not fetch markets for legacy scan: %s — scan skipped", e)
|
|
scan_markets = []
|
|
await run_legacy_scan(db, scan_markets, manifold, executor, PAPER_MODE)
|
|
|
|
try:
|
|
await run_trading_loop(poly, external, strategy, risk, executor, metrics, db)
|
|
finally:
|
|
await db.disconnect()
|
|
await news.close()
|
|
await manifold.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|