""" ResearchOwl Exhaustive Scraper Core engine: discovers, expands, and evaluates sources recursively """ import asyncio import re import time from typing import Optional from urllib.parse import urljoin, urlparse, quote_plus import aiohttp import feedparser import structlog import trafilatura from bs4 import BeautifulSoup from duckduckgo_search import DDGS from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound from tenacity import retry, stop_after_attempt, wait_exponential from src.config import settings from src.db.database import ResearchDB logger = structlog.get_logger() HEADERS = { "User-Agent": "Mozilla/5.0 (compatible; ResearchOwl/1.0; +https://chemavx.xyz)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9,es;q=0.8", } # Domains to skip — not useful for research BLACKLIST_DOMAINS = { "facebook.com", "twitter.com", "x.com", "instagram.com", "tiktok.com", "pinterest.com", "linkedin.com", "amazon.com", "ebay.com", "etsy.com", "ads.google.com", "doubleclick.net", "googleadservices.com", } # Source type patterns YOUTUBE_RE = re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]{11})") PDF_RE = re.compile(r"\.pdf(\?|$)", re.IGNORECASE) REDDIT_RE = re.compile(r"reddit\.com/(r/\w+/comments/\w+)") WIKIPEDIA_RE = re.compile(r"wikipedia\.org/wiki/(.+)") def detect_source_type(url: str) -> str: if YOUTUBE_RE.search(url): return "youtube" if PDF_RE.search(url): return "pdf" if REDDIT_RE.search(url): return "reddit" if WIKIPEDIA_RE.search(url): return "wikipedia" if "arxiv.org" in url: return "arxiv" if any(d in url for d in ["rss", "feed", "atom"]): return "rss" return "web" def is_blacklisted(url: str) -> bool: try: domain = urlparse(url).netloc.lower().replace("www.", "") return any(bl in domain for bl in BLACKLIST_DOMAINS) except Exception: return True def normalize_url(url: str) -> str: parsed = urlparse(url) clean = parsed._replace(fragment="", query="") return clean.geturl().rstrip("/") class ExhaustiveScraper: """ Recursive source discoverer and content extractor. Keeps expanding until saturation or limits hit. """ def __init__(self, db: ResearchDB, session_id: int, topic: str, progress_callback=None): self.db = db self.session_id = session_id self.topic = topic self.progress_callback = progress_callback self.iteration = 0 self.total_sources = 0 self._stop = False self._http: Optional[aiohttp.ClientSession] = None async def stop(self): self._stop = True async def _get_http(self) -> aiohttp.ClientSession: if not self._http or self._http.closed: timeout = aiohttp.ClientTimeout(total=settings.request_timeout) self._http = aiohttp.ClientSession(headers=HEADERS, timeout=timeout) return self._http async def close(self): if self._http and not self._http.closed: await self._http.close() # ─── Seed discovery ─────────────────────────────────────────────────────── async def seed(self): """Initial broad search across multiple sources""" logger.info("Seeding research", topic=self.topic) tasks = [ self._seed_duckduckgo(), self._seed_wikipedia(), self._seed_reddit(), self._seed_youtube(), ] await asyncio.gather(*tasks, return_exceptions=True) async def _seed_duckduckgo(self): """Multiple DDG queries — fresh DDGS() per query to avoid cascading ratelimits""" queries = [ self.topic, f"{self.topic} history facts", f"{self.topic} evidence analysis", f"{self.topic} official report", f"{self.topic} investigation", f"{self.topic} wikipedia", f"{self.topic} documentary", f"{self.topic} research study", ] for query in queries: if self._stop: break try: # Fresh instance per query — a ratelimit on one won't poison the rest with DDGS() as ddgs: results = list(ddgs.text(query, max_results=settings.max_pages_per_search)) for r in results: url = normalize_url(r.get("href", "")) if url and not is_blacklisted(url): await self.db.add_source( self.session_id, url, detect_source_type(url), depth=0, title=r.get("title") ) logger.info("DDG query ok", query=query, results=len(results)) except Exception as e: logger.warning("DDG query failed", query=query, error=str(e)) await asyncio.sleep(settings.request_delay * 2) async def _seed_wikipedia(self): """Search Wikipedia API for correct article URLs. Tries English first, falls back to Spanish if no results found.""" http = await self._get_http() added = 0 for lang in ("en", "es"): try: api_url = ( f"https://{lang}.wikipedia.org/w/api.php?action=opensearch" f"&search={quote_plus(self.topic)}&limit=10&format=json" ) async with http.get(api_url) as resp: data = await resp.json() urls = data[3] if len(data) > 3 else [] for url in urls: if url: await self.db.add_source(self.session_id, url, "wikipedia", depth=0) added += 1 logger.info("Wikipedia seed", lang=lang, found=len(urls)) if added > 0: break # English results found — no need to try Spanish except Exception as e: logger.warning("Wikipedia API seed failed", lang=lang, error=str(e)) async def _seed_reddit(self): """Search Reddit — sequential to avoid rate limiting""" try: http = await self._get_http() url = f"https://www.reddit.com/search.json?q={quote_plus(self.topic)}&sort=top&limit=15" async with http.get(url, headers={**HEADERS, "User-Agent": "ResearchOwl/1.0"}) as resp: if resp.status == 200: data = await resp.json() posts = data.get("data", {}).get("children", []) for post in posts: post_data = post.get("data", {}) permalink = post_data.get("permalink", "") if permalink: full_url = f"https://www.reddit.com{permalink}" await self.db.add_source( self.session_id, full_url, "reddit", depth=0, title=post_data.get("title") ) logger.info("Reddit seed", found=len(posts), status=resp.status) else: logger.warning("Reddit seed non-200", status=resp.status) except Exception as e: logger.warning("Reddit seed failed", error=str(e)) async def _seed_youtube(self): """Search YouTube via DDG for video transcripts""" try: with DDGS() as ddgs: results = list(ddgs.videos( f"{self.topic} documentary explanation", max_results=10 )) for r in results: url = r.get("content", "") if "youtube.com" in url or "youtu.be" in url: await self.db.add_source( self.session_id, url, "youtube", depth=0, title=r.get("title") ) except Exception as e: logger.warning("YouTube seed failed", error=str(e)) # ─── Main pipeline ──────────────────────────────────────────────────────── async def run(self) -> dict: """ Main exhaustive loop: 1. Seed initial sources 2. Process batch → extract content + new URLs 3. Repeat until saturated or limits hit """ await self.seed() while not self._stop: self.iteration += 1 pending = await self.db.get_pending_sources(self.session_id, limit=20) if not pending: logger.info("No more pending sources — saturated", iteration=self.iteration) break if self.total_sources >= settings.max_sources: logger.info("Max sources reached", total=self.total_sources) break logger.info("Processing batch", iteration=self.iteration, batch_size=len(pending)) # Reduced concurrency to 3 — avoids triggering Reddit/web rate limits semaphore = asyncio.Semaphore(3) tasks = [self._process_source(s, semaphore) for s in pending] results = await asyncio.gather(*tasks, return_exceptions=True) new_sources = sum(1 for r in results if r and isinstance(r, int) and r > 0) self.total_sources += len(pending) stats = await self.db.get_session_stats(self.session_id) await self.db.update_session( self.session_id, iterations=self.iteration, total_sources=self.total_sources ) if self.progress_callback: await self.progress_callback( iteration=self.iteration, total=self.total_sources, new_this_round=new_sources, stats=stats ) # Saturation check: if we found very few new URLs, we're done if new_sources < 3 and self.iteration > 2: logger.info("Saturation detected", new_sources=new_sources) break await asyncio.sleep(settings.request_delay) await self.close() final_stats = await self.db.get_session_stats(self.session_id) return final_stats async def _process_source(self, source: dict, semaphore: asyncio.Semaphore) -> int: """Extract content from a source and discover new URLs. Returns count of new URLs found.""" async with semaphore: source_type = source["source_type"] url = source["url"] source_id = source["id"] try: if source_type == "youtube": content, title = await self._extract_youtube(url) elif source_type == "wikipedia": content, title, new_urls = await self._extract_wikipedia(url) for new_url in (new_urls or []): await self.db.add_source( self.session_id, new_url, "wikipedia", depth=source["depth"] + 1 ) await self._mark_scraped(source_id, content, title, url) return len(new_urls or []) elif source_type == "reddit": content, title = await self._extract_reddit(url) # Small delay between Reddit requests to avoid rate limiting await asyncio.sleep(settings.request_delay) elif source_type == "pdf": content, title = await self._extract_pdf(url) else: content, title, new_urls = await self._extract_web(url, source["depth"]) for new_url in (new_urls or []): await self.db.add_source( self.session_id, new_url, detect_source_type(new_url), depth=source["depth"] + 1 ) await self._mark_scraped(source_id, content, title, url) return len(new_urls or []) await self._mark_scraped(source_id, content, title, url) return 0 except Exception as e: logger.warning("Source extraction failed", url=url, error=str(e)) await self.db.update_source(source_id, status="failed", error=str(e)[:200]) return 0 async def _mark_scraped(self, source_id: int, content: Optional[str], title: Optional[str], url: str): if not content: logger.debug("No content returned", source_id=source_id, url=url[:60]) await self.db.update_source(source_id, status="skipped", error="Content too short or empty") return if len(content) < settings.min_content_length: logger.debug("Content too short", source_id=source_id, length=len(content), url=url[:60]) await self.db.update_source(source_id, status="skipped", error="Content too short or empty") return word_count = len(content.split()) await self.db.save_source_content(source_id, content) await self.db.update_source( source_id, status="scraped", title=title or url, word_count=word_count, scraped_at=time.time(), quality_score=min(1.0, word_count / 1000) ) logger.info("Source scraped", source_id=source_id, words=word_count, url=url[:60]) # ─── Extractors ─────────────────────────────────────────────────────────── async def _extract_web(self, url: str, depth: int) -> tuple[Optional[str], Optional[str], list[str]]: """Extract text + discover internal/external links""" if is_blacklisted(url): return None, None, [] http = await self._get_http() async with http.get(url) as resp: if resp.status != 200: return None, None, [] html = await resp.text(errors="replace") # Extract main content with trafilatura (much better than BS4 for articles) content = trafilatura.extract( html, include_links=False, include_tables=True, favor_recall=True ) # Extract title and new URLs with BS4 soup = BeautifulSoup(html, "lxml") title = soup.title.string.strip() if soup.title else url new_urls = [] if depth < settings.max_depth: base = f"{urlparse(url).scheme}://{urlparse(url).netloc}" for a in soup.find_all("a", href=True): href = a["href"] full_url = normalize_url(urljoin(base, href)) if (full_url.startswith("http") and not is_blacklisted(full_url) and not await self.db.source_exists(self.session_id, full_url)): new_urls.append(full_url) return content, title, new_urls[:30] # cap links per page async def _extract_wikipedia(self, url: str) -> tuple[Optional[str], Optional[str], list[str]]: """Wikipedia: extract content + follow internal wiki links. Works for both en.wikipedia.org and es.wikipedia.org.""" http = await self._get_http() async with http.get(url) as resp: if resp.status != 200: logger.debug("Wikipedia non-200", status=resp.status, url=url[:60]) return None, None, [] html = await resp.text(errors="replace") soup = BeautifulSoup(html, "lxml") title_tag = soup.find("h1", {"id": "firstHeading"}) title = title_tag.text if title_tag else url # Get clean content content_div = soup.find("div", {"id": "mw-content-text"}) if not content_div: return None, title, [] # Remove navboxes, references, etc. for tag in content_div.find_all(["table", "sup", "style"]): tag.decompose() content = content_div.get_text(separator="\n", strip=True) # Extract Wikipedia internal links using the URL's actual domain parsed = urlparse(url) wiki_base = f"{parsed.scheme}://{parsed.netloc}" new_urls = [] for a in content_div.find_all("a", href=True): href = a["href"] if href.startswith("/wiki/") and ":" not in href: full_url = normalize_url(f"{wiki_base}{href}") if not await self.db.source_exists(self.session_id, full_url): new_urls.append(full_url) return content, title, new_urls[:20] async def _extract_youtube(self, url: str) -> tuple[Optional[str], Optional[str]]: """Extract YouTube transcript""" match = YOUTUBE_RE.search(url) if not match: return None, None video_id = match.group(1) try: transcript_list = YouTubeTranscriptApi.get_transcript( video_id, languages=["en", "es", "en-US", "en-GB"] ) text = " ".join(t["text"] for t in transcript_list) return text, f"YouTube: {video_id}" except NoTranscriptFound: return None, None except Exception as e: logger.warning("YouTube transcript failed", video_id=video_id, error=str(e)) return None, None async def _extract_reddit(self, url: str) -> tuple[Optional[str], Optional[str]]: """Extract Reddit post + top comments via JSON API""" json_url = url.rstrip("/") + ".json?limit=100&sort=top" http = await self._get_http() try: async with http.get( json_url, headers={**HEADERS, "User-Agent": "ResearchOwl/1.0"} ) as resp: if resp.status != 200: logger.debug("Reddit non-200", status=resp.status, url=url[:60]) return None, None data = await resp.json() post = data[0]["data"]["children"][0]["data"] title = post.get("title", "") selftext = post.get("selftext", "") comments = [] if len(data) > 1: for child in data[1]["data"]["children"][:50]: body = child.get("data", {}).get("body", "") if body and body != "[deleted]" and len(body) > 50: score = child.get("data", {}).get("score", 0) if score > 5: # only upvoted comments comments.append(body) content = f"# {title}\n\n{selftext}\n\n## Top Comments\n\n" + "\n\n---\n\n".join(comments) return content, title except Exception as e: logger.warning("Reddit extraction failed", url=url, error=str(e)) return None, None async def _extract_pdf(self, url: str) -> tuple[Optional[str], Optional[str]]: """Download and extract PDF text""" import pdfplumber import tempfile import os http = await self._get_http() try: async with http.get(url) as resp: if resp.status != 200: return None, None content_length = int(resp.headers.get("content-length", 0)) if content_length > 50 * 1024 * 1024: # skip PDFs > 50MB return None, None pdf_bytes = await resp.read() with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f: f.write(pdf_bytes) tmp_path = f.name try: with pdfplumber.open(tmp_path) as pdf: pages = [p.extract_text() or "" for p in pdf.pages[:50]] # max 50 pages text = "\n\n".join(pages) return text, url.split("/")[-1] finally: os.unlink(tmp_path) except Exception as e: logger.warning("PDF extraction failed", url=url, error=str(e)) return None, None