commit ba0853633734b6f5ba23485eaea25a948eef71e3 Author: ChemaVX Date: Mon Apr 27 13:49:07 2026 +0000 feat: initial ResearchOwl diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..9013d98 --- /dev/null +++ b/.env.example @@ -0,0 +1,29 @@ +# ResearchOwl — Environment Variables +# Copy to .env and fill in values + +# Required +TELEGRAM_BOT_TOKEN=your_bot_token_here +TELEGRAM_ALLOWED_USERS=123456789 # your Telegram user ID + +# Ollama (default points to your existing instance) +OLLAMA_URL=http://ollama.chemavx.xyz +OLLAMA_MODEL=qwen2.5:3b + +# Claude fallback (optional, only for premium generation) +# ANTHROPIC_API_KEY=sk-ant-... +# CLAUDE_MODEL=claude-haiku-4-5 + +# Storage +DB_PATH=/data/researchowl.db + +# Scraping tuning +MAX_DEPTH=3 # how deep to follow links (1-5) +MAX_SOURCES=150 # hard cap on total sources +MAX_PAGES_PER_SEARCH=5 +REQUEST_DELAY=1.0 # seconds between requests (be polite) +MIN_CONTENT_LENGTH=200 + +# Processing +CHUNK_SIZE=800 +CHUNK_OVERLAP=100 +QUALITY_THRESHOLD=0.4 # 0-1, lower = more permissive diff --git a/.gitea/workflows/build.yml b/.gitea/workflows/build.yml new file mode 100644 index 0000000..20866f4 --- /dev/null +++ b/.gitea/workflows/build.yml @@ -0,0 +1,49 @@ +name: Build & Deploy ResearchOwl + +on: + push: + branches: [main] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Validate manifests + run: | + for f in k8s/*.yaml; do + python3 -c "import yaml; list(yaml.safe_load_all(open('$f')))" && echo "✅ $f OK" + done + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Gitea Registry + uses: docker/login-action@v3 + with: + registry: git.chemavx.xyz + username: ${{ secrets.REGISTRY_USER }} + password: ${{ secrets.REGISTRY_PASSWORD }} + + - name: Build and push + uses: docker/build-push-action@v5 + with: + context: . + push: true + tags: git.chemavx.xyz/chemavx/researchowl:latest + cache-from: type=registry,ref=git.chemavx.xyz/chemavx/researchowl:cache + cache-to: type=registry,ref=git.chemavx.xyz/chemavx/researchowl:cache,mode=max + + - name: Notify Telegram + if: always() + env: + TOKEN: ${{ secrets.TELEGRAM_BOT_TOKEN }} + CHAT: ${{ secrets.TELEGRAM_CHAT_ID }} + run: | + STATUS="${{ job.status }}" + EMOJI="✅" + if [ "$STATUS" != "success" ]; then EMOJI="❌"; fi + MSG="${EMOJI} ResearchOwl build ${STATUS} — $(git log -1 --pretty='%s')" + curl -s -X POST "https://api.telegram.org/bot${TOKEN}/sendMessage" \ + -d chat_id="${CHAT}" -d text="${MSG}" diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..db4bea1 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,326 @@ +# ResearchOwl — Instrucciones para Claude Code + +## Contexto del proyecto + +Eres el agente de construcción e implementación de **ResearchOwl**, un bot de Telegram que realiza investigación exhaustiva sobre cualquier tema usando scraping recursivo y Ollama (qwen2.5:3b) para procesamiento y generación de contenido. + +El homelab donde se desplegará tiene: +- **k3s** con Traefik + cert-manager + Cloudflare DNS +- **ArgoCD** para GitOps (repo: `k8s-manifests` en Gitea) +- **Gitea** en `git.chemavx.xyz` + Container Registry +- **Ollama** en `http://ollama.chemavx.xyz` con modelo `qwen2.5:3b` +- **Telegram bot** ya existente en `@chemavx_bot` +- Dominio base: `chemavx.xyz` + +--- + +## Objetivo + +Construir el proyecto completo, corregir todos los bugs, y dejarlo listo para desplegar en k3s. + +--- + +## Tareas a realizar — en orden + +### 1. Crear estructura del proyecto + +``` +researchowl/ +├── src/ +│ ├── __init__.py +│ ├── config.py +│ ├── scraper/ +│ │ ├── __init__.py +│ │ └── exhaustive.py +│ ├── processor/ +│ │ ├── __init__.py +│ │ └── processor.py +│ ├── generator/ +│ │ ├── __init__.py +│ │ └── generator.py +│ ├── bot/ +│ │ ├── __init__.py +│ │ └── bot.py +│ └── db/ +│ ├── __init__.py +│ └── database.py +├── k8s/ +│ ├── deployment.yaml +│ └── argocd-app.yaml +├── .gitea/ +│ └── workflows/ +│ └── build.yml +├── tests/ +│ └── test_scraper.py +├── main.py +├── requirements.txt +├── Dockerfile +├── .env.example +└── README.md +``` + +### 2. Corregir bug crítico en database.py + +La tabla `source_contents` está referenciada en `processor.py` pero no existe en el schema. + +**Añadir al SCHEMA en `database.py`:** + +```sql +CREATE TABLE IF NOT EXISTS source_contents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_id INTEGER NOT NULL UNIQUE REFERENCES sources(id), + content TEXT NOT NULL, + created_at REAL NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_source_contents ON source_contents(source_id); +``` + +**Añadir método en la clase `ResearchDB`:** + +```python +async def save_source_content(self, source_id: int, content: str): + await self.db.execute( + """INSERT OR REPLACE INTO source_contents (source_id, content, created_at) + VALUES (?, ?, ?)""", + (source_id, content, time.time()) + ) + await self.db.commit() + +async def get_source_content(self, source_id: int) -> Optional[str]: + cursor = await self.db.execute( + "SELECT content FROM source_contents WHERE source_id = ?", (source_id,) + ) + row = await cursor.fetchone() + return row[0] if row else None +``` + +### 3. Corregir bug en exhaustive.py — guardar contenido + +En el método `_mark_scraped` del `ExhaustiveScraper`, después de validar el contenido, hay que guardarlo en `source_contents`. Cambiar el método a: + +```python +async def _mark_scraped(self, source_id: int, content: Optional[str], + title: Optional[str], url: str): + if not content or len(content) < settings.min_content_length: + await self.db.update_source(source_id, status="skipped", + error="Content too short or empty") + return + + word_count = len(content.split()) + + # Guardar contenido raw + await self.db.save_source_content(source_id, content) + + await self.db.update_source( + source_id, + status="scraped", + title=title or url, + word_count=word_count, + scraped_at=time.time(), + quality_score=min(1.0, word_count / 1000) + ) +``` + +### 4. Corregir bug en processor.py — usar save/get content + +En `_process_source`, la consulta a `source_contents` usa `self.db.db.execute` directamente pero ahora debería usar el método del DB: + +```python +async def _process_source(self, session_id: int, topic: str, source: dict) -> int: + source_id = source["id"] + + # Usar el método correcto + content = await self.db.get_source_content(source_id) + if not content: + return 0 + + chunks = simple_chunk(content, settings.chunk_size, settings.chunk_overlap) + stored = 0 + + for i, chunk in enumerate(chunks): + if len(chunk.split()) < 30: + continue + + quality = await self._score_quality(chunk, topic) + if quality < settings.quality_threshold: + continue + + embedding = await self.ollama.embed(chunk[:1000]) + + await self.db.add_chunk( + session_id=session_id, + source_id=source_id, + content=chunk, + chunk_index=i, + token_count=len(chunk.split()), + quality_score=quality, + embedding=embedding + ) + stored += 1 + + return stored +``` + +### 5. Añadir comando /outputs al bot + +En `bot.py`, añadir este handler: + +```python +async def cmd_outputs(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + db_conn = await get_db() + db = ResearchDB(db_conn) + + try: + cursor = await db_conn.execute( + "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", + (chat_id,) + ) + row = await cursor.fetchone() + if not row: + await update.message.reply_text("No sessions found.") + return + + outputs = await db.get_outputs(row["id"]) + if not outputs: + await update.message.reply_text( + "No outputs generated yet. Use `/generate podcast|blog|report|thread`", + parse_mode=ParseMode.MARKDOWN + ) + return + + lines = [f"📄 *Outputs for: {row['topic']}*\n"] + for o in outputs: + from datetime import datetime + dt = datetime.utcfromtimestamp(o['created_at']).strftime("%Y-%m-%d %H:%M") + lines.append(f"• `{o['output_type']}` — {dt} ({len(o['content'])} chars)") + + await update.message.reply_text( + "\n".join(lines), + parse_mode=ParseMode.MARKDOWN + ) + finally: + await db_conn.close() +``` + +Y registrarlo en `create_bot()`: +```python +app.add_handler(CommandHandler("outputs", cmd_outputs)) +``` + +### 6. Instalar dependencias y verificar que importa correctamente + +```bash +pip install -r requirements.txt +python -c "from src.bot.bot import create_bot; print('OK')" +python -c "from src.scraper.exhaustive import ExhaustiveScraper; print('OK')" +python -c "from src.processor.processor import ContentProcessor; print('OK')" +python -c "from src.generator.generator import OutputGenerator; print('OK')" +``` + +Si hay errores de importación, corrígelos. + +### 7. Escribir test básico + +En `tests/test_scraper.py`: + +```python +import pytest +import asyncio +from src.scraper.exhaustive import ( + detect_source_type, is_blacklisted, normalize_url, simple_chunk +) + +def test_detect_source_type(): + assert detect_source_type("https://youtube.com/watch?v=abc123") == "youtube" + assert detect_source_type("https://reddit.com/r/test/comments/abc") == "reddit" + assert detect_source_type("https://en.wikipedia.org/wiki/Roswell") == "wikipedia" + assert detect_source_type("https://example.com/doc.pdf") == "pdf" + assert detect_source_type("https://example.com/article") == "web" + +def test_is_blacklisted(): + assert is_blacklisted("https://facebook.com/something") == True + assert is_blacklisted("https://en.wikipedia.org/wiki/Test") == False + +def test_normalize_url(): + assert normalize_url("https://example.com/page#section") == "https://example.com/page" + assert normalize_url("https://example.com/page/") == "https://example.com/page" +``` + +Nota: importar `simple_chunk` desde `processor.py`: + +```python +from src.processor.processor import simple_chunk + +def test_simple_chunk(): + text = "\n\n".join([f"Paragraph {i} with some content here." for i in range(50)]) + chunks = simple_chunk(text, chunk_size=100, overlap=20) + assert len(chunks) > 1 + assert all(isinstance(c, str) for c in chunks) +``` + +Ejecutar: `pytest tests/ -v` + +### 8. Build Docker y verificar + +```bash +docker build -t researchowl:test . +docker run --rm researchowl:test python -c "from src.bot.bot import create_bot; print('Docker OK')" +``` + +### 9. Preparar para despliegue + +Verificar que estos ficheros están correctos y completos: +- `k8s/deployment.yaml` — Deployment + PVC + Secret template +- `k8s/argocd-app.yaml` — ArgoCD Application apuntando a `k8s-manifests` +- `.gitea/workflows/build.yml` — CI con build → push → notificación Telegram + +### 10. Instrucciones finales para el usuario + +Al finalizar, mostrar: + +``` +✅ ResearchOwl listo para desplegar. + +Pasos para desplegar: +1. Crear secret en k3s: + kubectl create namespace researchowl + kubectl create secret generic researchowl-secrets \ + --from-literal=telegram-bot-token=TU_TOKEN \ + --from-literal=telegram-allowed-users=TU_USER_ID \ + -n researchowl + +2. Subir código a Gitea: + git init && git remote add origin https://git.chemavx.xyz/chemavx/researchowl + git add . && git commit -m "feat: initial ResearchOwl" + git push -u origin main + +3. Gitea Actions construirá la imagen automáticamente. + +4. Copiar manifests k8s/ a tu repo k8s-manifests/researchowl/ + y aplicar el ArgoCD app: + kubectl apply -f k8s/argocd-app.yaml + +5. ArgoCD desplegará automáticamente. + +Uso desde Telegram: + /research Incidente Roswell + /status + /finish + /generate podcast +``` + +--- + +## Notas importantes + +- **No crear un bot de Telegram nuevo** — el usuario ya tiene `@chemavx_bot`. Solo necesita configurar el token en el secret de k3s. +- **No modificar** los manifests de k8s para añadir Ingress — el bot usa polling de Telegram, no necesita exponer ningún puerto. +- **Ollama** ya está corriendo en el cluster. La URL `http://ollama.chemavx.xyz` es correcta. +- Si `qwen2.5:3b` es lento para scoring de calidad, se puede desactivar el scoring con `QUALITY_THRESHOLD=0` y todos los chunks pasan directamente. +- El proyecto usa **SQLite** (coherente con el resto del homelab). +- Respetar el `REQUEST_DELAY=1.0` para no hacer ban en las fuentes. diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..af116ec --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.12-slim + +WORKDIR /app + +# System dependencies for lxml, pdfplumber +RUN apt-get update && apt-get install -y \ + gcc g++ \ + libxml2-dev libxslt-dev \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Data directory +RUN mkdir -p /data + +ENV PYTHONUNBUFFERED=1 +ENV PYTHONPATH=/app + +CMD ["python", "main.py"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..345e19f --- /dev/null +++ b/README.md @@ -0,0 +1,108 @@ +# 🦉 ResearchOwl + +**Exhaustive research engine with Telegram interface.** + +Recursively discovers, scrapes, and processes sources from across the web, +then generates podcast scripts, blog posts, reports, or social threads using Ollama. + +## Architecture + +``` +Telegram (/research ) + ↓ +ExhaustiveScraper + ├── DuckDuckGo (8 queries × 5 results) + ├── Wikipedia + recursive internal links + ├── Reddit (top posts + top comments) + ├── YouTube (transcripts) + ├── PDFs (public documents) + └── Web scraping (trafilatura) + ↓ recursive expansion (depth 1-3) +ContentProcessor (Ollama qwen2.5:3b) + ├── Chunking (800 token chunks, 100 overlap) + ├── Quality scoring (0-10 per chunk) + ├── Embeddings (cosine similarity RAG) + └── Deduplication + ↓ +OutputGenerator (Ollama) + ├── 🎙️ Podcast script (20-30 min) + ├── 📝 Blog post (1500-2500 words) + ├── 📊 Research report (structured) + └── 🐦 Social thread (15-25 tweets) +``` + +## Telegram Commands + +| Command | Description | +|---------|-------------| +| `/research ` | Start exhaustive research | +| `/status` | Check progress | +| `/finish` | Stop early, proceed to generation | +| `/generate podcast\|blog\|report\|thread` | Generate output | +| `/sources` | List all sources found | +| `/cancel` | Cancel current research | + +## Local Development + +```bash +# 1. Clone and setup +git clone https://git.chemavx.xyz/chemavx/researchowl +cd researchowl + +# 2. Create virtualenv +python3 -m venv venv && source venv/bin/activate +pip install -r requirements.txt + +# 3. Configure +cp .env.example .env +# Edit .env with your values + +# 4. Run +python main.py +``` + +## Deploy to k3s + +```bash +# 1. Create namespace and secrets +kubectl create namespace researchowl +kubectl create secret generic researchowl-secrets \ + --from-literal=telegram-bot-token=YOUR_TOKEN \ + --from-literal=telegram-allowed-users=YOUR_USER_ID \ + -n researchowl + +# 2. Copy manifests to your k8s-manifests repo +cp k8s/*.yaml /path/to/k8s-manifests/researchowl/ + +# 3. Apply ArgoCD app +kubectl apply -f k8s/argocd-app.yaml + +# 4. Push to Gitea → Gitea Actions builds → ArgoCD deploys +git add . && git commit -m "feat: add researchowl" && git push +``` + +## Tuning + +| Variable | Default | Description | +|----------|---------|-------------| +| `MAX_SOURCES` | 150 | Hard cap on sources | +| `MAX_DEPTH` | 3 | Link recursion depth | +| `QUALITY_THRESHOLD` | 0.4 | Min chunk quality (0-1) | +| `REQUEST_DELAY` | 1.0s | Delay between requests | + +**Want more thoroughness?** +- Increase `MAX_SOURCES` to 300+ +- Increase `MAX_DEPTH` to 4-5 +- Lower `QUALITY_THRESHOLD` to 0.3 + +**Want faster results?** +- Lower `MAX_SOURCES` to 50 +- Set `MAX_DEPTH` to 1-2 +- Higher `QUALITY_THRESHOLD` to 0.6 + +## Notes + +- Uses **qwen2.5:3b** (your existing Ollama) for all AI tasks — zero API cost +- Optionally add `ANTHROPIC_API_KEY` for Claude fallback on generation +- SQLite database stored in `/data/researchowl.db` +- All outputs saved to DB and available via `/outputs` diff --git a/k8s/argocd-app.yaml b/k8s/argocd-app.yaml new file mode 100644 index 0000000..42b9018 --- /dev/null +++ b/k8s/argocd-app.yaml @@ -0,0 +1,20 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Application +metadata: + name: researchowl + namespace: argocd +spec: + project: default + source: + repoURL: https://git.chemavx.xyz/chemavx/k8s-manifests + targetRevision: HEAD + path: researchowl + destination: + server: https://kubernetes.default.svc + namespace: researchowl + syncPolicy: + automated: + prune: true + selfHeal: true + syncOptions: + - CreateNamespace=true diff --git a/k8s/deployment.yaml b/k8s/deployment.yaml new file mode 100644 index 0000000..eba91da --- /dev/null +++ b/k8s/deployment.yaml @@ -0,0 +1,96 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + name: researchowl + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: researchowl-data + namespace: researchowl +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: researchowl + namespace: researchowl + labels: + app: researchowl +spec: + replicas: 1 + selector: + matchLabels: + app: researchowl + template: + metadata: + labels: + app: researchowl + spec: + containers: + - name: researchowl + image: git.chemavx.xyz/chemavx/researchowl:latest + imagePullPolicy: Always + env: + - name: TELEGRAM_BOT_TOKEN + valueFrom: + secretKeyRef: + name: researchowl-secrets + key: telegram-bot-token + - name: TELEGRAM_ALLOWED_USERS + valueFrom: + secretKeyRef: + name: researchowl-secrets + key: telegram-allowed-users + - name: OLLAMA_URL + value: "http://ollama.chemavx.xyz" + - name: OLLAMA_MODEL + value: "qwen2.5:3b" + - name: DB_PATH + value: "/data/researchowl.db" + - name: MAX_SOURCES + value: "150" + - name: MAX_DEPTH + value: "3" + - name: QUALITY_THRESHOLD + value: "0.4" + volumeMounts: + - name: data + mountPath: /data + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "1Gi" + cpu: "500m" + volumes: + - name: data + persistentVolumeClaim: + claimName: researchowl-data + imagePullSecrets: + - name: gitea-registry + +--- +# Secret template — fill with real values and apply manually +# kubectl create secret generic researchowl-secrets \ +# --from-literal=telegram-bot-token=YOUR_TOKEN \ +# --from-literal=telegram-allowed-users=YOUR_USER_ID \ +# -n researchowl +apiVersion: v1 +kind: Secret +metadata: + name: researchowl-secrets + namespace: researchowl +type: Opaque +stringData: + telegram-bot-token: "REPLACE_ME" + telegram-allowed-users: "REPLACE_ME" diff --git a/main.py b/main.py new file mode 100644 index 0000000..a609bcb --- /dev/null +++ b/main.py @@ -0,0 +1,12 @@ +import structlog +from src.bot.bot import run + +structlog.configure( + processors=[ + structlog.stdlib.add_log_level, + structlog.dev.ConsoleRenderer(), + ] +) + +if __name__ == "__main__": + run() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7ceab3e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,31 @@ +# Core +fastapi==0.115.0 +uvicorn==0.30.0 +python-telegram-bot==21.5 +httpx==0.27.0 +aiohttp==3.10.0 + +# Scraping +beautifulsoup4==4.12.3 +lxml==5.2.2 +trafilatura==1.12.0 +youtube-transcript-api==0.6.2 +pdfplumber==0.11.3 +feedparser==6.0.11 +duckduckgo-search==6.2.6 + +# Storage & Embeddings +sqlite-vec==0.1.6 +aiosqlite==0.20.0 + +# Processing +tiktoken==0.7.0 +numpy==1.26.4 +scikit-learn==1.5.1 + +# Utilities +pydantic==2.8.0 +pydantic-settings==2.4.0 +tenacity==9.0.0 +structlog==24.4.0 +python-dotenv==1.0.1 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/__pycache__/__init__.cpython-310.pyc b/src/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..9861eb5 Binary files /dev/null and b/src/__pycache__/__init__.cpython-310.pyc differ diff --git a/src/__pycache__/config.cpython-310.pyc b/src/__pycache__/config.cpython-310.pyc new file mode 100644 index 0000000..7172d6c Binary files /dev/null and b/src/__pycache__/config.cpython-310.pyc differ diff --git a/src/bot/__init__.py b/src/bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/bot/__pycache__/__init__.cpython-310.pyc b/src/bot/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..09af951 Binary files /dev/null and b/src/bot/__pycache__/__init__.cpython-310.pyc differ diff --git a/src/bot/__pycache__/bot.cpython-310.pyc b/src/bot/__pycache__/bot.cpython-310.pyc new file mode 100644 index 0000000..106173d Binary files /dev/null and b/src/bot/__pycache__/bot.cpython-310.pyc differ diff --git a/src/bot/bot.py b/src/bot/bot.py new file mode 100644 index 0000000..56f8396 --- /dev/null +++ b/src/bot/bot.py @@ -0,0 +1,467 @@ +""" +ResearchOwl Telegram Bot +Main user interface — all commands handled here +""" +import asyncio +import os +from datetime import datetime +from typing import Optional + +import structlog +from telegram import Update, Message +from telegram.ext import ( + Application, CommandHandler, MessageHandler, + filters, ContextTypes +) +from telegram.constants import ParseMode + +from src.config import settings +from src.db.database import get_db, ResearchDB, ResearchStatus, OutputType +from src.scraper.exhaustive import ExhaustiveScraper +from src.processor.processor import OllamaClient, ContentProcessor +from src.generator.generator import OutputGenerator + +logger = structlog.get_logger() + +# Active research tasks per chat +_active_tasks: dict[int, asyncio.Task] = {} +_active_sessions: dict[int, int] = {} # chat_id -> session_id + + +def is_authorized(user_id: int) -> bool: + allowed = settings.allowed_user_ids + return not allowed or user_id in allowed + + +def fmt_progress(iteration: int, total: int, new: int, stats: dict) -> str: + scraped = stats.get("scraped", 0) + failed = stats.get("failed", 0) + pending = stats.get("pending", 0) + return ( + f"🔄 *Iteration {iteration}*\n" + f"📚 Sources found: `{total}`\n" + f"✅ Scraped: `{scraped}` | ❌ Failed: `{failed}` | ⏳ Pending: `{pending}`\n" + f"🆕 New this round: `{new}`" + ) + + +async def send_chunked(message: Message, text: str, parse_mode=None): + """Send long text in chunks of 4000 chars (Telegram limit)""" + max_len = 4000 + for i in range(0, len(text), max_len): + chunk = text[i:i + max_len] + await message.reply_text(chunk, parse_mode=parse_mode) + if len(text) > max_len: + await asyncio.sleep(0.5) + + +# ─── Commands ───────────────────────────────────────────────────────────────── + +async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + await update.message.reply_text( + "🦉 *ResearchOwl* — Exhaustive Research Engine\n\n" + "Commands:\n" + "`/research ` — Start exhaustive research\n" + "`/status` — Check current research progress\n" + "`/finish` — Stop research and proceed to generation\n" + "`/generate ` — Generate output (podcast|blog|report|thread)\n" + "`/sources` — List all sources found\n" + "`/outputs` — List generated outputs\n" + "`/cancel` — Cancel current research\n" + "`/help` — Show this message", + parse_mode=ParseMode.MARKDOWN + ) + + +async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + topic = " ".join(ctx.args).strip() if ctx.args else "" + + if not topic: + await update.message.reply_text( + "❌ Please provide a topic.\nExample: `/research Roswell incident`", + parse_mode=ParseMode.MARKDOWN + ) + return + + # Check for existing active research + if chat_id in _active_tasks and not _active_tasks[chat_id].done(): + await update.message.reply_text( + "⚠️ Research already in progress. Use /status or /finish first." + ) + return + + msg = await update.message.reply_text( + f"🦉 *ResearchOwl* starting research on:\n`{topic}`\n\n" + f"🌱 Seeding sources from:\n" + f"• DuckDuckGo (8 queries)\n" + f"• Wikipedia + internal links\n" + f"• Reddit top posts\n" + f"• YouTube transcripts\n\n" + f"This will run exhaustively until saturation. Use /finish to stop early.", + parse_mode=ParseMode.MARKDOWN + ) + + async def run_research(): + db_conn = await get_db() + db = ResearchDB(db_conn) + try: + session_id = await db.create_session(topic, chat_id) + _active_sessions[chat_id] = session_id + + progress_msg = msg + iteration_count = [0] + + async def on_progress(iteration, total, new_this_round, stats): + iteration_count[0] = iteration + text = fmt_progress(iteration, total, new_this_round, stats) + try: + await progress_msg.edit_text(text, parse_mode=ParseMode.MARKDOWN) + except Exception: + pass + + scraper = ExhaustiveScraper(db, session_id, topic, on_progress) + final_stats = await scraper.run() + + await db.update_session(session_id, status=ResearchStatus.SATURATED) + + scraped = final_stats.get("scraped", 0) + await update.message.reply_text( + f"✅ *Research complete!*\n\n" + f"📊 Results:\n" + f"• Sources found & scraped: `{scraped}`\n" + f"• Iterations: `{iteration_count[0]}`\n\n" + f"Now processing content with Ollama...\n" + f"Use `/generate podcast|blog|report|thread` when ready.", + parse_mode=ParseMode.MARKDOWN + ) + + # Auto-process after scraping + ollama = OllamaClient() + if await ollama.is_available(): + processor = ContentProcessor(db, ollama) + + async def proc_progress(total_chunks, total_words): + await update.message.reply_text( + f"🧠 *Processing complete!*\n" + f"• Chunks stored: `{total_chunks}`\n" + f"• Words researched: `{total_words:,}`\n\n" + f"Ready! Use `/generate podcast|blog|report|thread`", + parse_mode=ParseMode.MARKDOWN + ) + + await processor.process_session(session_id, topic, proc_progress) + else: + await update.message.reply_text( + "⚠️ Ollama not reachable — skipping processing.\n" + "You can still use `/generate` (will use raw content)." + ) + + except asyncio.CancelledError: + await db.update_session( + _active_sessions.get(chat_id, 0), + status=ResearchStatus.FINISHED + ) + await update.message.reply_text("🛑 Research cancelled.") + except Exception as e: + logger.error("Research task failed", error=str(e)) + await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}") + finally: + await db_conn.close() + + task = asyncio.create_task(run_research()) + _active_tasks[chat_id] = task + + +async def cmd_status(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + db_conn = await get_db() + db = ResearchDB(db_conn) + + try: + session = await db.get_active_session(chat_id) + if not session: + # Try to find last session + cursor = await db_conn.execute( + "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", + (chat_id,) + ) + row = await cursor.fetchone() + session = dict(row) if row else None + + if not session: + await update.message.reply_text("No research sessions found. Start with /research ") + return + + stats = await db.get_session_stats(session["id"]) + is_active = chat_id in _active_tasks and not _active_tasks[chat_id].done() + + status_emoji = {"running": "🔄", "saturated": "✅", "finished": "🏁", "error": "❌"} + emoji = status_emoji.get(session["status"], "❓") + + await update.message.reply_text( + f"{emoji} *Research Status*\n\n" + f"📝 Topic: `{session['topic']}`\n" + f"🔁 Status: `{session['status']}`\n" + f"🔢 Iterations: `{session.get('iterations', 0)}`\n" + f"📚 Total sources: `{stats.get('total', 0)}`\n" + f"✅ Scraped: `{stats.get('scraped', 0)}`\n" + f"❌ Failed: `{stats.get('failed', 0)}`\n" + f"⏳ Pending: `{stats.get('pending', 0)}`\n" + f"💬 Chunks: `{session.get('total_chunks', 0)}`\n" + f"📖 Words: `{session.get('total_words', 0):,}`\n" + f"{'🟢 Active' if is_active else '⚫ Idle'}", + parse_mode=ParseMode.MARKDOWN + ) + finally: + await db_conn.close() + + +async def cmd_finish(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + task = _active_tasks.get(chat_id) + + if task and not task.done(): + task.cancel() + await update.message.reply_text( + "🛑 Stopping research...\n" + "Use `/generate podcast|blog|report|thread` to generate output.", + parse_mode=ParseMode.MARKDOWN + ) + else: + await update.message.reply_text( + "No active research. Use `/generate` to create output from last session.", + parse_mode=ParseMode.MARKDOWN + ) + + +async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + output_arg = ctx.args[0].lower() if ctx.args else "" + + type_map = { + "podcast": OutputType.PODCAST, + "blog": OutputType.BLOG, + "report": OutputType.REPORT, + "thread": OutputType.THREAD, + "hilo": OutputType.THREAD, + "informe": OutputType.REPORT, + } + + if output_arg not in type_map: + await update.message.reply_text( + "❌ Invalid output type.\n" + "Use: `/generate podcast|blog|report|thread`", + parse_mode=ParseMode.MARKDOWN + ) + return + + output_type = type_map[output_arg] + + db_conn = await get_db() + db = ResearchDB(db_conn) + + try: + # Find last session for this chat + cursor = await db_conn.execute( + """SELECT * FROM research_sessions WHERE telegram_chat_id = ? + ORDER BY created_at DESC LIMIT 1""", + (chat_id,) + ) + row = await cursor.fetchone() + if not row: + await update.message.reply_text("No research sessions found. Start with /research ") + return + + session = dict(row) + session_id = session["id"] + + msg = await update.message.reply_text( + f"⚙️ Generating *{output_type}* for: `{session['topic']}`\n" + f"Using Ollama ({settings.ollama_model})...\n" + f"This may take 2-5 minutes ☕", + parse_mode=ParseMode.MARKDOWN + ) + + async def gen_progress(text): + try: + await msg.edit_text(text) + except Exception: + pass + + ollama = OllamaClient() + processor = ContentProcessor(db, ollama) + generator = OutputGenerator(db, ollama, processor) + + output = await generator.generate(session_id, output_type, gen_progress) + + # Send as file if very long + if len(output) > 8000: + import tempfile + ext_map = { + OutputType.PODCAST: "script.md", + OutputType.BLOG: "post.md", + OutputType.REPORT: "report.md", + OutputType.THREAD: "thread.txt", + } + filename = f"researchowl_{session['topic'][:30].replace(' ', '_')}_{ext_map[output_type]}" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f: + f.write(output) + tmp_path = f.name + + with open(tmp_path, "rb") as f: + await update.message.reply_document( + document=f, + filename=filename, + caption=f"📄 *{output_type.upper()}* — {session['topic']}\n" + f"Generated by ResearchOwl 🦉", + parse_mode=ParseMode.MARKDOWN + ) + os.unlink(tmp_path) + else: + await send_chunked(update.message, output) + + except Exception as e: + logger.error("Generate failed", error=str(e)) + await update.message.reply_text(f"❌ Generation failed: {str(e)[:200]}") + finally: + await db_conn.close() + + +async def cmd_sources(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + db_conn = await get_db() + db = ResearchDB(db_conn) + + try: + cursor = await db_conn.execute( + "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", + (chat_id,) + ) + row = await cursor.fetchone() + if not row: + await update.message.reply_text("No sessions found.") + return + + session_id = row["id"] + sources = await db.get_all_sources(session_id) + + by_type: dict = {} + for s in sources: + t = s["source_type"] + by_type.setdefault(t, []).append(s) + + lines = [f"📚 *Sources for session #{session_id}*\n"] + for stype, srcs in by_type.items(): + scraped = sum(1 for s in srcs if s["status"] == "scraped") + lines.append(f"\n*{stype.upper()}* ({scraped}/{len(srcs)} scraped)") + for s in srcs[:5]: # show top 5 per type + quality = s.get("quality_score", 0) + status_icon = {"scraped": "✅", "failed": "❌", "pending": "⏳", "skipped": "⏭️"}.get(s["status"], "❓") + title = (s.get("title") or s["url"])[:50] + lines.append(f"{status_icon} {title} (q:{quality:.1f})") + if len(srcs) > 5: + lines.append(f" ... and {len(srcs)-5} more") + + await send_chunked(update.message, "\n".join(lines), parse_mode=ParseMode.MARKDOWN) + finally: + await db_conn.close() + + +async def cmd_outputs(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + db_conn = await get_db() + db = ResearchDB(db_conn) + + try: + cursor = await db_conn.execute( + "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", + (chat_id,) + ) + row = await cursor.fetchone() + if not row: + await update.message.reply_text("No sessions found.") + return + + outputs = await db.get_outputs(row["id"]) + if not outputs: + await update.message.reply_text( + "No outputs generated yet. Use `/generate podcast|blog|report|thread`", + parse_mode=ParseMode.MARKDOWN + ) + return + + lines = [f"📄 *Outputs for: {row['topic']}*\n"] + for o in outputs: + from datetime import datetime + dt = datetime.utcfromtimestamp(o['created_at']).strftime("%Y-%m-%d %H:%M") + lines.append(f"• `{o['output_type']}` — {dt} ({len(o['content'])} chars)") + + await update.message.reply_text( + "\n".join(lines), + parse_mode=ParseMode.MARKDOWN + ) + finally: + await db_conn.close() + + +async def cmd_cancel(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + task = _active_tasks.get(chat_id) + if task and not task.done(): + task.cancel() + await update.message.reply_text("🛑 Research cancelled.") + else: + await update.message.reply_text("No active research to cancel.") + + +async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + await cmd_start(update, ctx) + + +# ─── Bot setup ──────────────────────────────────────────────────────────────── + +def create_bot() -> Application: + app = Application.builder().token(settings.telegram_bot_token).build() + + app.add_handler(CommandHandler("start", cmd_start)) + app.add_handler(CommandHandler("help", cmd_help)) + app.add_handler(CommandHandler("research", cmd_research)) + app.add_handler(CommandHandler("status", cmd_status)) + app.add_handler(CommandHandler("finish", cmd_finish)) + app.add_handler(CommandHandler("generate", cmd_generate)) + app.add_handler(CommandHandler("sources", cmd_sources)) + app.add_handler(CommandHandler("outputs", cmd_outputs)) + app.add_handler(CommandHandler("cancel", cmd_cancel)) + + return app + + +def run(): + logger.info("Starting ResearchOwl bot") + app = create_bot() + app.run_polling(allowed_updates=Update.ALL_TYPES) diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..81cf9f9 --- /dev/null +++ b/src/config.py @@ -0,0 +1,49 @@ +from pydantic_settings import BaseSettings +from pydantic import Field +from typing import Optional + + +class Settings(BaseSettings): + # Telegram + telegram_bot_token: str = Field(..., env="TELEGRAM_BOT_TOKEN") + telegram_allowed_users: str = Field("", env="TELEGRAM_ALLOWED_USERS") # comma-separated user IDs + + # Ollama + ollama_url: str = Field("http://ollama.chemavx.xyz", env="OLLAMA_URL") + ollama_model: str = Field("qwen2.5:3b", env="OLLAMA_MODEL") + ollama_embed_model: str = Field("qwen2.5:3b", env="OLLAMA_EMBED_MODEL") + + # Claude fallback (optional) + anthropic_api_key: Optional[str] = Field(None, env="ANTHROPIC_API_KEY") + claude_model: str = Field("claude-haiku-4-5", env="CLAUDE_MODEL") + + # Database + db_path: str = Field("/data/researchowl.db", env="DB_PATH") + + # Scraping + max_depth: int = Field(3, env="MAX_DEPTH") # recursion depth + max_sources: int = Field(150, env="MAX_SOURCES") # hard cap + max_pages_per_search: int = Field(5, env="MAX_PAGES_PER_SEARCH") + request_timeout: int = Field(30, env="REQUEST_TIMEOUT") + request_delay: float = Field(1.0, env="REQUEST_DELAY") # seconds between requests + min_content_length: int = Field(200, env="MIN_CONTENT_LENGTH") # chars + + # Processing + chunk_size: int = Field(800, env="CHUNK_SIZE") # tokens per chunk + chunk_overlap: int = Field(100, env="CHUNK_OVERLAP") + quality_threshold: float = Field(0.5, env="QUALITY_THRESHOLD") # 0-1, chunks below discarded + + # App + log_level: str = Field("INFO", env="LOG_LEVEL") + + @property + def allowed_user_ids(self) -> list[int]: + if not self.telegram_allowed_users: + return [] + return [int(uid.strip()) for uid in self.telegram_allowed_users.split(",") if uid.strip()] + + class Config: + env_file = ".env" + + +settings = Settings() diff --git a/src/db/__init__.py b/src/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/db/__pycache__/__init__.cpython-310.pyc b/src/db/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..d0a25f5 Binary files /dev/null and b/src/db/__pycache__/__init__.cpython-310.pyc differ diff --git a/src/db/__pycache__/database.cpython-310.pyc b/src/db/__pycache__/database.cpython-310.pyc new file mode 100644 index 0000000..73791ad Binary files /dev/null and b/src/db/__pycache__/database.cpython-310.pyc differ diff --git a/src/db/database.py b/src/db/database.py new file mode 100644 index 0000000..cf099fa --- /dev/null +++ b/src/db/database.py @@ -0,0 +1,265 @@ +import aiosqlite +import json +import time +from pathlib import Path +from typing import Optional +from enum import Enum + +from src.config import settings + + +class ResearchStatus(str, Enum): + RUNNING = "running" + SATURATED = "saturated" + FINISHED = "finished" + ERROR = "error" + + +class OutputType(str, Enum): + PODCAST = "podcast" + BLOG = "blog" + REPORT = "report" + THREAD = "thread" + + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS research_sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + topic TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'running', + telegram_chat_id INTEGER NOT NULL, + telegram_message_id INTEGER, + created_at REAL NOT NULL, + updated_at REAL NOT NULL, + iterations INTEGER DEFAULT 0, + total_sources INTEGER DEFAULT 0, + total_chunks INTEGER DEFAULT 0, + total_words INTEGER DEFAULT 0, + meta JSON DEFAULT '{}' +); + +CREATE TABLE IF NOT EXISTS sources ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL REFERENCES research_sessions(id), + url TEXT NOT NULL, + title TEXT, + source_type TEXT, -- wikipedia, reddit, youtube, pdf, web, rss + depth INTEGER DEFAULT 0, + quality_score REAL DEFAULT 0, + word_count INTEGER DEFAULT 0, + scraped_at REAL, + status TEXT DEFAULT 'pending', -- pending, scraped, failed, skipped + error TEXT, + UNIQUE(session_id, url) +); + +CREATE TABLE IF NOT EXISTS chunks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL REFERENCES research_sessions(id), + source_id INTEGER NOT NULL REFERENCES sources(id), + content TEXT NOT NULL, + chunk_index INTEGER NOT NULL, + token_count INTEGER, + quality_score REAL DEFAULT 0, + embedding JSON, -- stored as JSON array for sqlite-vec compat + created_at REAL NOT NULL +); + +CREATE TABLE IF NOT EXISTS outputs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL REFERENCES research_sessions(id), + output_type TEXT NOT NULL, + content TEXT NOT NULL, + created_at REAL NOT NULL +); + +CREATE TABLE IF NOT EXISTS source_contents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_id INTEGER NOT NULL UNIQUE REFERENCES sources(id), + content TEXT NOT NULL, + created_at REAL NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_sources_session ON sources(session_id); +CREATE INDEX IF NOT EXISTS idx_chunks_session ON chunks(session_id); +CREATE INDEX IF NOT EXISTS idx_chunks_quality ON chunks(session_id, quality_score DESC); +CREATE INDEX IF NOT EXISTS idx_source_contents ON source_contents(source_id); +""" + + +async def get_db() -> aiosqlite.Connection: + Path(settings.db_path).parent.mkdir(parents=True, exist_ok=True) + db = await aiosqlite.connect(settings.db_path) + db.row_factory = aiosqlite.Row + await db.executescript(SCHEMA) + await db.commit() + return db + + +class ResearchDB: + def __init__(self, db: aiosqlite.Connection): + self.db = db + + # --- Sessions --- + + async def create_session(self, topic: str, chat_id: int) -> int: + now = time.time() + cursor = await self.db.execute( + """INSERT INTO research_sessions (topic, status, telegram_chat_id, created_at, updated_at) + VALUES (?, ?, ?, ?, ?)""", + (topic, ResearchStatus.RUNNING, chat_id, now, now) + ) + await self.db.commit() + return cursor.lastrowid + + async def get_session(self, session_id: int) -> Optional[dict]: + cursor = await self.db.execute( + "SELECT * FROM research_sessions WHERE id = ?", (session_id,) + ) + row = await cursor.fetchone() + return dict(row) if row else None + + async def get_active_session(self, chat_id: int) -> Optional[dict]: + cursor = await self.db.execute( + """SELECT * FROM research_sessions + WHERE telegram_chat_id = ? AND status = 'running' + ORDER BY created_at DESC LIMIT 1""", + (chat_id,) + ) + row = await cursor.fetchone() + return dict(row) if row else None + + async def update_session(self, session_id: int, **kwargs): + kwargs["updated_at"] = time.time() + sets = ", ".join(f"{k} = ?" for k in kwargs) + values = list(kwargs.values()) + [session_id] + await self.db.execute( + f"UPDATE research_sessions SET {sets} WHERE id = ?", values + ) + await self.db.commit() + + async def get_session_stats(self, session_id: int) -> dict: + cursor = await self.db.execute( + """SELECT + COUNT(*) as total, + SUM(CASE WHEN status='scraped' THEN 1 ELSE 0 END) as scraped, + SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END) as failed, + SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END) as pending + FROM sources WHERE session_id = ?""", + (session_id,) + ) + row = await cursor.fetchone() + return dict(row) if row else {} + + # --- Sources --- + + async def add_source(self, session_id: int, url: str, source_type: str, + depth: int = 0, title: str = None) -> Optional[int]: + try: + cursor = await self.db.execute( + """INSERT OR IGNORE INTO sources (session_id, url, title, source_type, depth) + VALUES (?, ?, ?, ?, ?)""", + (session_id, url, title, source_type, depth) + ) + await self.db.commit() + return cursor.lastrowid if cursor.rowcount > 0 else None + except Exception: + return None + + async def update_source(self, source_id: int, **kwargs): + sets = ", ".join(f"{k} = ?" for k in kwargs) + values = list(kwargs.values()) + [source_id] + await self.db.execute(f"UPDATE sources SET {sets} WHERE id = ?", values) + await self.db.commit() + + async def get_pending_sources(self, session_id: int, limit: int = 10) -> list[dict]: + cursor = await self.db.execute( + """SELECT * FROM sources WHERE session_id = ? AND status = 'pending' + ORDER BY depth ASC, id ASC LIMIT ?""", + (session_id, limit) + ) + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + async def get_all_sources(self, session_id: int) -> list[dict]: + cursor = await self.db.execute( + "SELECT * FROM sources WHERE session_id = ? ORDER BY quality_score DESC", + (session_id,) + ) + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + async def source_exists(self, session_id: int, url: str) -> bool: + cursor = await self.db.execute( + "SELECT 1 FROM sources WHERE session_id = ? AND url = ?", + (session_id, url) + ) + return await cursor.fetchone() is not None + + # --- Chunks --- + + async def add_chunk(self, session_id: int, source_id: int, content: str, + chunk_index: int, token_count: int, quality_score: float, + embedding: Optional[list] = None) -> int: + cursor = await self.db.execute( + """INSERT INTO chunks (session_id, source_id, content, chunk_index, + token_count, quality_score, embedding, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + (session_id, source_id, content, chunk_index, + token_count, quality_score, + json.dumps(embedding) if embedding else None, + time.time()) + ) + await self.db.commit() + return cursor.lastrowid + + async def get_top_chunks(self, session_id: int, limit: int = 50) -> list[dict]: + cursor = await self.db.execute( + """SELECT c.*, s.url, s.title, s.source_type FROM chunks c + JOIN sources s ON c.source_id = s.id + WHERE c.session_id = ? AND c.quality_score >= ? + ORDER BY c.quality_score DESC LIMIT ?""", + (session_id, settings.quality_threshold, limit) + ) + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + async def get_chunks_count(self, session_id: int) -> int: + cursor = await self.db.execute( + "SELECT COUNT(*) FROM chunks WHERE session_id = ?", (session_id,) + ) + row = await cursor.fetchone() + return row[0] + + # --- Outputs --- + + async def save_output(self, session_id: int, output_type: str, content: str) -> int: + cursor = await self.db.execute( + "INSERT INTO outputs (session_id, output_type, content, created_at) VALUES (?, ?, ?, ?)", + (session_id, output_type, content, time.time()) + ) + await self.db.commit() + return cursor.lastrowid + + async def save_source_content(self, source_id: int, content: str): + await self.db.execute( + """INSERT OR REPLACE INTO source_contents (source_id, content, created_at) + VALUES (?, ?, ?)""", + (source_id, content, time.time()) + ) + await self.db.commit() + + async def get_source_content(self, source_id: int) -> Optional[str]: + cursor = await self.db.execute( + "SELECT content FROM source_contents WHERE source_id = ?", (source_id,) + ) + row = await cursor.fetchone() + return row[0] if row else None + + async def get_outputs(self, session_id: int) -> list[dict]: + cursor = await self.db.execute( + "SELECT * FROM outputs WHERE session_id = ? ORDER BY created_at DESC", + (session_id,) + ) + rows = await cursor.fetchall() + return [dict(r) for r in rows] diff --git a/src/generator/__init__.py b/src/generator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/generator/__pycache__/__init__.cpython-310.pyc b/src/generator/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..7ed6028 Binary files /dev/null and b/src/generator/__pycache__/__init__.cpython-310.pyc differ diff --git a/src/generator/__pycache__/generator.cpython-310.pyc b/src/generator/__pycache__/generator.cpython-310.pyc new file mode 100644 index 0000000..0e6e594 Binary files /dev/null and b/src/generator/__pycache__/generator.cpython-310.pyc differ diff --git a/src/generator/generator.py b/src/generator/generator.py new file mode 100644 index 0000000..bd11330 --- /dev/null +++ b/src/generator/generator.py @@ -0,0 +1,188 @@ +""" +ResearchOwl Generators +Produces structured outputs from processed research using Ollama +""" +import structlog + +from src.processor.processor import OllamaClient, ContentProcessor +from src.db.database import ResearchDB, OutputType + +logger = structlog.get_logger() + +PODCAST_SYSTEM = """You are an expert podcast scriptwriter. Create engaging, well-structured scripts +that feel natural when spoken aloud. Use conversational language, rhetorical questions, +clear transitions, and compelling storytelling. Include [PAUSE], [EMPHASIS], and [MUSIC CUE] markers.""" + +BLOG_SYSTEM = """You are an expert blog writer and journalist. Create SEO-optimized, +well-structured articles with clear headings, engaging prose, and proper citations. +Use markdown formatting. Write for an educated general audience.""" + +REPORT_SYSTEM = """You are an expert research analyst. Create comprehensive, objective reports +with executive summary, detailed findings, source analysis, contradictions found, +and conclusions. Use structured markdown with tables where appropriate.""" + +THREAD_SYSTEM = """You are a social media expert. Create engaging Twitter/X thread content. +Each tweet must be under 280 characters. Use numbers (1/N, 2/N...), hooks, cliffhangers. +Make it shareable and engaging. Include relevant hashtags at the end.""" + + +PROMPTS = { + OutputType.PODCAST: """Based on the research below about "{topic}", write a complete podcast script. + +Structure: +- INTRO (hook + topic intro, 2-3 min) +- SEGMENT 1: Background & Context +- SEGMENT 2: Key Facts & Evidence +- SEGMENT 3: Controversies & Different Perspectives +- SEGMENT 4: Deep Dive (most interesting finding) +- OUTRO + Call to Action + +Make it 20-30 minutes of content. Include host notes in [brackets]. + +RESEARCH MATERIAL: +{context} + +Write the complete script now:""", + + OutputType.BLOG: """Based on the research below about "{topic}", write a comprehensive blog post. + +Requirements: +- Compelling headline and meta description +- Engaging intro with hook +- Well-structured sections with H2/H3 headers +- Key facts highlighted +- Multiple perspectives presented +- Strong conclusion with takeaways +- Word count: 1500-2500 words +- Tone: Informative but engaging + +RESEARCH MATERIAL: +{context} + +Write the complete blog post in markdown:""", + + OutputType.REPORT: """Based on the research below about "{topic}", write a comprehensive research report. + +Structure: +1. Executive Summary (200 words) +2. Introduction & Scope +3. Key Findings (numbered) +4. Evidence Analysis +5. Source Quality Assessment +6. Contradictions & Disputed Claims +7. Timeline of Events (if applicable) +8. Conclusions +9. Further Research Suggestions + +RESEARCH MATERIAL: +{context} + +Write the complete report in markdown:""", + + OutputType.THREAD: """Based on the research below about "{topic}", write an engaging Twitter/X thread. + +Requirements: +- Start with a KILLER hook tweet +- 15-25 tweets total +- Each tweet max 280 chars +- Number them (1/20, 2/20...) +- Include surprising facts +- Build suspense between tweets +- End with strong conclusion + CTA +- Add relevant hashtags to last tweet + +RESEARCH MATERIAL: +{context} + +Write the complete thread, one tweet per line:""" +} + + +class OutputGenerator: + def __init__(self, db: ResearchDB, ollama: OllamaClient, processor: ContentProcessor): + self.db = db + self.ollama = ollama + self.processor = processor + + async def generate(self, session_id: int, output_type: OutputType, + progress_callback=None) -> str: + """Generate an output for a research session""" + session = await self.db.get_session(session_id) + if not session: + raise ValueError(f"Session {session_id} not found") + + topic = session["topic"] + logger.info("Generating output", type=output_type, topic=topic) + + if progress_callback: + await progress_callback(f"🔍 Retrieving best research material for {output_type}...") + + # RAG: get most relevant context for this output type + query = self._get_rag_query(output_type, topic) + context = await self.processor.rag_query(session_id, query, top_k=30) + + if not context: + # Fallback: use raw top chunks + chunks = await self.db.get_top_chunks(session_id, limit=20) + context = "\n\n---\n\n".join(c["content"] for c in chunks) + + if not context: + raise ValueError("No processed content available. Run /process first.") + + # Truncate context to avoid Ollama context limits + context_words = context.split() + if len(context_words) > 6000: + context = " ".join(context_words[:6000]) + "\n\n[... additional material truncated ...]" + + if progress_callback: + await progress_callback(f"✍️ Generating {output_type} with Ollama... (this takes 2-5 min)") + + # Build prompt + system = self._get_system(output_type) + prompt = PROMPTS[output_type].format(topic=topic, context=context) + + # Generate — may take a while with local LLM + output = await self.ollama.generate(prompt, system=system, timeout=300) + + # Add metadata header + stats = await self.db.get_session_stats(session_id) + header = self._build_header(topic, output_type, session, stats) + full_output = header + "\n\n" + output + + # Save to DB + await self.db.save_output(session_id, output_type, full_output) + + logger.info("Output generated", type=output_type, length=len(full_output)) + return full_output + + def _get_rag_query(self, output_type: OutputType, topic: str) -> str: + queries = { + OutputType.PODCAST: f"{topic} story narrative facts interesting", + OutputType.BLOG: f"{topic} key facts evidence analysis", + OutputType.REPORT: f"{topic} evidence data official findings", + OutputType.THREAD: f"{topic} surprising facts shocking revelations", + } + return queries.get(output_type, topic) + + def _get_system(self, output_type: OutputType) -> str: + systems = { + OutputType.PODCAST: PODCAST_SYSTEM, + OutputType.BLOG: BLOG_SYSTEM, + OutputType.REPORT: REPORT_SYSTEM, + OutputType.THREAD: THREAD_SYSTEM, + } + return systems.get(output_type, "You are a helpful research assistant.") + + def _build_header(self, topic: str, output_type: OutputType, + session: dict, stats: dict) -> str: + from datetime import datetime + dt = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC") + return f"""--- +ResearchOwl | {output_type.upper()} OUTPUT +Topic: {topic} +Generated: {dt} +Sources: {stats.get('scraped', 0)} scraped | {stats.get('failed', 0)} failed +Iterations: {session.get('iterations', 0)} +Total words researched: {session.get('total_words', 0):,} +--- +""" diff --git a/src/processor/__init__.py b/src/processor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/processor/__pycache__/__init__.cpython-310.pyc b/src/processor/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..3311663 Binary files /dev/null and b/src/processor/__pycache__/__init__.cpython-310.pyc differ diff --git a/src/processor/__pycache__/processor.cpython-310.pyc b/src/processor/__pycache__/processor.cpython-310.pyc new file mode 100644 index 0000000..797514e Binary files /dev/null and b/src/processor/__pycache__/processor.cpython-310.pyc differ diff --git a/src/processor/processor.py b/src/processor/processor.py new file mode 100644 index 0000000..0fda083 --- /dev/null +++ b/src/processor/processor.py @@ -0,0 +1,251 @@ +""" +ResearchOwl Processor +Chunking → Quality scoring via Ollama → Embeddings → RAG synthesis +""" +import asyncio +import json +import math +import re +from typing import Optional + +import httpx +import structlog + +from src.config import settings +from src.db.database import ResearchDB + +logger = structlog.get_logger() + + +class OllamaClient: + """Async client for Ollama API""" + + def __init__(self): + self.base_url = settings.ollama_url.rstrip("/") + self.model = settings.ollama_model + + async def generate(self, prompt: str, system: str = None, + timeout: int = 120) -> str: + payload = { + "model": self.model, + "prompt": prompt, + "stream": False, + "options": {"temperature": 0.1, "num_predict": 512} + } + if system: + payload["system"] = system + + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.post(f"{self.base_url}/api/generate", json=payload) + resp.raise_for_status() + return resp.json().get("response", "").strip() + + async def embed(self, text: str) -> Optional[list[float]]: + """Get embedding vector for a text""" + payload = {"model": self.model, "prompt": text} + try: + async with httpx.AsyncClient(timeout=60) as client: + resp = await client.post(f"{self.base_url}/api/embeddings", json=payload) + resp.raise_for_status() + return resp.json().get("embedding") + except Exception as e: + logger.warning("Embedding failed", error=str(e)) + return None + + async def is_available(self) -> bool: + try: + async with httpx.AsyncClient(timeout=5) as client: + resp = await client.get(f"{self.base_url}/api/tags") + return resp.status_code == 200 + except Exception: + return False + + +def simple_chunk(text: str, chunk_size: int = 800, overlap: int = 100) -> list[str]: + """ + Split text into overlapping chunks by approximate word count. + Respects paragraph boundaries when possible. + """ + paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()] + chunks = [] + current = [] + current_words = 0 + + for para in paragraphs: + para_words = len(para.split()) + if current_words + para_words > chunk_size and current: + chunks.append("\n\n".join(current)) + # overlap: keep last paragraph + if overlap > 0 and current: + current = [current[-1]] + current_words = len(current[0].split()) + else: + current = [] + current_words = 0 + current.append(para) + current_words += para_words + + if current: + chunks.append("\n\n".join(current)) + + return chunks + + +def cosine_similarity(a: list[float], b: list[float]) -> float: + """Simple cosine similarity""" + if not a or not b or len(a) != len(b): + return 0.0 + dot = sum(x * y for x, y in zip(a, b)) + norm_a = math.sqrt(sum(x * x for x in a)) + norm_b = math.sqrt(sum(x * x for x in b)) + if norm_a == 0 or norm_b == 0: + return 0.0 + return dot / (norm_a * norm_b) + + +class ContentProcessor: + """ + Processes scraped sources: + 1. Chunks content + 2. Scores quality with Ollama + 3. Generates embeddings + 4. Stores high-quality chunks + """ + + def __init__(self, db: ResearchDB, ollama: OllamaClient): + self.db = db + self.ollama = ollama + + async def process_session(self, session_id: int, topic: str, + progress_callback=None) -> dict: + """Process all scraped sources for a session""" + from src.db.database import ResearchDB + sources = await self.db.get_all_sources(session_id) + scraped = [s for s in sources if s["status"] == "scraped"] + + logger.info("Processing sources", total=len(scraped)) + total_chunks = 0 + total_words = 0 + + semaphore = asyncio.Semaphore(3) # process 3 sources at once + + async def process_one(source): + async with semaphore: + n = await self._process_source(session_id, topic, source) + return n + + results = await asyncio.gather(*[process_one(s) for s in scraped], + return_exceptions=True) + + for r in results: + if isinstance(r, int): + total_chunks += r + + total_words = sum(s.get("word_count", 0) for s in scraped) + await self.db.update_session( + session_id, + total_chunks=total_chunks, + total_words=total_words + ) + + if progress_callback: + await progress_callback(total_chunks=total_chunks, total_words=total_words) + + return {"total_chunks": total_chunks, "total_words": total_words} + + async def _process_source(self, session_id: int, topic: str, source: dict) -> int: + """Chunk, score, embed and store a single source. Returns chunk count.""" + source_id = source["id"] + + content = await self.db.get_source_content(source_id) + if not content: + return 0 + + chunks = simple_chunk(content, settings.chunk_size, settings.chunk_overlap) + stored = 0 + + for i, chunk in enumerate(chunks): + if len(chunk.split()) < 30: + continue + + quality = await self._score_quality(chunk, topic) + if quality < settings.quality_threshold: + continue + + embedding = await self.ollama.embed(chunk[:1000]) + + await self.db.add_chunk( + session_id=session_id, + source_id=source_id, + content=chunk, + chunk_index=i, + token_count=len(chunk.split()), + quality_score=quality, + embedding=embedding + ) + stored += 1 + + return stored + + async def _score_quality(self, chunk: str, topic: str) -> float: + """ + Ask Ollama to score relevance and quality of a chunk. + Returns 0.0-1.0 + """ + prompt = f"""Rate this text chunk on a scale of 0-10 for: +1. Relevance to topic: "{topic}" +2. Information density (facts, data, insights) +3. Credibility (not speculation, not clickbait) + +Text: +{chunk[:500]} + +Respond with ONLY a single number 0-10. No explanation.""" + + try: + response = await self.ollama.generate(prompt) + # Extract number from response + numbers = re.findall(r'\b(\d+(?:\.\d+)?)\b', response) + if numbers: + score = float(numbers[0]) + return min(1.0, score / 10.0) + return 0.5 + except Exception: + return 0.5 # default on error + + async def rag_query(self, session_id: int, query: str, top_k: int = 20) -> str: + """ + Retrieve most relevant chunks for a query using embeddings + keyword fallback + """ + # Get query embedding + query_embedding = await self.ollama.embed(query) + + # Get top quality chunks + chunks = await self.db.get_top_chunks(session_id, limit=100) + + if query_embedding and chunks: + # Rank by embedding similarity + scored = [] + for chunk in chunks: + emb = chunk.get("embedding") + if emb and isinstance(emb, str): + try: + emb = json.loads(emb) + except Exception: + emb = None + sim = cosine_similarity(query_embedding, emb) if emb else 0.5 + scored.append((sim * 0.7 + chunk["quality_score"] * 0.3, chunk)) + + scored.sort(key=lambda x: x[0], reverse=True) + top_chunks = [c for _, c in scored[:top_k]] + else: + # Fallback: just use quality score + top_chunks = chunks[:top_k] + + # Build context + context_parts = [] + for chunk in top_chunks: + source_label = f"[{chunk.get('source_type', 'web').upper()}] {chunk.get('title', 'Unknown')}" + context_parts.append(f"{source_label}:\n{chunk['content']}") + + return "\n\n---\n\n".join(context_parts) diff --git a/src/scraper/__init__.py b/src/scraper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/scraper/__pycache__/__init__.cpython-310.pyc b/src/scraper/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..5d42cfb Binary files /dev/null and b/src/scraper/__pycache__/__init__.cpython-310.pyc differ diff --git a/src/scraper/__pycache__/exhaustive.cpython-310.pyc b/src/scraper/__pycache__/exhaustive.cpython-310.pyc new file mode 100644 index 0000000..6ee0e83 Binary files /dev/null and b/src/scraper/__pycache__/exhaustive.cpython-310.pyc differ diff --git a/src/scraper/exhaustive.py b/src/scraper/exhaustive.py new file mode 100644 index 0000000..fa0d9ae --- /dev/null +++ b/src/scraper/exhaustive.py @@ -0,0 +1,490 @@ +""" +ResearchOwl Exhaustive Scraper +Core engine: discovers, expands, and evaluates sources recursively +""" +import asyncio +import re +import time +from typing import Optional +from urllib.parse import urljoin, urlparse, quote_plus + +import aiohttp +import feedparser +import structlog +import trafilatura +from bs4 import BeautifulSoup +from duckduckgo_search import DDGS +from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound +from tenacity import retry, stop_after_attempt, wait_exponential + +from src.config import settings +from src.db.database import ResearchDB + +logger = structlog.get_logger() + +HEADERS = { + "User-Agent": "Mozilla/5.0 (compatible; ResearchOwl/1.0; +https://chemavx.xyz)", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.9,es;q=0.8", +} + +# Domains to skip — not useful for research +BLACKLIST_DOMAINS = { + "facebook.com", "twitter.com", "x.com", "instagram.com", "tiktok.com", + "pinterest.com", "linkedin.com", "amazon.com", "ebay.com", "etsy.com", + "ads.google.com", "doubleclick.net", "googleadservices.com", +} + +# Source type patterns +YOUTUBE_RE = re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]{11})") +PDF_RE = re.compile(r"\.pdf(\?|$)", re.IGNORECASE) +REDDIT_RE = re.compile(r"reddit\.com/(r/\w+/comments/\w+)") +WIKIPEDIA_RE = re.compile(r"wikipedia\.org/wiki/(.+)") + + +def detect_source_type(url: str) -> str: + if YOUTUBE_RE.search(url): + return "youtube" + if PDF_RE.search(url): + return "pdf" + if REDDIT_RE.search(url): + return "reddit" + if WIKIPEDIA_RE.search(url): + return "wikipedia" + if "arxiv.org" in url: + return "arxiv" + if any(d in url for d in ["rss", "feed", "atom"]): + return "rss" + return "web" + + +def is_blacklisted(url: str) -> bool: + try: + domain = urlparse(url).netloc.lower().replace("www.", "") + return any(bl in domain for bl in BLACKLIST_DOMAINS) + except Exception: + return True + + +def normalize_url(url: str) -> str: + """Remove fragments and tracking params""" + parsed = urlparse(url) + clean = parsed._replace(fragment="", query="") + return clean.geturl().rstrip("/") + + +class ExhaustiveScraper: + """ + Recursive source discoverer and content extractor. + Keeps expanding until saturation or limits hit. + """ + + def __init__(self, db: ResearchDB, session_id: int, topic: str, + progress_callback=None): + self.db = db + self.session_id = session_id + self.topic = topic + self.progress_callback = progress_callback + self.iteration = 0 + self.total_sources = 0 + self._stop = False + self._http: Optional[aiohttp.ClientSession] = None + + async def stop(self): + self._stop = True + + async def _get_http(self) -> aiohttp.ClientSession: + if not self._http or self._http.closed: + timeout = aiohttp.ClientTimeout(total=settings.request_timeout) + self._http = aiohttp.ClientSession(headers=HEADERS, timeout=timeout) + return self._http + + async def close(self): + if self._http and not self._http.closed: + await self._http.close() + + # ─── Seed discovery ─────────────────────────────────────────────────────── + + async def seed(self): + """Initial broad search across multiple sources""" + logger.info("Seeding research", topic=self.topic) + tasks = [ + self._seed_duckduckgo(), + self._seed_wikipedia(), + self._seed_reddit(), + self._seed_youtube(), + ] + await asyncio.gather(*tasks, return_exceptions=True) + + async def _seed_duckduckgo(self): + """Multiple DDG queries for breadth""" + queries = [ + self.topic, + f"{self.topic} history facts", + f"{self.topic} evidence analysis", + f"{self.topic} official report", + f"{self.topic} investigation", + f"{self.topic} wikipedia", + f"{self.topic} documentary", + f"{self.topic} research study", + ] + try: + with DDGS() as ddgs: + for query in queries: + if self._stop: + break + try: + results = list(ddgs.text(query, max_results=settings.max_pages_per_search)) + for r in results: + url = normalize_url(r.get("href", "")) + if url and not is_blacklisted(url): + await self.db.add_source( + self.session_id, url, + detect_source_type(url), + depth=0, + title=r.get("title") + ) + await asyncio.sleep(settings.request_delay) + except Exception as e: + logger.warning("DDG query failed", query=query, error=str(e)) + except Exception as e: + logger.error("DDG seeding failed", error=str(e)) + + async def _seed_wikipedia(self): + """Fetch Wikipedia article + all internal links""" + topic_encoded = quote_plus(self.topic.replace(" ", "_")) + wiki_url = f"https://en.wikipedia.org/wiki/{topic_encoded}" + await self.db.add_source(self.session_id, wiki_url, "wikipedia", depth=0) + + # Also search Wikipedia API for related articles + try: + http = await self._get_http() + api_url = ( + f"https://en.wikipedia.org/w/api.php?action=opensearch" + f"&search={quote_plus(self.topic)}&limit=10&format=json" + ) + async with http.get(api_url) as resp: + data = await resp.json() + urls = data[3] if len(data) > 3 else [] + for url in urls: + if url: + await self.db.add_source(self.session_id, url, "wikipedia", depth=0) + except Exception as e: + logger.warning("Wikipedia API seed failed", error=str(e)) + + async def _seed_reddit(self): + """Search Reddit via old.reddit.com JSON""" + try: + http = await self._get_http() + url = f"https://www.reddit.com/search.json?q={quote_plus(self.topic)}&sort=top&limit=25" + async with http.get(url, headers={**HEADERS, "User-Agent": "ResearchOwl/1.0"}) as resp: + if resp.status == 200: + data = await resp.json() + posts = data.get("data", {}).get("children", []) + for post in posts: + post_data = post.get("data", {}) + permalink = post_data.get("permalink", "") + if permalink: + full_url = f"https://www.reddit.com{permalink}" + await self.db.add_source( + self.session_id, full_url, "reddit", depth=0, + title=post_data.get("title") + ) + except Exception as e: + logger.warning("Reddit seed failed", error=str(e)) + + async def _seed_youtube(self): + """Search YouTube via DDG for video transcripts""" + try: + with DDGS() as ddgs: + results = list(ddgs.videos( + f"{self.topic} documentary explanation", + max_results=10 + )) + for r in results: + url = r.get("content", "") + if "youtube.com" in url or "youtu.be" in url: + await self.db.add_source( + self.session_id, url, "youtube", depth=0, + title=r.get("title") + ) + except Exception as e: + logger.warning("YouTube seed failed", error=str(e)) + + # ─── Main pipeline ──────────────────────────────────────────────────────── + + async def run(self) -> dict: + """ + Main exhaustive loop: + 1. Seed initial sources + 2. Process batch → extract content + new URLs + 3. Repeat until saturated or limits hit + """ + await self.seed() + + while not self._stop: + self.iteration += 1 + pending = await self.db.get_pending_sources(self.session_id, limit=20) + + if not pending: + logger.info("No more pending sources — saturated", iteration=self.iteration) + break + + if self.total_sources >= settings.max_sources: + logger.info("Max sources reached", total=self.total_sources) + break + + logger.info("Processing batch", iteration=self.iteration, batch_size=len(pending)) + + # Process sources concurrently (but not too many at once) + semaphore = asyncio.Semaphore(5) + tasks = [self._process_source(s, semaphore) for s in pending] + results = await asyncio.gather(*tasks, return_exceptions=True) + + new_sources = sum(1 for r in results if r and isinstance(r, int) and r > 0) + self.total_sources += len(pending) + + stats = await self.db.get_session_stats(self.session_id) + await self.db.update_session( + self.session_id, + iterations=self.iteration, + total_sources=self.total_sources + ) + + if self.progress_callback: + await self.progress_callback( + iteration=self.iteration, + total=self.total_sources, + new_this_round=new_sources, + stats=stats + ) + + # Saturation check: if we found very few new URLs, we're done + if new_sources < 3 and self.iteration > 2: + logger.info("Saturation detected", new_sources=new_sources) + break + + await asyncio.sleep(settings.request_delay) + + await self.close() + final_stats = await self.db.get_session_stats(self.session_id) + return final_stats + + async def _process_source(self, source: dict, semaphore: asyncio.Semaphore) -> int: + """Extract content from a source and discover new URLs. Returns count of new URLs found.""" + async with semaphore: + source_type = source["source_type"] + url = source["url"] + source_id = source["id"] + + try: + if source_type == "youtube": + content, title = await self._extract_youtube(url) + elif source_type == "wikipedia": + content, title, new_urls = await self._extract_wikipedia(url) + for new_url in (new_urls or []): + await self.db.add_source( + self.session_id, new_url, "wikipedia", + depth=source["depth"] + 1 + ) + await self._mark_scraped(source_id, content, title, url) + return len(new_urls or []) + elif source_type == "reddit": + content, title = await self._extract_reddit(url) + elif source_type == "pdf": + content, title = await self._extract_pdf(url) + else: + content, title, new_urls = await self._extract_web(url, source["depth"]) + for new_url in (new_urls or []): + await self.db.add_source( + self.session_id, new_url, + detect_source_type(new_url), + depth=source["depth"] + 1 + ) + await self._mark_scraped(source_id, content, title, url) + return len(new_urls or []) + + await self._mark_scraped(source_id, content, title, url) + return 0 + + except Exception as e: + logger.warning("Source extraction failed", url=url, error=str(e)) + await self.db.update_source(source_id, status="failed", error=str(e)[:200]) + return 0 + + async def _mark_scraped(self, source_id: int, content: Optional[str], + title: Optional[str], url: str): + if not content or len(content) < settings.min_content_length: + await self.db.update_source(source_id, status="skipped", + error="Content too short or empty") + return + + word_count = len(content.split()) + + await self.db.save_source_content(source_id, content) + + await self.db.update_source( + source_id, + status="scraped", + title=title or url, + word_count=word_count, + scraped_at=time.time(), + quality_score=min(1.0, word_count / 1000) + ) + + # ─── Extractors ─────────────────────────────────────────────────────────── + + async def _extract_web(self, url: str, depth: int) -> tuple[Optional[str], Optional[str], list[str]]: + """Extract text + discover internal/external links""" + if is_blacklisted(url): + return None, None, [] + + http = await self._get_http() + async with http.get(url) as resp: + if resp.status != 200: + return None, None, [] + html = await resp.text(errors="replace") + + # Extract main content with trafilatura (much better than BS4 for articles) + content = trafilatura.extract( + html, + include_links=False, + include_tables=True, + favor_recall=True + ) + + # Extract title and new URLs with BS4 + soup = BeautifulSoup(html, "lxml") + title = soup.title.string.strip() if soup.title else url + + new_urls = [] + if depth < settings.max_depth: + base = f"{urlparse(url).scheme}://{urlparse(url).netloc}" + for a in soup.find_all("a", href=True): + href = a["href"] + full_url = normalize_url(urljoin(base, href)) + if (full_url.startswith("http") and + not is_blacklisted(full_url) and + not await self.db.source_exists(self.session_id, full_url)): + new_urls.append(full_url) + + return content, title, new_urls[:30] # cap links per page + + async def _extract_wikipedia(self, url: str) -> tuple[Optional[str], Optional[str], list[str]]: + """Wikipedia: extract content + follow internal wiki links""" + http = await self._get_http() + async with http.get(url) as resp: + if resp.status != 200: + return None, None, [] + html = await resp.text(errors="replace") + + soup = BeautifulSoup(html, "lxml") + title_tag = soup.find("h1", {"id": "firstHeading"}) + title = title_tag.text if title_tag else url + + # Get clean content + content_div = soup.find("div", {"id": "mw-content-text"}) + if not content_div: + return None, title, [] + + # Remove navboxes, references, etc. + for tag in content_div.find_all(["table", "sup", "style"]): + tag.decompose() + + content = content_div.get_text(separator="\n", strip=True) + + # Extract Wikipedia internal links (only "See also" and body links) + new_urls = [] + for a in content_div.find_all("a", href=True): + href = a["href"] + if href.startswith("/wiki/") and ":" not in href: + full_url = f"https://en.wikipedia.org{href}" + full_url = normalize_url(full_url) + if not await self.db.source_exists(self.session_id, full_url): + new_urls.append(full_url) + + return content, title, new_urls[:20] + + async def _extract_youtube(self, url: str) -> tuple[Optional[str], Optional[str]]: + """Extract YouTube transcript""" + match = YOUTUBE_RE.search(url) + if not match: + return None, None + + video_id = match.group(1) + try: + transcript_list = YouTubeTranscriptApi.get_transcript( + video_id, languages=["en", "es", "en-US", "en-GB"] + ) + text = " ".join(t["text"] for t in transcript_list) + return text, f"YouTube: {video_id}" + except NoTranscriptFound: + return None, None + except Exception as e: + logger.warning("YouTube transcript failed", video_id=video_id, error=str(e)) + return None, None + + async def _extract_reddit(self, url: str) -> tuple[Optional[str], Optional[str]]: + """Extract Reddit post + top comments via JSON API""" + json_url = url.rstrip("/") + ".json?limit=100&sort=top" + http = await self._get_http() + try: + async with http.get( + json_url, + headers={**HEADERS, "User-Agent": "ResearchOwl/1.0"} + ) as resp: + if resp.status != 200: + return None, None + data = await resp.json() + + post = data[0]["data"]["children"][0]["data"] + title = post.get("title", "") + selftext = post.get("selftext", "") + + comments = [] + if len(data) > 1: + for child in data[1]["data"]["children"][:50]: + body = child.get("data", {}).get("body", "") + if body and body != "[deleted]" and len(body) > 50: + score = child.get("data", {}).get("score", 0) + if score > 5: # only upvoted comments + comments.append(body) + + content = f"# {title}\n\n{selftext}\n\n## Top Comments\n\n" + "\n\n---\n\n".join(comments) + return content, title + + except Exception as e: + logger.warning("Reddit extraction failed", url=url, error=str(e)) + return None, None + + async def _extract_pdf(self, url: str) -> tuple[Optional[str], Optional[str]]: + """Download and extract PDF text""" + import pdfplumber + import tempfile + import os + + http = await self._get_http() + try: + async with http.get(url) as resp: + if resp.status != 200: + return None, None + content_length = int(resp.headers.get("content-length", 0)) + if content_length > 50 * 1024 * 1024: # skip PDFs > 50MB + return None, None + pdf_bytes = await resp.read() + + with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as f: + f.write(pdf_bytes) + tmp_path = f.name + + try: + with pdfplumber.open(tmp_path) as pdf: + pages = [p.extract_text() or "" for p in pdf.pages[:50]] # max 50 pages + text = "\n\n".join(pages) + return text, url.split("/")[-1] + finally: + os.unlink(tmp_path) + + except Exception as e: + logger.warning("PDF extraction failed", url=url, error=str(e)) + return None, None diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/__pycache__/__init__.cpython-310.pyc b/tests/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..5029526 Binary files /dev/null and b/tests/__pycache__/__init__.cpython-310.pyc differ diff --git a/tests/__pycache__/test_scraper.cpython-310-pytest-9.0.3.pyc b/tests/__pycache__/test_scraper.cpython-310-pytest-9.0.3.pyc new file mode 100644 index 0000000..23d3049 Binary files /dev/null and b/tests/__pycache__/test_scraper.cpython-310-pytest-9.0.3.pyc differ diff --git a/tests/test_scraper.py b/tests/test_scraper.py new file mode 100644 index 0000000..8324198 --- /dev/null +++ b/tests/test_scraper.py @@ -0,0 +1,28 @@ +import pytest +from src.scraper.exhaustive import detect_source_type, is_blacklisted, normalize_url +from src.processor.processor import simple_chunk + + +def test_detect_source_type(): + assert detect_source_type("https://youtube.com/watch?v=dQw4w9WgXcY") == "youtube" + assert detect_source_type("https://reddit.com/r/test/comments/abc") == "reddit" + assert detect_source_type("https://en.wikipedia.org/wiki/Roswell") == "wikipedia" + assert detect_source_type("https://example.com/doc.pdf") == "pdf" + assert detect_source_type("https://example.com/article") == "web" + + +def test_is_blacklisted(): + assert is_blacklisted("https://facebook.com/something") == True + assert is_blacklisted("https://en.wikipedia.org/wiki/Test") == False + + +def test_normalize_url(): + assert normalize_url("https://example.com/page#section") == "https://example.com/page" + assert normalize_url("https://example.com/page/") == "https://example.com/page" + + +def test_simple_chunk(): + text = "\n\n".join([f"Paragraph {i} with some content here." for i in range(50)]) + chunks = simple_chunk(text, chunk_size=100, overlap=20) + assert len(chunks) > 1 + assert all(isinstance(c, str) for c in chunks)