Compare commits
44 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7060193cdc | |||
| 4bfd27db2c | |||
| 30509dab4a | |||
| f6ad57e3c7 | |||
| 2a9962a1bd | |||
| 0e52b404bf | |||
| 3a9ab2848a | |||
| d3d22a1605 | |||
| 425639f423 | |||
| 7f3c2d0b49 | |||
| f577ac4712 | |||
| 747b9605c0 | |||
| caf763c23e | |||
| bdea12e6f2 | |||
| a6a90d3598 | |||
| 36984657a8 | |||
| 83eb2359be | |||
| 94d209dd8a | |||
| 7a156e2af1 | |||
| 279475a175 | |||
| 82e614e285 | |||
| aa83cfacbd | |||
| e8034f3f37 | |||
| c2bb301103 | |||
| 53cf7a04a8 | |||
| f4e167f3b6 | |||
| ba2b366534 | |||
| 4bef9d2d17 | |||
| 7a012c2c28 | |||
| 6aaa85a1f8 | |||
| e0a42f0b91 | |||
| 4c7f5b521b | |||
| c33bb5337d | |||
| 566f685578 | |||
| 8c259b2b2e | |||
| a47d7b26ca | |||
| e5b77ad72d | |||
| 0d8aee63be | |||
| b5518ac95a | |||
| b33ae202b8 | |||
| 65917518ce | |||
| a681627d2e | |||
| 7704f071d6 | |||
| e66d728d68 |
@@ -27,6 +27,9 @@ jobs:
|
||||
- name: Log in to registry
|
||||
run: echo "${{ secrets.CI_TOKEN }}" | docker login gitea.gitea.svc.cluster.local:3000 -u chemavx --password-stdin
|
||||
|
||||
- name: Clean previous buildx builder
|
||||
run: docker buildx rm ci-builder 2>/dev/null || true
|
||||
|
||||
- name: Create buildx builder
|
||||
run: |
|
||||
cat > /tmp/buildkitd.toml << 'EOF'
|
||||
|
||||
@@ -6,6 +6,7 @@ WORKDIR /app
|
||||
RUN apt-get update && apt-get install -y \
|
||||
gcc g++ \
|
||||
libxml2-dev libxslt-dev \
|
||||
libfreetype6-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY requirements.txt .
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
||||
}
|
||||
+10
-6
@@ -8,14 +8,14 @@ aiohttp==3.10.0
|
||||
# Scraping
|
||||
beautifulsoup4==4.12.3
|
||||
lxml==5.2.2
|
||||
trafilatura==1.12.0
|
||||
youtube-transcript-api==0.6.2
|
||||
pdfplumber==0.11.3
|
||||
feedparser==6.0.11
|
||||
trafilatura==1.12.2
|
||||
youtube-transcript-api==0.6.3
|
||||
pdfplumber==0.11.9
|
||||
feedparser==6.0.12
|
||||
duckduckgo-search==6.2.6
|
||||
|
||||
# Storage & Embeddings
|
||||
sqlite-vec==0.1.6
|
||||
sqlite-vec==0.1.9
|
||||
aiosqlite==0.20.0
|
||||
|
||||
# Processing
|
||||
@@ -26,9 +26,13 @@ scikit-learn==1.5.1
|
||||
# Claude API (scoring)
|
||||
anthropic>=0.40.0
|
||||
|
||||
# PDF export
|
||||
markdown==3.7
|
||||
reportlab==4.2.5
|
||||
|
||||
# Utilities
|
||||
pydantic==2.8.0
|
||||
pydantic-settings==2.4.0
|
||||
pydantic-settings==2.14.1
|
||||
tenacity==9.0.0
|
||||
structlog==24.4.0
|
||||
python-dotenv==1.0.1
|
||||
|
||||
+759
-92
@@ -4,6 +4,7 @@ Main user interface — all commands handled here
|
||||
"""
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
@@ -33,17 +34,29 @@ def is_authorized(user_id: int) -> bool:
|
||||
return not allowed or user_id in allowed
|
||||
|
||||
|
||||
def fmt_progress(iteration: int, total: int, new: int, stats: dict) -> str:
|
||||
scraped = stats.get("scraped") or 0
|
||||
failed = stats.get("failed") or 0
|
||||
pending = stats.get("pending") or 0
|
||||
skipped = stats.get("skipped") or 0
|
||||
return (
|
||||
f"🔄 *Iteration {iteration}*\n"
|
||||
f"📚 Sources found: `{total}`\n"
|
||||
f"✅ Scraped: `{scraped}` | ⏭️ Skipped: `{skipped}` | ❌ Failed: `{failed}` | ⏳ Pending: `{pending}`\n"
|
||||
f"🆕 New URLs this round: `{new}`"
|
||||
)
|
||||
class ProgressReporter:
|
||||
def __init__(self, reply_target: Message = None, *, bot=None, chat_id: int = None):
|
||||
self._reply_target = reply_target
|
||||
self._bot = bot
|
||||
self._chat_id = chat_id
|
||||
self._msg: Optional[Message] = None
|
||||
|
||||
async def start(self, text: str):
|
||||
if self._reply_target is not None:
|
||||
self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN)
|
||||
elif self._bot is not None and self._chat_id is not None:
|
||||
self._msg = await self._bot.send_message(self._chat_id, text, parse_mode=ParseMode.MARKDOWN)
|
||||
|
||||
async def update(self, text: str):
|
||||
if not self._msg:
|
||||
return
|
||||
try:
|
||||
await self._msg.edit_text(text, parse_mode=ParseMode.MARKDOWN)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def done(self, text: str):
|
||||
await self.update(text)
|
||||
|
||||
|
||||
async def send_chunked(message: Message, text: str, parse_mode=None):
|
||||
@@ -56,6 +69,72 @@ async def send_chunked(message: Message, text: str, parse_mode=None):
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
|
||||
# ─── Shared research logic ────────────────────────────────────────────────────
|
||||
|
||||
async def run_scheduled_research(bot, chat_id: int, topic: str,
|
||||
session_id: int, db: ResearchDB,
|
||||
progress_message=None,
|
||||
silent_completion: bool = False):
|
||||
if progress_message is not None:
|
||||
reporter = ProgressReporter(progress_message)
|
||||
else:
|
||||
reporter = ProgressReporter(bot=bot, chat_id=chat_id)
|
||||
|
||||
try:
|
||||
await reporter.start(f"🔍 Iniciando scraping de `{topic}`…")
|
||||
|
||||
async def on_progress(iter_num, total_sources):
|
||||
await reporter.update(
|
||||
f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas"
|
||||
)
|
||||
|
||||
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
|
||||
final_stats = await scraper.run()
|
||||
|
||||
await db.update_session(session_id, status=ResearchStatus.SATURATED)
|
||||
scraped = final_stats.get("scraped", 0)
|
||||
|
||||
await reporter.update(f"⚡ Procesando `{scraped}` fuentes…")
|
||||
|
||||
ollama = OllamaClient()
|
||||
if await ollama.is_available():
|
||||
processor = ContentProcessor(db, ollama)
|
||||
|
||||
async def proc_progress(total_chunks, total_words):
|
||||
await reporter.update(
|
||||
f"⚡ Scoring chunks… (`{total_chunks}` procesados)"
|
||||
)
|
||||
|
||||
await processor.process_session(session_id, topic, proc_progress)
|
||||
chunk_count = await db.get_chunks_count(session_id)
|
||||
if silent_completion:
|
||||
await reporter.done(
|
||||
f"🔍 Investigación completada — analizando novedades…"
|
||||
)
|
||||
else:
|
||||
await reporter.done(
|
||||
f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate <tipo>"
|
||||
)
|
||||
else:
|
||||
await reporter.done(
|
||||
f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n"
|
||||
f"Usa /generate para generar contenido."
|
||||
)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
await db.update_session(session_id, status=ResearchStatus.FINISHED)
|
||||
try:
|
||||
await reporter.done("🛑 Investigación cancelada.")
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error("Research task failed", error=str(e))
|
||||
try:
|
||||
await reporter.done(f"❌ Error: {str(e)[:200]}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ─── Commands ─────────────────────────────────────────────────────────────────
|
||||
|
||||
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
@@ -68,9 +147,18 @@ async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
"`/status` — Check current research progress\n"
|
||||
"`/finish` — Stop research and proceed to generation\n"
|
||||
"`/process` — Manually trigger chunk processing\n"
|
||||
"`/generate <type>` — Generate output (podcast|blog|report|thread)\n"
|
||||
"`/generate <type>` — Generate output\n"
|
||||
" Tipos: podcast|blog|report|thread\n"
|
||||
" Extended: podcast_extended|blog_extended|report_extended\n"
|
||||
"`/sources` — List all sources found\n"
|
||||
"`/outputs` — List generated outputs\n"
|
||||
"`/export` — Exportar último output como PDF\n"
|
||||
"`/publish` — Publicar último output en Ghost como borrador\n"
|
||||
"`/compare <tema1> vs <tema2>` — Análisis comparativo\n"
|
||||
"`/costs` — Show API usage costs\n"
|
||||
"`/watch <topic> [h]` — Schedule periodic research\n"
|
||||
"`/unwatch <topic>` — Remove a watch\n"
|
||||
"`/watches` — List your watched topics\n"
|
||||
"`/cancel` — Cancel current research\n"
|
||||
"`/help` — Show this message",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
@@ -91,88 +179,22 @@ async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
)
|
||||
return
|
||||
|
||||
# Check for existing active research
|
||||
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
||||
await update.message.reply_text(
|
||||
"⚠️ Research already in progress. Use /status or /finish first."
|
||||
)
|
||||
return
|
||||
|
||||
msg = await update.message.reply_text(
|
||||
f"🦉 *ResearchOwl* starting research on:\n`{topic}`\n\n"
|
||||
f"🌱 Seeding sources from:\n"
|
||||
f"• DuckDuckGo (8 queries)\n"
|
||||
f"• Wikipedia + internal links\n"
|
||||
f"• Reddit top posts\n"
|
||||
f"• YouTube transcripts\n\n"
|
||||
f"This will run exhaustively until saturation. Use /finish to stop early.",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
|
||||
async def run_research():
|
||||
db_conn = await get_db()
|
||||
db = ResearchDB(db_conn)
|
||||
try:
|
||||
session_id = await db.create_session(topic, chat_id)
|
||||
_active_sessions[chat_id] = session_id
|
||||
|
||||
progress_msg = msg
|
||||
iteration_count = [0]
|
||||
|
||||
async def on_progress(iteration, total, new_this_round, stats):
|
||||
iteration_count[0] = iteration
|
||||
text = fmt_progress(iteration, total, new_this_round, stats)
|
||||
try:
|
||||
await progress_msg.edit_text(text, parse_mode=ParseMode.MARKDOWN)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
|
||||
final_stats = await scraper.run()
|
||||
|
||||
await db.update_session(session_id, status=ResearchStatus.SATURATED)
|
||||
|
||||
scraped = final_stats.get("scraped", 0)
|
||||
await update.message.reply_text(
|
||||
f"✅ *Research complete!*\n\n"
|
||||
f"📊 Results:\n"
|
||||
f"• Sources found & scraped: `{scraped}`\n"
|
||||
f"• Iterations: `{iteration_count[0]}`\n\n"
|
||||
f"Now processing content with Ollama...\n"
|
||||
f"Use `/generate podcast|blog|report|thread` when ready.",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
await run_scheduled_research(
|
||||
ctx.bot, chat_id, topic, session_id, db,
|
||||
progress_message=update.message
|
||||
)
|
||||
|
||||
# Auto-process after scraping
|
||||
ollama = OllamaClient()
|
||||
if await ollama.is_available():
|
||||
processor = ContentProcessor(db, ollama)
|
||||
|
||||
async def proc_progress(total_chunks, total_words):
|
||||
await update.message.reply_text(
|
||||
f"🧠 *Processing complete!*\n"
|
||||
f"• Chunks stored: `{total_chunks}`\n"
|
||||
f"• Words researched: `{total_words:,}`\n\n"
|
||||
f"Ready! Use `/generate podcast|blog|report|thread`",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
|
||||
await processor.process_session(session_id, topic, proc_progress)
|
||||
else:
|
||||
await update.message.reply_text(
|
||||
"⚠️ Ollama not reachable — skipping processing.\n"
|
||||
"You can still use `/generate` (will use raw content)."
|
||||
)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
await db.update_session(
|
||||
_active_sessions.get(chat_id, 0),
|
||||
status=ResearchStatus.FINISHED
|
||||
)
|
||||
await update.message.reply_text("🛑 Research cancelled.")
|
||||
except Exception as e:
|
||||
logger.error("Research task failed", error=str(e))
|
||||
await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}")
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
@@ -255,6 +277,7 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
output_arg = ctx.args[0].lower() if ctx.args else ""
|
||||
lang = "en" if len(ctx.args) > 1 and ctx.args[1].lower() == "en" else "es"
|
||||
|
||||
type_map = {
|
||||
"podcast": OutputType.PODCAST,
|
||||
@@ -263,6 +286,10 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
"thread": OutputType.THREAD,
|
||||
"hilo": OutputType.THREAD,
|
||||
"informe": OutputType.REPORT,
|
||||
"report_extended": OutputType.REPORT_EXTENDED,
|
||||
"blog_extended": OutputType.BLOG_EXTENDED,
|
||||
"podcast_extended": OutputType.PODCAST_EXTENDED,
|
||||
"informe_extended": OutputType.REPORT_EXTENDED,
|
||||
}
|
||||
|
||||
if output_arg not in type_map:
|
||||
@@ -279,12 +306,19 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
db = ResearchDB(db_conn)
|
||||
|
||||
try:
|
||||
# Find last session for this chat
|
||||
cursor = await db_conn.execute(
|
||||
"""SELECT * FROM research_sessions WHERE telegram_chat_id = ?
|
||||
ORDER BY created_at DESC LIMIT 1""",
|
||||
(chat_id,)
|
||||
)
|
||||
# Usa la sesión activa si existe, si no la más reciente
|
||||
session_id = _active_sessions.get(chat_id)
|
||||
if session_id:
|
||||
cursor = await db_conn.execute(
|
||||
"SELECT * FROM research_sessions WHERE id = ?",
|
||||
(session_id,)
|
||||
)
|
||||
else:
|
||||
cursor = await db_conn.execute(
|
||||
"""SELECT * FROM research_sessions WHERE telegram_chat_id = ?
|
||||
ORDER BY created_at DESC LIMIT 1""",
|
||||
(chat_id,)
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
if not row:
|
||||
await update.message.reply_text("No research sessions found. Start with /research <topic>")
|
||||
@@ -293,9 +327,11 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
session = dict(row)
|
||||
session_id = session["id"]
|
||||
|
||||
backend = "Claude Haiku" if settings.anthropic_api_key else f"Ollama ({settings.ollama_model})"
|
||||
lang_label = " (EN)" if lang == "en" else ""
|
||||
msg = await update.message.reply_text(
|
||||
f"⚙️ Generating *{output_type}* for: `{session['topic']}`\n"
|
||||
f"Using Ollama ({settings.ollama_model})...\n"
|
||||
f"⚙️ Generating *{output_type}{lang_label}* for: `{session['topic']}`\n"
|
||||
f"Using {backend}...\n"
|
||||
f"This may take 2-5 minutes ☕",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
@@ -310,18 +346,26 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
processor = ContentProcessor(db, ollama)
|
||||
generator = OutputGenerator(db, ollama, processor)
|
||||
|
||||
output = await generator.generate(session_id, output_type, gen_progress)
|
||||
output = await generator.generate(session_id, output_type, gen_progress, lang=lang)
|
||||
|
||||
# Send as file if very long
|
||||
if len(output) > 8000:
|
||||
import tempfile
|
||||
import re as _re
|
||||
ext_map = {
|
||||
OutputType.PODCAST: "script.md",
|
||||
OutputType.BLOG: "post.md",
|
||||
OutputType.REPORT: "report.md",
|
||||
OutputType.THREAD: "thread.txt",
|
||||
OutputType.REPORT_EXTENDED: "report_extended.md",
|
||||
OutputType.BLOG_EXTENDED: "blog_extended.md",
|
||||
OutputType.PODCAST_EXTENDED: "script_extended.md",
|
||||
}
|
||||
filename = f"researchowl_{session['topic'][:30].replace(' ', '_')}_{ext_map[output_type]}"
|
||||
# Use the topic from the output header (written at generation time)
|
||||
# instead of the pre-fetched session dict which may be stale.
|
||||
_m = _re.search(r'^Topic:\s*(.+)$', output[:500], _re.MULTILINE)
|
||||
_topic = _m.group(1).strip() if _m else session["topic"]
|
||||
filename = f"researchowl_{_topic[:30].replace(' ', '_')}_{ext_map[output_type]}"
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
|
||||
f.write(output)
|
||||
@@ -339,6 +383,18 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
else:
|
||||
await send_chunked(update.message, output)
|
||||
|
||||
try:
|
||||
stats = await db.get_usage_stats(session_id)
|
||||
total_cost = sum(s.get("total_cost", 0) for s in stats)
|
||||
if total_cost > settings.cost_alert_threshold:
|
||||
await update.message.reply_text(
|
||||
f"⚠️ Coste acumulado de esta sesión: `${total_cost:.4f}`"
|
||||
f" (umbral: `${settings.cost_alert_threshold:.2f}`)",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Generate failed", error=str(e))
|
||||
await update.message.reply_text(f"❌ Generation failed: {str(e)[:200]}")
|
||||
@@ -429,6 +485,169 @@ async def cmd_outputs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
async def cmd_costs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update.effective_user.id):
|
||||
return
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
db_conn = await get_db()
|
||||
db = ResearchDB(db_conn)
|
||||
|
||||
try:
|
||||
cursor = await db_conn.execute(
|
||||
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
||||
(chat_id,)
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
if not row:
|
||||
await update.message.reply_text("No sessions found.")
|
||||
return
|
||||
|
||||
session_id = row["id"]
|
||||
topic = row["topic"]
|
||||
|
||||
by_type = {r["call_type"]: r for r in await db.get_usage_stats(session_id)}
|
||||
totals = await db.get_total_usage_stats()
|
||||
|
||||
lines = [f"📊 *Costes ResearchOwl*\n"]
|
||||
lines.append(f"Última sesión (`{topic}`):")
|
||||
|
||||
session_total = 0.0
|
||||
for call_type, label in [("scoring", "Scoring"), ("generation", "Generación")]:
|
||||
row_data = by_type.get(call_type)
|
||||
if row_data:
|
||||
calls = row_data["calls"]
|
||||
tokens = row_data["total_tokens"]
|
||||
cost = row_data["total_cost"]
|
||||
session_total += cost
|
||||
lines.append(f" {label}: {calls} llamadas · {tokens:,} tokens · ${cost:.4f}")
|
||||
else:
|
||||
lines.append(f" {label}: —")
|
||||
|
||||
lines.append(f" Total: ${session_total:.4f}")
|
||||
lines.append("")
|
||||
lines.append("Acumulado total:")
|
||||
acc_cost = totals.get("total_cost") or 0.0
|
||||
acc_sessions = totals.get("sessions") or 0
|
||||
lines.append(f" ${acc_cost:.4f} ({acc_sessions} sesiones)")
|
||||
|
||||
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN)
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
async def cmd_watch(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update.effective_user.id):
|
||||
return
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
args = ctx.args or []
|
||||
|
||||
if not args:
|
||||
await update.message.reply_text(
|
||||
"❌ Uso: `/watch <tema> [horas]`\nEjemplo: `/watch Incidente Roswell 24`",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
|
||||
interval_hours = 24
|
||||
if args[-1].isdigit():
|
||||
interval_hours = int(args[-1])
|
||||
topic = " ".join(args[:-1]).strip()
|
||||
else:
|
||||
topic = " ".join(args).strip()
|
||||
|
||||
if not topic:
|
||||
await update.message.reply_text("❌ Debes especificar un tema.")
|
||||
return
|
||||
|
||||
if not (1 <= interval_hours <= 168):
|
||||
await update.message.reply_text(
|
||||
"❌ El intervalo debe estar entre 1 y 168 horas (1 semana)."
|
||||
)
|
||||
return
|
||||
|
||||
db_conn = await get_db()
|
||||
db = ResearchDB(db_conn)
|
||||
try:
|
||||
try:
|
||||
await db.add_watch(topic, chat_id, interval_hours)
|
||||
await update.message.reply_text(
|
||||
f"👁 Watching: `{topic}` — cada {interval_hours}h\n"
|
||||
f"Primera ejecución en ~{interval_hours}h.\n"
|
||||
f"Usa /watches para ver todos tus temas.",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
except Exception as e:
|
||||
if "UNIQUE" in str(e):
|
||||
await update.message.reply_text(
|
||||
f"Ya estás watching `{topic}`", parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
async def cmd_unwatch(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update.effective_user.id):
|
||||
return
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
topic = " ".join(ctx.args).strip() if ctx.args else ""
|
||||
|
||||
if not topic:
|
||||
await update.message.reply_text(
|
||||
"❌ Uso: `/unwatch <tema>`", parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
|
||||
db_conn = await get_db()
|
||||
db = ResearchDB(db_conn)
|
||||
try:
|
||||
removed = await db.remove_watch(topic, chat_id)
|
||||
if removed:
|
||||
await update.message.reply_text(
|
||||
f"✅ Ya no vigilas `{topic}`.", parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
else:
|
||||
await update.message.reply_text(f"No estabas watching `{topic}`.")
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
async def cmd_watches(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update.effective_user.id):
|
||||
return
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
db_conn = await get_db()
|
||||
db = ResearchDB(db_conn)
|
||||
try:
|
||||
watches = await db.list_watches(chat_id)
|
||||
if not watches:
|
||||
await update.message.reply_text(
|
||||
"No tienes temas vigilados. Usa `/watch <tema>`",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
lines = ["👁 *Tus temas vigilados:*\n"]
|
||||
for i, w in enumerate(watches, 1):
|
||||
secs_remaining = max(0.0, w["next_run_at"] - now)
|
||||
hours_remaining = secs_remaining / 3600
|
||||
eta = f"{int(secs_remaining / 60)}min" if hours_remaining < 1 else f"{hours_remaining:.1f}h"
|
||||
status = "✅" if w["enabled"] else "⏸"
|
||||
lines.append(
|
||||
f"{i}. {status} `{w['topic']}` — cada {w['interval_hours']}h · próxima en {eta}"
|
||||
)
|
||||
|
||||
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN)
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
async def cmd_process(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update.effective_user.id):
|
||||
return
|
||||
@@ -513,8 +732,448 @@ async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
|
||||
# ─── Bot setup ────────────────────────────────────────────────────────────────
|
||||
|
||||
async def _purge_on_startup(app: Application) -> None:
|
||||
db_conn = await get_db()
|
||||
try:
|
||||
db = ResearchDB(db_conn)
|
||||
result = await db.purge_old_sessions(30)
|
||||
if result["sessions"] > 0:
|
||||
logger.info("Startup purge done", **result)
|
||||
except Exception as e:
|
||||
logger.warning("Startup purge failed — bot continues", error=str(e))
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
async def _scheduler_loop(app: Application):
|
||||
while True:
|
||||
db_conn = None
|
||||
try:
|
||||
db_conn = await get_db()
|
||||
db = ResearchDB(db_conn)
|
||||
due = await db.get_due_watches()
|
||||
for watch in due:
|
||||
chat_id = watch["chat_id"]
|
||||
topic = watch["topic"]
|
||||
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
||||
continue
|
||||
session_id = await db.create_session(topic, chat_id)
|
||||
_active_sessions[chat_id] = session_id
|
||||
await db.update_watch_run(watch["id"])
|
||||
|
||||
async def _task(c=chat_id, t=topic, s=session_id, w_id=watch["id"]):
|
||||
inner_db_conn = await get_db()
|
||||
inner_db = ResearchDB(inner_db_conn)
|
||||
try:
|
||||
await run_scheduled_research(app.bot, c, t, s, inner_db,
|
||||
silent_completion=True)
|
||||
|
||||
prev_session = await inner_db.get_previous_session(c, t, s)
|
||||
new_urls = await inner_db.get_session_urls(s)
|
||||
old_urls = await inner_db.get_session_urls(prev_session["id"]) \
|
||||
if prev_session else set()
|
||||
new_chunks = await inner_db.get_top_chunks(s, limit=30)
|
||||
|
||||
try:
|
||||
from src.generator.generator import generate_diff_summary
|
||||
summary = await generate_diff_summary(
|
||||
t, new_urls, old_urls, new_chunks, s, inner_db
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Diff summary failed", error=str(e))
|
||||
summary = (
|
||||
f"📊 *Actualización disponible — {t}*\n\n"
|
||||
f"Usa /generate report para ver el análisis completo."
|
||||
)
|
||||
|
||||
if summary:
|
||||
await app.bot.send_message(
|
||||
c, summary, parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
else:
|
||||
await app.bot.send_message(
|
||||
c,
|
||||
f"🔄 *{t}* — sin novedades significativas esta vez.",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
finally:
|
||||
await inner_db_conn.close()
|
||||
|
||||
task = asyncio.create_task(_task())
|
||||
_active_tasks[chat_id] = task
|
||||
await app.bot.send_message(
|
||||
chat_id,
|
||||
f"🔄 Investigación automática iniciada: `{topic}`",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Scheduler loop error", error=str(e))
|
||||
finally:
|
||||
if db_conn:
|
||||
try:
|
||||
await db_conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
await asyncio.sleep(60)
|
||||
|
||||
|
||||
async def _start_scheduler(app: Application) -> None:
|
||||
asyncio.create_task(_scheduler_loop(app))
|
||||
|
||||
|
||||
async def _on_startup(app: Application) -> None:
|
||||
await _purge_on_startup(app)
|
||||
await _start_scheduler(app)
|
||||
|
||||
|
||||
async def cmd_export(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update.effective_user.id):
|
||||
return
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
db_conn = await get_db()
|
||||
db = ResearchDB(db_conn)
|
||||
|
||||
try:
|
||||
session = await db.get_latest_session(chat_id)
|
||||
if not session:
|
||||
await update.message.reply_text("No hay sesiones de investigación.")
|
||||
return
|
||||
|
||||
session_id = session["id"]
|
||||
topic = session["topic"]
|
||||
|
||||
outputs = await db.get_outputs(session_id)
|
||||
if not outputs:
|
||||
await update.message.reply_text(
|
||||
"No hay outputs generados. Usa `/generate <tipo>` primero.",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
|
||||
priority = [
|
||||
"report_extended", "blog_extended", "podcast_extended",
|
||||
"report", "blog", "podcast", "thread",
|
||||
]
|
||||
chosen = None
|
||||
for ptype in priority:
|
||||
for o in outputs:
|
||||
if o["output_type"] == ptype:
|
||||
chosen = o
|
||||
break
|
||||
if chosen:
|
||||
break
|
||||
if not chosen:
|
||||
chosen = outputs[0]
|
||||
|
||||
msg = await update.message.reply_text(
|
||||
f"📄 Generando PDF para `{topic}`…",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
|
||||
try:
|
||||
from src.generator.generator import generate_pdf
|
||||
pdf_bytes = generate_pdf(chosen["content"], title=topic)
|
||||
except ImportError:
|
||||
await msg.edit_text("❌ reportlab no está instalado. Ejecuta: `pip install reportlab`")
|
||||
return
|
||||
except Exception as e:
|
||||
await msg.edit_text(f"❌ Error generando PDF: {str(e)[:200]}")
|
||||
return
|
||||
|
||||
safe_topic = topic[:40].replace(" ", "_").replace("/", "-")
|
||||
filename = f"researchowl_{safe_topic}_{chosen['output_type']}.pdf"
|
||||
|
||||
import io
|
||||
await update.message.reply_document(
|
||||
document=io.BytesIO(pdf_bytes),
|
||||
filename=filename,
|
||||
caption=f"📄 *{chosen['output_type'].upper()}* — {topic}\nExportado por ResearchOwl 🦉",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
try:
|
||||
await msg.delete()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Export failed", error=str(e))
|
||||
await update.message.reply_text(f"❌ Export failed: {str(e)[:200]}")
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
async def cmd_purge(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update.effective_user.id):
|
||||
return
|
||||
|
||||
args = ctx.args or []
|
||||
|
||||
if not args:
|
||||
days = 30
|
||||
else:
|
||||
try:
|
||||
days = int(args[0])
|
||||
except ValueError:
|
||||
await update.message.reply_text(
|
||||
"❌ Uso: `/purge [días]`", parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
if days < 0:
|
||||
await update.message.reply_text("❌ El número de días debe ser ≥ 0.")
|
||||
return
|
||||
if days == 0 and not (len(args) >= 2 and args[1] == "confirm"):
|
||||
await update.message.reply_text(
|
||||
"⚠️ Esto borrará *todas* las sesiones completadas.\n"
|
||||
"Envía `/purge 0 confirm` para confirmar.",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
|
||||
db_conn = await get_db()
|
||||
try:
|
||||
db = ResearchDB(db_conn)
|
||||
result = await db.purge_old_sessions(days)
|
||||
await update.message.reply_text(
|
||||
f"🗑️ Purged: {result['sessions']} sessions, "
|
||||
f"{result['sources']} sources, "
|
||||
f"{result['chunks']} chunks, "
|
||||
f"{result['outputs']} outputs"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Purge command failed", error=str(e))
|
||||
await update.message.reply_text(f"❌ Purge failed: {str(e)[:200]}")
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
async def cmd_publish(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update.effective_user.id):
|
||||
return
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
db_conn = await get_db()
|
||||
db = ResearchDB(db_conn)
|
||||
|
||||
try:
|
||||
from src.generator.generator import GhostPublisher, _extract_title
|
||||
|
||||
ghost = GhostPublisher()
|
||||
if not ghost.is_configured():
|
||||
await update.message.reply_text(
|
||||
"❌ Ghost no configurado. Asegúrate de que `GHOST_URL` y `GHOST_API_KEY` están definidos.",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
|
||||
session_id = _active_sessions.get(chat_id)
|
||||
if session_id:
|
||||
cursor = await db_conn.execute(
|
||||
"SELECT * FROM research_sessions WHERE id = ?",
|
||||
(session_id,)
|
||||
)
|
||||
else:
|
||||
cursor = await db_conn.execute(
|
||||
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
||||
(chat_id,)
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
if not row:
|
||||
await update.message.reply_text("No hay sesiones. Usa /research primero.")
|
||||
return
|
||||
|
||||
session = dict(row)
|
||||
outputs = await db.get_outputs(session["id"])
|
||||
if not outputs:
|
||||
await update.message.reply_text(
|
||||
"No hay outputs generados. Usa `/generate blog|report` primero.",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
|
||||
priority = ["blog_extended", "blog", "report_extended", "report",
|
||||
"podcast_extended", "podcast", "thread"]
|
||||
chosen = None
|
||||
for ptype in priority:
|
||||
for o in outputs:
|
||||
if o["output_type"] == ptype:
|
||||
chosen = o
|
||||
break
|
||||
if chosen:
|
||||
break
|
||||
if not chosen:
|
||||
chosen = outputs[-1]
|
||||
|
||||
msg = await update.message.reply_text("📤 Publicando en Ghost como borrador…")
|
||||
|
||||
title = _extract_title(chosen["content"]) or session["topic"]
|
||||
result = await ghost.publish_draft(title, chosen["content"])
|
||||
post = result["posts"][0]
|
||||
admin_url = f"{ghost.url}/ghost/#/editor/post/{post['id']}"
|
||||
|
||||
await msg.edit_text(
|
||||
f"✅ *Publicado en Ghost como borrador*\n\n"
|
||||
f"📝 Título: `{title}`\n"
|
||||
f"🔗 Editar: {admin_url}",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Publish to Ghost failed", error=str(e))
|
||||
await update.message.reply_text(f"❌ Error publicando en Ghost: {str(e)[:200]}")
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
async def cmd_compare(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update.effective_user.id):
|
||||
return
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
text = " ".join(ctx.args).strip() if ctx.args else ""
|
||||
|
||||
import re
|
||||
match = re.split(r'\s+vs\.?\s+|\s+versus\s+', text, maxsplit=1, flags=re.IGNORECASE)
|
||||
if len(match) != 2 or not match[0].strip() or not match[1].strip():
|
||||
await update.message.reply_text(
|
||||
"❌ Uso: `/compare <tema1> vs <tema2>`\n"
|
||||
"Ejemplo: `/compare energía solar vs energía nuclear`",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
|
||||
topic_a = match[0].strip()
|
||||
topic_b = match[1].strip()
|
||||
|
||||
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
||||
await update.message.reply_text(
|
||||
"⚠️ Ya hay una investigación en curso. Usa /cancel primero."
|
||||
)
|
||||
return
|
||||
|
||||
msg = await update.message.reply_text(
|
||||
f"🔍 Comparando `{topic_a}` vs `{topic_b}`…\n"
|
||||
f"Esto lanzará dos investigaciones en paralelo y tardará varios minutos.",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
|
||||
async def run_compare():
|
||||
db_conn_a = await get_db()
|
||||
db_conn_b = await get_db()
|
||||
db_a = ResearchDB(db_conn_a)
|
||||
db_b = ResearchDB(db_conn_b)
|
||||
|
||||
try:
|
||||
session_id_a = await db_a.create_session(topic_a, chat_id)
|
||||
session_id_b = await db_b.create_session(topic_b, chat_id)
|
||||
_active_sessions[chat_id] = session_id_a
|
||||
|
||||
await msg.edit_text(
|
||||
f"🔍 Investigando en paralelo:\n"
|
||||
f"• `{topic_a}`\n"
|
||||
f"• `{topic_b}`\n\n"
|
||||
f"Esto puede tardar 10-20 minutos…",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
|
||||
async def research_topic(session_id, topic, db):
|
||||
scraper = ExhaustiveScraper(db, session_id, topic)
|
||||
await scraper.run()
|
||||
await db.update_session(session_id, status=ResearchStatus.SATURATED)
|
||||
ollama = OllamaClient()
|
||||
if await ollama.is_available():
|
||||
processor = ContentProcessor(db, ollama)
|
||||
await processor.process_session(session_id, topic)
|
||||
|
||||
await msg.edit_text(
|
||||
f"🔍 Scraping en paralelo:\n• `{topic_a}`\n• `{topic_b}`…",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
|
||||
await asyncio.gather(
|
||||
research_topic(session_id_a, topic_a, db_a),
|
||||
research_topic(session_id_b, topic_b, db_b),
|
||||
)
|
||||
|
||||
await msg.edit_text(
|
||||
"✍️ Generando análisis comparativo…",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
|
||||
ollama = OllamaClient()
|
||||
processor_a = ContentProcessor(db_a, ollama)
|
||||
processor_b = ContentProcessor(db_b, ollama)
|
||||
|
||||
context_a = await processor_a.rag_query(session_id_a, topic_a, top_k=40)
|
||||
context_b = await processor_b.rag_query(session_id_b, topic_b, top_k=40)
|
||||
|
||||
if not context_a:
|
||||
chunks = await db_a.get_top_chunks(session_id_a, limit=20)
|
||||
context_a = "\n\n---\n\n".join(c["content"] for c in chunks)
|
||||
if not context_b:
|
||||
chunks = await db_b.get_top_chunks(session_id_b, limit=20)
|
||||
context_b = "\n\n---\n\n".join(c["content"] for c in chunks)
|
||||
|
||||
from src.generator.generator import generate_comparison
|
||||
comparison = await generate_comparison(
|
||||
topic_a, topic_b, context_a, context_b, session_id_a, db_a
|
||||
)
|
||||
|
||||
from datetime import datetime, timezone
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
||||
header = (
|
||||
f"---\n"
|
||||
f"ResearchOwl | COMPARISON\n"
|
||||
f"Topic A: {topic_a}\n"
|
||||
f"Topic B: {topic_b}\n"
|
||||
f"Generated: {now}\n"
|
||||
f"---\n\n"
|
||||
)
|
||||
full_output = header + comparison
|
||||
|
||||
await db_a.save_output(session_id_a, OutputType.REPORT, full_output)
|
||||
|
||||
if len(full_output) > 8000:
|
||||
import io
|
||||
filename = (
|
||||
f"compare_{topic_a[:20]}_{topic_b[:20]}.md"
|
||||
.replace(" ", "_")
|
||||
)
|
||||
await update.message.reply_document(
|
||||
document=io.BytesIO(full_output.encode()),
|
||||
filename=filename,
|
||||
caption=f"📊 Comparación: {topic_a} vs {topic_b}"
|
||||
)
|
||||
try:
|
||||
await msg.delete()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
await msg.edit_text(full_output, parse_mode=ParseMode.MARKDOWN)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
await msg.edit_text("🛑 Comparación cancelada.")
|
||||
except Exception as e:
|
||||
logger.error("Compare task failed", error=str(e))
|
||||
try:
|
||||
await msg.edit_text(f"❌ Error: {str(e)[:200]}")
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
await db_conn_a.close()
|
||||
await db_conn_b.close()
|
||||
|
||||
task = asyncio.create_task(run_compare())
|
||||
_active_tasks[chat_id] = task
|
||||
|
||||
|
||||
def create_bot() -> Application:
|
||||
app = Application.builder().token(settings.telegram_bot_token).build()
|
||||
app = (
|
||||
Application.builder()
|
||||
.token(settings.telegram_bot_token)
|
||||
.post_init(_on_startup)
|
||||
.build()
|
||||
)
|
||||
|
||||
app.add_handler(CommandHandler("start", cmd_start))
|
||||
app.add_handler(CommandHandler("help", cmd_help))
|
||||
@@ -524,8 +1183,16 @@ def create_bot() -> Application:
|
||||
app.add_handler(CommandHandler("generate", cmd_generate))
|
||||
app.add_handler(CommandHandler("sources", cmd_sources))
|
||||
app.add_handler(CommandHandler("outputs", cmd_outputs))
|
||||
app.add_handler(CommandHandler("export", cmd_export))
|
||||
app.add_handler(CommandHandler("costs", cmd_costs))
|
||||
app.add_handler(CommandHandler("watch", cmd_watch))
|
||||
app.add_handler(CommandHandler("unwatch", cmd_unwatch))
|
||||
app.add_handler(CommandHandler("watches", cmd_watches))
|
||||
app.add_handler(CommandHandler("process", cmd_process))
|
||||
app.add_handler(CommandHandler("cancel", cmd_cancel))
|
||||
app.add_handler(CommandHandler("purge", cmd_purge))
|
||||
app.add_handler(CommandHandler("publish", cmd_publish))
|
||||
app.add_handler(CommandHandler("compare", cmd_compare))
|
||||
|
||||
return app
|
||||
|
||||
|
||||
+10
-1
@@ -31,7 +31,16 @@ class Settings(BaseSettings):
|
||||
# Processing
|
||||
chunk_size: int = Field(800, env="CHUNK_SIZE") # tokens per chunk
|
||||
chunk_overlap: int = Field(100, env="CHUNK_OVERLAP")
|
||||
quality_threshold: float = Field(0.5, env="QUALITY_THRESHOLD") # 0-1, chunks below discarded
|
||||
quality_threshold: float = Field(0.3, env="QUALITY_THRESHOLD") # 0-1, chunks below discarded
|
||||
|
||||
# Ghost CMS
|
||||
ghost_url: Optional[str] = Field(None, env="GHOST_URL")
|
||||
ghost_api_key: Optional[str] = Field(None, env="GHOST_API_KEY")
|
||||
ghost_url_en: str = Field("", env="GHOST_URL_EN")
|
||||
ghost_api_key_en: str = Field("", env="GHOST_API_KEY_EN")
|
||||
|
||||
# Alerts
|
||||
cost_alert_threshold: float = Field(0.15, env="COST_ALERT_THRESHOLD")
|
||||
|
||||
# App
|
||||
log_level: str = Field("INFO", env="LOG_LEVEL")
|
||||
|
||||
@@ -5,8 +5,12 @@ from pathlib import Path
|
||||
from typing import Optional
|
||||
from enum import Enum
|
||||
|
||||
import structlog
|
||||
|
||||
from src.config import settings
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
class ResearchStatus(str, Enum):
|
||||
RUNNING = "running"
|
||||
@@ -20,6 +24,9 @@ class OutputType(str, Enum):
|
||||
BLOG = "blog"
|
||||
REPORT = "report"
|
||||
THREAD = "thread"
|
||||
REPORT_EXTENDED = "report_extended"
|
||||
BLOG_EXTENDED = "blog_extended"
|
||||
PODCAST_EXTENDED = "podcast_extended"
|
||||
|
||||
|
||||
SCHEMA = """
|
||||
@@ -84,6 +91,29 @@ CREATE INDEX IF NOT EXISTS idx_sources_session ON sources(session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_chunks_session ON chunks(session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_chunks_quality ON chunks(session_id, quality_score DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_source_contents ON source_contents(source_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS api_usage (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
session_id INTEGER REFERENCES research_sessions(id),
|
||||
call_type TEXT NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
input_tokens INTEGER NOT NULL,
|
||||
output_tokens INTEGER NOT NULL,
|
||||
cost_usd REAL NOT NULL,
|
||||
created_at REAL NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS watched_topics (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
topic TEXT NOT NULL,
|
||||
chat_id INTEGER NOT NULL,
|
||||
interval_hours INTEGER NOT NULL DEFAULT 24,
|
||||
next_run_at REAL NOT NULL,
|
||||
last_run_at REAL,
|
||||
enabled INTEGER NOT NULL DEFAULT 1,
|
||||
created_at REAL NOT NULL,
|
||||
UNIQUE(topic, chat_id)
|
||||
);
|
||||
"""
|
||||
|
||||
|
||||
@@ -121,6 +151,35 @@ class ResearchDB:
|
||||
row = await cursor.fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
async def get_latest_session(self, chat_id: int) -> Optional[dict]:
|
||||
cursor = await self.db.execute(
|
||||
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
||||
(chat_id,)
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
async def get_session_urls(self, session_id: int) -> set:
|
||||
async with self.db.execute(
|
||||
"SELECT url FROM sources WHERE session_id = ?", (session_id,)
|
||||
) as cur:
|
||||
rows = await cur.fetchall()
|
||||
return {r[0] for r in rows}
|
||||
|
||||
async def get_previous_session(self, chat_id: int, topic: str,
|
||||
exclude_session_id: int) -> Optional[dict]:
|
||||
async with self.db.execute(
|
||||
"""SELECT id, topic, status, created_at FROM research_sessions
|
||||
WHERE telegram_chat_id = ? AND topic = ? AND id != ?
|
||||
ORDER BY created_at DESC LIMIT 1""",
|
||||
(chat_id, topic, exclude_session_id)
|
||||
) as cur:
|
||||
row = await cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return {"id": row[0], "topic": row[1],
|
||||
"status": row[2], "created_at": row[3]}
|
||||
|
||||
async def get_active_session(self, chat_id: int) -> Optional[dict]:
|
||||
cursor = await self.db.execute(
|
||||
"""SELECT * FROM research_sessions
|
||||
@@ -259,6 +318,19 @@ class ResearchDB:
|
||||
row = await cursor.fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
async def get_cached_content(self, url: str,
|
||||
max_age_days: int = 7) -> Optional[str]:
|
||||
threshold = time.time() - (max_age_days * 86400)
|
||||
async with self.db.execute(
|
||||
"""SELECT sc.content FROM source_contents sc
|
||||
JOIN sources s ON s.id = sc.source_id
|
||||
WHERE s.url = ? AND sc.created_at > ?
|
||||
ORDER BY sc.created_at DESC LIMIT 1""",
|
||||
(url, threshold)
|
||||
) as cur:
|
||||
row = await cur.fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
async def get_outputs(self, session_id: int) -> list[dict]:
|
||||
cursor = await self.db.execute(
|
||||
"SELECT * FROM outputs WHERE session_id = ? ORDER BY created_at DESC",
|
||||
@@ -266,3 +338,124 @@ class ResearchDB:
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
# --- API Usage ---
|
||||
|
||||
async def log_api_call(self, session_id, call_type: str, model: str,
|
||||
input_tokens: int, output_tokens: int):
|
||||
# Precios Claude Haiku (claude-haiku-4-5):
|
||||
# input: $0.80 / 1M tokens output: $4.00 / 1M tokens
|
||||
cost = (input_tokens * 0.80 + output_tokens * 4.00) / 1_000_000
|
||||
await self.db.execute(
|
||||
"""INSERT INTO api_usage
|
||||
(session_id, call_type, model, input_tokens, output_tokens, cost_usd, created_at)
|
||||
VALUES (?,?,?,?,?,?,?)""",
|
||||
(session_id, call_type, model, input_tokens, output_tokens, cost, time.time())
|
||||
)
|
||||
await self.db.commit()
|
||||
|
||||
async def get_usage_stats(self, session_id: int) -> list[dict]:
|
||||
cursor = await self.db.execute(
|
||||
"""SELECT call_type,
|
||||
COUNT(*) as calls,
|
||||
SUM(input_tokens + output_tokens) as total_tokens,
|
||||
SUM(cost_usd) as total_cost
|
||||
FROM api_usage WHERE session_id = ?
|
||||
GROUP BY call_type""",
|
||||
(session_id,)
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
async def get_total_usage_stats(self) -> dict:
|
||||
cursor = await self.db.execute(
|
||||
"""SELECT COUNT(DISTINCT session_id) as sessions,
|
||||
SUM(cost_usd) as total_cost
|
||||
FROM api_usage"""
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
return dict(row) if row else {"sessions": 0, "total_cost": 0}
|
||||
|
||||
# --- Watched Topics ---
|
||||
|
||||
async def add_watch(self, topic: str, chat_id: int, interval_hours: int) -> int:
|
||||
now = time.time()
|
||||
cursor = await self.db.execute(
|
||||
"""INSERT OR REPLACE INTO watched_topics
|
||||
(topic, chat_id, interval_hours, next_run_at, created_at)
|
||||
VALUES (?, ?, ?, ?, ?)""",
|
||||
(topic, chat_id, interval_hours, now + interval_hours * 3600, now)
|
||||
)
|
||||
await self.db.commit()
|
||||
return cursor.lastrowid
|
||||
|
||||
async def remove_watch(self, topic: str, chat_id: int) -> bool:
|
||||
cursor = await self.db.execute(
|
||||
"DELETE FROM watched_topics WHERE topic = ? AND chat_id = ?",
|
||||
(topic, chat_id)
|
||||
)
|
||||
await self.db.commit()
|
||||
return cursor.rowcount > 0
|
||||
|
||||
async def list_watches(self, chat_id: int) -> list[dict]:
|
||||
cursor = await self.db.execute(
|
||||
"SELECT * FROM watched_topics WHERE chat_id = ? ORDER BY created_at ASC",
|
||||
(chat_id,)
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
async def get_due_watches(self) -> list[dict]:
|
||||
cursor = await self.db.execute(
|
||||
"SELECT * FROM watched_topics WHERE enabled = 1 AND next_run_at <= ?",
|
||||
(time.time(),)
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
async def update_watch_run(self, watch_id: int):
|
||||
cursor = await self.db.execute(
|
||||
"SELECT interval_hours FROM watched_topics WHERE id = ?", (watch_id,)
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
if not row:
|
||||
return
|
||||
now = time.time()
|
||||
await self.db.execute(
|
||||
"UPDATE watched_topics SET last_run_at = ?, next_run_at = ? WHERE id = ?",
|
||||
(now, now + row[0] * 3600, watch_id)
|
||||
)
|
||||
await self.db.commit()
|
||||
|
||||
# --- Maintenance ---
|
||||
|
||||
async def purge_old_sessions(self, max_age_days: int = 30) -> dict:
|
||||
await self.db.execute("PRAGMA foreign_keys = ON")
|
||||
|
||||
threshold = time.time() - max_age_days * 86400
|
||||
cursor = await self.db.execute(
|
||||
"SELECT id FROM research_sessions WHERE created_at < ? AND status != 'running'",
|
||||
(threshold,)
|
||||
)
|
||||
session_ids = [row[0] for row in await cursor.fetchall()]
|
||||
|
||||
counts = {"sessions": 0, "sources": 0, "chunks": 0, "outputs": 0}
|
||||
|
||||
for sid in session_ids:
|
||||
await self.db.execute(
|
||||
"DELETE FROM source_contents WHERE source_id IN (SELECT id FROM sources WHERE session_id = ?)",
|
||||
(sid,)
|
||||
)
|
||||
cur = await self.db.execute("DELETE FROM chunks WHERE session_id = ?", (sid,))
|
||||
counts["chunks"] += cur.rowcount
|
||||
cur = await self.db.execute("DELETE FROM outputs WHERE session_id = ?", (sid,))
|
||||
counts["outputs"] += cur.rowcount
|
||||
cur = await self.db.execute("DELETE FROM sources WHERE session_id = ?", (sid,))
|
||||
counts["sources"] += cur.rowcount
|
||||
cur = await self.db.execute("DELETE FROM research_sessions WHERE id = ?", (sid,))
|
||||
counts["sessions"] += cur.rowcount
|
||||
|
||||
await self.db.commit()
|
||||
logger.info("Purged sessions older than days",
|
||||
sessions=counts["sessions"], days=max_age_days)
|
||||
return counts
|
||||
|
||||
+691
-14
@@ -2,6 +2,13 @@
|
||||
ResearchOwl Generators
|
||||
Produces structured outputs from processed research using Claude or Ollama
|
||||
"""
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
|
||||
import structlog
|
||||
|
||||
from src.config import settings
|
||||
@@ -26,6 +33,45 @@ BLOG_SYSTEM = (
|
||||
"Cada sección debe añadir información nueva no cubierta en secciones anteriores."
|
||||
)
|
||||
|
||||
BLOG_SYSTEM_EN = (
|
||||
"You write ALWAYS in English. "
|
||||
"You are a journalist writing a blog article. Use clear markdown headers. "
|
||||
"NEVER repeat the same fact or phrase twice — if you said it, move on. "
|
||||
"Each section must add new information not covered in other sections."
|
||||
)
|
||||
|
||||
BLOG_PROMPT_EN = """\
|
||||
Write a blog article about: "{topic}"
|
||||
|
||||
RULES — follow strictly:
|
||||
- Each section under a heading must add NEW information not covered elsewhere
|
||||
- Do NOT summarize previous sections at the start of each new section
|
||||
- Do NOT repeat facts — if a fact appears once, do not mention it again
|
||||
- Use concrete details, numbers, names — avoid vague generalities
|
||||
- Target: 1000-1500 words
|
||||
|
||||
STRUCTURE:
|
||||
# [Impactful headline]
|
||||
|
||||
[Hook paragraph — the most surprising fact]
|
||||
|
||||
## Background
|
||||
[Context — what, when, who — only facts not covered elsewhere]
|
||||
|
||||
## Key Facts
|
||||
[Most significant findings — each point must be distinct]
|
||||
|
||||
## Analysis / Significance
|
||||
[What this means — without repeating the Key Facts section]
|
||||
|
||||
## Conclusion
|
||||
[No more than 2 sentences summarizing, then a forward-looking statement]
|
||||
|
||||
RESEARCH MATERIAL:
|
||||
{context}
|
||||
|
||||
Write the complete article in markdown:"""
|
||||
|
||||
REPORT_SYSTEM = (
|
||||
"Escribe SIEMPRE en español. "
|
||||
"Eres un analista de investigación. Escribe un informe estructurado y factual. "
|
||||
@@ -135,6 +181,179 @@ Escribe el hilo (un tweet por línea, nada más):"""
|
||||
}
|
||||
|
||||
|
||||
OUTLINE_REPORT = """
|
||||
IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación.
|
||||
|
||||
Eres un editor de investigación. Dado el tema "{topic}" y el material
|
||||
disponible, genera un outline detallado para un informe exhaustivo.
|
||||
|
||||
Devuelve SOLO una lista JSON de secciones, sin texto adicional, sin
|
||||
markdown, sin explicaciones. Formato exacto:
|
||||
[
|
||||
{{"title": "Título de la sección", "query": "términos de búsqueda específicos para esta sección", "words": 800}},
|
||||
...
|
||||
]
|
||||
|
||||
Genera entre 6 y 10 secciones. Cada sección debe:
|
||||
- Cubrir un ángulo distinto del tema
|
||||
- Tener una query específica para recuperar chunks relevantes
|
||||
- Indicar longitud objetivo en palabras (400-1200)
|
||||
|
||||
Material disponible (resumen):
|
||||
{context_summary}
|
||||
"""
|
||||
|
||||
OUTLINE_BLOG = """
|
||||
IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación.
|
||||
|
||||
Eres un editor de contenido. Dado el tema "{topic}" y el material
|
||||
disponible, genera un outline para un artículo de blog exhaustivo.
|
||||
|
||||
Devuelve SOLO una lista JSON de secciones, sin texto adicional:
|
||||
[
|
||||
{{"title": "Título de sección", "query": "términos búsqueda", "words": 600}},
|
||||
...
|
||||
]
|
||||
|
||||
Genera entre 5 y 8 secciones. Primera sección = introducción gancho.
|
||||
Última sección = conclusión con perspectiva original.
|
||||
|
||||
Material disponible (resumen):
|
||||
{context_summary}
|
||||
"""
|
||||
|
||||
OUTLINE_PODCAST = """
|
||||
IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación.
|
||||
|
||||
Eres un productor de podcast. Dado el tema "{topic}" y el material
|
||||
disponible, genera un outline para un guion de podcast exhaustivo.
|
||||
|
||||
Devuelve SOLO una lista JSON de segmentos, sin texto adicional:
|
||||
[
|
||||
{{"title": "Nombre del segmento", "query": "términos búsqueda", "words": 700}},
|
||||
...
|
||||
]
|
||||
|
||||
Genera entre 5 y 7 segmentos. Flujo natural: gancho → contexto →
|
||||
desarrollo → controversia → conclusión.
|
||||
|
||||
Material disponible (resumen):
|
||||
{context_summary}
|
||||
"""
|
||||
|
||||
|
||||
# ─── Ghost CMS ────────────────────────────────────────────────────────────────
|
||||
|
||||
def _b64url(data: bytes | str) -> str:
|
||||
if isinstance(data, str):
|
||||
data = data.encode()
|
||||
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
|
||||
|
||||
|
||||
def _extract_title(content: str) -> str:
|
||||
"""Return first H1 heading from markdown, skipping the ResearchOwl header block."""
|
||||
in_header = False
|
||||
for line in content.splitlines():
|
||||
stripped = line.strip()
|
||||
if stripped == "---":
|
||||
in_header = not in_header
|
||||
continue
|
||||
if in_header:
|
||||
continue
|
||||
if stripped.startswith("# ") and not stripped.startswith("## "):
|
||||
return stripped[2:].strip()
|
||||
return ""
|
||||
|
||||
|
||||
def _strip_researchowl_header(content: str) -> str:
|
||||
"""Remove the ---...--- metadata block that ResearchOwl prepends to outputs."""
|
||||
lines = content.splitlines(keepends=True)
|
||||
dashes_seen = 0
|
||||
for i, line in enumerate(lines):
|
||||
if line.strip() == "---":
|
||||
dashes_seen += 1
|
||||
if dashes_seen == 2:
|
||||
return "".join(lines[i + 1:]).lstrip("\n")
|
||||
return content
|
||||
|
||||
|
||||
class GhostPublisher:
|
||||
def __init__(self, lang: str = "es"):
|
||||
if lang == "en":
|
||||
self.url = (settings.ghost_url_en or "").rstrip("/")
|
||||
self.api_key = settings.ghost_api_key_en or ""
|
||||
else:
|
||||
self.url = (settings.ghost_url or "").rstrip("/")
|
||||
self.api_key = settings.ghost_api_key or ""
|
||||
|
||||
def is_configured(self) -> bool:
|
||||
return bool(self.url and self.api_key)
|
||||
|
||||
def _make_token(self) -> str:
|
||||
key_id, secret = self.api_key.split(":", 1)
|
||||
now = int(time.time())
|
||||
header = _b64url(json.dumps({"alg": "HS256", "typ": "JWT", "kid": key_id}))
|
||||
payload = _b64url(json.dumps({"iat": now, "exp": now + 300, "aud": "/admin/"}))
|
||||
signing = f"{header}.{payload}"
|
||||
sig = _b64url(
|
||||
hmac.new(bytes.fromhex(secret), signing.encode(), hashlib.sha256).digest()
|
||||
)
|
||||
return f"{signing}.{sig}"
|
||||
|
||||
async def publish_draft(self, title: str, markdown_content: str,
|
||||
tags: list[str] | None = None) -> dict:
|
||||
import aiohttp as _aio
|
||||
import markdown as _md
|
||||
|
||||
clean = _strip_researchowl_header(markdown_content)
|
||||
html = _md.markdown(clean, extensions=["extra"])
|
||||
|
||||
# Ghost añade el título automáticamente — eliminar el primer <h1> para evitar duplicado
|
||||
html = re.sub(r"<h1[^>]*>.*?</h1>", "", html, count=1, flags=re.DOTALL).lstrip()
|
||||
|
||||
logger.info("Ghost publish_draft", html_length=len(html),
|
||||
html_preview=html[:200])
|
||||
|
||||
if not html.strip():
|
||||
raise ValueError("Ghost: HTML vacío tras conversión markdown — contenido no enviado")
|
||||
|
||||
# Ghost 5.x (Lexical editor): el campo "html" es solo de lectura en la API.
|
||||
# La forma fiable de enviar HTML arbitrario es via mobiledoc con HTML card,
|
||||
# que Ghost acepta en todas las versiones de v5 y renderiza sin conversión.
|
||||
mobiledoc = json.dumps({
|
||||
"version": "0.3.1",
|
||||
"atoms": [],
|
||||
"cards": [["html", {"html": html}]],
|
||||
"markups": [],
|
||||
"sections": [[10, 0]],
|
||||
})
|
||||
|
||||
token = self._make_token()
|
||||
body = {
|
||||
"posts": [{
|
||||
"title": title,
|
||||
"mobiledoc": mobiledoc,
|
||||
"status": "draft",
|
||||
"tags": [{"name": t} for t in (tags or ["investigacion"])],
|
||||
}]
|
||||
}
|
||||
async with _aio.ClientSession() as sess:
|
||||
async with sess.post(
|
||||
f"{self.url}/ghost/api/admin/posts/",
|
||||
json=body,
|
||||
headers={
|
||||
"Authorization": f"Ghost {token}",
|
||||
"Accept-Version": "v5.0",
|
||||
},
|
||||
) as resp:
|
||||
if resp.status not in (200, 201):
|
||||
text = await resp.text()
|
||||
raise ValueError(f"Ghost API {resp.status}: {text[:300]}")
|
||||
return await resp.json()
|
||||
|
||||
|
||||
# ─── Output generation ────────────────────────────────────────────────────────
|
||||
|
||||
class OutputGenerator:
|
||||
def __init__(self, db: ResearchDB, ollama: OllamaClient, processor: ContentProcessor):
|
||||
self.db = db
|
||||
@@ -142,8 +361,14 @@ class OutputGenerator:
|
||||
self.processor = processor
|
||||
|
||||
async def generate(self, session_id: int, output_type: OutputType,
|
||||
progress_callback=None) -> str:
|
||||
progress_callback=None, lang: str = "es") -> str:
|
||||
"""Generate an output for a research session"""
|
||||
if output_type in (OutputType.REPORT_EXTENDED,
|
||||
OutputType.BLOG_EXTENDED,
|
||||
OutputType.PODCAST_EXTENDED):
|
||||
return await self.generate_extended(session_id, output_type, progress_callback,
|
||||
lang=lang)
|
||||
|
||||
session = await self.db.get_session(session_id)
|
||||
if not session:
|
||||
raise ValueError(f"Session {session_id} not found")
|
||||
@@ -156,7 +381,7 @@ class OutputGenerator:
|
||||
|
||||
# RAG: get most relevant context for this output type
|
||||
query = self._get_rag_query(output_type, topic)
|
||||
context = await self.processor.rag_query(session_id, query, top_k=30)
|
||||
context = await self.processor.rag_query(session_id, query, top_k=80)
|
||||
|
||||
if not context:
|
||||
# Fallback: use raw top chunks
|
||||
@@ -166,11 +391,6 @@ class OutputGenerator:
|
||||
if not context:
|
||||
raise ValueError("No processed content available. Run /process first.")
|
||||
|
||||
# Truncate context to avoid Ollama context limits
|
||||
context_words = context.split()
|
||||
if len(context_words) > 6000:
|
||||
context = " ".join(context_words[:6000]) + "\n\n[... additional material truncated ...]"
|
||||
|
||||
backend = "Claude Haiku" if settings.anthropic_api_key else "Ollama"
|
||||
if progress_callback:
|
||||
await progress_callback(f"✍️ Generando {output_type} con {backend}... (2-5 min)")
|
||||
@@ -179,7 +399,11 @@ class OutputGenerator:
|
||||
system = self._get_system(output_type)
|
||||
prompt = PROMPTS[output_type].format(topic=topic, context=context)
|
||||
|
||||
output = await self._generate(prompt, system, output_type)
|
||||
if lang == "en" and output_type == OutputType.BLOG:
|
||||
system = BLOG_SYSTEM_EN
|
||||
prompt = BLOG_PROMPT_EN.format(topic=topic, context=context)
|
||||
|
||||
output = await self._generate(prompt, system, output_type, session_id)
|
||||
|
||||
# Add metadata header
|
||||
stats = await self.db.get_session_stats(session_id)
|
||||
@@ -189,17 +413,37 @@ class OutputGenerator:
|
||||
# Save to DB
|
||||
await self.db.save_output(session_id, output_type, full_output)
|
||||
|
||||
logger.info("Output generated", type=output_type, length=len(full_output))
|
||||
return full_output
|
||||
# Auto-publish to Ghost for blog outputs
|
||||
ghost_notice = ""
|
||||
if output_type in (OutputType.BLOG, OutputType.BLOG_EXTENDED):
|
||||
ghost = GhostPublisher(lang=lang)
|
||||
if ghost.is_configured():
|
||||
try:
|
||||
title = _extract_title(full_output) or topic
|
||||
result = await ghost.publish_draft(title, full_output)
|
||||
post = result["posts"][0]
|
||||
ghost_notice = (
|
||||
f"\n\n---\n"
|
||||
f"📤 *Borrador publicado en Ghost*\n"
|
||||
f"Editar: {ghost.url}/ghost/#/editor/post/{post['id']}"
|
||||
)
|
||||
logger.info("Auto-published blog to Ghost", post_id=post["id"])
|
||||
except Exception as e:
|
||||
logger.warning("Auto-publish to Ghost failed", error=str(e))
|
||||
|
||||
async def _generate(self, prompt: str, system: str, output_type: OutputType) -> str:
|
||||
logger.info("Output generated", type=output_type, length=len(full_output))
|
||||
return full_output + ghost_notice
|
||||
|
||||
async def _generate(self, prompt: str, system: str, output_type: OutputType,
|
||||
session_id: int | None = None) -> str:
|
||||
if settings.anthropic_api_key:
|
||||
return await self._generate_with_claude(prompt, system, output_type)
|
||||
return await self._generate_with_claude(prompt, system, output_type, session_id)
|
||||
return await self._generate_with_ollama(prompt, system)
|
||||
|
||||
async def _generate_with_claude(self, prompt: str, system: str, output_type: OutputType) -> str:
|
||||
async def _generate_with_claude(self, prompt: str, system: str, output_type: OutputType,
|
||||
session_id: int | None = None) -> str:
|
||||
import anthropic
|
||||
max_tokens = 4096 if output_type == OutputType.THREAD else 8192
|
||||
max_tokens = 4096 if output_type == OutputType.THREAD else 16000
|
||||
try:
|
||||
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||||
msg = await client.messages.create(
|
||||
@@ -208,6 +452,14 @@ class OutputGenerator:
|
||||
system=system,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
)
|
||||
if session_id is not None:
|
||||
try:
|
||||
await self.db.log_api_call(
|
||||
session_id, "generation", settings.claude_model,
|
||||
msg.usage.input_tokens, msg.usage.output_tokens
|
||||
)
|
||||
except Exception as log_err:
|
||||
logger.warning("Failed to log API usage", error=str(log_err))
|
||||
return msg.content[0].text.strip()
|
||||
except Exception as e:
|
||||
logger.warning("Claude generation failed, falling back to Ollama", error=str(e))
|
||||
@@ -234,6 +486,155 @@ class OutputGenerator:
|
||||
}
|
||||
return systems.get(output_type, "You are a helpful research assistant.")
|
||||
|
||||
async def generate_extended(self, session_id: int, output_type: OutputType,
|
||||
progress_callback=None, lang: str = "es") -> str:
|
||||
"""
|
||||
Generación por secciones para outputs exhaustivos.
|
||||
1. Recupera muestra de contexto para el outline
|
||||
2. Genera outline con Claude (lista de secciones)
|
||||
3. Para cada sección: RAG específico → genera sección
|
||||
4. Concatena y guarda
|
||||
"""
|
||||
session = await self.db.get_session(session_id)
|
||||
if not session:
|
||||
raise ValueError(f"Session {session_id} not found")
|
||||
topic = session["topic"]
|
||||
|
||||
# Paso 1: contexto resumen para el outline (top 10 chunks)
|
||||
top_chunks = await self.db.get_top_chunks(session_id, limit=10)
|
||||
if not top_chunks:
|
||||
raise ValueError("No processed content available. Run /process first.")
|
||||
context_summary = "\n\n".join(
|
||||
f"- {c.get('title', '')}: {c['content'][:300]}"
|
||||
for c in top_chunks
|
||||
)
|
||||
|
||||
if progress_callback:
|
||||
await progress_callback("🗂️ Generando estructura del documento…")
|
||||
|
||||
# Paso 2: outline
|
||||
base_type = output_type.value.replace("_extended", "")
|
||||
outline_prompts = {
|
||||
"report": OUTLINE_REPORT,
|
||||
"blog": OUTLINE_BLOG,
|
||||
"podcast": OUTLINE_PODCAST,
|
||||
}
|
||||
outline_prompt = outline_prompts[base_type].format(
|
||||
topic=topic, context_summary=context_summary
|
||||
)
|
||||
|
||||
outline_json = await self._generate_raw(outline_prompt, session_id)
|
||||
try:
|
||||
import json as _json
|
||||
clean = outline_json.strip()
|
||||
if clean.startswith("```"):
|
||||
clean = "\n".join(clean.split("\n")[1:])
|
||||
if clean.endswith("```"):
|
||||
clean = "\n".join(clean.split("\n")[:-1])
|
||||
sections = _json.loads(clean.strip())
|
||||
except Exception as e:
|
||||
logger.error("Failed to parse outline", error=str(e), raw=outline_json[:200])
|
||||
raise ValueError(f"No se pudo generar el outline: {e}")
|
||||
|
||||
if progress_callback:
|
||||
await progress_callback(
|
||||
f"✍️ Generando {len(sections)} secciones… (esto tardará varios minutos)"
|
||||
)
|
||||
|
||||
# Paso 3: generar cada sección
|
||||
base_output_type = OutputType(base_type)
|
||||
system = self._get_system(base_output_type)
|
||||
if lang == "en" and output_type == OutputType.BLOG_EXTENDED:
|
||||
system = BLOG_SYSTEM_EN
|
||||
sections_text = []
|
||||
|
||||
for i, section in enumerate(sections, 1):
|
||||
title = section.get("title", f"Sección {i}")
|
||||
query = section.get("query", topic)
|
||||
target_words = section.get("words", 600)
|
||||
|
||||
if progress_callback:
|
||||
await progress_callback(
|
||||
f"✍️ Sección {i}/{len(sections)}: {title[:40]}…"
|
||||
)
|
||||
|
||||
section_context = await self.processor.rag_query(session_id, query, top_k=40)
|
||||
if not section_context:
|
||||
section_context = context_summary
|
||||
|
||||
lang_rule = "- Write in English\n" if lang == "en" else "- Escribe en español\n"
|
||||
section_prompt = (
|
||||
f"Escribe la sección '{title}' del {base_type} sobre: '{topic}'\n\n"
|
||||
f"REGLAS:\n"
|
||||
f"- NO incluyas ningún título o encabezado al inicio de tu respuesta — el título de la sección ya está incluido externamente\n"
|
||||
f"- Esta es UNA sección de un documento más largo — no repitas introducción ni conclusión general\n"
|
||||
f"- No incluyas encabezados del documento completo, solo el contenido de esta sección\n"
|
||||
f"- Objetivo: aproximadamente {target_words} palabras\n"
|
||||
f"- Usa SOLO información del material siguiente — no inventes datos\n"
|
||||
f"{lang_rule}"
|
||||
f"\nMATERIAL:\n{section_context}"
|
||||
)
|
||||
|
||||
section_text = await self._generate(
|
||||
section_prompt, system, base_output_type, session_id
|
||||
)
|
||||
sections_text.append(f"## {title}\n\n{section_text}")
|
||||
|
||||
# Paso 4: concatenar
|
||||
full_content = "\n\n---\n\n".join(sections_text)
|
||||
stats = await self.db.get_session_stats(session_id)
|
||||
header = self._build_header(topic, output_type, session, stats)
|
||||
full_output = header + "\n\n" + full_content
|
||||
|
||||
await self.db.save_output(session_id, output_type, full_output)
|
||||
|
||||
# Auto-publish to Ghost for extended blog outputs
|
||||
ghost_notice = ""
|
||||
if output_type == OutputType.BLOG_EXTENDED:
|
||||
ghost = GhostPublisher(lang=lang)
|
||||
if ghost.is_configured():
|
||||
try:
|
||||
title = _extract_title(full_output) or topic
|
||||
result = await ghost.publish_draft(title, full_output)
|
||||
post = result["posts"][0]
|
||||
ghost_notice = (
|
||||
f"\n\n---\n"
|
||||
f"📤 *Borrador publicado en Ghost*\n"
|
||||
f"Editar: {ghost.url}/ghost/#/editor/post/{post['id']}"
|
||||
)
|
||||
logger.info("Auto-published extended blog to Ghost", post_id=post["id"])
|
||||
except Exception as e:
|
||||
logger.warning("Auto-publish to Ghost failed (extended)", error=str(e))
|
||||
|
||||
logger.info("Extended output generated", type=output_type,
|
||||
sections=len(sections), length=len(full_output))
|
||||
return full_output + ghost_notice
|
||||
|
||||
async def _generate_raw(self, prompt: str,
|
||||
session_id: int | None = None) -> str:
|
||||
if settings.anthropic_api_key:
|
||||
import anthropic
|
||||
try:
|
||||
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||||
msg = await client.messages.create(
|
||||
model=settings.claude_model,
|
||||
max_tokens=2048,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
)
|
||||
if session_id is not None:
|
||||
try:
|
||||
await self.db.log_api_call(
|
||||
session_id, "outline", settings.claude_model,
|
||||
msg.usage.input_tokens, msg.usage.output_tokens
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return msg.content[0].text.strip()
|
||||
except Exception as e:
|
||||
logger.warning("Claude outline failed", error=str(e))
|
||||
raise
|
||||
raise ValueError("Claude API key required for extended generation")
|
||||
|
||||
def _build_header(self, topic: str, output_type: OutputType,
|
||||
session: dict, stats: dict) -> str:
|
||||
from datetime import datetime
|
||||
@@ -247,3 +648,279 @@ Iterations: {session.get('iterations', 0)}
|
||||
Total words researched: {session.get('total_words', 0):,}
|
||||
---
|
||||
"""
|
||||
|
||||
|
||||
def _remove_duplicate_headings(text: str) -> str:
|
||||
lines = text.split('\n')
|
||||
result = []
|
||||
i = 0
|
||||
while i < len(lines):
|
||||
line = lines[i].rstrip()
|
||||
if line.startswith('# ') and not line.startswith('## '):
|
||||
h1_text = line[2:].strip().lower()
|
||||
window = result[-5:] if len(result) >= 5 else result[:]
|
||||
recent_h2s = {
|
||||
p.strip()[3:].strip().lower()
|
||||
for p in window
|
||||
if p.strip().startswith('## ')
|
||||
}
|
||||
if h1_text in recent_h2s:
|
||||
i += 1
|
||||
continue
|
||||
result.append(lines[i])
|
||||
i += 1
|
||||
return '\n'.join(result)
|
||||
|
||||
|
||||
def generate_pdf(content: str, title: str = "ResearchOwl Output") -> bytes:
|
||||
content = _remove_duplicate_headings(content)
|
||||
try:
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||||
from reportlab.lib.units import cm
|
||||
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, HRFlowable
|
||||
from reportlab.lib.enums import TA_LEFT
|
||||
from reportlab.lib import colors
|
||||
import io
|
||||
import re
|
||||
except ImportError:
|
||||
raise ImportError("reportlab is required for PDF export — pip install reportlab")
|
||||
|
||||
buf = io.BytesIO()
|
||||
doc = SimpleDocTemplate(
|
||||
buf,
|
||||
pagesize=A4,
|
||||
rightMargin=2 * cm,
|
||||
leftMargin=2 * cm,
|
||||
topMargin=2.5 * cm,
|
||||
bottomMargin=2 * cm,
|
||||
title=title,
|
||||
)
|
||||
|
||||
base = getSampleStyleSheet()
|
||||
normal = ParagraphStyle("RO_Normal", parent=base["Normal"],
|
||||
fontSize=10, leading=14, spaceAfter=4)
|
||||
h1 = ParagraphStyle("RO_H1", parent=base["Heading1"],
|
||||
fontSize=18, spaceBefore=12, spaceAfter=6,
|
||||
textColor=colors.HexColor("#1a1a2e"))
|
||||
h2 = ParagraphStyle("RO_H2", parent=base["Heading2"],
|
||||
fontSize=14, spaceBefore=10, spaceAfter=4,
|
||||
textColor=colors.HexColor("#16213e"))
|
||||
h3 = ParagraphStyle("RO_H3", parent=base["Heading3"],
|
||||
fontSize=12, spaceBefore=8, spaceAfter=4)
|
||||
code_style = ParagraphStyle("RO_Code", parent=base["Code"],
|
||||
fontSize=9, leading=12, fontName="Courier",
|
||||
backColor=colors.HexColor("#f4f4f4"), spaceAfter=4)
|
||||
bullet_style = ParagraphStyle("RO_Bullet", parent=normal,
|
||||
leftIndent=20, bulletIndent=10, spaceAfter=2)
|
||||
|
||||
def md_to_para(text: str) -> str:
|
||||
text = text.replace("&", "&").replace("<", "<").replace(">", ">")
|
||||
text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
|
||||
text = re.sub(r'__(.+?)__', r'<b>\1</b>', text)
|
||||
text = re.sub(r'\*(.+?)\*', r'<i>\1</i>', text)
|
||||
text = re.sub(r'_(.+?)_', r'<i>\1</i>', text)
|
||||
text = re.sub(r'`(.+?)`', r'<font name="Courier">\1</font>', text)
|
||||
return text
|
||||
|
||||
story = []
|
||||
lines = content.split("\n")
|
||||
in_code = False
|
||||
code_buf = []
|
||||
|
||||
for line in lines:
|
||||
if line.startswith("```"):
|
||||
if not in_code:
|
||||
in_code = True
|
||||
code_buf = []
|
||||
else:
|
||||
in_code = False
|
||||
try:
|
||||
story.append(Paragraph(
|
||||
"<br/>".join(l.replace("&", "&").replace("<", "<").replace(">", ">")
|
||||
for l in code_buf),
|
||||
code_style
|
||||
))
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
if in_code:
|
||||
code_buf.append(line)
|
||||
continue
|
||||
|
||||
if re.match(r'^[-*_]{3,}$', line.strip()):
|
||||
story.append(HRFlowable(width="100%", thickness=0.5,
|
||||
color=colors.grey, spaceAfter=6))
|
||||
continue
|
||||
|
||||
try:
|
||||
if line.startswith("### "):
|
||||
story.append(Paragraph(md_to_para(line[4:]), h3))
|
||||
elif line.startswith("## "):
|
||||
story.append(Paragraph(md_to_para(line[3:]), h2))
|
||||
elif line.startswith("# "):
|
||||
story.append(Paragraph(md_to_para(line[2:]), h1))
|
||||
elif re.match(r'^[-*+] ', line):
|
||||
story.append(Paragraph("• " + md_to_para(line[2:]), bullet_style))
|
||||
elif re.match(r'^\d+\. ', line):
|
||||
story.append(Paragraph(md_to_para(line), bullet_style))
|
||||
elif line.strip() == "":
|
||||
story.append(Spacer(1, 6))
|
||||
else:
|
||||
story.append(Paragraph(md_to_para(line), normal))
|
||||
except Exception:
|
||||
try:
|
||||
story.append(Paragraph(line[:300], normal))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
doc.build(story)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
async def generate_diff_summary(
|
||||
topic: str,
|
||||
new_urls: set,
|
||||
old_urls: set,
|
||||
new_chunks: list,
|
||||
session_id: int,
|
||||
db,
|
||||
) -> str | None:
|
||||
from src.config import settings
|
||||
import structlog
|
||||
diff_logger = structlog.get_logger()
|
||||
|
||||
added_urls = new_urls - old_urls
|
||||
pct_new = len(added_urls) / max(len(new_urls), 1)
|
||||
|
||||
diff_logger.info("Diff analysis", topic=topic,
|
||||
new_urls=len(new_urls), old_urls=len(old_urls),
|
||||
added=len(added_urls), pct_new=round(pct_new, 2))
|
||||
|
||||
if pct_new < 0.20 and len(added_urls) < 5:
|
||||
diff_logger.info("Diff: no significant new sources", topic=topic)
|
||||
return None
|
||||
|
||||
if not new_chunks:
|
||||
return None
|
||||
|
||||
context = "\n\n---\n\n".join(
|
||||
f"[{c.get('source_type', 'web').upper()}] {c.get('title', '')}\n{c['content'][:400]}"
|
||||
for c in new_chunks[:20]
|
||||
)
|
||||
|
||||
if not settings.anthropic_api_key:
|
||||
return (
|
||||
f"📊 *Novedades detectadas sobre {topic}*\n\n"
|
||||
f"• {len(added_urls)} fuentes nuevas encontradas\n"
|
||||
f"• {len(new_chunks)} chunks de contenido procesados\n\n"
|
||||
f"Usa /generate report para ver el análisis completo."
|
||||
)
|
||||
|
||||
try:
|
||||
import anthropic
|
||||
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||||
prompt = (
|
||||
f'Analiza el siguiente material de investigación sobre "{topic}" '
|
||||
f'y genera un resumen BREVE (máximo 300 palabras) de las novedades '
|
||||
f'más importantes encontradas. Escribe en español.\n\n'
|
||||
f'Si el contenido es muy similar a investigaciones anteriores o no '
|
||||
f'contiene información genuinamente nueva, responde SOLO con: '
|
||||
f'"SIN_NOVEDADES"\n\n'
|
||||
f'Material nuevo:\n{context}'
|
||||
)
|
||||
msg = await client.messages.create(
|
||||
model=settings.claude_model,
|
||||
max_tokens=500,
|
||||
messages=[{"role": "user", "content": prompt}]
|
||||
)
|
||||
summary = msg.content[0].text.strip()
|
||||
|
||||
if summary == "SIN_NOVEDADES":
|
||||
diff_logger.info("Diff: Claude found no new information", topic=topic)
|
||||
return None
|
||||
|
||||
try:
|
||||
await db.log_api_call(session_id, "diff", settings.claude_model,
|
||||
msg.usage.input_tokens, msg.usage.output_tokens)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return f"🔔 *Novedades — {topic}*\n\n{summary}\n\nUsa /generate para report completo."
|
||||
|
||||
except Exception as e:
|
||||
diff_logger.warning("Diff summary generation failed", error=str(e))
|
||||
return (
|
||||
f"📊 *Actualización — {topic}*\n\n"
|
||||
f"• {len(added_urls)} fuentes nuevas\n"
|
||||
f"Usa /generate report para ver el análisis completo."
|
||||
)
|
||||
|
||||
|
||||
async def generate_comparison(
|
||||
topic_a: str,
|
||||
topic_b: str,
|
||||
context_a: str,
|
||||
context_b: str,
|
||||
session_id_a: int,
|
||||
db,
|
||||
) -> str:
|
||||
from src.config import settings
|
||||
import structlog
|
||||
cmp_logger = structlog.get_logger()
|
||||
|
||||
if not settings.anthropic_api_key:
|
||||
raise ValueError("Claude API key required for comparison")
|
||||
|
||||
def _truncate(text: str, max_words: int = 3000) -> str:
|
||||
words = text.split()
|
||||
if len(words) > max_words:
|
||||
return " ".join(words[:max_words]) + "\n\n[... contenido adicional truncado ...]"
|
||||
return text
|
||||
|
||||
context_a = _truncate(context_a)
|
||||
context_b = _truncate(context_b)
|
||||
|
||||
prompt = (
|
||||
f'Eres un analista experto. Compara en profundidad estos dos temas:\n'
|
||||
f'TEMA A: "{topic_a}"\n'
|
||||
f'TEMA B: "{topic_b}"\n\n'
|
||||
f'Escribe el análisis en español con esta estructura:\n\n'
|
||||
f'## Resumen comparativo\n'
|
||||
f'(2-3 párrafos con las diferencias y similitudes más importantes)\n\n'
|
||||
f'## {topic_a}\n'
|
||||
f'(Puntos clave únicos de este tema)\n\n'
|
||||
f'## {topic_b}\n'
|
||||
f'(Puntos clave únicos de este tema)\n\n'
|
||||
f'## Similitudes\n'
|
||||
f'(Qué tienen en común)\n\n'
|
||||
f'## Diferencias clave\n'
|
||||
f'(Tabla markdown o lista de las diferencias más relevantes)\n\n'
|
||||
f'## Conclusión\n'
|
||||
f'(Cuál es mejor/más relevante según el contexto, o qué conclusión se extrae)\n\n'
|
||||
f'---\n'
|
||||
f'MATERIAL DE INVESTIGACIÓN — {topic_a}:\n{context_a}\n\n'
|
||||
f'---\n'
|
||||
f'MATERIAL DE INVESTIGACIÓN — {topic_b}:\n{context_b}\n'
|
||||
)
|
||||
|
||||
try:
|
||||
import anthropic
|
||||
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||||
msg = await client.messages.create(
|
||||
model=settings.claude_model,
|
||||
max_tokens=8192,
|
||||
messages=[{"role": "user", "content": prompt}]
|
||||
)
|
||||
try:
|
||||
await db.log_api_call(
|
||||
session_id_a, "comparison", settings.claude_model,
|
||||
msg.usage.input_tokens, msg.usage.output_tokens
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return msg.content[0].text.strip()
|
||||
except Exception as e:
|
||||
cmp_logger.error("Comparison generation failed", error=str(e))
|
||||
raise
|
||||
|
||||
@@ -129,6 +129,8 @@ class ContentProcessor:
|
||||
scraped = [s for s in sources if s["status"] == "scraped"]
|
||||
|
||||
logger.info("Processing sources", total=len(scraped))
|
||||
scraped = await self._dedup_sources(session_id, scraped)
|
||||
logger.info("After dedup", unique=len(scraped))
|
||||
total_chunks = 0
|
||||
total_words = 0
|
||||
|
||||
@@ -161,6 +163,56 @@ class ContentProcessor:
|
||||
|
||||
return {"total_chunks": total_chunks, "total_words": total_words}
|
||||
|
||||
async def _dedup_sources(self, session_id: int,
|
||||
scraped: list[dict]) -> list[dict]:
|
||||
try:
|
||||
import hashlib
|
||||
seen_hashes: set = set()
|
||||
seen_prefixes: list = []
|
||||
unique: list = []
|
||||
duplicates = 0
|
||||
|
||||
for source in scraped:
|
||||
content = await self.db.get_source_content(source["id"])
|
||||
if not content:
|
||||
unique.append(source)
|
||||
continue
|
||||
|
||||
content_hash = hashlib.md5(content[:2000].encode()).hexdigest()
|
||||
if content_hash in seen_hashes:
|
||||
duplicates += 1
|
||||
await self.db.update_source(source["id"], status="skipped")
|
||||
continue
|
||||
seen_hashes.add(content_hash)
|
||||
|
||||
prefix = content[:300].strip().lower()
|
||||
prefix_words = set(prefix.split())
|
||||
is_dup = False
|
||||
if len(prefix_words) >= 10:
|
||||
for seen_prefix_words in seen_prefixes:
|
||||
intersection = len(prefix_words & seen_prefix_words)
|
||||
union = len(prefix_words | seen_prefix_words)
|
||||
if intersection / max(union, 1) > 0.85:
|
||||
is_dup = True
|
||||
break
|
||||
|
||||
if is_dup:
|
||||
duplicates += 1
|
||||
await self.db.update_source(source["id"], status="skipped")
|
||||
continue
|
||||
|
||||
seen_prefixes.append(prefix_words)
|
||||
unique.append(source)
|
||||
|
||||
if duplicates > 0:
|
||||
logger.info("Dedup complete", session_id=session_id,
|
||||
original=len(scraped), duplicates=duplicates,
|
||||
unique=len(unique))
|
||||
return unique
|
||||
except Exception as e:
|
||||
logger.warning("Dedup failed, processing all sources", error=str(e))
|
||||
return scraped
|
||||
|
||||
async def _process_source(self, session_id: int, topic: str, source: dict) -> int:
|
||||
"""Chunk, score, embed and store a single source. Returns chunk count."""
|
||||
source_id = source["id"]
|
||||
@@ -182,7 +234,7 @@ class ContentProcessor:
|
||||
if words < 30:
|
||||
continue
|
||||
|
||||
quality = await self._score_quality(chunk, topic)
|
||||
quality = await self._score_quality(chunk, topic, session_id)
|
||||
if quality < settings.quality_threshold:
|
||||
filtered_quality += 1
|
||||
logger.debug("Chunk filtered by quality", source_id=source_id,
|
||||
@@ -215,16 +267,20 @@ class ContentProcessor:
|
||||
logger.info("Source processed", source_id=source_id, stored=stored)
|
||||
return stored
|
||||
|
||||
async def _score_quality(self, chunk: str, topic: str) -> float:
|
||||
async def _score_quality(self, chunk: str, topic: str,
|
||||
session_id: int | None = None) -> float:
|
||||
"""Score 0-1 relevance to topic. Uses Claude Haiku if API key set, else Ollama."""
|
||||
if settings.anthropic_api_key:
|
||||
return await self._score_with_claude(chunk, topic)
|
||||
return await self._score_with_claude(chunk, topic, session_id)
|
||||
return await self._score_with_ollama(chunk, topic)
|
||||
|
||||
async def _score_with_claude(self, chunk: str, topic: str) -> float:
|
||||
async def _score_with_claude(self, chunk: str, topic: str,
|
||||
session_id: int | None = None) -> float:
|
||||
import anthropic
|
||||
prompt = (
|
||||
f'Rate 0-10 how relevant this text is to the topic "{topic}". '
|
||||
f'Be generous — if the text is tangentially related, score 4+. '
|
||||
f'Only score below 3 if completely unrelated. '
|
||||
f'Reply with only a number.\n\nText:\n{chunk[:500]}'
|
||||
)
|
||||
try:
|
||||
@@ -234,6 +290,14 @@ class ContentProcessor:
|
||||
max_tokens=10,
|
||||
messages=[{"role": "user", "content": prompt}]
|
||||
)
|
||||
if session_id is not None:
|
||||
try:
|
||||
await self.db.log_api_call(
|
||||
session_id, "scoring", settings.claude_model,
|
||||
msg.usage.input_tokens, msg.usage.output_tokens
|
||||
)
|
||||
except Exception as log_err:
|
||||
logger.warning("Failed to log API usage", error=str(log_err))
|
||||
response = msg.content[0].text.strip()
|
||||
numbers = re.findall(r'\b(\d+(?:\.\d+)?)\b', response)
|
||||
if numbers:
|
||||
@@ -241,7 +305,7 @@ class ContentProcessor:
|
||||
normalized = min(1.0, score / 10.0)
|
||||
logger.debug("Claude relevance score", raw=score, normalized=round(normalized, 2))
|
||||
return normalized
|
||||
return 0.6
|
||||
return 0.5
|
||||
except Exception as e:
|
||||
logger.warning("Claude scoring failed, falling back to Ollama", error=str(e))
|
||||
return await self._score_with_ollama(chunk, topic)
|
||||
@@ -275,7 +339,7 @@ class ContentProcessor:
|
||||
query_embedding = await self.ollama.embed(query)
|
||||
|
||||
# Get top quality chunks
|
||||
chunks = await self.db.get_top_chunks(session_id, limit=100)
|
||||
chunks = await self.db.get_top_chunks(session_id, limit=300)
|
||||
|
||||
if query_embedding and chunks:
|
||||
# Rank by embedding similarity
|
||||
|
||||
+170
-35
@@ -3,6 +3,7 @@ ResearchOwl Exhaustive Scraper
|
||||
Core engine: discovers, expands, and evaluates sources recursively
|
||||
"""
|
||||
import asyncio
|
||||
import random
|
||||
import re
|
||||
import time
|
||||
from typing import Optional
|
||||
@@ -54,6 +55,22 @@ REDDIT_RE = re.compile(r"reddit\.com/(r/\w+/comments/\w+)")
|
||||
WIKIPEDIA_RE = re.compile(r"wikipedia\.org/wiki/(.+)")
|
||||
|
||||
|
||||
async def fetch_with_retry(fetch_fn, source_name: str, max_retries: int = 3):
|
||||
last_exc = None
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
return await fetch_fn()
|
||||
except Exception as e:
|
||||
last_exc = e
|
||||
if attempt < max_retries - 1:
|
||||
wait = 2 ** attempt + random.random()
|
||||
logger.debug("fetch_with_retry backoff", source=source_name[:60],
|
||||
attempt=attempt + 1, wait=round(wait, 1), error=str(e))
|
||||
await asyncio.sleep(wait)
|
||||
logger.warning("fetch_with_retry exhausted", source=source_name[:60], error=str(last_exc))
|
||||
raise last_exc
|
||||
|
||||
|
||||
def detect_source_type(url: str) -> str:
|
||||
if YOUTUBE_RE.search(url):
|
||||
return "youtube"
|
||||
@@ -140,16 +157,15 @@ class ExhaustiveScraper:
|
||||
"""Initial broad search across multiple sources"""
|
||||
logger.info("Seeding research", topic=self.topic)
|
||||
tasks = [
|
||||
self._seed_duckduckgo(),
|
||||
self._seed_search(),
|
||||
self._seed_wikipedia(),
|
||||
self._seed_reddit(),
|
||||
self._seed_youtube(),
|
||||
]
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
async def _seed_duckduckgo(self):
|
||||
"""Multiple DDG queries — fresh DDGS() per query to avoid cascading ratelimits"""
|
||||
queries = [
|
||||
async def _generate_ddg_queries(self) -> list[str]:
|
||||
fallback = [
|
||||
self.topic,
|
||||
f"{self.topic} history facts",
|
||||
f"{self.topic} evidence analysis",
|
||||
@@ -159,26 +175,113 @@ class ExhaustiveScraper:
|
||||
f"{self.topic} documentary",
|
||||
f"{self.topic} research study",
|
||||
]
|
||||
|
||||
if not settings.anthropic_api_key:
|
||||
return fallback
|
||||
|
||||
try:
|
||||
import anthropic
|
||||
logger.info("Generating DDG queries with Claude", topic=self.topic)
|
||||
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||||
prompt = (
|
||||
f'Generate exactly 8 DuckDuckGo search queries to research: "{self.topic}"\n\n'
|
||||
f'Rules:\n'
|
||||
f'- Each query must be specific and distinct — no generic templates\n'
|
||||
f'- Cover different angles: facts, history, official sources, criticism, '
|
||||
f'technical details, recent developments, expert opinions, primary sources\n'
|
||||
f'- Use the most specific terminology for this topic\n'
|
||||
f'- Include the topic language naturally (if topic is in Spanish, '
|
||||
f'mix Spanish and English queries for broader coverage)\n'
|
||||
f'- Output ONLY the 8 queries, one per line, no numbering, '
|
||||
f'no explanations, no markdown\n'
|
||||
)
|
||||
msg = await client.messages.create(
|
||||
model=settings.claude_model,
|
||||
max_tokens=300,
|
||||
messages=[{"role": "user", "content": prompt}]
|
||||
)
|
||||
raw = msg.content[0].text.strip()
|
||||
queries = [q.strip() for q in raw.split('\n') if q.strip()]
|
||||
if self.topic not in queries:
|
||||
queries = [self.topic] + queries[:7]
|
||||
queries = queries[:8]
|
||||
logger.info("DDG queries generated by Claude", queries=queries)
|
||||
return queries
|
||||
except Exception as e:
|
||||
logger.warning("Claude query generation failed, using fallback",
|
||||
error=str(e), error_type=type(e).__name__)
|
||||
return fallback
|
||||
|
||||
async def _search_searxng(self, query: str) -> list[dict]:
|
||||
"""Busca en SearXNG y retorna lista de {href, title}. Retorna [] si no disponible."""
|
||||
import aiohttp
|
||||
searxng_url = "http://searxng-svc.researchowl.svc.cluster.local:8080/search"
|
||||
params = {
|
||||
"q": query,
|
||||
"format": "json",
|
||||
"engines": "duckduckgo,google,bing,brave",
|
||||
"language": "all",
|
||||
}
|
||||
headers = {
|
||||
"Accept": "application/json",
|
||||
"X-Forwarded-For": "127.0.0.1",
|
||||
"User-Agent": "ResearchOwl/1.0",
|
||||
}
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
searxng_url,
|
||||
params=params,
|
||||
headers=headers,
|
||||
timeout=aiohttp.ClientTimeout(total=15)
|
||||
) as resp:
|
||||
if resp.status == 200:
|
||||
data = await resp.json()
|
||||
results = data.get("results", [])
|
||||
logger.info("SearXNG query ok", query=query, results=len(results))
|
||||
return [
|
||||
{"href": r.get("url", ""), "title": r.get("title", "")}
|
||||
for r in results
|
||||
if r.get("url")
|
||||
]
|
||||
else:
|
||||
logger.warning("SearXNG non-200", status=resp.status, query=query)
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.warning("SearXNG failed", query=query, error=str(e))
|
||||
return []
|
||||
|
||||
async def _seed_search(self):
|
||||
"""SearXNG primary + DDG fallback per query"""
|
||||
queries = await self._generate_ddg_queries()
|
||||
for query in queries:
|
||||
if self._stop:
|
||||
break
|
||||
try:
|
||||
# Fresh instance per query — a ratelimit on one won't poison the rest
|
||||
with DDGS() as ddgs:
|
||||
results = list(ddgs.text(query, max_results=settings.max_pages_per_search))
|
||||
for r in results:
|
||||
url = normalize_url(r.get("href", ""))
|
||||
if url and not is_blacklisted(url):
|
||||
await self.db.add_source(
|
||||
self.session_id, url,
|
||||
detect_source_type(url),
|
||||
depth=0,
|
||||
title=r.get("title")
|
||||
)
|
||||
logger.info("DDG query ok", query=query, results=len(results))
|
||||
except Exception as e:
|
||||
logger.warning("DDG query failed", query=query, error=str(e))
|
||||
await asyncio.sleep(settings.request_delay * 2)
|
||||
results = await self._search_searxng(query)
|
||||
if not results:
|
||||
logger.info("SearXNG vacío, usando DDG", query=query)
|
||||
try:
|
||||
with DDGS() as ddgs:
|
||||
ddg_results = list(ddgs.text(
|
||||
query,
|
||||
max_results=settings.max_pages_per_search
|
||||
))
|
||||
results = ddg_results
|
||||
logger.info("DDG fallback ok", query=query, results=len(results))
|
||||
except Exception as e:
|
||||
logger.warning("DDG fallback failed", query=query, error=str(e))
|
||||
results = []
|
||||
|
||||
for r in results:
|
||||
url = normalize_url(r.get("href", ""))
|
||||
if url and not is_blacklisted(url):
|
||||
await self.db.add_source(
|
||||
self.session_id, url,
|
||||
detect_source_type(url),
|
||||
depth=0,
|
||||
title=r.get("title")
|
||||
)
|
||||
await asyncio.sleep(random.uniform(1, 3))
|
||||
|
||||
async def _seed_wikipedia(self):
|
||||
"""Search Wikipedia API for correct article URLs.
|
||||
@@ -290,12 +393,7 @@ class ExhaustiveScraper:
|
||||
)
|
||||
|
||||
if self.progress_callback:
|
||||
await self.progress_callback(
|
||||
iteration=self.iteration,
|
||||
total=self.total_sources,
|
||||
new_this_round=new_sources,
|
||||
stats=stats
|
||||
)
|
||||
await self.progress_callback(self.iteration, self.total_sources)
|
||||
|
||||
# Saturation check: if we found very few new URLs, we're done
|
||||
if new_sources < 3 and self.iteration > 2:
|
||||
@@ -316,10 +414,31 @@ class ExhaustiveScraper:
|
||||
source_id = source["id"]
|
||||
|
||||
try:
|
||||
try:
|
||||
cached = await self.db.get_cached_content(url)
|
||||
except Exception as cache_err:
|
||||
logger.warning("Cache lookup failed", url=url, error=str(cache_err))
|
||||
cached = None
|
||||
|
||||
if cached:
|
||||
logger.debug("Cache hit", url=url)
|
||||
await self.db.save_source_content(source_id, cached)
|
||||
await self.db.update_source(
|
||||
source_id,
|
||||
status="scraped",
|
||||
scraped_at=time.time(),
|
||||
word_count=len(cached.split()),
|
||||
)
|
||||
return 0
|
||||
|
||||
if source_type == "youtube":
|
||||
content, title = await self._extract_youtube(url)
|
||||
content, title = await fetch_with_retry(
|
||||
lambda: self._extract_youtube(url), url
|
||||
)
|
||||
elif source_type == "wikipedia":
|
||||
content, title, new_urls = await self._extract_wikipedia(url)
|
||||
content, title, new_urls = await fetch_with_retry(
|
||||
lambda: self._extract_wikipedia(url), url
|
||||
)
|
||||
added = 0
|
||||
for new_url in (new_urls or []):
|
||||
if self._url_is_relevant(new_url):
|
||||
@@ -331,13 +450,18 @@ class ExhaustiveScraper:
|
||||
await self._mark_scraped(source_id, content, title, url)
|
||||
return added
|
||||
elif source_type == "reddit":
|
||||
content, title = await self._extract_reddit(url)
|
||||
# Small delay between Reddit requests to avoid rate limiting
|
||||
content, title = await fetch_with_retry(
|
||||
lambda: self._extract_reddit(url), url
|
||||
)
|
||||
await asyncio.sleep(settings.request_delay)
|
||||
elif source_type == "pdf":
|
||||
content, title = await self._extract_pdf(url)
|
||||
content, title = await fetch_with_retry(
|
||||
lambda: self._extract_pdf(url), url
|
||||
)
|
||||
else:
|
||||
content, title, new_urls = await self._extract_web(url, source["depth"])
|
||||
content, title, new_urls = await fetch_with_retry(
|
||||
lambda: self._extract_web(url, source["depth"]), url
|
||||
)
|
||||
added = 0
|
||||
for new_url in (new_urls or []):
|
||||
if self._url_is_relevant(new_url):
|
||||
@@ -469,12 +593,23 @@ class ExhaustiveScraper:
|
||||
return None, None
|
||||
|
||||
video_id = match.group(1)
|
||||
try:
|
||||
transcript_list = YouTubeTranscriptApi.get_transcript(
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
def _fetch():
|
||||
return YouTubeTranscriptApi.get_transcript(
|
||||
video_id, languages=["en", "es", "en-US", "en-GB"]
|
||||
)
|
||||
|
||||
try:
|
||||
transcript_list = await asyncio.wait_for(
|
||||
loop.run_in_executor(None, _fetch),
|
||||
timeout=30.0
|
||||
)
|
||||
text = " ".join(t["text"] for t in transcript_list)
|
||||
return text, f"YouTube: {video_id}"
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("YouTube transcript timed out", video_id=video_id)
|
||||
return None, None
|
||||
except NoTranscriptFound:
|
||||
return None, None
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user