This commit is contained in:
@@ -0,0 +1,490 @@
|
||||
"""
|
||||
ResearchOwl Exhaustive Scraper
|
||||
Core engine: discovers, expands, and evaluates sources recursively
|
||||
"""
|
||||
import asyncio
|
||||
import re
|
||||
import time
|
||||
from typing import Optional
|
||||
from urllib.parse import urljoin, urlparse, quote_plus
|
||||
|
||||
import aiohttp
|
||||
import feedparser
|
||||
import structlog
|
||||
import trafilatura
|
||||
from bs4 import BeautifulSoup
|
||||
from duckduckgo_search import DDGS
|
||||
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential
|
||||
|
||||
from src.config import settings
|
||||
from src.db.database import ResearchDB
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
HEADERS = {
|
||||
"User-Agent": "Mozilla/5.0 (compatible; ResearchOwl/1.0; +https://chemavx.xyz)",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-US,en;q=0.9,es;q=0.8",
|
||||
}
|
||||
|
||||
# Domains to skip — not useful for research
|
||||
BLACKLIST_DOMAINS = {
|
||||
"facebook.com", "twitter.com", "x.com", "instagram.com", "tiktok.com",
|
||||
"pinterest.com", "linkedin.com", "amazon.com", "ebay.com", "etsy.com",
|
||||
"ads.google.com", "doubleclick.net", "googleadservices.com",
|
||||
}
|
||||
|
||||
# Source type patterns
|
||||
YOUTUBE_RE = re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]{11})")
|
||||
PDF_RE = re.compile(r"\.pdf(\?|$)", re.IGNORECASE)
|
||||
REDDIT_RE = re.compile(r"reddit\.com/(r/\w+/comments/\w+)")
|
||||
WIKIPEDIA_RE = re.compile(r"wikipedia\.org/wiki/(.+)")
|
||||
|
||||
|
||||
def detect_source_type(url: str) -> str:
|
||||
if YOUTUBE_RE.search(url):
|
||||
return "youtube"
|
||||
if PDF_RE.search(url):
|
||||
return "pdf"
|
||||
if REDDIT_RE.search(url):
|
||||
return "reddit"
|
||||
if WIKIPEDIA_RE.search(url):
|
||||
return "wikipedia"
|
||||
if "arxiv.org" in url:
|
||||
return "arxiv"
|
||||
if any(d in url for d in ["rss", "feed", "atom"]):
|
||||
return "rss"
|
||||
return "web"
|
||||
|
||||
|
||||
def is_blacklisted(url: str) -> bool:
|
||||
try:
|
||||
domain = urlparse(url).netloc.lower().replace("www.", "")
|
||||
return any(bl in domain for bl in BLACKLIST_DOMAINS)
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
def normalize_url(url: str) -> str:
|
||||
"""Remove fragments and tracking params"""
|
||||
parsed = urlparse(url)
|
||||
clean = parsed._replace(fragment="", query="")
|
||||
return clean.geturl().rstrip("/")
|
||||
|
||||
|
||||
class ExhaustiveScraper:
|
||||
"""
|
||||
Recursive source discoverer and content extractor.
|
||||
Keeps expanding until saturation or limits hit.
|
||||
"""
|
||||
|
||||
def __init__(self, db: ResearchDB, session_id: int, topic: str,
|
||||
progress_callback=None):
|
||||
self.db = db
|
||||
self.session_id = session_id
|
||||
self.topic = topic
|
||||
self.progress_callback = progress_callback
|
||||
self.iteration = 0
|
||||
self.total_sources = 0
|
||||
self._stop = False
|
||||
self._http: Optional[aiohttp.ClientSession] = None
|
||||
|
||||
async def stop(self):
|
||||
self._stop = True
|
||||
|
||||
async def _get_http(self) -> aiohttp.ClientSession:
|
||||
if not self._http or self._http.closed:
|
||||
timeout = aiohttp.ClientTimeout(total=settings.request_timeout)
|
||||
self._http = aiohttp.ClientSession(headers=HEADERS, timeout=timeout)
|
||||
return self._http
|
||||
|
||||
async def close(self):
|
||||
if self._http and not self._http.closed:
|
||||
await self._http.close()
|
||||
|
||||
# ─── Seed discovery ───────────────────────────────────────────────────────
|
||||
|
||||
async def seed(self):
|
||||
"""Initial broad search across multiple sources"""
|
||||
logger.info("Seeding research", topic=self.topic)
|
||||
tasks = [
|
||||
self._seed_duckduckgo(),
|
||||
self._seed_wikipedia(),
|
||||
self._seed_reddit(),
|
||||
self._seed_youtube(),
|
||||
]
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
async def _seed_duckduckgo(self):
|
||||
"""Multiple DDG queries for breadth"""
|
||||
queries = [
|
||||
self.topic,
|
||||
f"{self.topic} history facts",
|
||||
f"{self.topic} evidence analysis",
|
||||
f"{self.topic} official report",
|
||||
f"{self.topic} investigation",
|
||||
f"{self.topic} wikipedia",
|
||||
f"{self.topic} documentary",
|
||||
f"{self.topic} research study",
|
||||
]
|
||||
try:
|
||||
with DDGS() as ddgs:
|
||||
for query in queries:
|
||||
if self._stop:
|
||||
break
|
||||
try:
|
||||
results = list(ddgs.text(query, max_results=settings.max_pages_per_search))
|
||||
for r in results:
|
||||
url = normalize_url(r.get("href", ""))
|
||||
if url and not is_blacklisted(url):
|
||||
await self.db.add_source(
|
||||
self.session_id, url,
|
||||
detect_source_type(url),
|
||||
depth=0,
|
||||
title=r.get("title")
|
||||
)
|
||||
await asyncio.sleep(settings.request_delay)
|
||||
except Exception as e:
|
||||
logger.warning("DDG query failed", query=query, error=str(e))
|
||||
except Exception as e:
|
||||
logger.error("DDG seeding failed", error=str(e))
|
||||
|
||||
async def _seed_wikipedia(self):
|
||||
"""Fetch Wikipedia article + all internal links"""
|
||||
topic_encoded = quote_plus(self.topic.replace(" ", "_"))
|
||||
wiki_url = f"https://en.wikipedia.org/wiki/{topic_encoded}"
|
||||
await self.db.add_source(self.session_id, wiki_url, "wikipedia", depth=0)
|
||||
|
||||
# Also search Wikipedia API for related articles
|
||||
try:
|
||||
http = await self._get_http()
|
||||
api_url = (
|
||||
f"https://en.wikipedia.org/w/api.php?action=opensearch"
|
||||
f"&search={quote_plus(self.topic)}&limit=10&format=json"
|
||||
)
|
||||
async with http.get(api_url) as resp:
|
||||
data = await resp.json()
|
||||
urls = data[3] if len(data) > 3 else []
|
||||
for url in urls:
|
||||
if url:
|
||||
await self.db.add_source(self.session_id, url, "wikipedia", depth=0)
|
||||
except Exception as e:
|
||||
logger.warning("Wikipedia API seed failed", error=str(e))
|
||||
|
||||
async def _seed_reddit(self):
|
||||
"""Search Reddit via old.reddit.com JSON"""
|
||||
try:
|
||||
http = await self._get_http()
|
||||
url = f"https://www.reddit.com/search.json?q={quote_plus(self.topic)}&sort=top&limit=25"
|
||||
async with http.get(url, headers={**HEADERS, "User-Agent": "ResearchOwl/1.0"}) as resp:
|
||||
if resp.status == 200:
|
||||
data = await resp.json()
|
||||
posts = data.get("data", {}).get("children", [])
|
||||
for post in posts:
|
||||
post_data = post.get("data", {})
|
||||
permalink = post_data.get("permalink", "")
|
||||
if permalink:
|
||||
full_url = f"https://www.reddit.com{permalink}"
|
||||
await self.db.add_source(
|
||||
self.session_id, full_url, "reddit", depth=0,
|
||||
title=post_data.get("title")
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Reddit seed failed", error=str(e))
|
||||
|
||||
async def _seed_youtube(self):
|
||||
"""Search YouTube via DDG for video transcripts"""
|
||||
try:
|
||||
with DDGS() as ddgs:
|
||||
results = list(ddgs.videos(
|
||||
f"{self.topic} documentary explanation",
|
||||
max_results=10
|
||||
))
|
||||
for r in results:
|
||||
url = r.get("content", "")
|
||||
if "youtube.com" in url or "youtu.be" in url:
|
||||
await self.db.add_source(
|
||||
self.session_id, url, "youtube", depth=0,
|
||||
title=r.get("title")
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("YouTube seed failed", error=str(e))
|
||||
|
||||
# ─── Main pipeline ────────────────────────────────────────────────────────
|
||||
|
||||
async def run(self) -> dict:
|
||||
"""
|
||||
Main exhaustive loop:
|
||||
1. Seed initial sources
|
||||
2. Process batch → extract content + new URLs
|
||||
3. Repeat until saturated or limits hit
|
||||
"""
|
||||
await self.seed()
|
||||
|
||||
while not self._stop:
|
||||
self.iteration += 1
|
||||
pending = await self.db.get_pending_sources(self.session_id, limit=20)
|
||||
|
||||
if not pending:
|
||||
logger.info("No more pending sources — saturated", iteration=self.iteration)
|
||||
break
|
||||
|
||||
if self.total_sources >= settings.max_sources:
|
||||
logger.info("Max sources reached", total=self.total_sources)
|
||||
break
|
||||
|
||||
logger.info("Processing batch", iteration=self.iteration, batch_size=len(pending))
|
||||
|
||||
# Process sources concurrently (but not too many at once)
|
||||
semaphore = asyncio.Semaphore(5)
|
||||
tasks = [self._process_source(s, semaphore) for s in pending]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
new_sources = sum(1 for r in results if r and isinstance(r, int) and r > 0)
|
||||
self.total_sources += len(pending)
|
||||
|
||||
stats = await self.db.get_session_stats(self.session_id)
|
||||
await self.db.update_session(
|
||||
self.session_id,
|
||||
iterations=self.iteration,
|
||||
total_sources=self.total_sources
|
||||
)
|
||||
|
||||
if self.progress_callback:
|
||||
await self.progress_callback(
|
||||
iteration=self.iteration,
|
||||
total=self.total_sources,
|
||||
new_this_round=new_sources,
|
||||
stats=stats
|
||||
)
|
||||
|
||||
# Saturation check: if we found very few new URLs, we're done
|
||||
if new_sources < 3 and self.iteration > 2:
|
||||
logger.info("Saturation detected", new_sources=new_sources)
|
||||
break
|
||||
|
||||
await asyncio.sleep(settings.request_delay)
|
||||
|
||||
await self.close()
|
||||
final_stats = await self.db.get_session_stats(self.session_id)
|
||||
return final_stats
|
||||
|
||||
async def _process_source(self, source: dict, semaphore: asyncio.Semaphore) -> int:
|
||||
"""Extract content from a source and discover new URLs. Returns count of new URLs found."""
|
||||
async with semaphore:
|
||||
source_type = source["source_type"]
|
||||
url = source["url"]
|
||||
source_id = source["id"]
|
||||
|
||||
try:
|
||||
if source_type == "youtube":
|
||||
content, title = await self._extract_youtube(url)
|
||||
elif source_type == "wikipedia":
|
||||
content, title, new_urls = await self._extract_wikipedia(url)
|
||||
for new_url in (new_urls or []):
|
||||
await self.db.add_source(
|
||||
self.session_id, new_url, "wikipedia",
|
||||
depth=source["depth"] + 1
|
||||
)
|
||||
await self._mark_scraped(source_id, content, title, url)
|
||||
return len(new_urls or [])
|
||||
elif source_type == "reddit":
|
||||
content, title = await self._extract_reddit(url)
|
||||
elif source_type == "pdf":
|
||||
content, title = await self._extract_pdf(url)
|
||||
else:
|
||||
content, title, new_urls = await self._extract_web(url, source["depth"])
|
||||
for new_url in (new_urls or []):
|
||||
await self.db.add_source(
|
||||
self.session_id, new_url,
|
||||
detect_source_type(new_url),
|
||||
depth=source["depth"] + 1
|
||||
)
|
||||
await self._mark_scraped(source_id, content, title, url)
|
||||
return len(new_urls or [])
|
||||
|
||||
await self._mark_scraped(source_id, content, title, url)
|
||||
return 0
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Source extraction failed", url=url, error=str(e))
|
||||
await self.db.update_source(source_id, status="failed", error=str(e)[:200])
|
||||
return 0
|
||||
|
||||
async def _mark_scraped(self, source_id: int, content: Optional[str],
|
||||
title: Optional[str], url: str):
|
||||
if not content or len(content) < settings.min_content_length:
|
||||
await self.db.update_source(source_id, status="skipped",
|
||||
error="Content too short or empty")
|
||||
return
|
||||
|
||||
word_count = len(content.split())
|
||||
|
||||
await self.db.save_source_content(source_id, content)
|
||||
|
||||
await self.db.update_source(
|
||||
source_id,
|
||||
status="scraped",
|
||||
title=title or url,
|
||||
word_count=word_count,
|
||||
scraped_at=time.time(),
|
||||
quality_score=min(1.0, word_count / 1000)
|
||||
)
|
||||
|
||||
# ─── Extractors ───────────────────────────────────────────────────────────
|
||||
|
||||
async def _extract_web(self, url: str, depth: int) -> tuple[Optional[str], Optional[str], list[str]]:
|
||||
"""Extract text + discover internal/external links"""
|
||||
if is_blacklisted(url):
|
||||
return None, None, []
|
||||
|
||||
http = await self._get_http()
|
||||
async with http.get(url) as resp:
|
||||
if resp.status != 200:
|
||||
return None, None, []
|
||||
html = await resp.text(errors="replace")
|
||||
|
||||
# Extract main content with trafilatura (much better than BS4 for articles)
|
||||
content = trafilatura.extract(
|
||||
html,
|
||||
include_links=False,
|
||||
include_tables=True,
|
||||
favor_recall=True
|
||||
)
|
||||
|
||||
# Extract title and new URLs with BS4
|
||||
soup = BeautifulSoup(html, "lxml")
|
||||
title = soup.title.string.strip() if soup.title else url
|
||||
|
||||
new_urls = []
|
||||
if depth < settings.max_depth:
|
||||
base = f"{urlparse(url).scheme}://{urlparse(url).netloc}"
|
||||
for a in soup.find_all("a", href=True):
|
||||
href = a["href"]
|
||||
full_url = normalize_url(urljoin(base, href))
|
||||
if (full_url.startswith("http") and
|
||||
not is_blacklisted(full_url) and
|
||||
not await self.db.source_exists(self.session_id, full_url)):
|
||||
new_urls.append(full_url)
|
||||
|
||||
return content, title, new_urls[:30] # cap links per page
|
||||
|
||||
async def _extract_wikipedia(self, url: str) -> tuple[Optional[str], Optional[str], list[str]]:
|
||||
"""Wikipedia: extract content + follow internal wiki links"""
|
||||
http = await self._get_http()
|
||||
async with http.get(url) as resp:
|
||||
if resp.status != 200:
|
||||
return None, None, []
|
||||
html = await resp.text(errors="replace")
|
||||
|
||||
soup = BeautifulSoup(html, "lxml")
|
||||
title_tag = soup.find("h1", {"id": "firstHeading"})
|
||||
title = title_tag.text if title_tag else url
|
||||
|
||||
# Get clean content
|
||||
content_div = soup.find("div", {"id": "mw-content-text"})
|
||||
if not content_div:
|
||||
return None, title, []
|
||||
|
||||
# Remove navboxes, references, etc.
|
||||
for tag in content_div.find_all(["table", "sup", "style"]):
|
||||
tag.decompose()
|
||||
|
||||
content = content_div.get_text(separator="\n", strip=True)
|
||||
|
||||
# Extract Wikipedia internal links (only "See also" and body links)
|
||||
new_urls = []
|
||||
for a in content_div.find_all("a", href=True):
|
||||
href = a["href"]
|
||||
if href.startswith("/wiki/") and ":" not in href:
|
||||
full_url = f"https://en.wikipedia.org{href}"
|
||||
full_url = normalize_url(full_url)
|
||||
if not await self.db.source_exists(self.session_id, full_url):
|
||||
new_urls.append(full_url)
|
||||
|
||||
return content, title, new_urls[:20]
|
||||
|
||||
async def _extract_youtube(self, url: str) -> tuple[Optional[str], Optional[str]]:
|
||||
"""Extract YouTube transcript"""
|
||||
match = YOUTUBE_RE.search(url)
|
||||
if not match:
|
||||
return None, None
|
||||
|
||||
video_id = match.group(1)
|
||||
try:
|
||||
transcript_list = YouTubeTranscriptApi.get_transcript(
|
||||
video_id, languages=["en", "es", "en-US", "en-GB"]
|
||||
)
|
||||
text = " ".join(t["text"] for t in transcript_list)
|
||||
return text, f"YouTube: {video_id}"
|
||||
except NoTranscriptFound:
|
||||
return None, None
|
||||
except Exception as e:
|
||||
logger.warning("YouTube transcript failed", video_id=video_id, error=str(e))
|
||||
return None, None
|
||||
|
||||
async def _extract_reddit(self, url: str) -> tuple[Optional[str], Optional[str]]:
|
||||
"""Extract Reddit post + top comments via JSON API"""
|
||||
json_url = url.rstrip("/") + ".json?limit=100&sort=top"
|
||||
http = await self._get_http()
|
||||
try:
|
||||
async with http.get(
|
||||
json_url,
|
||||
headers={**HEADERS, "User-Agent": "ResearchOwl/1.0"}
|
||||
) as resp:
|
||||
if resp.status != 200:
|
||||
return None, None
|
||||
data = await resp.json()
|
||||
|
||||
post = data[0]["data"]["children"][0]["data"]
|
||||
title = post.get("title", "")
|
||||
selftext = post.get("selftext", "")
|
||||
|
||||
comments = []
|
||||
if len(data) > 1:
|
||||
for child in data[1]["data"]["children"][:50]:
|
||||
body = child.get("data", {}).get("body", "")
|
||||
if body and body != "[deleted]" and len(body) > 50:
|
||||
score = child.get("data", {}).get("score", 0)
|
||||
if score > 5: # only upvoted comments
|
||||
comments.append(body)
|
||||
|
||||
content = f"# {title}\n\n{selftext}\n\n## Top Comments\n\n" + "\n\n---\n\n".join(comments)
|
||||
return content, title
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Reddit extraction failed", url=url, error=str(e))
|
||||
return None, None
|
||||
|
||||
async def _extract_pdf(self, url: str) -> tuple[Optional[str], Optional[str]]:
|
||||
"""Download and extract PDF text"""
|
||||
import pdfplumber
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
http = await self._get_http()
|
||||
try:
|
||||
async with http.get(url) as resp:
|
||||
if resp.status != 200:
|
||||
return None, None
|
||||
content_length = int(resp.headers.get("content-length", 0))
|
||||
if content_length > 50 * 1024 * 1024: # skip PDFs > 50MB
|
||||
return None, None
|
||||
pdf_bytes = await resp.read()
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f:
|
||||
f.write(pdf_bytes)
|
||||
tmp_path = f.name
|
||||
|
||||
try:
|
||||
with pdfplumber.open(tmp_path) as pdf:
|
||||
pages = [p.extract_text() or "" for p in pdf.pages[:50]] # max 50 pages
|
||||
text = "\n\n".join(pages)
|
||||
return text, url.split("/")[-1]
|
||||
finally:
|
||||
os.unlink(tmp_path)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("PDF extraction failed", url=url, error=str(e))
|
||||
return None, None
|
||||
Reference in New Issue
Block a user