""" Polymarket CLOB API client. Docs: https://docs.polymarket.com """ import asyncio import logging import os from dataclasses import dataclass, field from datetime import datetime, timezone, timedelta from typing import Optional import httpx log = logging.getLogger(__name__) POLYMARKET_API = "https://clob.polymarket.com" GAMMA_API = "https://gamma-api.polymarket.com" @dataclass class Market: id: str condition_id: str question: str yes_token_id: str no_token_id: str yes_price: float # 0-1, current best ask for YES no_price: float volume_24h: float end_date: str active: bool category: str = "" @dataclass class OrderBook: market_id: str yes_bids: list[tuple[float, float]] = field(default_factory=list) # (price, size) yes_asks: list[tuple[float, float]] = field(default_factory=list) mid_price: float = 0.5 class PolymarketClient: """ Async Polymarket client. In paper mode, API key is not needed — only public data. API key required for placing real orders. """ def __init__(self) -> None: self.api_key = os.getenv("POLYMARKET_API_KEY", "") self.secret = os.getenv("POLYMARKET_SECRET", "") self.passphrase = os.getenv("POLYMARKET_PASSPHRASE", "") self._client = httpx.AsyncClient(timeout=30) # Keywords that identify crypto / finance markets. # Short tickers are padded with spaces to avoid false substring matches # (e.g. " eth " won't match "Hegseth"; " sol " won't match "solar"). _CRYPTO_FINANCE_KEYWORDS: list[str] = [ "bitcoin", "btc", " eth ", "ethereum", " sol ", "solana", "xrp", "ripple", "dogecoin", "doge", "litecoin", "ltc", "coinbase", "binance", "kraken", "bybit", "okx", "usdc", "usdt", "stablecoin", "defi", "nft", "blockchain", "crypto", " fdv", "airdrop", "token launch", "token listing", "microstrategy", "mstr", "saylor", "nasdaq", "sp500", "s&p 500", "s&p500", "federal reserve", "fed rate", "interest rate", "inflation", "tariff", "treasury yield", " ipo ", "sec ", "cftc", ] _POLITICS_KEYWORDS: list[str] = [ "election", "president", "congress", "senate", "vote", "war", "trump", "biden", "ukraine", "russia", "israel", "nato", ] _TECH_KEYWORDS: list[str] = [ " ai ", "openai", "apple", "google", "microsoft", "meta", "nvidia", "regulation", "antitrust", ] _EVENTS_KEYWORDS: list[str] = [ "super bowl", "world cup", "oscar", "nobel", "spacex", "nasa", ] @classmethod def _is_crypto_finance(cls, question: str) -> bool: q = f" {question.lower()} " # pad so edge keywords match cleanly return any(kw in q for kw in cls._CRYPTO_FINANCE_KEYWORDS) @classmethod def _is_politics(cls, question: str) -> bool: q = f" {question.lower()} " return any(kw in q for kw in cls._POLITICS_KEYWORDS) @classmethod def _is_tech(cls, question: str) -> bool: q = f" {question.lower()} " return any(kw in q for kw in cls._TECH_KEYWORDS) @classmethod def _is_events(cls, question: str) -> bool: q = f" {question.lower()} " return any(kw in q for kw in cls._EVENTS_KEYWORDS) @classmethod def _detect_category(cls, question: str) -> str: """Return the category label for a market question, or '' if unsupported.""" if cls._is_crypto_finance(question): return "crypto/finance" if cls._is_politics(question): return "politics" if cls._is_tech(question): return "tech" if cls._is_events(question): return "events" return "" async def get_active_markets( self, min_volume: float = 1000, pages: int = 3, page_size: int = 200, max_days_to_resolution: int = 30, ) -> list[Market]: """Fetch active markets from Gamma API (no auth needed). Fetches events without tag filtering (tag= param is unreliable), then keeps only markets whose question matches any supported category (crypto/finance, politics, tech, events) and that: - have NOT already expired (end_dt >= now) - resolve within max_days_to_resolution days """ seen: set[str] = set() markets: list[Market] = [] now = datetime.now(timezone.utc) cutoff = now + timedelta(days=max_days_to_resolution) for page in range(pages): try: resp = await self._client.get( f"{GAMMA_API}/events", params={ "active": True, "closed": False, "limit": page_size, "offset": page * page_size, }, ) resp.raise_for_status() events = resp.json() if not events: break # no more pages for event in events: event_title = event.get("title", "") for m in event.get("markets", []): try: if not m.get("active") or m.get("closed"): continue question = m.get("question", "") # Detect category from question or event title category = self._detect_category(question) if not category: category = self._detect_category(event_title) if not category: continue # Filter: skip already-expired and far-future markets # Gamma API may return endDate or end_date (snake_case) raw_end = m.get("endDate") or m.get("end_date") or m.get("endDateIso", "") if raw_end: try: end_dt = datetime.fromisoformat( raw_end.replace("Z", "+00:00") ) # Make naive datetimes UTC-aware before comparing if end_dt.tzinfo is None: end_dt = end_dt.replace(tzinfo=timezone.utc) if end_dt < now: log.debug("Skipping expired market: %s", question[:60]) continue if end_dt > cutoff: continue except (ValueError, TypeError): pass # keep market if date unparseable market_id = str(m["id"]) if market_id in seen: continue vol = float(m.get("volume24hr", 0)) if vol < min_volume: continue raw_prices = m.get("outcomePrices", ["0.5", "0.5"]) if isinstance(raw_prices, str): import json as _json raw_prices = _json.loads(raw_prices) yes_price = float(raw_prices[0]) raw_tokens = m.get("clobTokenIds", ["", ""]) if isinstance(raw_tokens, str): import json as _json raw_tokens = _json.loads(raw_tokens) seen.add(market_id) markets.append(Market( id=market_id, condition_id=m.get("conditionId", ""), question=question, yes_token_id=raw_tokens[0] if raw_tokens else "", no_token_id=raw_tokens[1] if len(raw_tokens) > 1 else "", yes_price=yes_price, no_price=1 - yes_price, volume_24h=vol, end_date=m.get("endDate", ""), active=True, category=category, )) except (KeyError, ValueError, IndexError) as e: log.debug("Skipping malformed market: %s", e) except httpx.HTTPError as e: log.error("Polymarket API error (page=%d): %s", page, e) break by_cat: dict[str, int] = {} for mkt in markets: by_cat[mkt.category] = by_cat.get(mkt.category, 0) + 1 log.info( "Loaded %d markets (min_vol=%.0f, resolving within %dd): %s", len(markets), min_volume, max_days_to_resolution, ", ".join(f"{k}={v}" for k, v in sorted(by_cat.items())), ) return markets async def get_order_book(self, token_id: str) -> Optional[OrderBook]: """Get order book for a specific token.""" try: resp = await self._client.get( f"{POLYMARKET_API}/book", params={"token_id": token_id}, ) resp.raise_for_status() data = resp.json() bids = [(float(b["price"]), float(b["size"])) for b in data.get("bids", [])] asks = [(float(a["price"]), float(a["size"])) for a in data.get("asks", [])] mid = 0.5 if bids and asks: mid = (bids[0][0] + asks[0][0]) / 2 return OrderBook( market_id=token_id, yes_bids=bids, yes_asks=asks, mid_price=mid, ) except Exception as e: log.warning("Order book fetch failed for %s: %s", token_id, e) return None async def close(self) -> None: await self._client.aclose()