""" Manifold Markets client — cross-platform prediction market probability signals. For each Polymarket question, searches Manifold for a matching binary market by keyword overlap and returns a ManifoldMatchResult with full audit metadata. Match threshold: >= 0.40 Jaccard overlap (raised from 0.25 for stricter semantics). Outcome compatibility guard (conservative): - Conditional Manifold markets ("If X, will Y?" / "Conditional on..." / "Assuming..." / "Given that..." / mid-sentence "...if X is nominated, will...") are rejected: a premise-gated question is not equivalent to a direct outcome question even when token overlap is high. reason='conditional_market'. - Each side is classified into an outcome_type (nomination | primary_win | general_win | conditional | other). Matches with differing outcome_type — or any conditional side — are rejected. reason='outcome_mismatch: poly=... manifold=...'. Inversion guard (conservative): - If Polymarket question names a party (democrat/republican) AND the matched Manifold market names the OPPOSITE party → invert probability (1 - prob). - If Polymarket question names a party AND Manifold market has NO party keyword → reject with reason='ambiguous_inversion' (can't determine if inversion applies). - All other cases: no inversion, accept if score >= threshold. - Ante duda, reject. Cache TTL: 30 minutes. """ import logging import re import time from dataclasses import dataclass, field from typing import Optional import httpx # Version tag for every audit record this matcher produces. Persisted to # manifold_match_audit.matcher_version so metrics can isolate current-version # stats from legacy/pre-versioning records. Do NOT change this value once set; # bump to a new string only when matcher semantics change materially. MANIFOLD_MATCHER_VERSION = "v3_outcome_guard" MANIFOLD_API = "https://api.manifold.markets/v0" CACHE_TTL_SEC = 1800 # 30 minutes log = logging.getLogger(__name__) _MATCH_THRESHOLD = 0.40 # raised from 0.25 _STOP_WORDS = frozenset([ "will", "the", "a", "an", "is", "are", "was", "were", "be", "been", "by", "in", "on", "at", "to", "for", "of", "and", "or", "not", "this", "that", "with", "from", "have", "has", "had", "do", "does", "did", "can", "could", "would", "should", "may", "might", "shall", "win", "lose", "get", "become", "make", "take", "give", "see", "any", "who", "what", "when", "where", "which", "how", "over", "under", "than", "more", "most", "least", "its", "their", "they", "him", "her", "his", "she", "been", "being", "into", "after", "before", "during", "until", "against", "between", "through", ]) _REPUBLICAN_WORDS = frozenset(["republican", "republicans", "gop"]) _DEMOCRAT_WORDS = frozenset(["democrat", "democrats", "democratic"]) @dataclass class ManifoldMatchResult: status: str # 'accepted' | 'rejected' | 'no_results' prob_final: Optional[float] = None prob_raw: Optional[float] = None market_id: Optional[str] = None # Manifold internal market ID market_title: Optional[str] = None market_url: Optional[str] = None match_score: Optional[float] = None # 0-1 Jaccard match_reason: Optional[str] = None # human-readable explanation inverted: bool = False search_query: str = "" poly_outcome_type: Optional[str] = None # nomination|primary_win|general_win|conditional|other mfld_outcome_type: Optional[str] = None def _significant_words(text: str) -> set[str]: words = re.findall(r"[a-zA-Z]+", text.lower()) return {w for w in words if w not in _STOP_WORDS and len(w) >= 3} def _build_search_query(question: str, max_words: int = 6) -> str: words = re.findall(r"[a-zA-Z0-9]+", question) sig = [w for w in words if w.lower() not in _STOP_WORDS and len(w) >= 3] return " ".join(sig[:max_words]) def _detect_party(text: str) -> Optional[str]: """Return 'republican', 'democrat', or None if no party detected.""" words = set(re.findall(r"[a-zA-Z]+", text.lower())) if words & _REPUBLICAN_WORDS: return "republican" if words & _DEMOCRAT_WORDS: return "democrat" return None # ── Conditional-market detection (Task 1) ────────────────────────────────────── # A market is "conditional" when its resolution is gated on a premise rather than # asking the outcome directly (e.g. "If X is the nominee, will he win?"). Such a # market is NOT equivalent to a direct outcome question even with high token overlap. _CONDITIONAL_PREFIXES = ("if ", "conditional on", "assuming ", "given that") # " if ," — a mid-sentence conditional clause closed by a comma. _CONDITIONAL_CLAUSE_RE = re.compile(r"\sif\s[^,]*,") def _is_conditional(text: str) -> bool: """True if the question is phrased conditionally (premise-gated).""" t = (text or "").strip().lower() if t.startswith(_CONDITIONAL_PREFIXES): return True return bool(_CONDITIONAL_CLAUSE_RE.search(t)) def _classify_outcome(text: str) -> str: """ Coarse classification of what a question is *asking about*, used to reject matches whose outcomes are not equivalent even when tokens overlap. Returns one of: nomination | primary_win | general_win | conditional | other. Order matters: conditional is checked first (premise-gated), then nomination (which subsumes "primary nominee"), then primary, then general election. """ t = (text or "").strip().lower() if t.startswith(_CONDITIONAL_PREFIXES): return "conditional" if any(k in t for k in ("nominee", "nominated", "nomination")): return "nomination" if any(k in t for k in ("primary", "win the primary", "first round")): return "primary_win" if any(k in t for k in ("win the election", "win the race", "win the seat", "general election")): return "general_win" return "other" def _find_best_candidate(poly_question: str, results: list[dict]) -> tuple[Optional[dict], float]: """Find the highest-scoring open binary Manifold market by Jaccard overlap.""" poly_words = _significant_words(poly_question) if not poly_words: return None, 0.0 best_score = 0.0 best: Optional[dict] = None for result in results: if result.get("outcomeType") != "BINARY": continue prob = result.get("probability") if prob is None or not (0.02 < float(prob) < 0.98): continue title = result.get("question", "") m_words = _significant_words(title) if not m_words: continue overlap = len(poly_words & m_words) score = overlap / min(len(poly_words), len(m_words)) if score > best_score: best_score = score best = result return best, best_score def _market_url(match: dict) -> Optional[str]: slug = match.get("slug", "") creator = match.get("creatorUsername", "") return f"https://manifold.markets/{creator}/{slug}" if slug else None class ManifoldClient: """Async Manifold Markets client for cross-platform probability signals.""" def __init__(self) -> None: self._client = httpx.AsyncClient(timeout=15) # question → (fetched_at_monotonic, ManifoldMatchResult) self._cache: dict[str, tuple[float, ManifoldMatchResult]] = {} async def get_match(self, question: str) -> ManifoldMatchResult: """ Return a ManifoldMatchResult for the given Polymarket question. status='accepted' → prob_final is set and ready to use as signal status='rejected' → match found but failed quality/inversion check status='no_results' → API returned no results or call failed """ now = time.monotonic() cached = self._cache.get(question) if cached and (now - cached[0]) < CACHE_TTL_SEC: return cached[1] poly_outcome = _classify_outcome(question) query = _build_search_query(question) if not query: result = ManifoldMatchResult( status="no_results", search_query="", poly_outcome_type=poly_outcome, ) self._cache[question] = (now, result) return result try: resp = await self._client.get( f"{MANIFOLD_API}/search-markets", params={"term": query, "limit": 5, "filter": "open"}, ) resp.raise_for_status() results = resp.json() except Exception as exc: log.warning("Manifold API error for %r: %s", question[:40], exc) result = ManifoldMatchResult( status="no_results", search_query=query, poly_outcome_type=poly_outcome, ) self._cache[question] = (now, result) return result if not results: result = ManifoldMatchResult( status="no_results", search_query=query, poly_outcome_type=poly_outcome, ) self._cache[question] = (now, result) return result best, score = _find_best_candidate(question, results) # ── Score threshold ─────────────────────────────────────────────────── if best is None or score < _MATCH_THRESHOLD: reason = f"jaccard={score:.2f}<{_MATCH_THRESHOLD:.2f}" log.info( "Manifold REJECTED %-50s | score=%.2f < threshold=%.2f | query=%r", question[:50], score, _MATCH_THRESHOLD, query, ) result = ManifoldMatchResult( status="rejected", market_title=best.get("question") if best else None, match_score=score if best else None, match_reason=reason, search_query=query, poly_outcome_type=poly_outcome, mfld_outcome_type=_classify_outcome(best.get("question", "")) if best else None, ) self._cache[question] = (now, result) return result # ── Outcome compatibility + inversion analysis (conservative) ───────── mfld_title = best.get("question", "") mfld_outcome = _classify_outcome(mfld_title) poly_party = _detect_party(question) manifold_party = _detect_party(mfld_title) poly_words = _significant_words(question) mfld_words = _significant_words(mfld_title) matched_tokens = sorted(poly_words & mfld_words)[:6] inverted = False rejection_reason: Optional[str] = None # Task 1 — conditional Manifold market is never equivalent to a direct # outcome question, regardless of token overlap. if _is_conditional(mfld_title): rejection_reason = "conditional_market: manifold question is conditional" # Task 2 — outcome types must match; any conditional side is rejected. elif (poly_outcome == "conditional" or mfld_outcome == "conditional" or poly_outcome != mfld_outcome): rejection_reason = ( f"outcome_mismatch: poly={poly_outcome} manifold={mfld_outcome}" ) elif poly_party is not None: if manifold_party is None: # Poly specifies a party; Manifold does not → can't verify inversion safety rejection_reason = ( f"ambiguous_inversion: poly_party={poly_party}, mfld_party=none" ) elif manifold_party != poly_party: # Clear opposite parties — apply inversion inverted = True # manifold_party == poly_party → same party, no inversion needed if rejection_reason is not None: url = _market_url(best) log.info( "Manifold REJECTED %-50s | score=%.2f | reason=%s\n" " mfld_title: %s", question[:50], score, rejection_reason, best.get("question", "")[:70], ) result = ManifoldMatchResult( status="rejected", market_id=str(best.get("id", "")) or None, market_title=best.get("question"), market_url=url, match_score=score, match_reason=( f"jaccard={score:.2f}, tokens={matched_tokens}, {rejection_reason}" ), search_query=query, poly_outcome_type=poly_outcome, mfld_outcome_type=mfld_outcome, ) self._cache[question] = (now, result) return result # ── Accepted ────────────────────────────────────────────────────────── prob_raw = float(best["probability"]) prob_final = (1.0 - prob_raw) if inverted else prob_raw url = _market_url(best) match_reason = f"jaccard={score:.2f}, tokens={matched_tokens}" if inverted: match_reason += f", inverted=party({poly_party}≠{manifold_party})" log.info( "Manifold %s %-50s\n" " poly: %s\n" " mfld: %s\n" " url: %s\n" " score=%.2f | raw=%.3f | inverted=%s | final=%.3f", "ACCEPTED_INVERTED" if inverted else "ACCEPTED ", question[:50], question, best.get("question", ""), url or "n/a", score, prob_raw, inverted, prob_final, ) result = ManifoldMatchResult( status="accepted", prob_final=prob_final, prob_raw=prob_raw, market_id=str(best.get("id", "")) or None, market_title=best.get("question"), market_url=url, match_score=score, match_reason=match_reason, inverted=inverted, search_query=query, poly_outcome_type=poly_outcome, mfld_outcome_type=mfld_outcome, ) self._cache[question] = (now, result) return result async def close(self) -> None: await self._client.aclose()