41 Commits

Author SHA1 Message Date
Renovate Bot 24ec6a3107 chore(deps): update dependency beautifulsoup4 to v4.14.3 2026-05-26 12:00:53 +00:00
Renovate Bot 2a9962a1bd chore(deps): update dependency pdfplumber to v0.11.9
Build & Deploy ResearchOwl / build-and-push (push) Successful in 46s
2026-05-25 18:00:36 +00:00
Renovate Bot 0e52b404bf chore(deps): update dependency feedparser to v6.0.12
Build & Deploy ResearchOwl / build-and-push (push) Successful in 53s
2026-05-21 12:00:29 +00:00
chemavx 3a9ab2848a Merge pull request 'chore: Configure Renovate' (#1) from renovate/configure into main
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
Reviewed-on: #1
2026-05-20 14:11:30 +00:00
Renovate Bot d3d22a1605 Add renovate.json 2026-05-20 13:52:38 +00:00
ChemaVX 425639f423 test: redis fix
Build & Deploy ResearchOwl / build-and-push (push) Successful in 9s
2026-05-20 12:23:43 +00:00
ChemaVX 7f3c2d0b49 test: webhook
Build & Deploy ResearchOwl / build-and-push (push) Successful in 1m12s
2026-05-20 12:21:07 +00:00
ChemaVX f577ac4712 feat: Ghost EN — /generate blog en publica en inglés en theexclusionzone.com
Build & Deploy ResearchOwl / build-and-push (push) Successful in 1m19s
2026-05-18 16:49:09 +00:00
ChemaVX 747b9605c0 fix: cmd_publish usa _active_sessions para la sesión correcta
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
Mismo patrón que cmd_generate: si hay sesión activa registrada
para el chat, consulta por id; si no, fallback a created_at DESC.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 15:40:52 +00:00
ChemaVX caf763c23e fix: cmd_generate usa _active_sessions para la sesión correcta
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
Si hay una sesión activa registrada para el chat, se consulta
directamente por id en lugar de por created_at DESC, evitando
que /generate use la sesión más reciente en vez de la actual.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 15:38:06 +00:00
ChemaVX bdea12e6f2 fix: nombre de archivo .md usa topic del output, no del session pre-cacheado
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
El session dict se obtiene antes de generator.generate() y puede
contener datos de la sesión anterior. Ahora se extrae el topic
directamente de la línea "Topic:" del header del output generado,
que siempre refleja la sesión actual usada en la generación.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 15:18:09 +00:00
ChemaVX a6a90d3598 fix: eliminar primer <h1> del HTML antes de publicar en Ghost
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
Ghost añade el título del post automáticamente en el frontend,
por lo que el <h1> generado desde el markdown aparecía duplicado.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 10:51:52 +00:00
ChemaVX 36984657a8 fix: Ghost 5.x — usar mobiledoc+HTML card en lugar del campo html
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
El campo "html" en Ghost Admin API v5 (Lexical editor) es de solo
lectura. El contenido se debe enviar via mobiledoc con HTML card,
que Ghost acepta en todas las versiones de v5 y renderiza sin
conversión. Añadidos logs de diagnóstico y validación de HTML vacío.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 10:44:38 +00:00
ChemaVX 83eb2359be feat: Ghost CMS integration — auto-publish blog + /publish command
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 10:26:22 +00:00
ChemaVX 94d209dd8a test: webhook sync automático
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
2026-05-06 11:35:14 +00:00
ChemaVX 7a156e2af1 fix: mover alerta de coste a /generate donde está el gasto real
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-06 07:49:29 +00:00
ChemaVX 279475a175 feat: alerta de coste — aviso si sesión supera COST_ALERT_THRESHOLD
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-06 07:23:11 +00:00
ChemaVX 82e614e285 feat: caché de contenido de fuentes — reutiliza URLs scrapeadas en últimos 7 días
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
2026-05-06 07:05:41 +00:00
ChemaVX aa83cfacbd fix: truncar contextos en /compare a 3000 palabras para evitar límite de tokens
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-06 06:51:36 +00:00
ChemaVX e8034f3f37 feat: /compare — análisis comparativo de dos temas en paralelo
Build & Deploy ResearchOwl / build-and-push (push) Successful in 34s
2026-05-06 06:40:31 +00:00
ChemaVX c2bb301103 feat: dedup semántico antes del scoring — hash MD5 + similitud Jaccard
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-05 08:58:53 +00:00
ChemaVX 53cf7a04a8 feat: modo diff para /watch — notifica solo si hay novedades reales
Build & Deploy ResearchOwl / build-and-push (push) Successful in 7s
2026-05-05 07:43:41 +00:00
ChemaVX f4e167f3b6 feat: SearXNG como motor principal, DDG como fallback
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
2026-05-04 20:00:24 +00:00
ChemaVX ba2b366534 fix: delay DDG 3-8s aleatorio, logging mejorado en query generation
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
2026-05-04 13:28:54 +00:00
ChemaVX 4bef9d2d17 feat: queries DDG generadas por Claude en lugar de plantillas hardcodeadas
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
2026-05-04 13:24:25 +00:00
ChemaVX 7a012c2c28 fix: _remove_duplicate_headings usa ventana de 5 líneas en lugar de break
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
2026-05-04 13:19:08 +00:00
ChemaVX 6aaa85a1f8 fix: eliminar títulos h1 duplicados en export PDF
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-04 13:12:32 +00:00
ChemaVX e0a42f0b91 ci: retrigger PDF build
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
2026-05-04 13:02:57 +00:00
ChemaVX 4c7f5b521b feat: fase 3 — export PDF con reportlab + /export command
Build & Deploy ResearchOwl / build-and-push (push) Successful in 1m2s
2026-05-04 12:57:21 +00:00
ChemaVX c33bb5337d fix: títulos de sección en español, sin encabezado duplicado en extended
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
2026-05-04 11:40:07 +00:00
ChemaVX 566f685578 ci: retrigger tras fix DinD
Build & Deploy ResearchOwl / build-and-push (push) Successful in 1m42s
2026-05-04 11:13:10 +00:00
ChemaVX 8c259b2b2e ci: clean ci-builder before create to prevent stale BuildKit state
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-04 11:09:56 +00:00
ChemaVX a47d7b26ca feat: fase 2 — generación por secciones report_extended, blog_extended, podcast_extended
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-04 10:58:06 +00:00
ChemaVX e5b77ad72d fix: QUALITY_THRESHOLD 0.5→0.3, prompt scoring más generoso
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-04 10:35:08 +00:00
ChemaVX 0d8aee63be feat: fase 1 — top_k 30→80, pool 100→300, sin truncado, max_tokens 16000
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-04 10:23:19 +00:00
ChemaVX b5518ac95a feat: scheduler /watch — watched_topics + scheduler loop + /watch /unwatch /watches
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-04 07:48:05 +00:00
ChemaVX b33ae202b8 feat: trackeo de coste por llamada Claude — tabla api_usage + /costs
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
2026-05-03 20:06:06 +00:00
ChemaVX 65917518ce ci: retrigger build for a681627
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-03 17:14:50 +00:00
ChemaVX a681627d2e feat: TTL purge — purge_old_sessions + /purge command + startup hook
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
2026-05-03 16:56:37 +00:00
ChemaVX 7704f071d6 feat: retry+backoff en scraper, ProgressReporter en bot
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
2026-05-03 16:40:37 +00:00
ChemaVX e66d728d68 fix: wrap YouTubeTranscriptApi in run_in_executor with 30s timeout
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
The synchronous get_transcript() call was blocking the asyncio event
loop indefinitely, freezing the entire bot (including Telegram polling).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 12:59:40 +00:00
10 changed files with 1907 additions and 151 deletions
+3
View File
@@ -27,6 +27,9 @@ jobs:
- name: Log in to registry
run: echo "${{ secrets.CI_TOKEN }}" | docker login gitea.gitea.svc.cluster.local:3000 -u chemavx --password-stdin
- name: Clean previous buildx builder
run: docker buildx rm ci-builder 2>/dev/null || true
- name: Create buildx builder
run: |
cat > /tmp/buildkitd.toml << 'EOF'
+1
View File
@@ -6,6 +6,7 @@ WORKDIR /app
RUN apt-get update && apt-get install -y \
gcc g++ \
libxml2-dev libxslt-dev \
libfreetype6-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
+3
View File
@@ -0,0 +1,3 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}
+7 -3
View File
@@ -6,12 +6,12 @@ httpx==0.27.0
aiohttp==3.10.0
# Scraping
beautifulsoup4==4.12.3
beautifulsoup4==4.14.3
lxml==5.2.2
trafilatura==1.12.0
youtube-transcript-api==0.6.2
pdfplumber==0.11.3
feedparser==6.0.11
pdfplumber==0.11.9
feedparser==6.0.12
duckduckgo-search==6.2.6
# Storage & Embeddings
@@ -26,6 +26,10 @@ scikit-learn==1.5.1
# Claude API (scoring)
anthropic>=0.40.0
# PDF export
markdown==3.7
reportlab==4.2.5
# Utilities
pydantic==2.8.0
pydantic-settings==2.4.0
+759 -92
View File
@@ -4,6 +4,7 @@ Main user interface — all commands handled here
"""
import asyncio
import os
import time
from datetime import datetime
from typing import Optional
@@ -33,17 +34,29 @@ def is_authorized(user_id: int) -> bool:
return not allowed or user_id in allowed
def fmt_progress(iteration: int, total: int, new: int, stats: dict) -> str:
scraped = stats.get("scraped") or 0
failed = stats.get("failed") or 0
pending = stats.get("pending") or 0
skipped = stats.get("skipped") or 0
return (
f"🔄 *Iteration {iteration}*\n"
f"📚 Sources found: `{total}`\n"
f"✅ Scraped: `{scraped}` | ⏭️ Skipped: `{skipped}` | ❌ Failed: `{failed}` | ⏳ Pending: `{pending}`\n"
f"🆕 New URLs this round: `{new}`"
)
class ProgressReporter:
def __init__(self, reply_target: Message = None, *, bot=None, chat_id: int = None):
self._reply_target = reply_target
self._bot = bot
self._chat_id = chat_id
self._msg: Optional[Message] = None
async def start(self, text: str):
if self._reply_target is not None:
self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN)
elif self._bot is not None and self._chat_id is not None:
self._msg = await self._bot.send_message(self._chat_id, text, parse_mode=ParseMode.MARKDOWN)
async def update(self, text: str):
if not self._msg:
return
try:
await self._msg.edit_text(text, parse_mode=ParseMode.MARKDOWN)
except Exception:
pass
async def done(self, text: str):
await self.update(text)
async def send_chunked(message: Message, text: str, parse_mode=None):
@@ -56,6 +69,72 @@ async def send_chunked(message: Message, text: str, parse_mode=None):
await asyncio.sleep(0.5)
# ─── Shared research logic ────────────────────────────────────────────────────
async def run_scheduled_research(bot, chat_id: int, topic: str,
session_id: int, db: ResearchDB,
progress_message=None,
silent_completion: bool = False):
if progress_message is not None:
reporter = ProgressReporter(progress_message)
else:
reporter = ProgressReporter(bot=bot, chat_id=chat_id)
try:
await reporter.start(f"🔍 Iniciando scraping de `{topic}`…")
async def on_progress(iter_num, total_sources):
await reporter.update(
f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas"
)
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
final_stats = await scraper.run()
await db.update_session(session_id, status=ResearchStatus.SATURATED)
scraped = final_stats.get("scraped", 0)
await reporter.update(f"⚡ Procesando `{scraped}` fuentes…")
ollama = OllamaClient()
if await ollama.is_available():
processor = ContentProcessor(db, ollama)
async def proc_progress(total_chunks, total_words):
await reporter.update(
f"⚡ Scoring chunks… (`{total_chunks}` procesados)"
)
await processor.process_session(session_id, topic, proc_progress)
chunk_count = await db.get_chunks_count(session_id)
if silent_completion:
await reporter.done(
f"🔍 Investigación completada — analizando novedades…"
)
else:
await reporter.done(
f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate <tipo>"
)
else:
await reporter.done(
f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n"
f"Usa /generate para generar contenido."
)
except asyncio.CancelledError:
await db.update_session(session_id, status=ResearchStatus.FINISHED)
try:
await reporter.done("🛑 Investigación cancelada.")
except Exception:
pass
except Exception as e:
logger.error("Research task failed", error=str(e))
try:
await reporter.done(f"❌ Error: {str(e)[:200]}")
except Exception:
pass
# ─── Commands ─────────────────────────────────────────────────────────────────
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
@@ -68,9 +147,18 @@ async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"`/status` — Check current research progress\n"
"`/finish` — Stop research and proceed to generation\n"
"`/process` — Manually trigger chunk processing\n"
"`/generate <type>` — Generate output (podcast|blog|report|thread)\n"
"`/generate <type>` — Generate output\n"
" Tipos: podcast|blog|report|thread\n"
" Extended: podcast_extended|blog_extended|report_extended\n"
"`/sources` — List all sources found\n"
"`/outputs` — List generated outputs\n"
"`/export` — Exportar último output como PDF\n"
"`/publish` — Publicar último output en Ghost como borrador\n"
"`/compare <tema1> vs <tema2>` — Análisis comparativo\n"
"`/costs` — Show API usage costs\n"
"`/watch <topic> [h]` — Schedule periodic research\n"
"`/unwatch <topic>` — Remove a watch\n"
"`/watches` — List your watched topics\n"
"`/cancel` — Cancel current research\n"
"`/help` — Show this message",
parse_mode=ParseMode.MARKDOWN
@@ -91,88 +179,22 @@ async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
)
return
# Check for existing active research
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
await update.message.reply_text(
"⚠️ Research already in progress. Use /status or /finish first."
)
return
msg = await update.message.reply_text(
f"🦉 *ResearchOwl* starting research on:\n`{topic}`\n\n"
f"🌱 Seeding sources from:\n"
f"• DuckDuckGo (8 queries)\n"
f"• Wikipedia + internal links\n"
f"• Reddit top posts\n"
f"• YouTube transcripts\n\n"
f"This will run exhaustively until saturation. Use /finish to stop early.",
parse_mode=ParseMode.MARKDOWN
)
async def run_research():
db_conn = await get_db()
db = ResearchDB(db_conn)
try:
session_id = await db.create_session(topic, chat_id)
_active_sessions[chat_id] = session_id
progress_msg = msg
iteration_count = [0]
async def on_progress(iteration, total, new_this_round, stats):
iteration_count[0] = iteration
text = fmt_progress(iteration, total, new_this_round, stats)
try:
await progress_msg.edit_text(text, parse_mode=ParseMode.MARKDOWN)
except Exception:
pass
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
final_stats = await scraper.run()
await db.update_session(session_id, status=ResearchStatus.SATURATED)
scraped = final_stats.get("scraped", 0)
await update.message.reply_text(
f"✅ *Research complete!*\n\n"
f"📊 Results:\n"
f"• Sources found & scraped: `{scraped}`\n"
f"• Iterations: `{iteration_count[0]}`\n\n"
f"Now processing content with Ollama...\n"
f"Use `/generate podcast|blog|report|thread` when ready.",
parse_mode=ParseMode.MARKDOWN
await run_scheduled_research(
ctx.bot, chat_id, topic, session_id, db,
progress_message=update.message
)
# Auto-process after scraping
ollama = OllamaClient()
if await ollama.is_available():
processor = ContentProcessor(db, ollama)
async def proc_progress(total_chunks, total_words):
await update.message.reply_text(
f"🧠 *Processing complete!*\n"
f"• Chunks stored: `{total_chunks}`\n"
f"• Words researched: `{total_words:,}`\n\n"
f"Ready! Use `/generate podcast|blog|report|thread`",
parse_mode=ParseMode.MARKDOWN
)
await processor.process_session(session_id, topic, proc_progress)
else:
await update.message.reply_text(
"⚠️ Ollama not reachable — skipping processing.\n"
"You can still use `/generate` (will use raw content)."
)
except asyncio.CancelledError:
await db.update_session(
_active_sessions.get(chat_id, 0),
status=ResearchStatus.FINISHED
)
await update.message.reply_text("🛑 Research cancelled.")
except Exception as e:
logger.error("Research task failed", error=str(e))
await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}")
finally:
await db_conn.close()
@@ -255,6 +277,7 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
output_arg = ctx.args[0].lower() if ctx.args else ""
lang = "en" if len(ctx.args) > 1 and ctx.args[1].lower() == "en" else "es"
type_map = {
"podcast": OutputType.PODCAST,
@@ -263,6 +286,10 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"thread": OutputType.THREAD,
"hilo": OutputType.THREAD,
"informe": OutputType.REPORT,
"report_extended": OutputType.REPORT_EXTENDED,
"blog_extended": OutputType.BLOG_EXTENDED,
"podcast_extended": OutputType.PODCAST_EXTENDED,
"informe_extended": OutputType.REPORT_EXTENDED,
}
if output_arg not in type_map:
@@ -279,12 +306,19 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
db = ResearchDB(db_conn)
try:
# Find last session for this chat
cursor = await db_conn.execute(
"""SELECT * FROM research_sessions WHERE telegram_chat_id = ?
ORDER BY created_at DESC LIMIT 1""",
(chat_id,)
)
# Usa la sesión activa si existe, si no la más reciente
session_id = _active_sessions.get(chat_id)
if session_id:
cursor = await db_conn.execute(
"SELECT * FROM research_sessions WHERE id = ?",
(session_id,)
)
else:
cursor = await db_conn.execute(
"""SELECT * FROM research_sessions WHERE telegram_chat_id = ?
ORDER BY created_at DESC LIMIT 1""",
(chat_id,)
)
row = await cursor.fetchone()
if not row:
await update.message.reply_text("No research sessions found. Start with /research <topic>")
@@ -293,9 +327,11 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
session = dict(row)
session_id = session["id"]
backend = "Claude Haiku" if settings.anthropic_api_key else f"Ollama ({settings.ollama_model})"
lang_label = " (EN)" if lang == "en" else ""
msg = await update.message.reply_text(
f"⚙️ Generating *{output_type}* for: `{session['topic']}`\n"
f"Using Ollama ({settings.ollama_model})...\n"
f"⚙️ Generating *{output_type}{lang_label}* for: `{session['topic']}`\n"
f"Using {backend}...\n"
f"This may take 2-5 minutes ☕",
parse_mode=ParseMode.MARKDOWN
)
@@ -310,18 +346,26 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
processor = ContentProcessor(db, ollama)
generator = OutputGenerator(db, ollama, processor)
output = await generator.generate(session_id, output_type, gen_progress)
output = await generator.generate(session_id, output_type, gen_progress, lang=lang)
# Send as file if very long
if len(output) > 8000:
import tempfile
import re as _re
ext_map = {
OutputType.PODCAST: "script.md",
OutputType.BLOG: "post.md",
OutputType.REPORT: "report.md",
OutputType.THREAD: "thread.txt",
OutputType.REPORT_EXTENDED: "report_extended.md",
OutputType.BLOG_EXTENDED: "blog_extended.md",
OutputType.PODCAST_EXTENDED: "script_extended.md",
}
filename = f"researchowl_{session['topic'][:30].replace(' ', '_')}_{ext_map[output_type]}"
# Use the topic from the output header (written at generation time)
# instead of the pre-fetched session dict which may be stale.
_m = _re.search(r'^Topic:\s*(.+)$', output[:500], _re.MULTILINE)
_topic = _m.group(1).strip() if _m else session["topic"]
filename = f"researchowl_{_topic[:30].replace(' ', '_')}_{ext_map[output_type]}"
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
f.write(output)
@@ -339,6 +383,18 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
else:
await send_chunked(update.message, output)
try:
stats = await db.get_usage_stats(session_id)
total_cost = sum(s.get("total_cost", 0) for s in stats)
if total_cost > settings.cost_alert_threshold:
await update.message.reply_text(
f"⚠️ Coste acumulado de esta sesión: `${total_cost:.4f}`"
f" (umbral: `${settings.cost_alert_threshold:.2f}`)",
parse_mode=ParseMode.MARKDOWN
)
except Exception:
pass
except Exception as e:
logger.error("Generate failed", error=str(e))
await update.message.reply_text(f"❌ Generation failed: {str(e)[:200]}")
@@ -429,6 +485,169 @@ async def cmd_outputs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
await db_conn.close()
async def cmd_costs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
db_conn = await get_db()
db = ResearchDB(db_conn)
try:
cursor = await db_conn.execute(
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
(chat_id,)
)
row = await cursor.fetchone()
if not row:
await update.message.reply_text("No sessions found.")
return
session_id = row["id"]
topic = row["topic"]
by_type = {r["call_type"]: r for r in await db.get_usage_stats(session_id)}
totals = await db.get_total_usage_stats()
lines = [f"📊 *Costes ResearchOwl*\n"]
lines.append(f"Última sesión (`{topic}`):")
session_total = 0.0
for call_type, label in [("scoring", "Scoring"), ("generation", "Generación")]:
row_data = by_type.get(call_type)
if row_data:
calls = row_data["calls"]
tokens = row_data["total_tokens"]
cost = row_data["total_cost"]
session_total += cost
lines.append(f" {label}: {calls} llamadas · {tokens:,} tokens · ${cost:.4f}")
else:
lines.append(f" {label}: —")
lines.append(f" Total: ${session_total:.4f}")
lines.append("")
lines.append("Acumulado total:")
acc_cost = totals.get("total_cost") or 0.0
acc_sessions = totals.get("sessions") or 0
lines.append(f" ${acc_cost:.4f} ({acc_sessions} sesiones)")
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN)
finally:
await db_conn.close()
async def cmd_watch(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
args = ctx.args or []
if not args:
await update.message.reply_text(
"❌ Uso: `/watch <tema> [horas]`\nEjemplo: `/watch Incidente Roswell 24`",
parse_mode=ParseMode.MARKDOWN
)
return
interval_hours = 24
if args[-1].isdigit():
interval_hours = int(args[-1])
topic = " ".join(args[:-1]).strip()
else:
topic = " ".join(args).strip()
if not topic:
await update.message.reply_text("❌ Debes especificar un tema.")
return
if not (1 <= interval_hours <= 168):
await update.message.reply_text(
"❌ El intervalo debe estar entre 1 y 168 horas (1 semana)."
)
return
db_conn = await get_db()
db = ResearchDB(db_conn)
try:
try:
await db.add_watch(topic, chat_id, interval_hours)
await update.message.reply_text(
f"👁 Watching: `{topic}` — cada {interval_hours}h\n"
f"Primera ejecución en ~{interval_hours}h.\n"
f"Usa /watches para ver todos tus temas.",
parse_mode=ParseMode.MARKDOWN
)
except Exception as e:
if "UNIQUE" in str(e):
await update.message.reply_text(
f"Ya estás watching `{topic}`", parse_mode=ParseMode.MARKDOWN
)
else:
raise
finally:
await db_conn.close()
async def cmd_unwatch(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
topic = " ".join(ctx.args).strip() if ctx.args else ""
if not topic:
await update.message.reply_text(
"❌ Uso: `/unwatch <tema>`", parse_mode=ParseMode.MARKDOWN
)
return
db_conn = await get_db()
db = ResearchDB(db_conn)
try:
removed = await db.remove_watch(topic, chat_id)
if removed:
await update.message.reply_text(
f"✅ Ya no vigilas `{topic}`.", parse_mode=ParseMode.MARKDOWN
)
else:
await update.message.reply_text(f"No estabas watching `{topic}`.")
finally:
await db_conn.close()
async def cmd_watches(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
db_conn = await get_db()
db = ResearchDB(db_conn)
try:
watches = await db.list_watches(chat_id)
if not watches:
await update.message.reply_text(
"No tienes temas vigilados. Usa `/watch <tema>`",
parse_mode=ParseMode.MARKDOWN
)
return
now = time.time()
lines = ["👁 *Tus temas vigilados:*\n"]
for i, w in enumerate(watches, 1):
secs_remaining = max(0.0, w["next_run_at"] - now)
hours_remaining = secs_remaining / 3600
eta = f"{int(secs_remaining / 60)}min" if hours_remaining < 1 else f"{hours_remaining:.1f}h"
status = "" if w["enabled"] else ""
lines.append(
f"{i}. {status} `{w['topic']}` — cada {w['interval_hours']}h · próxima en {eta}"
)
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN)
finally:
await db_conn.close()
async def cmd_process(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
@@ -513,8 +732,448 @@ async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
# ─── Bot setup ────────────────────────────────────────────────────────────────
async def _purge_on_startup(app: Application) -> None:
db_conn = await get_db()
try:
db = ResearchDB(db_conn)
result = await db.purge_old_sessions(30)
if result["sessions"] > 0:
logger.info("Startup purge done", **result)
except Exception as e:
logger.warning("Startup purge failed — bot continues", error=str(e))
finally:
await db_conn.close()
async def _scheduler_loop(app: Application):
while True:
db_conn = None
try:
db_conn = await get_db()
db = ResearchDB(db_conn)
due = await db.get_due_watches()
for watch in due:
chat_id = watch["chat_id"]
topic = watch["topic"]
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
continue
session_id = await db.create_session(topic, chat_id)
_active_sessions[chat_id] = session_id
await db.update_watch_run(watch["id"])
async def _task(c=chat_id, t=topic, s=session_id, w_id=watch["id"]):
inner_db_conn = await get_db()
inner_db = ResearchDB(inner_db_conn)
try:
await run_scheduled_research(app.bot, c, t, s, inner_db,
silent_completion=True)
prev_session = await inner_db.get_previous_session(c, t, s)
new_urls = await inner_db.get_session_urls(s)
old_urls = await inner_db.get_session_urls(prev_session["id"]) \
if prev_session else set()
new_chunks = await inner_db.get_top_chunks(s, limit=30)
try:
from src.generator.generator import generate_diff_summary
summary = await generate_diff_summary(
t, new_urls, old_urls, new_chunks, s, inner_db
)
except Exception as e:
logger.warning("Diff summary failed", error=str(e))
summary = (
f"📊 *Actualización disponible — {t}*\n\n"
f"Usa /generate report para ver el análisis completo."
)
if summary:
await app.bot.send_message(
c, summary, parse_mode=ParseMode.MARKDOWN
)
else:
await app.bot.send_message(
c,
f"🔄 *{t}* — sin novedades significativas esta vez.",
parse_mode=ParseMode.MARKDOWN
)
finally:
await inner_db_conn.close()
task = asyncio.create_task(_task())
_active_tasks[chat_id] = task
await app.bot.send_message(
chat_id,
f"🔄 Investigación automática iniciada: `{topic}`",
parse_mode=ParseMode.MARKDOWN
)
except Exception as e:
logger.warning("Scheduler loop error", error=str(e))
finally:
if db_conn:
try:
await db_conn.close()
except Exception:
pass
await asyncio.sleep(60)
async def _start_scheduler(app: Application) -> None:
asyncio.create_task(_scheduler_loop(app))
async def _on_startup(app: Application) -> None:
await _purge_on_startup(app)
await _start_scheduler(app)
async def cmd_export(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
db_conn = await get_db()
db = ResearchDB(db_conn)
try:
session = await db.get_latest_session(chat_id)
if not session:
await update.message.reply_text("No hay sesiones de investigación.")
return
session_id = session["id"]
topic = session["topic"]
outputs = await db.get_outputs(session_id)
if not outputs:
await update.message.reply_text(
"No hay outputs generados. Usa `/generate <tipo>` primero.",
parse_mode=ParseMode.MARKDOWN
)
return
priority = [
"report_extended", "blog_extended", "podcast_extended",
"report", "blog", "podcast", "thread",
]
chosen = None
for ptype in priority:
for o in outputs:
if o["output_type"] == ptype:
chosen = o
break
if chosen:
break
if not chosen:
chosen = outputs[0]
msg = await update.message.reply_text(
f"📄 Generando PDF para `{topic}`…",
parse_mode=ParseMode.MARKDOWN
)
try:
from src.generator.generator import generate_pdf
pdf_bytes = generate_pdf(chosen["content"], title=topic)
except ImportError:
await msg.edit_text("❌ reportlab no está instalado. Ejecuta: `pip install reportlab`")
return
except Exception as e:
await msg.edit_text(f"❌ Error generando PDF: {str(e)[:200]}")
return
safe_topic = topic[:40].replace(" ", "_").replace("/", "-")
filename = f"researchowl_{safe_topic}_{chosen['output_type']}.pdf"
import io
await update.message.reply_document(
document=io.BytesIO(pdf_bytes),
filename=filename,
caption=f"📄 *{chosen['output_type'].upper()}* — {topic}\nExportado por ResearchOwl 🦉",
parse_mode=ParseMode.MARKDOWN
)
try:
await msg.delete()
except Exception:
pass
except Exception as e:
logger.error("Export failed", error=str(e))
await update.message.reply_text(f"❌ Export failed: {str(e)[:200]}")
finally:
await db_conn.close()
async def cmd_purge(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
args = ctx.args or []
if not args:
days = 30
else:
try:
days = int(args[0])
except ValueError:
await update.message.reply_text(
"❌ Uso: `/purge [días]`", parse_mode=ParseMode.MARKDOWN
)
return
if days < 0:
await update.message.reply_text("❌ El número de días debe ser ≥ 0.")
return
if days == 0 and not (len(args) >= 2 and args[1] == "confirm"):
await update.message.reply_text(
"⚠️ Esto borrará *todas* las sesiones completadas.\n"
"Envía `/purge 0 confirm` para confirmar.",
parse_mode=ParseMode.MARKDOWN
)
return
db_conn = await get_db()
try:
db = ResearchDB(db_conn)
result = await db.purge_old_sessions(days)
await update.message.reply_text(
f"🗑️ Purged: {result['sessions']} sessions, "
f"{result['sources']} sources, "
f"{result['chunks']} chunks, "
f"{result['outputs']} outputs"
)
except Exception as e:
logger.error("Purge command failed", error=str(e))
await update.message.reply_text(f"❌ Purge failed: {str(e)[:200]}")
finally:
await db_conn.close()
async def cmd_publish(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
db_conn = await get_db()
db = ResearchDB(db_conn)
try:
from src.generator.generator import GhostPublisher, _extract_title
ghost = GhostPublisher()
if not ghost.is_configured():
await update.message.reply_text(
"❌ Ghost no configurado. Asegúrate de que `GHOST_URL` y `GHOST_API_KEY` están definidos.",
parse_mode=ParseMode.MARKDOWN
)
return
session_id = _active_sessions.get(chat_id)
if session_id:
cursor = await db_conn.execute(
"SELECT * FROM research_sessions WHERE id = ?",
(session_id,)
)
else:
cursor = await db_conn.execute(
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
(chat_id,)
)
row = await cursor.fetchone()
if not row:
await update.message.reply_text("No hay sesiones. Usa /research primero.")
return
session = dict(row)
outputs = await db.get_outputs(session["id"])
if not outputs:
await update.message.reply_text(
"No hay outputs generados. Usa `/generate blog|report` primero.",
parse_mode=ParseMode.MARKDOWN
)
return
priority = ["blog_extended", "blog", "report_extended", "report",
"podcast_extended", "podcast", "thread"]
chosen = None
for ptype in priority:
for o in outputs:
if o["output_type"] == ptype:
chosen = o
break
if chosen:
break
if not chosen:
chosen = outputs[-1]
msg = await update.message.reply_text("📤 Publicando en Ghost como borrador…")
title = _extract_title(chosen["content"]) or session["topic"]
result = await ghost.publish_draft(title, chosen["content"])
post = result["posts"][0]
admin_url = f"{ghost.url}/ghost/#/editor/post/{post['id']}"
await msg.edit_text(
f"✅ *Publicado en Ghost como borrador*\n\n"
f"📝 Título: `{title}`\n"
f"🔗 Editar: {admin_url}",
parse_mode=ParseMode.MARKDOWN
)
except Exception as e:
logger.error("Publish to Ghost failed", error=str(e))
await update.message.reply_text(f"❌ Error publicando en Ghost: {str(e)[:200]}")
finally:
await db_conn.close()
async def cmd_compare(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
text = " ".join(ctx.args).strip() if ctx.args else ""
import re
match = re.split(r'\s+vs\.?\s+|\s+versus\s+', text, maxsplit=1, flags=re.IGNORECASE)
if len(match) != 2 or not match[0].strip() or not match[1].strip():
await update.message.reply_text(
"❌ Uso: `/compare <tema1> vs <tema2>`\n"
"Ejemplo: `/compare energía solar vs energía nuclear`",
parse_mode=ParseMode.MARKDOWN
)
return
topic_a = match[0].strip()
topic_b = match[1].strip()
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
await update.message.reply_text(
"⚠️ Ya hay una investigación en curso. Usa /cancel primero."
)
return
msg = await update.message.reply_text(
f"🔍 Comparando `{topic_a}` vs `{topic_b}`…\n"
f"Esto lanzará dos investigaciones en paralelo y tardará varios minutos.",
parse_mode=ParseMode.MARKDOWN
)
async def run_compare():
db_conn_a = await get_db()
db_conn_b = await get_db()
db_a = ResearchDB(db_conn_a)
db_b = ResearchDB(db_conn_b)
try:
session_id_a = await db_a.create_session(topic_a, chat_id)
session_id_b = await db_b.create_session(topic_b, chat_id)
_active_sessions[chat_id] = session_id_a
await msg.edit_text(
f"🔍 Investigando en paralelo:\n"
f"• `{topic_a}`\n"
f"• `{topic_b}`\n\n"
f"Esto puede tardar 10-20 minutos…",
parse_mode=ParseMode.MARKDOWN
)
async def research_topic(session_id, topic, db):
scraper = ExhaustiveScraper(db, session_id, topic)
await scraper.run()
await db.update_session(session_id, status=ResearchStatus.SATURATED)
ollama = OllamaClient()
if await ollama.is_available():
processor = ContentProcessor(db, ollama)
await processor.process_session(session_id, topic)
await msg.edit_text(
f"🔍 Scraping en paralelo:\n• `{topic_a}`\n• `{topic_b}`…",
parse_mode=ParseMode.MARKDOWN
)
await asyncio.gather(
research_topic(session_id_a, topic_a, db_a),
research_topic(session_id_b, topic_b, db_b),
)
await msg.edit_text(
"✍️ Generando análisis comparativo…",
parse_mode=ParseMode.MARKDOWN
)
ollama = OllamaClient()
processor_a = ContentProcessor(db_a, ollama)
processor_b = ContentProcessor(db_b, ollama)
context_a = await processor_a.rag_query(session_id_a, topic_a, top_k=40)
context_b = await processor_b.rag_query(session_id_b, topic_b, top_k=40)
if not context_a:
chunks = await db_a.get_top_chunks(session_id_a, limit=20)
context_a = "\n\n---\n\n".join(c["content"] for c in chunks)
if not context_b:
chunks = await db_b.get_top_chunks(session_id_b, limit=20)
context_b = "\n\n---\n\n".join(c["content"] for c in chunks)
from src.generator.generator import generate_comparison
comparison = await generate_comparison(
topic_a, topic_b, context_a, context_b, session_id_a, db_a
)
from datetime import datetime, timezone
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
header = (
f"---\n"
f"ResearchOwl | COMPARISON\n"
f"Topic A: {topic_a}\n"
f"Topic B: {topic_b}\n"
f"Generated: {now}\n"
f"---\n\n"
)
full_output = header + comparison
await db_a.save_output(session_id_a, OutputType.REPORT, full_output)
if len(full_output) > 8000:
import io
filename = (
f"compare_{topic_a[:20]}_{topic_b[:20]}.md"
.replace(" ", "_")
)
await update.message.reply_document(
document=io.BytesIO(full_output.encode()),
filename=filename,
caption=f"📊 Comparación: {topic_a} vs {topic_b}"
)
try:
await msg.delete()
except Exception:
pass
else:
await msg.edit_text(full_output, parse_mode=ParseMode.MARKDOWN)
except asyncio.CancelledError:
await msg.edit_text("🛑 Comparación cancelada.")
except Exception as e:
logger.error("Compare task failed", error=str(e))
try:
await msg.edit_text(f"❌ Error: {str(e)[:200]}")
except Exception:
pass
finally:
await db_conn_a.close()
await db_conn_b.close()
task = asyncio.create_task(run_compare())
_active_tasks[chat_id] = task
def create_bot() -> Application:
app = Application.builder().token(settings.telegram_bot_token).build()
app = (
Application.builder()
.token(settings.telegram_bot_token)
.post_init(_on_startup)
.build()
)
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("help", cmd_help))
@@ -524,8 +1183,16 @@ def create_bot() -> Application:
app.add_handler(CommandHandler("generate", cmd_generate))
app.add_handler(CommandHandler("sources", cmd_sources))
app.add_handler(CommandHandler("outputs", cmd_outputs))
app.add_handler(CommandHandler("export", cmd_export))
app.add_handler(CommandHandler("costs", cmd_costs))
app.add_handler(CommandHandler("watch", cmd_watch))
app.add_handler(CommandHandler("unwatch", cmd_unwatch))
app.add_handler(CommandHandler("watches", cmd_watches))
app.add_handler(CommandHandler("process", cmd_process))
app.add_handler(CommandHandler("cancel", cmd_cancel))
app.add_handler(CommandHandler("purge", cmd_purge))
app.add_handler(CommandHandler("publish", cmd_publish))
app.add_handler(CommandHandler("compare", cmd_compare))
return app
+10 -1
View File
@@ -31,7 +31,16 @@ class Settings(BaseSettings):
# Processing
chunk_size: int = Field(800, env="CHUNK_SIZE") # tokens per chunk
chunk_overlap: int = Field(100, env="CHUNK_OVERLAP")
quality_threshold: float = Field(0.5, env="QUALITY_THRESHOLD") # 0-1, chunks below discarded
quality_threshold: float = Field(0.3, env="QUALITY_THRESHOLD") # 0-1, chunks below discarded
# Ghost CMS
ghost_url: Optional[str] = Field(None, env="GHOST_URL")
ghost_api_key: Optional[str] = Field(None, env="GHOST_API_KEY")
ghost_url_en: str = Field("", env="GHOST_URL_EN")
ghost_api_key_en: str = Field("", env="GHOST_API_KEY_EN")
# Alerts
cost_alert_threshold: float = Field(0.15, env="COST_ALERT_THRESHOLD")
# App
log_level: str = Field("INFO", env="LOG_LEVEL")
+193
View File
@@ -5,8 +5,12 @@ from pathlib import Path
from typing import Optional
from enum import Enum
import structlog
from src.config import settings
logger = structlog.get_logger()
class ResearchStatus(str, Enum):
RUNNING = "running"
@@ -20,6 +24,9 @@ class OutputType(str, Enum):
BLOG = "blog"
REPORT = "report"
THREAD = "thread"
REPORT_EXTENDED = "report_extended"
BLOG_EXTENDED = "blog_extended"
PODCAST_EXTENDED = "podcast_extended"
SCHEMA = """
@@ -84,6 +91,29 @@ CREATE INDEX IF NOT EXISTS idx_sources_session ON sources(session_id);
CREATE INDEX IF NOT EXISTS idx_chunks_session ON chunks(session_id);
CREATE INDEX IF NOT EXISTS idx_chunks_quality ON chunks(session_id, quality_score DESC);
CREATE INDEX IF NOT EXISTS idx_source_contents ON source_contents(source_id);
CREATE TABLE IF NOT EXISTS api_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER REFERENCES research_sessions(id),
call_type TEXT NOT NULL,
model TEXT NOT NULL,
input_tokens INTEGER NOT NULL,
output_tokens INTEGER NOT NULL,
cost_usd REAL NOT NULL,
created_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS watched_topics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
chat_id INTEGER NOT NULL,
interval_hours INTEGER NOT NULL DEFAULT 24,
next_run_at REAL NOT NULL,
last_run_at REAL,
enabled INTEGER NOT NULL DEFAULT 1,
created_at REAL NOT NULL,
UNIQUE(topic, chat_id)
);
"""
@@ -121,6 +151,35 @@ class ResearchDB:
row = await cursor.fetchone()
return dict(row) if row else None
async def get_latest_session(self, chat_id: int) -> Optional[dict]:
cursor = await self.db.execute(
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
(chat_id,)
)
row = await cursor.fetchone()
return dict(row) if row else None
async def get_session_urls(self, session_id: int) -> set:
async with self.db.execute(
"SELECT url FROM sources WHERE session_id = ?", (session_id,)
) as cur:
rows = await cur.fetchall()
return {r[0] for r in rows}
async def get_previous_session(self, chat_id: int, topic: str,
exclude_session_id: int) -> Optional[dict]:
async with self.db.execute(
"""SELECT id, topic, status, created_at FROM research_sessions
WHERE telegram_chat_id = ? AND topic = ? AND id != ?
ORDER BY created_at DESC LIMIT 1""",
(chat_id, topic, exclude_session_id)
) as cur:
row = await cur.fetchone()
if not row:
return None
return {"id": row[0], "topic": row[1],
"status": row[2], "created_at": row[3]}
async def get_active_session(self, chat_id: int) -> Optional[dict]:
cursor = await self.db.execute(
"""SELECT * FROM research_sessions
@@ -259,6 +318,19 @@ class ResearchDB:
row = await cursor.fetchone()
return row[0] if row else None
async def get_cached_content(self, url: str,
max_age_days: int = 7) -> Optional[str]:
threshold = time.time() - (max_age_days * 86400)
async with self.db.execute(
"""SELECT sc.content FROM source_contents sc
JOIN sources s ON s.id = sc.source_id
WHERE s.url = ? AND sc.created_at > ?
ORDER BY sc.created_at DESC LIMIT 1""",
(url, threshold)
) as cur:
row = await cur.fetchone()
return row[0] if row else None
async def get_outputs(self, session_id: int) -> list[dict]:
cursor = await self.db.execute(
"SELECT * FROM outputs WHERE session_id = ? ORDER BY created_at DESC",
@@ -266,3 +338,124 @@ class ResearchDB:
)
rows = await cursor.fetchall()
return [dict(r) for r in rows]
# --- API Usage ---
async def log_api_call(self, session_id, call_type: str, model: str,
input_tokens: int, output_tokens: int):
# Precios Claude Haiku (claude-haiku-4-5):
# input: $0.80 / 1M tokens output: $4.00 / 1M tokens
cost = (input_tokens * 0.80 + output_tokens * 4.00) / 1_000_000
await self.db.execute(
"""INSERT INTO api_usage
(session_id, call_type, model, input_tokens, output_tokens, cost_usd, created_at)
VALUES (?,?,?,?,?,?,?)""",
(session_id, call_type, model, input_tokens, output_tokens, cost, time.time())
)
await self.db.commit()
async def get_usage_stats(self, session_id: int) -> list[dict]:
cursor = await self.db.execute(
"""SELECT call_type,
COUNT(*) as calls,
SUM(input_tokens + output_tokens) as total_tokens,
SUM(cost_usd) as total_cost
FROM api_usage WHERE session_id = ?
GROUP BY call_type""",
(session_id,)
)
rows = await cursor.fetchall()
return [dict(r) for r in rows]
async def get_total_usage_stats(self) -> dict:
cursor = await self.db.execute(
"""SELECT COUNT(DISTINCT session_id) as sessions,
SUM(cost_usd) as total_cost
FROM api_usage"""
)
row = await cursor.fetchone()
return dict(row) if row else {"sessions": 0, "total_cost": 0}
# --- Watched Topics ---
async def add_watch(self, topic: str, chat_id: int, interval_hours: int) -> int:
now = time.time()
cursor = await self.db.execute(
"""INSERT OR REPLACE INTO watched_topics
(topic, chat_id, interval_hours, next_run_at, created_at)
VALUES (?, ?, ?, ?, ?)""",
(topic, chat_id, interval_hours, now + interval_hours * 3600, now)
)
await self.db.commit()
return cursor.lastrowid
async def remove_watch(self, topic: str, chat_id: int) -> bool:
cursor = await self.db.execute(
"DELETE FROM watched_topics WHERE topic = ? AND chat_id = ?",
(topic, chat_id)
)
await self.db.commit()
return cursor.rowcount > 0
async def list_watches(self, chat_id: int) -> list[dict]:
cursor = await self.db.execute(
"SELECT * FROM watched_topics WHERE chat_id = ? ORDER BY created_at ASC",
(chat_id,)
)
rows = await cursor.fetchall()
return [dict(r) for r in rows]
async def get_due_watches(self) -> list[dict]:
cursor = await self.db.execute(
"SELECT * FROM watched_topics WHERE enabled = 1 AND next_run_at <= ?",
(time.time(),)
)
rows = await cursor.fetchall()
return [dict(r) for r in rows]
async def update_watch_run(self, watch_id: int):
cursor = await self.db.execute(
"SELECT interval_hours FROM watched_topics WHERE id = ?", (watch_id,)
)
row = await cursor.fetchone()
if not row:
return
now = time.time()
await self.db.execute(
"UPDATE watched_topics SET last_run_at = ?, next_run_at = ? WHERE id = ?",
(now, now + row[0] * 3600, watch_id)
)
await self.db.commit()
# --- Maintenance ---
async def purge_old_sessions(self, max_age_days: int = 30) -> dict:
await self.db.execute("PRAGMA foreign_keys = ON")
threshold = time.time() - max_age_days * 86400
cursor = await self.db.execute(
"SELECT id FROM research_sessions WHERE created_at < ? AND status != 'running'",
(threshold,)
)
session_ids = [row[0] for row in await cursor.fetchall()]
counts = {"sessions": 0, "sources": 0, "chunks": 0, "outputs": 0}
for sid in session_ids:
await self.db.execute(
"DELETE FROM source_contents WHERE source_id IN (SELECT id FROM sources WHERE session_id = ?)",
(sid,)
)
cur = await self.db.execute("DELETE FROM chunks WHERE session_id = ?", (sid,))
counts["chunks"] += cur.rowcount
cur = await self.db.execute("DELETE FROM outputs WHERE session_id = ?", (sid,))
counts["outputs"] += cur.rowcount
cur = await self.db.execute("DELETE FROM sources WHERE session_id = ?", (sid,))
counts["sources"] += cur.rowcount
cur = await self.db.execute("DELETE FROM research_sessions WHERE id = ?", (sid,))
counts["sessions"] += cur.rowcount
await self.db.commit()
logger.info("Purged sessions older than days",
sessions=counts["sessions"], days=max_age_days)
return counts
+691 -14
View File
@@ -2,6 +2,13 @@
ResearchOwl Generators
Produces structured outputs from processed research using Claude or Ollama
"""
import base64
import hashlib
import hmac
import json
import re
import time
import structlog
from src.config import settings
@@ -26,6 +33,45 @@ BLOG_SYSTEM = (
"Cada sección debe añadir información nueva no cubierta en secciones anteriores."
)
BLOG_SYSTEM_EN = (
"You write ALWAYS in English. "
"You are a journalist writing a blog article. Use clear markdown headers. "
"NEVER repeat the same fact or phrase twice — if you said it, move on. "
"Each section must add new information not covered in other sections."
)
BLOG_PROMPT_EN = """\
Write a blog article about: "{topic}"
RULES — follow strictly:
- Each section under a heading must add NEW information not covered elsewhere
- Do NOT summarize previous sections at the start of each new section
- Do NOT repeat facts — if a fact appears once, do not mention it again
- Use concrete details, numbers, names — avoid vague generalities
- Target: 1000-1500 words
STRUCTURE:
# [Impactful headline]
[Hook paragraph — the most surprising fact]
## Background
[Context — what, when, who — only facts not covered elsewhere]
## Key Facts
[Most significant findings — each point must be distinct]
## Analysis / Significance
[What this means — without repeating the Key Facts section]
## Conclusion
[No more than 2 sentences summarizing, then a forward-looking statement]
RESEARCH MATERIAL:
{context}
Write the complete article in markdown:"""
REPORT_SYSTEM = (
"Escribe SIEMPRE en español. "
"Eres un analista de investigación. Escribe un informe estructurado y factual. "
@@ -135,6 +181,179 @@ Escribe el hilo (un tweet por línea, nada más):"""
}
OUTLINE_REPORT = """
IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación.
Eres un editor de investigación. Dado el tema "{topic}" y el material
disponible, genera un outline detallado para un informe exhaustivo.
Devuelve SOLO una lista JSON de secciones, sin texto adicional, sin
markdown, sin explicaciones. Formato exacto:
[
{{"title": "Título de la sección", "query": "términos de búsqueda específicos para esta sección", "words": 800}},
...
]
Genera entre 6 y 10 secciones. Cada sección debe:
- Cubrir un ángulo distinto del tema
- Tener una query específica para recuperar chunks relevantes
- Indicar longitud objetivo en palabras (400-1200)
Material disponible (resumen):
{context_summary}
"""
OUTLINE_BLOG = """
IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación.
Eres un editor de contenido. Dado el tema "{topic}" y el material
disponible, genera un outline para un artículo de blog exhaustivo.
Devuelve SOLO una lista JSON de secciones, sin texto adicional:
[
{{"title": "Título de sección", "query": "términos búsqueda", "words": 600}},
...
]
Genera entre 5 y 8 secciones. Primera sección = introducción gancho.
Última sección = conclusión con perspectiva original.
Material disponible (resumen):
{context_summary}
"""
OUTLINE_PODCAST = """
IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación.
Eres un productor de podcast. Dado el tema "{topic}" y el material
disponible, genera un outline para un guion de podcast exhaustivo.
Devuelve SOLO una lista JSON de segmentos, sin texto adicional:
[
{{"title": "Nombre del segmento", "query": "términos búsqueda", "words": 700}},
...
]
Genera entre 5 y 7 segmentos. Flujo natural: gancho → contexto →
desarrollo → controversia → conclusión.
Material disponible (resumen):
{context_summary}
"""
# ─── Ghost CMS ────────────────────────────────────────────────────────────────
def _b64url(data: bytes | str) -> str:
if isinstance(data, str):
data = data.encode()
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def _extract_title(content: str) -> str:
"""Return first H1 heading from markdown, skipping the ResearchOwl header block."""
in_header = False
for line in content.splitlines():
stripped = line.strip()
if stripped == "---":
in_header = not in_header
continue
if in_header:
continue
if stripped.startswith("# ") and not stripped.startswith("## "):
return stripped[2:].strip()
return ""
def _strip_researchowl_header(content: str) -> str:
"""Remove the ---...--- metadata block that ResearchOwl prepends to outputs."""
lines = content.splitlines(keepends=True)
dashes_seen = 0
for i, line in enumerate(lines):
if line.strip() == "---":
dashes_seen += 1
if dashes_seen == 2:
return "".join(lines[i + 1:]).lstrip("\n")
return content
class GhostPublisher:
def __init__(self, lang: str = "es"):
if lang == "en":
self.url = (settings.ghost_url_en or "").rstrip("/")
self.api_key = settings.ghost_api_key_en or ""
else:
self.url = (settings.ghost_url or "").rstrip("/")
self.api_key = settings.ghost_api_key or ""
def is_configured(self) -> bool:
return bool(self.url and self.api_key)
def _make_token(self) -> str:
key_id, secret = self.api_key.split(":", 1)
now = int(time.time())
header = _b64url(json.dumps({"alg": "HS256", "typ": "JWT", "kid": key_id}))
payload = _b64url(json.dumps({"iat": now, "exp": now + 300, "aud": "/admin/"}))
signing = f"{header}.{payload}"
sig = _b64url(
hmac.new(bytes.fromhex(secret), signing.encode(), hashlib.sha256).digest()
)
return f"{signing}.{sig}"
async def publish_draft(self, title: str, markdown_content: str,
tags: list[str] | None = None) -> dict:
import aiohttp as _aio
import markdown as _md
clean = _strip_researchowl_header(markdown_content)
html = _md.markdown(clean, extensions=["extra"])
# Ghost añade el título automáticamente — eliminar el primer <h1> para evitar duplicado
html = re.sub(r"<h1[^>]*>.*?</h1>", "", html, count=1, flags=re.DOTALL).lstrip()
logger.info("Ghost publish_draft", html_length=len(html),
html_preview=html[:200])
if not html.strip():
raise ValueError("Ghost: HTML vacío tras conversión markdown — contenido no enviado")
# Ghost 5.x (Lexical editor): el campo "html" es solo de lectura en la API.
# La forma fiable de enviar HTML arbitrario es via mobiledoc con HTML card,
# que Ghost acepta en todas las versiones de v5 y renderiza sin conversión.
mobiledoc = json.dumps({
"version": "0.3.1",
"atoms": [],
"cards": [["html", {"html": html}]],
"markups": [],
"sections": [[10, 0]],
})
token = self._make_token()
body = {
"posts": [{
"title": title,
"mobiledoc": mobiledoc,
"status": "draft",
"tags": [{"name": t} for t in (tags or ["investigacion"])],
}]
}
async with _aio.ClientSession() as sess:
async with sess.post(
f"{self.url}/ghost/api/admin/posts/",
json=body,
headers={
"Authorization": f"Ghost {token}",
"Accept-Version": "v5.0",
},
) as resp:
if resp.status not in (200, 201):
text = await resp.text()
raise ValueError(f"Ghost API {resp.status}: {text[:300]}")
return await resp.json()
# ─── Output generation ────────────────────────────────────────────────────────
class OutputGenerator:
def __init__(self, db: ResearchDB, ollama: OllamaClient, processor: ContentProcessor):
self.db = db
@@ -142,8 +361,14 @@ class OutputGenerator:
self.processor = processor
async def generate(self, session_id: int, output_type: OutputType,
progress_callback=None) -> str:
progress_callback=None, lang: str = "es") -> str:
"""Generate an output for a research session"""
if output_type in (OutputType.REPORT_EXTENDED,
OutputType.BLOG_EXTENDED,
OutputType.PODCAST_EXTENDED):
return await self.generate_extended(session_id, output_type, progress_callback,
lang=lang)
session = await self.db.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
@@ -156,7 +381,7 @@ class OutputGenerator:
# RAG: get most relevant context for this output type
query = self._get_rag_query(output_type, topic)
context = await self.processor.rag_query(session_id, query, top_k=30)
context = await self.processor.rag_query(session_id, query, top_k=80)
if not context:
# Fallback: use raw top chunks
@@ -166,11 +391,6 @@ class OutputGenerator:
if not context:
raise ValueError("No processed content available. Run /process first.")
# Truncate context to avoid Ollama context limits
context_words = context.split()
if len(context_words) > 6000:
context = " ".join(context_words[:6000]) + "\n\n[... additional material truncated ...]"
backend = "Claude Haiku" if settings.anthropic_api_key else "Ollama"
if progress_callback:
await progress_callback(f"✍️ Generando {output_type} con {backend}... (2-5 min)")
@@ -179,7 +399,11 @@ class OutputGenerator:
system = self._get_system(output_type)
prompt = PROMPTS[output_type].format(topic=topic, context=context)
output = await self._generate(prompt, system, output_type)
if lang == "en" and output_type == OutputType.BLOG:
system = BLOG_SYSTEM_EN
prompt = BLOG_PROMPT_EN.format(topic=topic, context=context)
output = await self._generate(prompt, system, output_type, session_id)
# Add metadata header
stats = await self.db.get_session_stats(session_id)
@@ -189,17 +413,37 @@ class OutputGenerator:
# Save to DB
await self.db.save_output(session_id, output_type, full_output)
logger.info("Output generated", type=output_type, length=len(full_output))
return full_output
# Auto-publish to Ghost for blog outputs
ghost_notice = ""
if output_type in (OutputType.BLOG, OutputType.BLOG_EXTENDED):
ghost = GhostPublisher(lang=lang)
if ghost.is_configured():
try:
title = _extract_title(full_output) or topic
result = await ghost.publish_draft(title, full_output)
post = result["posts"][0]
ghost_notice = (
f"\n\n---\n"
f"📤 *Borrador publicado en Ghost*\n"
f"Editar: {ghost.url}/ghost/#/editor/post/{post['id']}"
)
logger.info("Auto-published blog to Ghost", post_id=post["id"])
except Exception as e:
logger.warning("Auto-publish to Ghost failed", error=str(e))
async def _generate(self, prompt: str, system: str, output_type: OutputType) -> str:
logger.info("Output generated", type=output_type, length=len(full_output))
return full_output + ghost_notice
async def _generate(self, prompt: str, system: str, output_type: OutputType,
session_id: int | None = None) -> str:
if settings.anthropic_api_key:
return await self._generate_with_claude(prompt, system, output_type)
return await self._generate_with_claude(prompt, system, output_type, session_id)
return await self._generate_with_ollama(prompt, system)
async def _generate_with_claude(self, prompt: str, system: str, output_type: OutputType) -> str:
async def _generate_with_claude(self, prompt: str, system: str, output_type: OutputType,
session_id: int | None = None) -> str:
import anthropic
max_tokens = 4096 if output_type == OutputType.THREAD else 8192
max_tokens = 4096 if output_type == OutputType.THREAD else 16000
try:
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
msg = await client.messages.create(
@@ -208,6 +452,14 @@ class OutputGenerator:
system=system,
messages=[{"role": "user", "content": prompt}],
)
if session_id is not None:
try:
await self.db.log_api_call(
session_id, "generation", settings.claude_model,
msg.usage.input_tokens, msg.usage.output_tokens
)
except Exception as log_err:
logger.warning("Failed to log API usage", error=str(log_err))
return msg.content[0].text.strip()
except Exception as e:
logger.warning("Claude generation failed, falling back to Ollama", error=str(e))
@@ -234,6 +486,155 @@ class OutputGenerator:
}
return systems.get(output_type, "You are a helpful research assistant.")
async def generate_extended(self, session_id: int, output_type: OutputType,
progress_callback=None, lang: str = "es") -> str:
"""
Generación por secciones para outputs exhaustivos.
1. Recupera muestra de contexto para el outline
2. Genera outline con Claude (lista de secciones)
3. Para cada sección: RAG específico → genera sección
4. Concatena y guarda
"""
session = await self.db.get_session(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
topic = session["topic"]
# Paso 1: contexto resumen para el outline (top 10 chunks)
top_chunks = await self.db.get_top_chunks(session_id, limit=10)
if not top_chunks:
raise ValueError("No processed content available. Run /process first.")
context_summary = "\n\n".join(
f"- {c.get('title', '')}: {c['content'][:300]}"
for c in top_chunks
)
if progress_callback:
await progress_callback("🗂️ Generando estructura del documento…")
# Paso 2: outline
base_type = output_type.value.replace("_extended", "")
outline_prompts = {
"report": OUTLINE_REPORT,
"blog": OUTLINE_BLOG,
"podcast": OUTLINE_PODCAST,
}
outline_prompt = outline_prompts[base_type].format(
topic=topic, context_summary=context_summary
)
outline_json = await self._generate_raw(outline_prompt, session_id)
try:
import json as _json
clean = outline_json.strip()
if clean.startswith("```"):
clean = "\n".join(clean.split("\n")[1:])
if clean.endswith("```"):
clean = "\n".join(clean.split("\n")[:-1])
sections = _json.loads(clean.strip())
except Exception as e:
logger.error("Failed to parse outline", error=str(e), raw=outline_json[:200])
raise ValueError(f"No se pudo generar el outline: {e}")
if progress_callback:
await progress_callback(
f"✍️ Generando {len(sections)} secciones… (esto tardará varios minutos)"
)
# Paso 3: generar cada sección
base_output_type = OutputType(base_type)
system = self._get_system(base_output_type)
if lang == "en" and output_type == OutputType.BLOG_EXTENDED:
system = BLOG_SYSTEM_EN
sections_text = []
for i, section in enumerate(sections, 1):
title = section.get("title", f"Sección {i}")
query = section.get("query", topic)
target_words = section.get("words", 600)
if progress_callback:
await progress_callback(
f"✍️ Sección {i}/{len(sections)}: {title[:40]}"
)
section_context = await self.processor.rag_query(session_id, query, top_k=40)
if not section_context:
section_context = context_summary
lang_rule = "- Write in English\n" if lang == "en" else "- Escribe en español\n"
section_prompt = (
f"Escribe la sección '{title}' del {base_type} sobre: '{topic}'\n\n"
f"REGLAS:\n"
f"- NO incluyas ningún título o encabezado al inicio de tu respuesta — el título de la sección ya está incluido externamente\n"
f"- Esta es UNA sección de un documento más largo — no repitas introducción ni conclusión general\n"
f"- No incluyas encabezados del documento completo, solo el contenido de esta sección\n"
f"- Objetivo: aproximadamente {target_words} palabras\n"
f"- Usa SOLO información del material siguiente — no inventes datos\n"
f"{lang_rule}"
f"\nMATERIAL:\n{section_context}"
)
section_text = await self._generate(
section_prompt, system, base_output_type, session_id
)
sections_text.append(f"## {title}\n\n{section_text}")
# Paso 4: concatenar
full_content = "\n\n---\n\n".join(sections_text)
stats = await self.db.get_session_stats(session_id)
header = self._build_header(topic, output_type, session, stats)
full_output = header + "\n\n" + full_content
await self.db.save_output(session_id, output_type, full_output)
# Auto-publish to Ghost for extended blog outputs
ghost_notice = ""
if output_type == OutputType.BLOG_EXTENDED:
ghost = GhostPublisher(lang=lang)
if ghost.is_configured():
try:
title = _extract_title(full_output) or topic
result = await ghost.publish_draft(title, full_output)
post = result["posts"][0]
ghost_notice = (
f"\n\n---\n"
f"📤 *Borrador publicado en Ghost*\n"
f"Editar: {ghost.url}/ghost/#/editor/post/{post['id']}"
)
logger.info("Auto-published extended blog to Ghost", post_id=post["id"])
except Exception as e:
logger.warning("Auto-publish to Ghost failed (extended)", error=str(e))
logger.info("Extended output generated", type=output_type,
sections=len(sections), length=len(full_output))
return full_output + ghost_notice
async def _generate_raw(self, prompt: str,
session_id: int | None = None) -> str:
if settings.anthropic_api_key:
import anthropic
try:
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
msg = await client.messages.create(
model=settings.claude_model,
max_tokens=2048,
messages=[{"role": "user", "content": prompt}],
)
if session_id is not None:
try:
await self.db.log_api_call(
session_id, "outline", settings.claude_model,
msg.usage.input_tokens, msg.usage.output_tokens
)
except Exception:
pass
return msg.content[0].text.strip()
except Exception as e:
logger.warning("Claude outline failed", error=str(e))
raise
raise ValueError("Claude API key required for extended generation")
def _build_header(self, topic: str, output_type: OutputType,
session: dict, stats: dict) -> str:
from datetime import datetime
@@ -247,3 +648,279 @@ Iterations: {session.get('iterations', 0)}
Total words researched: {session.get('total_words', 0):,}
---
"""
def _remove_duplicate_headings(text: str) -> str:
lines = text.split('\n')
result = []
i = 0
while i < len(lines):
line = lines[i].rstrip()
if line.startswith('# ') and not line.startswith('## '):
h1_text = line[2:].strip().lower()
window = result[-5:] if len(result) >= 5 else result[:]
recent_h2s = {
p.strip()[3:].strip().lower()
for p in window
if p.strip().startswith('## ')
}
if h1_text in recent_h2s:
i += 1
continue
result.append(lines[i])
i += 1
return '\n'.join(result)
def generate_pdf(content: str, title: str = "ResearchOwl Output") -> bytes:
content = _remove_duplicate_headings(content)
try:
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, HRFlowable
from reportlab.lib.enums import TA_LEFT
from reportlab.lib import colors
import io
import re
except ImportError:
raise ImportError("reportlab is required for PDF export — pip install reportlab")
buf = io.BytesIO()
doc = SimpleDocTemplate(
buf,
pagesize=A4,
rightMargin=2 * cm,
leftMargin=2 * cm,
topMargin=2.5 * cm,
bottomMargin=2 * cm,
title=title,
)
base = getSampleStyleSheet()
normal = ParagraphStyle("RO_Normal", parent=base["Normal"],
fontSize=10, leading=14, spaceAfter=4)
h1 = ParagraphStyle("RO_H1", parent=base["Heading1"],
fontSize=18, spaceBefore=12, spaceAfter=6,
textColor=colors.HexColor("#1a1a2e"))
h2 = ParagraphStyle("RO_H2", parent=base["Heading2"],
fontSize=14, spaceBefore=10, spaceAfter=4,
textColor=colors.HexColor("#16213e"))
h3 = ParagraphStyle("RO_H3", parent=base["Heading3"],
fontSize=12, spaceBefore=8, spaceAfter=4)
code_style = ParagraphStyle("RO_Code", parent=base["Code"],
fontSize=9, leading=12, fontName="Courier",
backColor=colors.HexColor("#f4f4f4"), spaceAfter=4)
bullet_style = ParagraphStyle("RO_Bullet", parent=normal,
leftIndent=20, bulletIndent=10, spaceAfter=2)
def md_to_para(text: str) -> str:
text = text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
text = re.sub(r'__(.+?)__', r'<b>\1</b>', text)
text = re.sub(r'\*(.+?)\*', r'<i>\1</i>', text)
text = re.sub(r'_(.+?)_', r'<i>\1</i>', text)
text = re.sub(r'`(.+?)`', r'<font name="Courier">\1</font>', text)
return text
story = []
lines = content.split("\n")
in_code = False
code_buf = []
for line in lines:
if line.startswith("```"):
if not in_code:
in_code = True
code_buf = []
else:
in_code = False
try:
story.append(Paragraph(
"<br/>".join(l.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
for l in code_buf),
code_style
))
except Exception:
pass
continue
if in_code:
code_buf.append(line)
continue
if re.match(r'^[-*_]{3,}$', line.strip()):
story.append(HRFlowable(width="100%", thickness=0.5,
color=colors.grey, spaceAfter=6))
continue
try:
if line.startswith("### "):
story.append(Paragraph(md_to_para(line[4:]), h3))
elif line.startswith("## "):
story.append(Paragraph(md_to_para(line[3:]), h2))
elif line.startswith("# "):
story.append(Paragraph(md_to_para(line[2:]), h1))
elif re.match(r'^[-*+] ', line):
story.append(Paragraph("" + md_to_para(line[2:]), bullet_style))
elif re.match(r'^\d+\. ', line):
story.append(Paragraph(md_to_para(line), bullet_style))
elif line.strip() == "":
story.append(Spacer(1, 6))
else:
story.append(Paragraph(md_to_para(line), normal))
except Exception:
try:
story.append(Paragraph(line[:300], normal))
except Exception:
pass
doc.build(story)
return buf.getvalue()
async def generate_diff_summary(
topic: str,
new_urls: set,
old_urls: set,
new_chunks: list,
session_id: int,
db,
) -> str | None:
from src.config import settings
import structlog
diff_logger = structlog.get_logger()
added_urls = new_urls - old_urls
pct_new = len(added_urls) / max(len(new_urls), 1)
diff_logger.info("Diff analysis", topic=topic,
new_urls=len(new_urls), old_urls=len(old_urls),
added=len(added_urls), pct_new=round(pct_new, 2))
if pct_new < 0.20 and len(added_urls) < 5:
diff_logger.info("Diff: no significant new sources", topic=topic)
return None
if not new_chunks:
return None
context = "\n\n---\n\n".join(
f"[{c.get('source_type', 'web').upper()}] {c.get('title', '')}\n{c['content'][:400]}"
for c in new_chunks[:20]
)
if not settings.anthropic_api_key:
return (
f"📊 *Novedades detectadas sobre {topic}*\n\n"
f"{len(added_urls)} fuentes nuevas encontradas\n"
f"{len(new_chunks)} chunks de contenido procesados\n\n"
f"Usa /generate report para ver el análisis completo."
)
try:
import anthropic
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
prompt = (
f'Analiza el siguiente material de investigación sobre "{topic}" '
f'y genera un resumen BREVE (máximo 300 palabras) de las novedades '
f'más importantes encontradas. Escribe en español.\n\n'
f'Si el contenido es muy similar a investigaciones anteriores o no '
f'contiene información genuinamente nueva, responde SOLO con: '
f'"SIN_NOVEDADES"\n\n'
f'Material nuevo:\n{context}'
)
msg = await client.messages.create(
model=settings.claude_model,
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
)
summary = msg.content[0].text.strip()
if summary == "SIN_NOVEDADES":
diff_logger.info("Diff: Claude found no new information", topic=topic)
return None
try:
await db.log_api_call(session_id, "diff", settings.claude_model,
msg.usage.input_tokens, msg.usage.output_tokens)
except Exception:
pass
return f"🔔 *Novedades — {topic}*\n\n{summary}\n\nUsa /generate para report completo."
except Exception as e:
diff_logger.warning("Diff summary generation failed", error=str(e))
return (
f"📊 *Actualización — {topic}*\n\n"
f"{len(added_urls)} fuentes nuevas\n"
f"Usa /generate report para ver el análisis completo."
)
async def generate_comparison(
topic_a: str,
topic_b: str,
context_a: str,
context_b: str,
session_id_a: int,
db,
) -> str:
from src.config import settings
import structlog
cmp_logger = structlog.get_logger()
if not settings.anthropic_api_key:
raise ValueError("Claude API key required for comparison")
def _truncate(text: str, max_words: int = 3000) -> str:
words = text.split()
if len(words) > max_words:
return " ".join(words[:max_words]) + "\n\n[... contenido adicional truncado ...]"
return text
context_a = _truncate(context_a)
context_b = _truncate(context_b)
prompt = (
f'Eres un analista experto. Compara en profundidad estos dos temas:\n'
f'TEMA A: "{topic_a}"\n'
f'TEMA B: "{topic_b}"\n\n'
f'Escribe el análisis en español con esta estructura:\n\n'
f'## Resumen comparativo\n'
f'(2-3 párrafos con las diferencias y similitudes más importantes)\n\n'
f'## {topic_a}\n'
f'(Puntos clave únicos de este tema)\n\n'
f'## {topic_b}\n'
f'(Puntos clave únicos de este tema)\n\n'
f'## Similitudes\n'
f'(Qué tienen en común)\n\n'
f'## Diferencias clave\n'
f'(Tabla markdown o lista de las diferencias más relevantes)\n\n'
f'## Conclusión\n'
f'(Cuál es mejor/más relevante según el contexto, o qué conclusión se extrae)\n\n'
f'---\n'
f'MATERIAL DE INVESTIGACIÓN — {topic_a}:\n{context_a}\n\n'
f'---\n'
f'MATERIAL DE INVESTIGACIÓN — {topic_b}:\n{context_b}\n'
)
try:
import anthropic
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
msg = await client.messages.create(
model=settings.claude_model,
max_tokens=8192,
messages=[{"role": "user", "content": prompt}]
)
try:
await db.log_api_call(
session_id_a, "comparison", settings.claude_model,
msg.usage.input_tokens, msg.usage.output_tokens
)
except Exception:
pass
return msg.content[0].text.strip()
except Exception as e:
cmp_logger.error("Comparison generation failed", error=str(e))
raise
+70 -6
View File
@@ -129,6 +129,8 @@ class ContentProcessor:
scraped = [s for s in sources if s["status"] == "scraped"]
logger.info("Processing sources", total=len(scraped))
scraped = await self._dedup_sources(session_id, scraped)
logger.info("After dedup", unique=len(scraped))
total_chunks = 0
total_words = 0
@@ -161,6 +163,56 @@ class ContentProcessor:
return {"total_chunks": total_chunks, "total_words": total_words}
async def _dedup_sources(self, session_id: int,
scraped: list[dict]) -> list[dict]:
try:
import hashlib
seen_hashes: set = set()
seen_prefixes: list = []
unique: list = []
duplicates = 0
for source in scraped:
content = await self.db.get_source_content(source["id"])
if not content:
unique.append(source)
continue
content_hash = hashlib.md5(content[:2000].encode()).hexdigest()
if content_hash in seen_hashes:
duplicates += 1
await self.db.update_source(source["id"], status="skipped")
continue
seen_hashes.add(content_hash)
prefix = content[:300].strip().lower()
prefix_words = set(prefix.split())
is_dup = False
if len(prefix_words) >= 10:
for seen_prefix_words in seen_prefixes:
intersection = len(prefix_words & seen_prefix_words)
union = len(prefix_words | seen_prefix_words)
if intersection / max(union, 1) > 0.85:
is_dup = True
break
if is_dup:
duplicates += 1
await self.db.update_source(source["id"], status="skipped")
continue
seen_prefixes.append(prefix_words)
unique.append(source)
if duplicates > 0:
logger.info("Dedup complete", session_id=session_id,
original=len(scraped), duplicates=duplicates,
unique=len(unique))
return unique
except Exception as e:
logger.warning("Dedup failed, processing all sources", error=str(e))
return scraped
async def _process_source(self, session_id: int, topic: str, source: dict) -> int:
"""Chunk, score, embed and store a single source. Returns chunk count."""
source_id = source["id"]
@@ -182,7 +234,7 @@ class ContentProcessor:
if words < 30:
continue
quality = await self._score_quality(chunk, topic)
quality = await self._score_quality(chunk, topic, session_id)
if quality < settings.quality_threshold:
filtered_quality += 1
logger.debug("Chunk filtered by quality", source_id=source_id,
@@ -215,16 +267,20 @@ class ContentProcessor:
logger.info("Source processed", source_id=source_id, stored=stored)
return stored
async def _score_quality(self, chunk: str, topic: str) -> float:
async def _score_quality(self, chunk: str, topic: str,
session_id: int | None = None) -> float:
"""Score 0-1 relevance to topic. Uses Claude Haiku if API key set, else Ollama."""
if settings.anthropic_api_key:
return await self._score_with_claude(chunk, topic)
return await self._score_with_claude(chunk, topic, session_id)
return await self._score_with_ollama(chunk, topic)
async def _score_with_claude(self, chunk: str, topic: str) -> float:
async def _score_with_claude(self, chunk: str, topic: str,
session_id: int | None = None) -> float:
import anthropic
prompt = (
f'Rate 0-10 how relevant this text is to the topic "{topic}". '
f'Be generous — if the text is tangentially related, score 4+. '
f'Only score below 3 if completely unrelated. '
f'Reply with only a number.\n\nText:\n{chunk[:500]}'
)
try:
@@ -234,6 +290,14 @@ class ContentProcessor:
max_tokens=10,
messages=[{"role": "user", "content": prompt}]
)
if session_id is not None:
try:
await self.db.log_api_call(
session_id, "scoring", settings.claude_model,
msg.usage.input_tokens, msg.usage.output_tokens
)
except Exception as log_err:
logger.warning("Failed to log API usage", error=str(log_err))
response = msg.content[0].text.strip()
numbers = re.findall(r'\b(\d+(?:\.\d+)?)\b', response)
if numbers:
@@ -241,7 +305,7 @@ class ContentProcessor:
normalized = min(1.0, score / 10.0)
logger.debug("Claude relevance score", raw=score, normalized=round(normalized, 2))
return normalized
return 0.6
return 0.5
except Exception as e:
logger.warning("Claude scoring failed, falling back to Ollama", error=str(e))
return await self._score_with_ollama(chunk, topic)
@@ -275,7 +339,7 @@ class ContentProcessor:
query_embedding = await self.ollama.embed(query)
# Get top quality chunks
chunks = await self.db.get_top_chunks(session_id, limit=100)
chunks = await self.db.get_top_chunks(session_id, limit=300)
if query_embedding and chunks:
# Rank by embedding similarity
+170 -35
View File
@@ -3,6 +3,7 @@ ResearchOwl Exhaustive Scraper
Core engine: discovers, expands, and evaluates sources recursively
"""
import asyncio
import random
import re
import time
from typing import Optional
@@ -54,6 +55,22 @@ REDDIT_RE = re.compile(r"reddit\.com/(r/\w+/comments/\w+)")
WIKIPEDIA_RE = re.compile(r"wikipedia\.org/wiki/(.+)")
async def fetch_with_retry(fetch_fn, source_name: str, max_retries: int = 3):
last_exc = None
for attempt in range(max_retries):
try:
return await fetch_fn()
except Exception as e:
last_exc = e
if attempt < max_retries - 1:
wait = 2 ** attempt + random.random()
logger.debug("fetch_with_retry backoff", source=source_name[:60],
attempt=attempt + 1, wait=round(wait, 1), error=str(e))
await asyncio.sleep(wait)
logger.warning("fetch_with_retry exhausted", source=source_name[:60], error=str(last_exc))
raise last_exc
def detect_source_type(url: str) -> str:
if YOUTUBE_RE.search(url):
return "youtube"
@@ -140,16 +157,15 @@ class ExhaustiveScraper:
"""Initial broad search across multiple sources"""
logger.info("Seeding research", topic=self.topic)
tasks = [
self._seed_duckduckgo(),
self._seed_search(),
self._seed_wikipedia(),
self._seed_reddit(),
self._seed_youtube(),
]
await asyncio.gather(*tasks, return_exceptions=True)
async def _seed_duckduckgo(self):
"""Multiple DDG queries — fresh DDGS() per query to avoid cascading ratelimits"""
queries = [
async def _generate_ddg_queries(self) -> list[str]:
fallback = [
self.topic,
f"{self.topic} history facts",
f"{self.topic} evidence analysis",
@@ -159,26 +175,113 @@ class ExhaustiveScraper:
f"{self.topic} documentary",
f"{self.topic} research study",
]
if not settings.anthropic_api_key:
return fallback
try:
import anthropic
logger.info("Generating DDG queries with Claude", topic=self.topic)
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
prompt = (
f'Generate exactly 8 DuckDuckGo search queries to research: "{self.topic}"\n\n'
f'Rules:\n'
f'- Each query must be specific and distinct — no generic templates\n'
f'- Cover different angles: facts, history, official sources, criticism, '
f'technical details, recent developments, expert opinions, primary sources\n'
f'- Use the most specific terminology for this topic\n'
f'- Include the topic language naturally (if topic is in Spanish, '
f'mix Spanish and English queries for broader coverage)\n'
f'- Output ONLY the 8 queries, one per line, no numbering, '
f'no explanations, no markdown\n'
)
msg = await client.messages.create(
model=settings.claude_model,
max_tokens=300,
messages=[{"role": "user", "content": prompt}]
)
raw = msg.content[0].text.strip()
queries = [q.strip() for q in raw.split('\n') if q.strip()]
if self.topic not in queries:
queries = [self.topic] + queries[:7]
queries = queries[:8]
logger.info("DDG queries generated by Claude", queries=queries)
return queries
except Exception as e:
logger.warning("Claude query generation failed, using fallback",
error=str(e), error_type=type(e).__name__)
return fallback
async def _search_searxng(self, query: str) -> list[dict]:
"""Busca en SearXNG y retorna lista de {href, title}. Retorna [] si no disponible."""
import aiohttp
searxng_url = "http://searxng-svc.researchowl.svc.cluster.local:8080/search"
params = {
"q": query,
"format": "json",
"engines": "duckduckgo,google,bing,brave",
"language": "all",
}
headers = {
"Accept": "application/json",
"X-Forwarded-For": "127.0.0.1",
"User-Agent": "ResearchOwl/1.0",
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
searxng_url,
params=params,
headers=headers,
timeout=aiohttp.ClientTimeout(total=15)
) as resp:
if resp.status == 200:
data = await resp.json()
results = data.get("results", [])
logger.info("SearXNG query ok", query=query, results=len(results))
return [
{"href": r.get("url", ""), "title": r.get("title", "")}
for r in results
if r.get("url")
]
else:
logger.warning("SearXNG non-200", status=resp.status, query=query)
return []
except Exception as e:
logger.warning("SearXNG failed", query=query, error=str(e))
return []
async def _seed_search(self):
"""SearXNG primary + DDG fallback per query"""
queries = await self._generate_ddg_queries()
for query in queries:
if self._stop:
break
try:
# Fresh instance per query — a ratelimit on one won't poison the rest
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=settings.max_pages_per_search))
for r in results:
url = normalize_url(r.get("href", ""))
if url and not is_blacklisted(url):
await self.db.add_source(
self.session_id, url,
detect_source_type(url),
depth=0,
title=r.get("title")
)
logger.info("DDG query ok", query=query, results=len(results))
except Exception as e:
logger.warning("DDG query failed", query=query, error=str(e))
await asyncio.sleep(settings.request_delay * 2)
results = await self._search_searxng(query)
if not results:
logger.info("SearXNG vacío, usando DDG", query=query)
try:
with DDGS() as ddgs:
ddg_results = list(ddgs.text(
query,
max_results=settings.max_pages_per_search
))
results = ddg_results
logger.info("DDG fallback ok", query=query, results=len(results))
except Exception as e:
logger.warning("DDG fallback failed", query=query, error=str(e))
results = []
for r in results:
url = normalize_url(r.get("href", ""))
if url and not is_blacklisted(url):
await self.db.add_source(
self.session_id, url,
detect_source_type(url),
depth=0,
title=r.get("title")
)
await asyncio.sleep(random.uniform(1, 3))
async def _seed_wikipedia(self):
"""Search Wikipedia API for correct article URLs.
@@ -290,12 +393,7 @@ class ExhaustiveScraper:
)
if self.progress_callback:
await self.progress_callback(
iteration=self.iteration,
total=self.total_sources,
new_this_round=new_sources,
stats=stats
)
await self.progress_callback(self.iteration, self.total_sources)
# Saturation check: if we found very few new URLs, we're done
if new_sources < 3 and self.iteration > 2:
@@ -316,10 +414,31 @@ class ExhaustiveScraper:
source_id = source["id"]
try:
try:
cached = await self.db.get_cached_content(url)
except Exception as cache_err:
logger.warning("Cache lookup failed", url=url, error=str(cache_err))
cached = None
if cached:
logger.debug("Cache hit", url=url)
await self.db.save_source_content(source_id, cached)
await self.db.update_source(
source_id,
status="scraped",
scraped_at=time.time(),
word_count=len(cached.split()),
)
return 0
if source_type == "youtube":
content, title = await self._extract_youtube(url)
content, title = await fetch_with_retry(
lambda: self._extract_youtube(url), url
)
elif source_type == "wikipedia":
content, title, new_urls = await self._extract_wikipedia(url)
content, title, new_urls = await fetch_with_retry(
lambda: self._extract_wikipedia(url), url
)
added = 0
for new_url in (new_urls or []):
if self._url_is_relevant(new_url):
@@ -331,13 +450,18 @@ class ExhaustiveScraper:
await self._mark_scraped(source_id, content, title, url)
return added
elif source_type == "reddit":
content, title = await self._extract_reddit(url)
# Small delay between Reddit requests to avoid rate limiting
content, title = await fetch_with_retry(
lambda: self._extract_reddit(url), url
)
await asyncio.sleep(settings.request_delay)
elif source_type == "pdf":
content, title = await self._extract_pdf(url)
content, title = await fetch_with_retry(
lambda: self._extract_pdf(url), url
)
else:
content, title, new_urls = await self._extract_web(url, source["depth"])
content, title, new_urls = await fetch_with_retry(
lambda: self._extract_web(url, source["depth"]), url
)
added = 0
for new_url in (new_urls or []):
if self._url_is_relevant(new_url):
@@ -469,12 +593,23 @@ class ExhaustiveScraper:
return None, None
video_id = match.group(1)
try:
transcript_list = YouTubeTranscriptApi.get_transcript(
loop = asyncio.get_event_loop()
def _fetch():
return YouTubeTranscriptApi.get_transcript(
video_id, languages=["en", "es", "en-US", "en-GB"]
)
try:
transcript_list = await asyncio.wait_for(
loop.run_in_executor(None, _fetch),
timeout=30.0
)
text = " ".join(t["text"] for t in transcript_list)
return text, f"YouTube: {video_id}"
except asyncio.TimeoutError:
logger.warning("YouTube transcript timed out", video_id=video_id)
return None, None
except NoTranscriptFound:
return None, None
except Exception as e: