diff --git a/src/scraper/exhaustive.py b/src/scraper/exhaustive.py index fa0d9ae..c849e5e 100644 --- a/src/scraper/exhaustive.py +++ b/src/scraper/exhaustive.py @@ -67,7 +67,6 @@ def is_blacklisted(url: str) -> bool: def normalize_url(url: str) -> str: - """Remove fragments and tracking params""" parsed = urlparse(url) clean = parsed._replace(fragment="", query="") return clean.geturl().rstrip("/") @@ -117,7 +116,7 @@ class ExhaustiveScraper: await asyncio.gather(*tasks, return_exceptions=True) async def _seed_duckduckgo(self): - """Multiple DDG queries for breadth""" + """Multiple DDG queries — fresh DDGS() per query to avoid cascading ratelimits""" queries = [ self.topic, f"{self.topic} history facts", @@ -128,55 +127,57 @@ class ExhaustiveScraper: f"{self.topic} documentary", f"{self.topic} research study", ] - try: - with DDGS() as ddgs: - for query in queries: - if self._stop: - break - try: - results = list(ddgs.text(query, max_results=settings.max_pages_per_search)) - for r in results: - url = normalize_url(r.get("href", "")) - if url and not is_blacklisted(url): - await self.db.add_source( - self.session_id, url, - detect_source_type(url), - depth=0, - title=r.get("title") - ) - await asyncio.sleep(settings.request_delay) - except Exception as e: - logger.warning("DDG query failed", query=query, error=str(e)) - except Exception as e: - logger.error("DDG seeding failed", error=str(e)) + for query in queries: + if self._stop: + break + try: + # Fresh instance per query — a ratelimit on one won't poison the rest + with DDGS() as ddgs: + results = list(ddgs.text(query, max_results=settings.max_pages_per_search)) + for r in results: + url = normalize_url(r.get("href", "")) + if url and not is_blacklisted(url): + await self.db.add_source( + self.session_id, url, + detect_source_type(url), + depth=0, + title=r.get("title") + ) + logger.info("DDG query ok", query=query, results=len(results)) + except Exception as e: + logger.warning("DDG query failed", query=query, error=str(e)) + await asyncio.sleep(settings.request_delay * 2) async def _seed_wikipedia(self): - """Fetch Wikipedia article + all internal links""" - topic_encoded = quote_plus(self.topic.replace(" ", "_")) - wiki_url = f"https://en.wikipedia.org/wiki/{topic_encoded}" - await self.db.add_source(self.session_id, wiki_url, "wikipedia", depth=0) + """Search Wikipedia API for correct article URLs. + Tries English first, falls back to Spanish if no results found.""" + http = await self._get_http() + added = 0 - # Also search Wikipedia API for related articles - try: - http = await self._get_http() - api_url = ( - f"https://en.wikipedia.org/w/api.php?action=opensearch" - f"&search={quote_plus(self.topic)}&limit=10&format=json" - ) - async with http.get(api_url) as resp: - data = await resp.json() - urls = data[3] if len(data) > 3 else [] - for url in urls: - if url: - await self.db.add_source(self.session_id, url, "wikipedia", depth=0) - except Exception as e: - logger.warning("Wikipedia API seed failed", error=str(e)) + for lang in ("en", "es"): + try: + api_url = ( + f"https://{lang}.wikipedia.org/w/api.php?action=opensearch" + f"&search={quote_plus(self.topic)}&limit=10&format=json" + ) + async with http.get(api_url) as resp: + data = await resp.json() + urls = data[3] if len(data) > 3 else [] + for url in urls: + if url: + await self.db.add_source(self.session_id, url, "wikipedia", depth=0) + added += 1 + logger.info("Wikipedia seed", lang=lang, found=len(urls)) + if added > 0: + break # English results found — no need to try Spanish + except Exception as e: + logger.warning("Wikipedia API seed failed", lang=lang, error=str(e)) async def _seed_reddit(self): - """Search Reddit via old.reddit.com JSON""" + """Search Reddit — sequential to avoid rate limiting""" try: http = await self._get_http() - url = f"https://www.reddit.com/search.json?q={quote_plus(self.topic)}&sort=top&limit=25" + url = f"https://www.reddit.com/search.json?q={quote_plus(self.topic)}&sort=top&limit=15" async with http.get(url, headers={**HEADERS, "User-Agent": "ResearchOwl/1.0"}) as resp: if resp.status == 200: data = await resp.json() @@ -190,6 +191,9 @@ class ExhaustiveScraper: self.session_id, full_url, "reddit", depth=0, title=post_data.get("title") ) + logger.info("Reddit seed", found=len(posts), status=resp.status) + else: + logger.warning("Reddit seed non-200", status=resp.status) except Exception as e: logger.warning("Reddit seed failed", error=str(e)) @@ -236,8 +240,8 @@ class ExhaustiveScraper: logger.info("Processing batch", iteration=self.iteration, batch_size=len(pending)) - # Process sources concurrently (but not too many at once) - semaphore = asyncio.Semaphore(5) + # Reduced concurrency to 3 — avoids triggering Reddit/web rate limits + semaphore = asyncio.Semaphore(3) tasks = [self._process_source(s, semaphore) for s in pending] results = await asyncio.gather(*tasks, return_exceptions=True) @@ -291,6 +295,8 @@ class ExhaustiveScraper: return len(new_urls or []) elif source_type == "reddit": content, title = await self._extract_reddit(url) + # Small delay between Reddit requests to avoid rate limiting + await asyncio.sleep(settings.request_delay) elif source_type == "pdf": content, title = await self._extract_pdf(url) else: @@ -314,7 +320,14 @@ class ExhaustiveScraper: async def _mark_scraped(self, source_id: int, content: Optional[str], title: Optional[str], url: str): - if not content or len(content) < settings.min_content_length: + if not content: + logger.debug("No content returned", source_id=source_id, url=url[:60]) + await self.db.update_source(source_id, status="skipped", + error="Content too short or empty") + return + if len(content) < settings.min_content_length: + logger.debug("Content too short", source_id=source_id, + length=len(content), url=url[:60]) await self.db.update_source(source_id, status="skipped", error="Content too short or empty") return @@ -331,6 +344,7 @@ class ExhaustiveScraper: scraped_at=time.time(), quality_score=min(1.0, word_count / 1000) ) + logger.info("Source scraped", source_id=source_id, words=word_count, url=url[:60]) # ─── Extractors ─────────────────────────────────────────────────────────── @@ -371,10 +385,12 @@ class ExhaustiveScraper: return content, title, new_urls[:30] # cap links per page async def _extract_wikipedia(self, url: str) -> tuple[Optional[str], Optional[str], list[str]]: - """Wikipedia: extract content + follow internal wiki links""" + """Wikipedia: extract content + follow internal wiki links. + Works for both en.wikipedia.org and es.wikipedia.org.""" http = await self._get_http() async with http.get(url) as resp: if resp.status != 200: + logger.debug("Wikipedia non-200", status=resp.status, url=url[:60]) return None, None, [] html = await resp.text(errors="replace") @@ -393,13 +409,14 @@ class ExhaustiveScraper: content = content_div.get_text(separator="\n", strip=True) - # Extract Wikipedia internal links (only "See also" and body links) + # Extract Wikipedia internal links using the URL's actual domain + parsed = urlparse(url) + wiki_base = f"{parsed.scheme}://{parsed.netloc}" new_urls = [] for a in content_div.find_all("a", href=True): href = a["href"] if href.startswith("/wiki/") and ":" not in href: - full_url = f"https://en.wikipedia.org{href}" - full_url = normalize_url(full_url) + full_url = normalize_url(f"{wiki_base}{href}") if not await self.db.source_exists(self.session_id, full_url): new_urls.append(full_url) @@ -434,6 +451,7 @@ class ExhaustiveScraper: headers={**HEADERS, "User-Agent": "ResearchOwl/1.0"} ) as resp: if resp.status != 200: + logger.debug("Reddit non-200", status=resp.status, url=url[:60]) return None, None data = await resp.json()