Compare commits
45 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ca8c1a2846 | |||
| 30509dab4a | |||
| f6ad57e3c7 | |||
| 2a9962a1bd | |||
| 0e52b404bf | |||
| 3a9ab2848a | |||
| d3d22a1605 | |||
| 425639f423 | |||
| 7f3c2d0b49 | |||
| f577ac4712 | |||
| 747b9605c0 | |||
| caf763c23e | |||
| bdea12e6f2 | |||
| a6a90d3598 | |||
| 36984657a8 | |||
| 83eb2359be | |||
| 94d209dd8a | |||
| 7a156e2af1 | |||
| 279475a175 | |||
| 82e614e285 | |||
| aa83cfacbd | |||
| e8034f3f37 | |||
| c2bb301103 | |||
| 53cf7a04a8 | |||
| f4e167f3b6 | |||
| ba2b366534 | |||
| 4bef9d2d17 | |||
| 7a012c2c28 | |||
| 6aaa85a1f8 | |||
| e0a42f0b91 | |||
| 4c7f5b521b | |||
| c33bb5337d | |||
| 566f685578 | |||
| 8c259b2b2e | |||
| a47d7b26ca | |||
| e5b77ad72d | |||
| 0d8aee63be | |||
| b5518ac95a | |||
| b33ae202b8 | |||
| 65917518ce | |||
| a681627d2e | |||
| 7704f071d6 | |||
| e66d728d68 | |||
| 65b1739943 | |||
| 54b3841d32 |
@@ -27,6 +27,9 @@ jobs:
|
|||||||
- name: Log in to registry
|
- name: Log in to registry
|
||||||
run: echo "${{ secrets.CI_TOKEN }}" | docker login gitea.gitea.svc.cluster.local:3000 -u chemavx --password-stdin
|
run: echo "${{ secrets.CI_TOKEN }}" | docker login gitea.gitea.svc.cluster.local:3000 -u chemavx --password-stdin
|
||||||
|
|
||||||
|
- name: Clean previous buildx builder
|
||||||
|
run: docker buildx rm ci-builder 2>/dev/null || true
|
||||||
|
|
||||||
- name: Create buildx builder
|
- name: Create buildx builder
|
||||||
run: |
|
run: |
|
||||||
cat > /tmp/buildkitd.toml << 'EOF'
|
cat > /tmp/buildkitd.toml << 'EOF'
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ WORKDIR /app
|
|||||||
RUN apt-get update && apt-get install -y \
|
RUN apt-get update && apt-get install -y \
|
||||||
gcc g++ \
|
gcc g++ \
|
||||||
libxml2-dev libxslt-dev \
|
libxml2-dev libxslt-dev \
|
||||||
|
libfreetype6-dev \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
COPY requirements.txt .
|
COPY requirements.txt .
|
||||||
|
|||||||
@@ -0,0 +1,3 @@
|
|||||||
|
{
|
||||||
|
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
||||||
|
}
|
||||||
+9
-5
@@ -8,15 +8,15 @@ aiohttp==3.10.0
|
|||||||
# Scraping
|
# Scraping
|
||||||
beautifulsoup4==4.12.3
|
beautifulsoup4==4.12.3
|
||||||
lxml==5.2.2
|
lxml==5.2.2
|
||||||
trafilatura==1.12.0
|
trafilatura==1.12.2
|
||||||
youtube-transcript-api==0.6.2
|
youtube-transcript-api==0.6.2
|
||||||
pdfplumber==0.11.3
|
pdfplumber==0.11.9
|
||||||
feedparser==6.0.11
|
feedparser==6.0.12
|
||||||
duckduckgo-search==6.2.6
|
duckduckgo-search==6.2.6
|
||||||
|
|
||||||
# Storage & Embeddings
|
# Storage & Embeddings
|
||||||
sqlite-vec==0.1.6
|
sqlite-vec==0.1.9
|
||||||
aiosqlite==0.20.0
|
aiosqlite==0.22.1
|
||||||
|
|
||||||
# Processing
|
# Processing
|
||||||
tiktoken==0.7.0
|
tiktoken==0.7.0
|
||||||
@@ -26,6 +26,10 @@ scikit-learn==1.5.1
|
|||||||
# Claude API (scoring)
|
# Claude API (scoring)
|
||||||
anthropic>=0.40.0
|
anthropic>=0.40.0
|
||||||
|
|
||||||
|
# PDF export
|
||||||
|
markdown==3.7
|
||||||
|
reportlab==4.2.5
|
||||||
|
|
||||||
# Utilities
|
# Utilities
|
||||||
pydantic==2.8.0
|
pydantic==2.8.0
|
||||||
pydantic-settings==2.4.0
|
pydantic-settings==2.4.0
|
||||||
|
|||||||
+754
-87
@@ -4,6 +4,7 @@ Main user interface — all commands handled here
|
|||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
@@ -33,17 +34,29 @@ def is_authorized(user_id: int) -> bool:
|
|||||||
return not allowed or user_id in allowed
|
return not allowed or user_id in allowed
|
||||||
|
|
||||||
|
|
||||||
def fmt_progress(iteration: int, total: int, new: int, stats: dict) -> str:
|
class ProgressReporter:
|
||||||
scraped = stats.get("scraped") or 0
|
def __init__(self, reply_target: Message = None, *, bot=None, chat_id: int = None):
|
||||||
failed = stats.get("failed") or 0
|
self._reply_target = reply_target
|
||||||
pending = stats.get("pending") or 0
|
self._bot = bot
|
||||||
skipped = stats.get("skipped") or 0
|
self._chat_id = chat_id
|
||||||
return (
|
self._msg: Optional[Message] = None
|
||||||
f"🔄 *Iteration {iteration}*\n"
|
|
||||||
f"📚 Sources found: `{total}`\n"
|
async def start(self, text: str):
|
||||||
f"✅ Scraped: `{scraped}` | ⏭️ Skipped: `{skipped}` | ❌ Failed: `{failed}` | ⏳ Pending: `{pending}`\n"
|
if self._reply_target is not None:
|
||||||
f"🆕 New URLs this round: `{new}`"
|
self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN)
|
||||||
)
|
elif self._bot is not None and self._chat_id is not None:
|
||||||
|
self._msg = await self._bot.send_message(self._chat_id, text, parse_mode=ParseMode.MARKDOWN)
|
||||||
|
|
||||||
|
async def update(self, text: str):
|
||||||
|
if not self._msg:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
await self._msg.edit_text(text, parse_mode=ParseMode.MARKDOWN)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def done(self, text: str):
|
||||||
|
await self.update(text)
|
||||||
|
|
||||||
|
|
||||||
async def send_chunked(message: Message, text: str, parse_mode=None):
|
async def send_chunked(message: Message, text: str, parse_mode=None):
|
||||||
@@ -56,6 +69,72 @@ async def send_chunked(message: Message, text: str, parse_mode=None):
|
|||||||
await asyncio.sleep(0.5)
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Shared research logic ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
async def run_scheduled_research(bot, chat_id: int, topic: str,
|
||||||
|
session_id: int, db: ResearchDB,
|
||||||
|
progress_message=None,
|
||||||
|
silent_completion: bool = False):
|
||||||
|
if progress_message is not None:
|
||||||
|
reporter = ProgressReporter(progress_message)
|
||||||
|
else:
|
||||||
|
reporter = ProgressReporter(bot=bot, chat_id=chat_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await reporter.start(f"🔍 Iniciando scraping de `{topic}`…")
|
||||||
|
|
||||||
|
async def on_progress(iter_num, total_sources):
|
||||||
|
await reporter.update(
|
||||||
|
f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas"
|
||||||
|
)
|
||||||
|
|
||||||
|
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
|
||||||
|
final_stats = await scraper.run()
|
||||||
|
|
||||||
|
await db.update_session(session_id, status=ResearchStatus.SATURATED)
|
||||||
|
scraped = final_stats.get("scraped", 0)
|
||||||
|
|
||||||
|
await reporter.update(f"⚡ Procesando `{scraped}` fuentes…")
|
||||||
|
|
||||||
|
ollama = OllamaClient()
|
||||||
|
if await ollama.is_available():
|
||||||
|
processor = ContentProcessor(db, ollama)
|
||||||
|
|
||||||
|
async def proc_progress(total_chunks, total_words):
|
||||||
|
await reporter.update(
|
||||||
|
f"⚡ Scoring chunks… (`{total_chunks}` procesados)"
|
||||||
|
)
|
||||||
|
|
||||||
|
await processor.process_session(session_id, topic, proc_progress)
|
||||||
|
chunk_count = await db.get_chunks_count(session_id)
|
||||||
|
if silent_completion:
|
||||||
|
await reporter.done(
|
||||||
|
f"🔍 Investigación completada — analizando novedades…"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await reporter.done(
|
||||||
|
f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate <tipo>"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await reporter.done(
|
||||||
|
f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n"
|
||||||
|
f"Usa /generate para generar contenido."
|
||||||
|
)
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
await db.update_session(session_id, status=ResearchStatus.FINISHED)
|
||||||
|
try:
|
||||||
|
await reporter.done("🛑 Investigación cancelada.")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Research task failed", error=str(e))
|
||||||
|
try:
|
||||||
|
await reporter.done(f"❌ Error: {str(e)[:200]}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
# ─── Commands ─────────────────────────────────────────────────────────────────
|
# ─── Commands ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
@@ -68,9 +147,18 @@ async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
"`/status` — Check current research progress\n"
|
"`/status` — Check current research progress\n"
|
||||||
"`/finish` — Stop research and proceed to generation\n"
|
"`/finish` — Stop research and proceed to generation\n"
|
||||||
"`/process` — Manually trigger chunk processing\n"
|
"`/process` — Manually trigger chunk processing\n"
|
||||||
"`/generate <type>` — Generate output (podcast|blog|report|thread)\n"
|
"`/generate <type>` — Generate output\n"
|
||||||
|
" Tipos: podcast|blog|report|thread\n"
|
||||||
|
" Extended: podcast_extended|blog_extended|report_extended\n"
|
||||||
"`/sources` — List all sources found\n"
|
"`/sources` — List all sources found\n"
|
||||||
"`/outputs` — List generated outputs\n"
|
"`/outputs` — List generated outputs\n"
|
||||||
|
"`/export` — Exportar último output como PDF\n"
|
||||||
|
"`/publish` — Publicar último output en Ghost como borrador\n"
|
||||||
|
"`/compare <tema1> vs <tema2>` — Análisis comparativo\n"
|
||||||
|
"`/costs` — Show API usage costs\n"
|
||||||
|
"`/watch <topic> [h]` — Schedule periodic research\n"
|
||||||
|
"`/unwatch <topic>` — Remove a watch\n"
|
||||||
|
"`/watches` — List your watched topics\n"
|
||||||
"`/cancel` — Cancel current research\n"
|
"`/cancel` — Cancel current research\n"
|
||||||
"`/help` — Show this message",
|
"`/help` — Show this message",
|
||||||
parse_mode=ParseMode.MARKDOWN
|
parse_mode=ParseMode.MARKDOWN
|
||||||
@@ -91,88 +179,22 @@ async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Check for existing active research
|
|
||||||
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
||||||
await update.message.reply_text(
|
await update.message.reply_text(
|
||||||
"⚠️ Research already in progress. Use /status or /finish first."
|
"⚠️ Research already in progress. Use /status or /finish first."
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
msg = await update.message.reply_text(
|
|
||||||
f"🦉 *ResearchOwl* starting research on:\n`{topic}`\n\n"
|
|
||||||
f"🌱 Seeding sources from:\n"
|
|
||||||
f"• DuckDuckGo (8 queries)\n"
|
|
||||||
f"• Wikipedia + internal links\n"
|
|
||||||
f"• Reddit top posts\n"
|
|
||||||
f"• YouTube transcripts\n\n"
|
|
||||||
f"This will run exhaustively until saturation. Use /finish to stop early.",
|
|
||||||
parse_mode=ParseMode.MARKDOWN
|
|
||||||
)
|
|
||||||
|
|
||||||
async def run_research():
|
async def run_research():
|
||||||
db_conn = await get_db()
|
db_conn = await get_db()
|
||||||
db = ResearchDB(db_conn)
|
db = ResearchDB(db_conn)
|
||||||
try:
|
try:
|
||||||
session_id = await db.create_session(topic, chat_id)
|
session_id = await db.create_session(topic, chat_id)
|
||||||
_active_sessions[chat_id] = session_id
|
_active_sessions[chat_id] = session_id
|
||||||
|
await run_scheduled_research(
|
||||||
progress_msg = msg
|
ctx.bot, chat_id, topic, session_id, db,
|
||||||
iteration_count = [0]
|
progress_message=update.message
|
||||||
|
|
||||||
async def on_progress(iteration, total, new_this_round, stats):
|
|
||||||
iteration_count[0] = iteration
|
|
||||||
text = fmt_progress(iteration, total, new_this_round, stats)
|
|
||||||
try:
|
|
||||||
await progress_msg.edit_text(text, parse_mode=ParseMode.MARKDOWN)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
|
|
||||||
final_stats = await scraper.run()
|
|
||||||
|
|
||||||
await db.update_session(session_id, status=ResearchStatus.SATURATED)
|
|
||||||
|
|
||||||
scraped = final_stats.get("scraped", 0)
|
|
||||||
await update.message.reply_text(
|
|
||||||
f"✅ *Research complete!*\n\n"
|
|
||||||
f"📊 Results:\n"
|
|
||||||
f"• Sources found & scraped: `{scraped}`\n"
|
|
||||||
f"• Iterations: `{iteration_count[0]}`\n\n"
|
|
||||||
f"Now processing content with Ollama...\n"
|
|
||||||
f"Use `/generate podcast|blog|report|thread` when ready.",
|
|
||||||
parse_mode=ParseMode.MARKDOWN
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Auto-process after scraping
|
|
||||||
ollama = OllamaClient()
|
|
||||||
if await ollama.is_available():
|
|
||||||
processor = ContentProcessor(db, ollama)
|
|
||||||
|
|
||||||
async def proc_progress(total_chunks, total_words):
|
|
||||||
await update.message.reply_text(
|
|
||||||
f"🧠 *Processing complete!*\n"
|
|
||||||
f"• Chunks stored: `{total_chunks}`\n"
|
|
||||||
f"• Words researched: `{total_words:,}`\n\n"
|
|
||||||
f"Ready! Use `/generate podcast|blog|report|thread`",
|
|
||||||
parse_mode=ParseMode.MARKDOWN
|
|
||||||
)
|
|
||||||
|
|
||||||
await processor.process_session(session_id, topic, proc_progress)
|
|
||||||
else:
|
|
||||||
await update.message.reply_text(
|
|
||||||
"⚠️ Ollama not reachable — skipping processing.\n"
|
|
||||||
"You can still use `/generate` (will use raw content)."
|
|
||||||
)
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
await db.update_session(
|
|
||||||
_active_sessions.get(chat_id, 0),
|
|
||||||
status=ResearchStatus.FINISHED
|
|
||||||
)
|
|
||||||
await update.message.reply_text("🛑 Research cancelled.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Research task failed", error=str(e))
|
|
||||||
await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}")
|
|
||||||
finally:
|
finally:
|
||||||
await db_conn.close()
|
await db_conn.close()
|
||||||
|
|
||||||
@@ -255,6 +277,7 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
|
|
||||||
chat_id = update.effective_chat.id
|
chat_id = update.effective_chat.id
|
||||||
output_arg = ctx.args[0].lower() if ctx.args else ""
|
output_arg = ctx.args[0].lower() if ctx.args else ""
|
||||||
|
lang = "en" if len(ctx.args) > 1 and ctx.args[1].lower() == "en" else "es"
|
||||||
|
|
||||||
type_map = {
|
type_map = {
|
||||||
"podcast": OutputType.PODCAST,
|
"podcast": OutputType.PODCAST,
|
||||||
@@ -263,6 +286,10 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
"thread": OutputType.THREAD,
|
"thread": OutputType.THREAD,
|
||||||
"hilo": OutputType.THREAD,
|
"hilo": OutputType.THREAD,
|
||||||
"informe": OutputType.REPORT,
|
"informe": OutputType.REPORT,
|
||||||
|
"report_extended": OutputType.REPORT_EXTENDED,
|
||||||
|
"blog_extended": OutputType.BLOG_EXTENDED,
|
||||||
|
"podcast_extended": OutputType.PODCAST_EXTENDED,
|
||||||
|
"informe_extended": OutputType.REPORT_EXTENDED,
|
||||||
}
|
}
|
||||||
|
|
||||||
if output_arg not in type_map:
|
if output_arg not in type_map:
|
||||||
@@ -279,7 +306,14 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
db = ResearchDB(db_conn)
|
db = ResearchDB(db_conn)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Find last session for this chat
|
# Usa la sesión activa si existe, si no la más reciente
|
||||||
|
session_id = _active_sessions.get(chat_id)
|
||||||
|
if session_id:
|
||||||
|
cursor = await db_conn.execute(
|
||||||
|
"SELECT * FROM research_sessions WHERE id = ?",
|
||||||
|
(session_id,)
|
||||||
|
)
|
||||||
|
else:
|
||||||
cursor = await db_conn.execute(
|
cursor = await db_conn.execute(
|
||||||
"""SELECT * FROM research_sessions WHERE telegram_chat_id = ?
|
"""SELECT * FROM research_sessions WHERE telegram_chat_id = ?
|
||||||
ORDER BY created_at DESC LIMIT 1""",
|
ORDER BY created_at DESC LIMIT 1""",
|
||||||
@@ -293,9 +327,11 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
session = dict(row)
|
session = dict(row)
|
||||||
session_id = session["id"]
|
session_id = session["id"]
|
||||||
|
|
||||||
|
backend = "Claude Haiku" if settings.anthropic_api_key else f"Ollama ({settings.ollama_model})"
|
||||||
|
lang_label = " (EN)" if lang == "en" else ""
|
||||||
msg = await update.message.reply_text(
|
msg = await update.message.reply_text(
|
||||||
f"⚙️ Generating *{output_type}* for: `{session['topic']}`\n"
|
f"⚙️ Generating *{output_type}{lang_label}* for: `{session['topic']}`\n"
|
||||||
f"Using Ollama ({settings.ollama_model})...\n"
|
f"Using {backend}...\n"
|
||||||
f"This may take 2-5 minutes ☕",
|
f"This may take 2-5 minutes ☕",
|
||||||
parse_mode=ParseMode.MARKDOWN
|
parse_mode=ParseMode.MARKDOWN
|
||||||
)
|
)
|
||||||
@@ -310,18 +346,26 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
processor = ContentProcessor(db, ollama)
|
processor = ContentProcessor(db, ollama)
|
||||||
generator = OutputGenerator(db, ollama, processor)
|
generator = OutputGenerator(db, ollama, processor)
|
||||||
|
|
||||||
output = await generator.generate(session_id, output_type, gen_progress)
|
output = await generator.generate(session_id, output_type, gen_progress, lang=lang)
|
||||||
|
|
||||||
# Send as file if very long
|
# Send as file if very long
|
||||||
if len(output) > 8000:
|
if len(output) > 8000:
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import re as _re
|
||||||
ext_map = {
|
ext_map = {
|
||||||
OutputType.PODCAST: "script.md",
|
OutputType.PODCAST: "script.md",
|
||||||
OutputType.BLOG: "post.md",
|
OutputType.BLOG: "post.md",
|
||||||
OutputType.REPORT: "report.md",
|
OutputType.REPORT: "report.md",
|
||||||
OutputType.THREAD: "thread.txt",
|
OutputType.THREAD: "thread.txt",
|
||||||
|
OutputType.REPORT_EXTENDED: "report_extended.md",
|
||||||
|
OutputType.BLOG_EXTENDED: "blog_extended.md",
|
||||||
|
OutputType.PODCAST_EXTENDED: "script_extended.md",
|
||||||
}
|
}
|
||||||
filename = f"researchowl_{session['topic'][:30].replace(' ', '_')}_{ext_map[output_type]}"
|
# Use the topic from the output header (written at generation time)
|
||||||
|
# instead of the pre-fetched session dict which may be stale.
|
||||||
|
_m = _re.search(r'^Topic:\s*(.+)$', output[:500], _re.MULTILINE)
|
||||||
|
_topic = _m.group(1).strip() if _m else session["topic"]
|
||||||
|
filename = f"researchowl_{_topic[:30].replace(' ', '_')}_{ext_map[output_type]}"
|
||||||
|
|
||||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
|
||||||
f.write(output)
|
f.write(output)
|
||||||
@@ -339,6 +383,18 @@ async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
else:
|
else:
|
||||||
await send_chunked(update.message, output)
|
await send_chunked(update.message, output)
|
||||||
|
|
||||||
|
try:
|
||||||
|
stats = await db.get_usage_stats(session_id)
|
||||||
|
total_cost = sum(s.get("total_cost", 0) for s in stats)
|
||||||
|
if total_cost > settings.cost_alert_threshold:
|
||||||
|
await update.message.reply_text(
|
||||||
|
f"⚠️ Coste acumulado de esta sesión: `${total_cost:.4f}`"
|
||||||
|
f" (umbral: `${settings.cost_alert_threshold:.2f}`)",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Generate failed", error=str(e))
|
logger.error("Generate failed", error=str(e))
|
||||||
await update.message.reply_text(f"❌ Generation failed: {str(e)[:200]}")
|
await update.message.reply_text(f"❌ Generation failed: {str(e)[:200]}")
|
||||||
@@ -429,6 +485,169 @@ async def cmd_outputs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
await db_conn.close()
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_costs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
|
if not is_authorized(update.effective_user.id):
|
||||||
|
return
|
||||||
|
|
||||||
|
chat_id = update.effective_chat.id
|
||||||
|
db_conn = await get_db()
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor = await db_conn.execute(
|
||||||
|
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
||||||
|
(chat_id,)
|
||||||
|
)
|
||||||
|
row = await cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
await update.message.reply_text("No sessions found.")
|
||||||
|
return
|
||||||
|
|
||||||
|
session_id = row["id"]
|
||||||
|
topic = row["topic"]
|
||||||
|
|
||||||
|
by_type = {r["call_type"]: r for r in await db.get_usage_stats(session_id)}
|
||||||
|
totals = await db.get_total_usage_stats()
|
||||||
|
|
||||||
|
lines = [f"📊 *Costes ResearchOwl*\n"]
|
||||||
|
lines.append(f"Última sesión (`{topic}`):")
|
||||||
|
|
||||||
|
session_total = 0.0
|
||||||
|
for call_type, label in [("scoring", "Scoring"), ("generation", "Generación")]:
|
||||||
|
row_data = by_type.get(call_type)
|
||||||
|
if row_data:
|
||||||
|
calls = row_data["calls"]
|
||||||
|
tokens = row_data["total_tokens"]
|
||||||
|
cost = row_data["total_cost"]
|
||||||
|
session_total += cost
|
||||||
|
lines.append(f" {label}: {calls} llamadas · {tokens:,} tokens · ${cost:.4f}")
|
||||||
|
else:
|
||||||
|
lines.append(f" {label}: —")
|
||||||
|
|
||||||
|
lines.append(f" Total: ${session_total:.4f}")
|
||||||
|
lines.append("")
|
||||||
|
lines.append("Acumulado total:")
|
||||||
|
acc_cost = totals.get("total_cost") or 0.0
|
||||||
|
acc_sessions = totals.get("sessions") or 0
|
||||||
|
lines.append(f" ${acc_cost:.4f} ({acc_sessions} sesiones)")
|
||||||
|
|
||||||
|
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN)
|
||||||
|
finally:
|
||||||
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_watch(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
|
if not is_authorized(update.effective_user.id):
|
||||||
|
return
|
||||||
|
|
||||||
|
chat_id = update.effective_chat.id
|
||||||
|
args = ctx.args or []
|
||||||
|
|
||||||
|
if not args:
|
||||||
|
await update.message.reply_text(
|
||||||
|
"❌ Uso: `/watch <tema> [horas]`\nEjemplo: `/watch Incidente Roswell 24`",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
interval_hours = 24
|
||||||
|
if args[-1].isdigit():
|
||||||
|
interval_hours = int(args[-1])
|
||||||
|
topic = " ".join(args[:-1]).strip()
|
||||||
|
else:
|
||||||
|
topic = " ".join(args).strip()
|
||||||
|
|
||||||
|
if not topic:
|
||||||
|
await update.message.reply_text("❌ Debes especificar un tema.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not (1 <= interval_hours <= 168):
|
||||||
|
await update.message.reply_text(
|
||||||
|
"❌ El intervalo debe estar entre 1 y 168 horas (1 semana)."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
db_conn = await get_db()
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
await db.add_watch(topic, chat_id, interval_hours)
|
||||||
|
await update.message.reply_text(
|
||||||
|
f"👁 Watching: `{topic}` — cada {interval_hours}h\n"
|
||||||
|
f"Primera ejecución en ~{interval_hours}h.\n"
|
||||||
|
f"Usa /watches para ver todos tus temas.",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
if "UNIQUE" in str(e):
|
||||||
|
await update.message.reply_text(
|
||||||
|
f"Ya estás watching `{topic}`", parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_unwatch(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
|
if not is_authorized(update.effective_user.id):
|
||||||
|
return
|
||||||
|
|
||||||
|
chat_id = update.effective_chat.id
|
||||||
|
topic = " ".join(ctx.args).strip() if ctx.args else ""
|
||||||
|
|
||||||
|
if not topic:
|
||||||
|
await update.message.reply_text(
|
||||||
|
"❌ Uso: `/unwatch <tema>`", parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
db_conn = await get_db()
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
try:
|
||||||
|
removed = await db.remove_watch(topic, chat_id)
|
||||||
|
if removed:
|
||||||
|
await update.message.reply_text(
|
||||||
|
f"✅ Ya no vigilas `{topic}`.", parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await update.message.reply_text(f"No estabas watching `{topic}`.")
|
||||||
|
finally:
|
||||||
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_watches(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
|
if not is_authorized(update.effective_user.id):
|
||||||
|
return
|
||||||
|
|
||||||
|
chat_id = update.effective_chat.id
|
||||||
|
db_conn = await get_db()
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
try:
|
||||||
|
watches = await db.list_watches(chat_id)
|
||||||
|
if not watches:
|
||||||
|
await update.message.reply_text(
|
||||||
|
"No tienes temas vigilados. Usa `/watch <tema>`",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
lines = ["👁 *Tus temas vigilados:*\n"]
|
||||||
|
for i, w in enumerate(watches, 1):
|
||||||
|
secs_remaining = max(0.0, w["next_run_at"] - now)
|
||||||
|
hours_remaining = secs_remaining / 3600
|
||||||
|
eta = f"{int(secs_remaining / 60)}min" if hours_remaining < 1 else f"{hours_remaining:.1f}h"
|
||||||
|
status = "✅" if w["enabled"] else "⏸"
|
||||||
|
lines.append(
|
||||||
|
f"{i}. {status} `{w['topic']}` — cada {w['interval_hours']}h · próxima en {eta}"
|
||||||
|
)
|
||||||
|
|
||||||
|
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN)
|
||||||
|
finally:
|
||||||
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
async def cmd_process(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
async def cmd_process(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
if not is_authorized(update.effective_user.id):
|
if not is_authorized(update.effective_user.id):
|
||||||
return
|
return
|
||||||
@@ -513,8 +732,448 @@ async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
|
|
||||||
# ─── Bot setup ────────────────────────────────────────────────────────────────
|
# ─── Bot setup ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
async def _purge_on_startup(app: Application) -> None:
|
||||||
|
db_conn = await get_db()
|
||||||
|
try:
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
result = await db.purge_old_sessions(30)
|
||||||
|
if result["sessions"] > 0:
|
||||||
|
logger.info("Startup purge done", **result)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Startup purge failed — bot continues", error=str(e))
|
||||||
|
finally:
|
||||||
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def _scheduler_loop(app: Application):
|
||||||
|
while True:
|
||||||
|
db_conn = None
|
||||||
|
try:
|
||||||
|
db_conn = await get_db()
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
due = await db.get_due_watches()
|
||||||
|
for watch in due:
|
||||||
|
chat_id = watch["chat_id"]
|
||||||
|
topic = watch["topic"]
|
||||||
|
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
||||||
|
continue
|
||||||
|
session_id = await db.create_session(topic, chat_id)
|
||||||
|
_active_sessions[chat_id] = session_id
|
||||||
|
await db.update_watch_run(watch["id"])
|
||||||
|
|
||||||
|
async def _task(c=chat_id, t=topic, s=session_id, w_id=watch["id"]):
|
||||||
|
inner_db_conn = await get_db()
|
||||||
|
inner_db = ResearchDB(inner_db_conn)
|
||||||
|
try:
|
||||||
|
await run_scheduled_research(app.bot, c, t, s, inner_db,
|
||||||
|
silent_completion=True)
|
||||||
|
|
||||||
|
prev_session = await inner_db.get_previous_session(c, t, s)
|
||||||
|
new_urls = await inner_db.get_session_urls(s)
|
||||||
|
old_urls = await inner_db.get_session_urls(prev_session["id"]) \
|
||||||
|
if prev_session else set()
|
||||||
|
new_chunks = await inner_db.get_top_chunks(s, limit=30)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from src.generator.generator import generate_diff_summary
|
||||||
|
summary = await generate_diff_summary(
|
||||||
|
t, new_urls, old_urls, new_chunks, s, inner_db
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Diff summary failed", error=str(e))
|
||||||
|
summary = (
|
||||||
|
f"📊 *Actualización disponible — {t}*\n\n"
|
||||||
|
f"Usa /generate report para ver el análisis completo."
|
||||||
|
)
|
||||||
|
|
||||||
|
if summary:
|
||||||
|
await app.bot.send_message(
|
||||||
|
c, summary, parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await app.bot.send_message(
|
||||||
|
c,
|
||||||
|
f"🔄 *{t}* — sin novedades significativas esta vez.",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
await inner_db_conn.close()
|
||||||
|
|
||||||
|
task = asyncio.create_task(_task())
|
||||||
|
_active_tasks[chat_id] = task
|
||||||
|
await app.bot.send_message(
|
||||||
|
chat_id,
|
||||||
|
f"🔄 Investigación automática iniciada: `{topic}`",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Scheduler loop error", error=str(e))
|
||||||
|
finally:
|
||||||
|
if db_conn:
|
||||||
|
try:
|
||||||
|
await db_conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
await asyncio.sleep(60)
|
||||||
|
|
||||||
|
|
||||||
|
async def _start_scheduler(app: Application) -> None:
|
||||||
|
asyncio.create_task(_scheduler_loop(app))
|
||||||
|
|
||||||
|
|
||||||
|
async def _on_startup(app: Application) -> None:
|
||||||
|
await _purge_on_startup(app)
|
||||||
|
await _start_scheduler(app)
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_export(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
|
if not is_authorized(update.effective_user.id):
|
||||||
|
return
|
||||||
|
|
||||||
|
chat_id = update.effective_chat.id
|
||||||
|
db_conn = await get_db()
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
|
||||||
|
try:
|
||||||
|
session = await db.get_latest_session(chat_id)
|
||||||
|
if not session:
|
||||||
|
await update.message.reply_text("No hay sesiones de investigación.")
|
||||||
|
return
|
||||||
|
|
||||||
|
session_id = session["id"]
|
||||||
|
topic = session["topic"]
|
||||||
|
|
||||||
|
outputs = await db.get_outputs(session_id)
|
||||||
|
if not outputs:
|
||||||
|
await update.message.reply_text(
|
||||||
|
"No hay outputs generados. Usa `/generate <tipo>` primero.",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
priority = [
|
||||||
|
"report_extended", "blog_extended", "podcast_extended",
|
||||||
|
"report", "blog", "podcast", "thread",
|
||||||
|
]
|
||||||
|
chosen = None
|
||||||
|
for ptype in priority:
|
||||||
|
for o in outputs:
|
||||||
|
if o["output_type"] == ptype:
|
||||||
|
chosen = o
|
||||||
|
break
|
||||||
|
if chosen:
|
||||||
|
break
|
||||||
|
if not chosen:
|
||||||
|
chosen = outputs[0]
|
||||||
|
|
||||||
|
msg = await update.message.reply_text(
|
||||||
|
f"📄 Generando PDF para `{topic}`…",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from src.generator.generator import generate_pdf
|
||||||
|
pdf_bytes = generate_pdf(chosen["content"], title=topic)
|
||||||
|
except ImportError:
|
||||||
|
await msg.edit_text("❌ reportlab no está instalado. Ejecuta: `pip install reportlab`")
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
await msg.edit_text(f"❌ Error generando PDF: {str(e)[:200]}")
|
||||||
|
return
|
||||||
|
|
||||||
|
safe_topic = topic[:40].replace(" ", "_").replace("/", "-")
|
||||||
|
filename = f"researchowl_{safe_topic}_{chosen['output_type']}.pdf"
|
||||||
|
|
||||||
|
import io
|
||||||
|
await update.message.reply_document(
|
||||||
|
document=io.BytesIO(pdf_bytes),
|
||||||
|
filename=filename,
|
||||||
|
caption=f"📄 *{chosen['output_type'].upper()}* — {topic}\nExportado por ResearchOwl 🦉",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await msg.delete()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Export failed", error=str(e))
|
||||||
|
await update.message.reply_text(f"❌ Export failed: {str(e)[:200]}")
|
||||||
|
finally:
|
||||||
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_purge(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
|
if not is_authorized(update.effective_user.id):
|
||||||
|
return
|
||||||
|
|
||||||
|
args = ctx.args or []
|
||||||
|
|
||||||
|
if not args:
|
||||||
|
days = 30
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
days = int(args[0])
|
||||||
|
except ValueError:
|
||||||
|
await update.message.reply_text(
|
||||||
|
"❌ Uso: `/purge [días]`", parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if days < 0:
|
||||||
|
await update.message.reply_text("❌ El número de días debe ser ≥ 0.")
|
||||||
|
return
|
||||||
|
if days == 0 and not (len(args) >= 2 and args[1] == "confirm"):
|
||||||
|
await update.message.reply_text(
|
||||||
|
"⚠️ Esto borrará *todas* las sesiones completadas.\n"
|
||||||
|
"Envía `/purge 0 confirm` para confirmar.",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
db_conn = await get_db()
|
||||||
|
try:
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
result = await db.purge_old_sessions(days)
|
||||||
|
await update.message.reply_text(
|
||||||
|
f"🗑️ Purged: {result['sessions']} sessions, "
|
||||||
|
f"{result['sources']} sources, "
|
||||||
|
f"{result['chunks']} chunks, "
|
||||||
|
f"{result['outputs']} outputs"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Purge command failed", error=str(e))
|
||||||
|
await update.message.reply_text(f"❌ Purge failed: {str(e)[:200]}")
|
||||||
|
finally:
|
||||||
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_publish(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
|
if not is_authorized(update.effective_user.id):
|
||||||
|
return
|
||||||
|
|
||||||
|
chat_id = update.effective_chat.id
|
||||||
|
db_conn = await get_db()
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from src.generator.generator import GhostPublisher, _extract_title
|
||||||
|
|
||||||
|
ghost = GhostPublisher()
|
||||||
|
if not ghost.is_configured():
|
||||||
|
await update.message.reply_text(
|
||||||
|
"❌ Ghost no configurado. Asegúrate de que `GHOST_URL` y `GHOST_API_KEY` están definidos.",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
session_id = _active_sessions.get(chat_id)
|
||||||
|
if session_id:
|
||||||
|
cursor = await db_conn.execute(
|
||||||
|
"SELECT * FROM research_sessions WHERE id = ?",
|
||||||
|
(session_id,)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
cursor = await db_conn.execute(
|
||||||
|
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
||||||
|
(chat_id,)
|
||||||
|
)
|
||||||
|
row = await cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
await update.message.reply_text("No hay sesiones. Usa /research primero.")
|
||||||
|
return
|
||||||
|
|
||||||
|
session = dict(row)
|
||||||
|
outputs = await db.get_outputs(session["id"])
|
||||||
|
if not outputs:
|
||||||
|
await update.message.reply_text(
|
||||||
|
"No hay outputs generados. Usa `/generate blog|report` primero.",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
priority = ["blog_extended", "blog", "report_extended", "report",
|
||||||
|
"podcast_extended", "podcast", "thread"]
|
||||||
|
chosen = None
|
||||||
|
for ptype in priority:
|
||||||
|
for o in outputs:
|
||||||
|
if o["output_type"] == ptype:
|
||||||
|
chosen = o
|
||||||
|
break
|
||||||
|
if chosen:
|
||||||
|
break
|
||||||
|
if not chosen:
|
||||||
|
chosen = outputs[-1]
|
||||||
|
|
||||||
|
msg = await update.message.reply_text("📤 Publicando en Ghost como borrador…")
|
||||||
|
|
||||||
|
title = _extract_title(chosen["content"]) or session["topic"]
|
||||||
|
result = await ghost.publish_draft(title, chosen["content"])
|
||||||
|
post = result["posts"][0]
|
||||||
|
admin_url = f"{ghost.url}/ghost/#/editor/post/{post['id']}"
|
||||||
|
|
||||||
|
await msg.edit_text(
|
||||||
|
f"✅ *Publicado en Ghost como borrador*\n\n"
|
||||||
|
f"📝 Título: `{title}`\n"
|
||||||
|
f"🔗 Editar: {admin_url}",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Publish to Ghost failed", error=str(e))
|
||||||
|
await update.message.reply_text(f"❌ Error publicando en Ghost: {str(e)[:200]}")
|
||||||
|
finally:
|
||||||
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_compare(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
|
if not is_authorized(update.effective_user.id):
|
||||||
|
return
|
||||||
|
|
||||||
|
chat_id = update.effective_chat.id
|
||||||
|
text = " ".join(ctx.args).strip() if ctx.args else ""
|
||||||
|
|
||||||
|
import re
|
||||||
|
match = re.split(r'\s+vs\.?\s+|\s+versus\s+', text, maxsplit=1, flags=re.IGNORECASE)
|
||||||
|
if len(match) != 2 or not match[0].strip() or not match[1].strip():
|
||||||
|
await update.message.reply_text(
|
||||||
|
"❌ Uso: `/compare <tema1> vs <tema2>`\n"
|
||||||
|
"Ejemplo: `/compare energía solar vs energía nuclear`",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
topic_a = match[0].strip()
|
||||||
|
topic_b = match[1].strip()
|
||||||
|
|
||||||
|
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
||||||
|
await update.message.reply_text(
|
||||||
|
"⚠️ Ya hay una investigación en curso. Usa /cancel primero."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = await update.message.reply_text(
|
||||||
|
f"🔍 Comparando `{topic_a}` vs `{topic_b}`…\n"
|
||||||
|
f"Esto lanzará dos investigaciones en paralelo y tardará varios minutos.",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
|
||||||
|
async def run_compare():
|
||||||
|
db_conn_a = await get_db()
|
||||||
|
db_conn_b = await get_db()
|
||||||
|
db_a = ResearchDB(db_conn_a)
|
||||||
|
db_b = ResearchDB(db_conn_b)
|
||||||
|
|
||||||
|
try:
|
||||||
|
session_id_a = await db_a.create_session(topic_a, chat_id)
|
||||||
|
session_id_b = await db_b.create_session(topic_b, chat_id)
|
||||||
|
_active_sessions[chat_id] = session_id_a
|
||||||
|
|
||||||
|
await msg.edit_text(
|
||||||
|
f"🔍 Investigando en paralelo:\n"
|
||||||
|
f"• `{topic_a}`\n"
|
||||||
|
f"• `{topic_b}`\n\n"
|
||||||
|
f"Esto puede tardar 10-20 minutos…",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
|
||||||
|
async def research_topic(session_id, topic, db):
|
||||||
|
scraper = ExhaustiveScraper(db, session_id, topic)
|
||||||
|
await scraper.run()
|
||||||
|
await db.update_session(session_id, status=ResearchStatus.SATURATED)
|
||||||
|
ollama = OllamaClient()
|
||||||
|
if await ollama.is_available():
|
||||||
|
processor = ContentProcessor(db, ollama)
|
||||||
|
await processor.process_session(session_id, topic)
|
||||||
|
|
||||||
|
await msg.edit_text(
|
||||||
|
f"🔍 Scraping en paralelo:\n• `{topic_a}`\n• `{topic_b}`…",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
|
||||||
|
await asyncio.gather(
|
||||||
|
research_topic(session_id_a, topic_a, db_a),
|
||||||
|
research_topic(session_id_b, topic_b, db_b),
|
||||||
|
)
|
||||||
|
|
||||||
|
await msg.edit_text(
|
||||||
|
"✍️ Generando análisis comparativo…",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
|
||||||
|
ollama = OllamaClient()
|
||||||
|
processor_a = ContentProcessor(db_a, ollama)
|
||||||
|
processor_b = ContentProcessor(db_b, ollama)
|
||||||
|
|
||||||
|
context_a = await processor_a.rag_query(session_id_a, topic_a, top_k=40)
|
||||||
|
context_b = await processor_b.rag_query(session_id_b, topic_b, top_k=40)
|
||||||
|
|
||||||
|
if not context_a:
|
||||||
|
chunks = await db_a.get_top_chunks(session_id_a, limit=20)
|
||||||
|
context_a = "\n\n---\n\n".join(c["content"] for c in chunks)
|
||||||
|
if not context_b:
|
||||||
|
chunks = await db_b.get_top_chunks(session_id_b, limit=20)
|
||||||
|
context_b = "\n\n---\n\n".join(c["content"] for c in chunks)
|
||||||
|
|
||||||
|
from src.generator.generator import generate_comparison
|
||||||
|
comparison = await generate_comparison(
|
||||||
|
topic_a, topic_b, context_a, context_b, session_id_a, db_a
|
||||||
|
)
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
||||||
|
header = (
|
||||||
|
f"---\n"
|
||||||
|
f"ResearchOwl | COMPARISON\n"
|
||||||
|
f"Topic A: {topic_a}\n"
|
||||||
|
f"Topic B: {topic_b}\n"
|
||||||
|
f"Generated: {now}\n"
|
||||||
|
f"---\n\n"
|
||||||
|
)
|
||||||
|
full_output = header + comparison
|
||||||
|
|
||||||
|
await db_a.save_output(session_id_a, OutputType.REPORT, full_output)
|
||||||
|
|
||||||
|
if len(full_output) > 8000:
|
||||||
|
import io
|
||||||
|
filename = (
|
||||||
|
f"compare_{topic_a[:20]}_{topic_b[:20]}.md"
|
||||||
|
.replace(" ", "_")
|
||||||
|
)
|
||||||
|
await update.message.reply_document(
|
||||||
|
document=io.BytesIO(full_output.encode()),
|
||||||
|
filename=filename,
|
||||||
|
caption=f"📊 Comparación: {topic_a} vs {topic_b}"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await msg.delete()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
await msg.edit_text(full_output, parse_mode=ParseMode.MARKDOWN)
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
await msg.edit_text("🛑 Comparación cancelada.")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Compare task failed", error=str(e))
|
||||||
|
try:
|
||||||
|
await msg.edit_text(f"❌ Error: {str(e)[:200]}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
await db_conn_a.close()
|
||||||
|
await db_conn_b.close()
|
||||||
|
|
||||||
|
task = asyncio.create_task(run_compare())
|
||||||
|
_active_tasks[chat_id] = task
|
||||||
|
|
||||||
|
|
||||||
def create_bot() -> Application:
|
def create_bot() -> Application:
|
||||||
app = Application.builder().token(settings.telegram_bot_token).build()
|
app = (
|
||||||
|
Application.builder()
|
||||||
|
.token(settings.telegram_bot_token)
|
||||||
|
.post_init(_on_startup)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
|
||||||
app.add_handler(CommandHandler("start", cmd_start))
|
app.add_handler(CommandHandler("start", cmd_start))
|
||||||
app.add_handler(CommandHandler("help", cmd_help))
|
app.add_handler(CommandHandler("help", cmd_help))
|
||||||
@@ -524,8 +1183,16 @@ def create_bot() -> Application:
|
|||||||
app.add_handler(CommandHandler("generate", cmd_generate))
|
app.add_handler(CommandHandler("generate", cmd_generate))
|
||||||
app.add_handler(CommandHandler("sources", cmd_sources))
|
app.add_handler(CommandHandler("sources", cmd_sources))
|
||||||
app.add_handler(CommandHandler("outputs", cmd_outputs))
|
app.add_handler(CommandHandler("outputs", cmd_outputs))
|
||||||
|
app.add_handler(CommandHandler("export", cmd_export))
|
||||||
|
app.add_handler(CommandHandler("costs", cmd_costs))
|
||||||
|
app.add_handler(CommandHandler("watch", cmd_watch))
|
||||||
|
app.add_handler(CommandHandler("unwatch", cmd_unwatch))
|
||||||
|
app.add_handler(CommandHandler("watches", cmd_watches))
|
||||||
app.add_handler(CommandHandler("process", cmd_process))
|
app.add_handler(CommandHandler("process", cmd_process))
|
||||||
app.add_handler(CommandHandler("cancel", cmd_cancel))
|
app.add_handler(CommandHandler("cancel", cmd_cancel))
|
||||||
|
app.add_handler(CommandHandler("purge", cmd_purge))
|
||||||
|
app.add_handler(CommandHandler("publish", cmd_publish))
|
||||||
|
app.add_handler(CommandHandler("compare", cmd_compare))
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
|
|||||||
+10
-1
@@ -31,7 +31,16 @@ class Settings(BaseSettings):
|
|||||||
# Processing
|
# Processing
|
||||||
chunk_size: int = Field(800, env="CHUNK_SIZE") # tokens per chunk
|
chunk_size: int = Field(800, env="CHUNK_SIZE") # tokens per chunk
|
||||||
chunk_overlap: int = Field(100, env="CHUNK_OVERLAP")
|
chunk_overlap: int = Field(100, env="CHUNK_OVERLAP")
|
||||||
quality_threshold: float = Field(0.5, env="QUALITY_THRESHOLD") # 0-1, chunks below discarded
|
quality_threshold: float = Field(0.3, env="QUALITY_THRESHOLD") # 0-1, chunks below discarded
|
||||||
|
|
||||||
|
# Ghost CMS
|
||||||
|
ghost_url: Optional[str] = Field(None, env="GHOST_URL")
|
||||||
|
ghost_api_key: Optional[str] = Field(None, env="GHOST_API_KEY")
|
||||||
|
ghost_url_en: str = Field("", env="GHOST_URL_EN")
|
||||||
|
ghost_api_key_en: str = Field("", env="GHOST_API_KEY_EN")
|
||||||
|
|
||||||
|
# Alerts
|
||||||
|
cost_alert_threshold: float = Field(0.15, env="COST_ALERT_THRESHOLD")
|
||||||
|
|
||||||
# App
|
# App
|
||||||
log_level: str = Field("INFO", env="LOG_LEVEL")
|
log_level: str = Field("INFO", env="LOG_LEVEL")
|
||||||
|
|||||||
@@ -5,8 +5,12 @@ from pathlib import Path
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
|
|
||||||
|
logger = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
class ResearchStatus(str, Enum):
|
class ResearchStatus(str, Enum):
|
||||||
RUNNING = "running"
|
RUNNING = "running"
|
||||||
@@ -20,6 +24,9 @@ class OutputType(str, Enum):
|
|||||||
BLOG = "blog"
|
BLOG = "blog"
|
||||||
REPORT = "report"
|
REPORT = "report"
|
||||||
THREAD = "thread"
|
THREAD = "thread"
|
||||||
|
REPORT_EXTENDED = "report_extended"
|
||||||
|
BLOG_EXTENDED = "blog_extended"
|
||||||
|
PODCAST_EXTENDED = "podcast_extended"
|
||||||
|
|
||||||
|
|
||||||
SCHEMA = """
|
SCHEMA = """
|
||||||
@@ -84,6 +91,29 @@ CREATE INDEX IF NOT EXISTS idx_sources_session ON sources(session_id);
|
|||||||
CREATE INDEX IF NOT EXISTS idx_chunks_session ON chunks(session_id);
|
CREATE INDEX IF NOT EXISTS idx_chunks_session ON chunks(session_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_chunks_quality ON chunks(session_id, quality_score DESC);
|
CREATE INDEX IF NOT EXISTS idx_chunks_quality ON chunks(session_id, quality_score DESC);
|
||||||
CREATE INDEX IF NOT EXISTS idx_source_contents ON source_contents(source_id);
|
CREATE INDEX IF NOT EXISTS idx_source_contents ON source_contents(source_id);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS api_usage (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
session_id INTEGER REFERENCES research_sessions(id),
|
||||||
|
call_type TEXT NOT NULL,
|
||||||
|
model TEXT NOT NULL,
|
||||||
|
input_tokens INTEGER NOT NULL,
|
||||||
|
output_tokens INTEGER NOT NULL,
|
||||||
|
cost_usd REAL NOT NULL,
|
||||||
|
created_at REAL NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS watched_topics (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
topic TEXT NOT NULL,
|
||||||
|
chat_id INTEGER NOT NULL,
|
||||||
|
interval_hours INTEGER NOT NULL DEFAULT 24,
|
||||||
|
next_run_at REAL NOT NULL,
|
||||||
|
last_run_at REAL,
|
||||||
|
enabled INTEGER NOT NULL DEFAULT 1,
|
||||||
|
created_at REAL NOT NULL,
|
||||||
|
UNIQUE(topic, chat_id)
|
||||||
|
);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
@@ -121,6 +151,35 @@ class ResearchDB:
|
|||||||
row = await cursor.fetchone()
|
row = await cursor.fetchone()
|
||||||
return dict(row) if row else None
|
return dict(row) if row else None
|
||||||
|
|
||||||
|
async def get_latest_session(self, chat_id: int) -> Optional[dict]:
|
||||||
|
cursor = await self.db.execute(
|
||||||
|
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
||||||
|
(chat_id,)
|
||||||
|
)
|
||||||
|
row = await cursor.fetchone()
|
||||||
|
return dict(row) if row else None
|
||||||
|
|
||||||
|
async def get_session_urls(self, session_id: int) -> set:
|
||||||
|
async with self.db.execute(
|
||||||
|
"SELECT url FROM sources WHERE session_id = ?", (session_id,)
|
||||||
|
) as cur:
|
||||||
|
rows = await cur.fetchall()
|
||||||
|
return {r[0] for r in rows}
|
||||||
|
|
||||||
|
async def get_previous_session(self, chat_id: int, topic: str,
|
||||||
|
exclude_session_id: int) -> Optional[dict]:
|
||||||
|
async with self.db.execute(
|
||||||
|
"""SELECT id, topic, status, created_at FROM research_sessions
|
||||||
|
WHERE telegram_chat_id = ? AND topic = ? AND id != ?
|
||||||
|
ORDER BY created_at DESC LIMIT 1""",
|
||||||
|
(chat_id, topic, exclude_session_id)
|
||||||
|
) as cur:
|
||||||
|
row = await cur.fetchone()
|
||||||
|
if not row:
|
||||||
|
return None
|
||||||
|
return {"id": row[0], "topic": row[1],
|
||||||
|
"status": row[2], "created_at": row[3]}
|
||||||
|
|
||||||
async def get_active_session(self, chat_id: int) -> Optional[dict]:
|
async def get_active_session(self, chat_id: int) -> Optional[dict]:
|
||||||
cursor = await self.db.execute(
|
cursor = await self.db.execute(
|
||||||
"""SELECT * FROM research_sessions
|
"""SELECT * FROM research_sessions
|
||||||
@@ -259,6 +318,19 @@ class ResearchDB:
|
|||||||
row = await cursor.fetchone()
|
row = await cursor.fetchone()
|
||||||
return row[0] if row else None
|
return row[0] if row else None
|
||||||
|
|
||||||
|
async def get_cached_content(self, url: str,
|
||||||
|
max_age_days: int = 7) -> Optional[str]:
|
||||||
|
threshold = time.time() - (max_age_days * 86400)
|
||||||
|
async with self.db.execute(
|
||||||
|
"""SELECT sc.content FROM source_contents sc
|
||||||
|
JOIN sources s ON s.id = sc.source_id
|
||||||
|
WHERE s.url = ? AND sc.created_at > ?
|
||||||
|
ORDER BY sc.created_at DESC LIMIT 1""",
|
||||||
|
(url, threshold)
|
||||||
|
) as cur:
|
||||||
|
row = await cur.fetchone()
|
||||||
|
return row[0] if row else None
|
||||||
|
|
||||||
async def get_outputs(self, session_id: int) -> list[dict]:
|
async def get_outputs(self, session_id: int) -> list[dict]:
|
||||||
cursor = await self.db.execute(
|
cursor = await self.db.execute(
|
||||||
"SELECT * FROM outputs WHERE session_id = ? ORDER BY created_at DESC",
|
"SELECT * FROM outputs WHERE session_id = ? ORDER BY created_at DESC",
|
||||||
@@ -266,3 +338,124 @@ class ResearchDB:
|
|||||||
)
|
)
|
||||||
rows = await cursor.fetchall()
|
rows = await cursor.fetchall()
|
||||||
return [dict(r) for r in rows]
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
# --- API Usage ---
|
||||||
|
|
||||||
|
async def log_api_call(self, session_id, call_type: str, model: str,
|
||||||
|
input_tokens: int, output_tokens: int):
|
||||||
|
# Precios Claude Haiku (claude-haiku-4-5):
|
||||||
|
# input: $0.80 / 1M tokens output: $4.00 / 1M tokens
|
||||||
|
cost = (input_tokens * 0.80 + output_tokens * 4.00) / 1_000_000
|
||||||
|
await self.db.execute(
|
||||||
|
"""INSERT INTO api_usage
|
||||||
|
(session_id, call_type, model, input_tokens, output_tokens, cost_usd, created_at)
|
||||||
|
VALUES (?,?,?,?,?,?,?)""",
|
||||||
|
(session_id, call_type, model, input_tokens, output_tokens, cost, time.time())
|
||||||
|
)
|
||||||
|
await self.db.commit()
|
||||||
|
|
||||||
|
async def get_usage_stats(self, session_id: int) -> list[dict]:
|
||||||
|
cursor = await self.db.execute(
|
||||||
|
"""SELECT call_type,
|
||||||
|
COUNT(*) as calls,
|
||||||
|
SUM(input_tokens + output_tokens) as total_tokens,
|
||||||
|
SUM(cost_usd) as total_cost
|
||||||
|
FROM api_usage WHERE session_id = ?
|
||||||
|
GROUP BY call_type""",
|
||||||
|
(session_id,)
|
||||||
|
)
|
||||||
|
rows = await cursor.fetchall()
|
||||||
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
async def get_total_usage_stats(self) -> dict:
|
||||||
|
cursor = await self.db.execute(
|
||||||
|
"""SELECT COUNT(DISTINCT session_id) as sessions,
|
||||||
|
SUM(cost_usd) as total_cost
|
||||||
|
FROM api_usage"""
|
||||||
|
)
|
||||||
|
row = await cursor.fetchone()
|
||||||
|
return dict(row) if row else {"sessions": 0, "total_cost": 0}
|
||||||
|
|
||||||
|
# --- Watched Topics ---
|
||||||
|
|
||||||
|
async def add_watch(self, topic: str, chat_id: int, interval_hours: int) -> int:
|
||||||
|
now = time.time()
|
||||||
|
cursor = await self.db.execute(
|
||||||
|
"""INSERT OR REPLACE INTO watched_topics
|
||||||
|
(topic, chat_id, interval_hours, next_run_at, created_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?)""",
|
||||||
|
(topic, chat_id, interval_hours, now + interval_hours * 3600, now)
|
||||||
|
)
|
||||||
|
await self.db.commit()
|
||||||
|
return cursor.lastrowid
|
||||||
|
|
||||||
|
async def remove_watch(self, topic: str, chat_id: int) -> bool:
|
||||||
|
cursor = await self.db.execute(
|
||||||
|
"DELETE FROM watched_topics WHERE topic = ? AND chat_id = ?",
|
||||||
|
(topic, chat_id)
|
||||||
|
)
|
||||||
|
await self.db.commit()
|
||||||
|
return cursor.rowcount > 0
|
||||||
|
|
||||||
|
async def list_watches(self, chat_id: int) -> list[dict]:
|
||||||
|
cursor = await self.db.execute(
|
||||||
|
"SELECT * FROM watched_topics WHERE chat_id = ? ORDER BY created_at ASC",
|
||||||
|
(chat_id,)
|
||||||
|
)
|
||||||
|
rows = await cursor.fetchall()
|
||||||
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
async def get_due_watches(self) -> list[dict]:
|
||||||
|
cursor = await self.db.execute(
|
||||||
|
"SELECT * FROM watched_topics WHERE enabled = 1 AND next_run_at <= ?",
|
||||||
|
(time.time(),)
|
||||||
|
)
|
||||||
|
rows = await cursor.fetchall()
|
||||||
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
async def update_watch_run(self, watch_id: int):
|
||||||
|
cursor = await self.db.execute(
|
||||||
|
"SELECT interval_hours FROM watched_topics WHERE id = ?", (watch_id,)
|
||||||
|
)
|
||||||
|
row = await cursor.fetchone()
|
||||||
|
if not row:
|
||||||
|
return
|
||||||
|
now = time.time()
|
||||||
|
await self.db.execute(
|
||||||
|
"UPDATE watched_topics SET last_run_at = ?, next_run_at = ? WHERE id = ?",
|
||||||
|
(now, now + row[0] * 3600, watch_id)
|
||||||
|
)
|
||||||
|
await self.db.commit()
|
||||||
|
|
||||||
|
# --- Maintenance ---
|
||||||
|
|
||||||
|
async def purge_old_sessions(self, max_age_days: int = 30) -> dict:
|
||||||
|
await self.db.execute("PRAGMA foreign_keys = ON")
|
||||||
|
|
||||||
|
threshold = time.time() - max_age_days * 86400
|
||||||
|
cursor = await self.db.execute(
|
||||||
|
"SELECT id FROM research_sessions WHERE created_at < ? AND status != 'running'",
|
||||||
|
(threshold,)
|
||||||
|
)
|
||||||
|
session_ids = [row[0] for row in await cursor.fetchall()]
|
||||||
|
|
||||||
|
counts = {"sessions": 0, "sources": 0, "chunks": 0, "outputs": 0}
|
||||||
|
|
||||||
|
for sid in session_ids:
|
||||||
|
await self.db.execute(
|
||||||
|
"DELETE FROM source_contents WHERE source_id IN (SELECT id FROM sources WHERE session_id = ?)",
|
||||||
|
(sid,)
|
||||||
|
)
|
||||||
|
cur = await self.db.execute("DELETE FROM chunks WHERE session_id = ?", (sid,))
|
||||||
|
counts["chunks"] += cur.rowcount
|
||||||
|
cur = await self.db.execute("DELETE FROM outputs WHERE session_id = ?", (sid,))
|
||||||
|
counts["outputs"] += cur.rowcount
|
||||||
|
cur = await self.db.execute("DELETE FROM sources WHERE session_id = ?", (sid,))
|
||||||
|
counts["sources"] += cur.rowcount
|
||||||
|
cur = await self.db.execute("DELETE FROM research_sessions WHERE id = ?", (sid,))
|
||||||
|
counts["sessions"] += cur.rowcount
|
||||||
|
|
||||||
|
await self.db.commit()
|
||||||
|
logger.info("Purged sessions older than days",
|
||||||
|
sessions=counts["sessions"], days=max_age_days)
|
||||||
|
return counts
|
||||||
|
|||||||
+792
-86
@@ -1,76 +1,57 @@
|
|||||||
"""
|
"""
|
||||||
ResearchOwl Generators
|
ResearchOwl Generators
|
||||||
Produces structured outputs from processed research using Ollama
|
Produces structured outputs from processed research using Claude or Ollama
|
||||||
"""
|
"""
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
import structlog
|
import structlog
|
||||||
|
|
||||||
|
from src.config import settings
|
||||||
from src.processor.processor import OllamaClient, ContentProcessor
|
from src.processor.processor import OllamaClient, ContentProcessor
|
||||||
from src.db.database import ResearchDB, OutputType
|
from src.db.database import ResearchDB, OutputType
|
||||||
|
|
||||||
logger = structlog.get_logger()
|
logger = structlog.get_logger()
|
||||||
|
|
||||||
PODCAST_SYSTEM = (
|
PODCAST_SYSTEM = (
|
||||||
"You are a podcast scriptwriter. Write exactly as a host SPEAKS — contractions, "
|
"Escribe SIEMPRE en español. "
|
||||||
"incomplete sentences, natural pauses, rhetorical questions. "
|
"Eres un guionista de podcast. Escribe exactamente como un presentador HABLA — contracciones, "
|
||||||
"NEVER repeat a sentence, phrase, or idea you already wrote. "
|
"frases naturales, pausas, preguntas retóricas. "
|
||||||
"Each paragraph must introduce NEW information. "
|
"NUNCA repitas una frase o idea que ya escribiste. "
|
||||||
"Use [PAUSE], [EMPHASIS], [MUSIC CUE] markers sparingly."
|
"Cada párrafo debe introducir información NUEVA. "
|
||||||
|
"Usa marcadores [PAUSA], [ÉNFASIS], [MÚSICA] con moderación."
|
||||||
)
|
)
|
||||||
|
|
||||||
BLOG_SYSTEM = (
|
BLOG_SYSTEM = (
|
||||||
"You are a journalist writing a blog post. Use clear markdown headings. "
|
"Escribe SIEMPRE en español. "
|
||||||
"NEVER repeat the same fact or phrase twice — if you said something, move on. "
|
"Eres un periodista escribiendo un artículo de blog. Usa encabezados markdown claros. "
|
||||||
"Each section must add new information not covered in previous sections."
|
"NUNCA repitas el mismo dato o frase dos veces — si ya lo dijiste, avanza. "
|
||||||
|
"Cada sección debe añadir información nueva no cubierta en secciones anteriores."
|
||||||
)
|
)
|
||||||
|
|
||||||
REPORT_SYSTEM = (
|
BLOG_SYSTEM_EN = (
|
||||||
"You are a research analyst. Write a structured factual report. "
|
"You write ALWAYS in English. "
|
||||||
"Be concise — do NOT pad with redundant summaries. "
|
"You are a journalist writing a blog article. Use clear markdown headers. "
|
||||||
"NEVER restate a finding already listed. Each numbered finding must be distinct."
|
"NEVER repeat the same fact or phrase twice — if you said it, move on. "
|
||||||
|
"Each section must add new information not covered in other sections."
|
||||||
)
|
)
|
||||||
|
|
||||||
THREAD_SYSTEM = (
|
BLOG_PROMPT_EN = """\
|
||||||
"You write Twitter/X threads. Each tweet must be under 280 chars. "
|
Write a blog article about: "{topic}"
|
||||||
"NEVER repeat information from a previous tweet. "
|
|
||||||
"Each tweet must reveal something NEW. Number them 1/N, 2/N..."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
PROMPTS = {
|
|
||||||
OutputType.PODCAST: """\
|
|
||||||
Write a podcast script about: "{topic}"
|
|
||||||
|
|
||||||
RULES — follow strictly:
|
|
||||||
- Write as SPOKEN WORD: contractions, natural rhythm, as if talking to a friend
|
|
||||||
- DO NOT use formal headings like "SEGMENT 1:" — just flow naturally
|
|
||||||
- Each paragraph must introduce a NEW fact or angle — never restate something already said
|
|
||||||
- If you find yourself repeating, stop and jump to the next new point
|
|
||||||
- Aim for 800-1200 words of actual spoken content
|
|
||||||
|
|
||||||
STRUCTURE (use natural transitions, not headers):
|
|
||||||
1. Hook: open with the most surprising or dramatic fact
|
|
||||||
2. Background: how did we get here?
|
|
||||||
3. The key evidence or events (pick the 3 most interesting)
|
|
||||||
4. Controversy or debate around the topic
|
|
||||||
5. What does this mean / what happened next
|
|
||||||
|
|
||||||
RESEARCH MATERIAL:
|
|
||||||
{context}
|
|
||||||
|
|
||||||
Write the script now (spoken word only, no stage directions except occasional [PAUSE]):""",
|
|
||||||
|
|
||||||
OutputType.BLOG: """\
|
|
||||||
Write a blog post about: "{topic}"
|
|
||||||
|
|
||||||
RULES — follow strictly:
|
RULES — follow strictly:
|
||||||
- Each section under a heading must add NEW information not covered elsewhere
|
- Each section under a heading must add NEW information not covered elsewhere
|
||||||
- Do NOT summarize previous sections at the start of each new section
|
- Do NOT summarize previous sections at the start of each new section
|
||||||
- Do NOT repeat facts — if a fact appears once, do not mention it again
|
- Do NOT repeat facts — if a fact appears once, do not mention it again
|
||||||
- Use concrete details, numbers, names — avoid vague generalities
|
- Use concrete details, numbers, names — avoid vague generalities
|
||||||
- Target 1000-1500 words
|
- Target: 1000-1500 words
|
||||||
|
|
||||||
STRUCTURE:
|
STRUCTURE:
|
||||||
# [Compelling headline]
|
# [Impactful headline]
|
||||||
|
|
||||||
[Hook paragraph — the most surprising fact]
|
[Hook paragraph — the most surprising fact]
|
||||||
|
|
||||||
@@ -78,58 +59,301 @@ STRUCTURE:
|
|||||||
[Context — what, when, who — only facts not covered elsewhere]
|
[Context — what, when, who — only facts not covered elsewhere]
|
||||||
|
|
||||||
## Key Facts
|
## Key Facts
|
||||||
[The most significant findings — each bullet must be distinct]
|
[Most significant findings — each point must be distinct]
|
||||||
|
|
||||||
## Analysis / Significance
|
## Analysis / Significance
|
||||||
[What this means — no repetition of Key Facts section]
|
[What this means — without repeating the Key Facts section]
|
||||||
|
|
||||||
## Conclusion
|
## Conclusion
|
||||||
[Takeaway — no more than 2 sentences summarizing, then a forward-looking statement]
|
[No more than 2 sentences summarizing, then a forward-looking statement]
|
||||||
|
|
||||||
RESEARCH MATERIAL:
|
RESEARCH MATERIAL:
|
||||||
{context}
|
{context}
|
||||||
|
|
||||||
Write the complete blog post in markdown:""",
|
Write the complete article in markdown:"""
|
||||||
|
|
||||||
|
REPORT_SYSTEM = (
|
||||||
|
"Escribe SIEMPRE en español. "
|
||||||
|
"Eres un analista de investigación. Escribe un informe estructurado y factual. "
|
||||||
|
"Sé conciso — NO rellenes con resúmenes redundantes. "
|
||||||
|
"NUNCA repitas un hallazgo ya listado. Cada hallazgo numerado debe ser distinto."
|
||||||
|
)
|
||||||
|
|
||||||
|
THREAD_SYSTEM = (
|
||||||
|
"Escribe SIEMPRE en español. "
|
||||||
|
"Escribes hilos de Twitter/X. Cada tweet debe tener menos de 280 caracteres. "
|
||||||
|
"NUNCA repitas información de un tweet anterior. "
|
||||||
|
"Cada tweet debe revelar algo NUEVO. Numéralos 1/N, 2/N..."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
PROMPTS = {
|
||||||
|
OutputType.PODCAST: """\
|
||||||
|
Escribe un guion de podcast sobre: "{topic}"
|
||||||
|
|
||||||
|
REGLAS — sigue estrictamente:
|
||||||
|
- Escribe como PALABRA HABLADA: contracciones, ritmo natural, como si hablaras con un amigo
|
||||||
|
- NO uses encabezados formales como "SEGMENTO 1:" — fluye de forma natural
|
||||||
|
- Cada párrafo debe introducir un NUEVO hecho o ángulo — nunca repitas algo ya dicho
|
||||||
|
- Si te encuentras repitiendo, para y salta al siguiente punto nuevo
|
||||||
|
- Objetivo: 800-1200 palabras de contenido hablado real
|
||||||
|
|
||||||
|
ESTRUCTURA (usa transiciones naturales, no encabezados):
|
||||||
|
1. Gancho: abre con el hecho más sorprendente o dramático
|
||||||
|
2. Contexto: ¿cómo llegamos aquí?
|
||||||
|
3. Las evidencias o eventos clave (elige los 3 más interesantes)
|
||||||
|
4. La controversia o debate sobre el tema
|
||||||
|
5. ¿Qué significa esto / qué pasó después?
|
||||||
|
|
||||||
|
MATERIAL DE INVESTIGACIÓN:
|
||||||
|
{context}
|
||||||
|
|
||||||
|
Escribe el guion ahora (solo palabra hablada, sin acotaciones excepto [PAUSA] ocasional):""",
|
||||||
|
|
||||||
|
OutputType.BLOG: """\
|
||||||
|
Escribe un artículo de blog sobre: "{topic}"
|
||||||
|
|
||||||
|
REGLAS — sigue estrictamente:
|
||||||
|
- Cada sección bajo un encabezado debe añadir información NUEVA no cubierta en otro lugar
|
||||||
|
- NO resumas secciones anteriores al inicio de cada nueva sección
|
||||||
|
- NO repitas hechos — si un hecho aparece una vez, no lo menciones de nuevo
|
||||||
|
- Usa detalles concretos, números, nombres — evita generalidades vagas
|
||||||
|
- Objetivo: 1000-1500 palabras
|
||||||
|
|
||||||
|
ESTRUCTURA:
|
||||||
|
# [Titular impactante]
|
||||||
|
|
||||||
|
[Párrafo gancho — el hecho más sorprendente]
|
||||||
|
|
||||||
|
## Contexto
|
||||||
|
[Contexto — qué, cuándo, quién — solo hechos no cubiertos en otro lugar]
|
||||||
|
|
||||||
|
## Hechos Clave
|
||||||
|
[Los hallazgos más significativos — cada punto debe ser distinto]
|
||||||
|
|
||||||
|
## Análisis / Importancia
|
||||||
|
[Qué significa esto — sin repetir la sección de Hechos Clave]
|
||||||
|
|
||||||
|
## Conclusión
|
||||||
|
[Conclusión — no más de 2 oraciones resumiendo, luego una declaración prospectiva]
|
||||||
|
|
||||||
|
MATERIAL DE INVESTIGACIÓN:
|
||||||
|
{context}
|
||||||
|
|
||||||
|
Escribe el artículo completo en markdown:""",
|
||||||
|
|
||||||
OutputType.REPORT: """\
|
OutputType.REPORT: """\
|
||||||
Write a research report about: "{topic}"
|
Escribe un informe de investigación sobre: "{topic}"
|
||||||
|
|
||||||
RULES — follow strictly:
|
REGLAS — sigue estrictamente:
|
||||||
- Each numbered finding must be DISTINCT — no overlapping content
|
- Cada hallazgo numerado debe ser DISTINTO — sin contenido que se superponga
|
||||||
- The Executive Summary must NOT repeat findings verbatim — only the 2-3 most critical points
|
- El Resumen Ejecutivo NO debe repetir los hallazgos literalmente — solo los 2-3 puntos más críticos
|
||||||
- Source quality and contradictions must reference specific claims, not generic statements
|
- La calidad de las fuentes y contradicciones deben referenciar afirmaciones específicas, no declaraciones genéricas
|
||||||
- Be precise and concise — no filler
|
- Sé preciso y conciso — sin relleno
|
||||||
|
|
||||||
STRUCTURE:
|
ESTRUCTURA:
|
||||||
1. Executive Summary (3-4 sentences, key takeaways only)
|
1. Resumen Ejecutivo (3-4 oraciones, solo puntos clave)
|
||||||
2. Key Findings (5-10 numbered, each completely distinct)
|
2. Hallazgos Clave (5-10 numerados, cada uno completamente distinto)
|
||||||
3. Evidence Analysis (what the sources show, with any contradictions)
|
3. Análisis de Evidencia (lo que muestran las fuentes, con cualquier contradicción)
|
||||||
4. Timeline (if applicable — specific dates/events)
|
4. Cronología (si aplica — fechas/eventos específicos)
|
||||||
5. Conclusions & Open Questions
|
5. Conclusiones y Preguntas Abiertas
|
||||||
|
|
||||||
RESEARCH MATERIAL:
|
MATERIAL DE INVESTIGACIÓN:
|
||||||
{context}
|
{context}
|
||||||
|
|
||||||
Write the complete report in markdown:""",
|
Escribe el informe completo en markdown:""",
|
||||||
|
|
||||||
OutputType.THREAD: """\
|
OutputType.THREAD: """\
|
||||||
Write a Twitter/X thread about: "{topic}"
|
Escribe un hilo de Twitter/X sobre: "{topic}"
|
||||||
|
|
||||||
RULES — follow strictly:
|
REGLAS — sigue estrictamente:
|
||||||
- Each tweet must reveal ONE new fact or idea — never restate a previous tweet
|
- Cada tweet debe revelar UN nuevo hecho o idea — nunca repetir un tweet anterior
|
||||||
- Max 280 characters per tweet (count carefully)
|
- Máximo 280 caracteres por tweet (cuenta cuidadosamente)
|
||||||
- Number format: 1/ 2/ 3/ ... N/
|
- Formato de numeración: 1/ 2/ 3/ ... N/
|
||||||
- Hook tweet must be the most surprising/provocative fact
|
- El tweet gancho debe ser el hecho más sorprendente/provocador
|
||||||
- Build toward a conclusion — do not repeat the hook at the end
|
- Avanza hacia una conclusión — no repitas el gancho al final
|
||||||
- 12-18 tweets total
|
- 12-18 tweets en total
|
||||||
|
|
||||||
RESEARCH MATERIAL:
|
MATERIAL DE INVESTIGACIÓN:
|
||||||
{context}
|
{context}
|
||||||
|
|
||||||
Write the thread (one tweet per line, nothing else):"""
|
Escribe el hilo (un tweet por línea, nada más):"""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
OUTLINE_REPORT = """
|
||||||
|
IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación.
|
||||||
|
|
||||||
|
Eres un editor de investigación. Dado el tema "{topic}" y el material
|
||||||
|
disponible, genera un outline detallado para un informe exhaustivo.
|
||||||
|
|
||||||
|
Devuelve SOLO una lista JSON de secciones, sin texto adicional, sin
|
||||||
|
markdown, sin explicaciones. Formato exacto:
|
||||||
|
[
|
||||||
|
{{"title": "Título de la sección", "query": "términos de búsqueda específicos para esta sección", "words": 800}},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
|
||||||
|
Genera entre 6 y 10 secciones. Cada sección debe:
|
||||||
|
- Cubrir un ángulo distinto del tema
|
||||||
|
- Tener una query específica para recuperar chunks relevantes
|
||||||
|
- Indicar longitud objetivo en palabras (400-1200)
|
||||||
|
|
||||||
|
Material disponible (resumen):
|
||||||
|
{context_summary}
|
||||||
|
"""
|
||||||
|
|
||||||
|
OUTLINE_BLOG = """
|
||||||
|
IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación.
|
||||||
|
|
||||||
|
Eres un editor de contenido. Dado el tema "{topic}" y el material
|
||||||
|
disponible, genera un outline para un artículo de blog exhaustivo.
|
||||||
|
|
||||||
|
Devuelve SOLO una lista JSON de secciones, sin texto adicional:
|
||||||
|
[
|
||||||
|
{{"title": "Título de sección", "query": "términos búsqueda", "words": 600}},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
|
||||||
|
Genera entre 5 y 8 secciones. Primera sección = introducción gancho.
|
||||||
|
Última sección = conclusión con perspectiva original.
|
||||||
|
|
||||||
|
Material disponible (resumen):
|
||||||
|
{context_summary}
|
||||||
|
"""
|
||||||
|
|
||||||
|
OUTLINE_PODCAST = """
|
||||||
|
IMPORTANTE: Los títulos y queries deben estar en español, independientemente del idioma del material de investigación.
|
||||||
|
|
||||||
|
Eres un productor de podcast. Dado el tema "{topic}" y el material
|
||||||
|
disponible, genera un outline para un guion de podcast exhaustivo.
|
||||||
|
|
||||||
|
Devuelve SOLO una lista JSON de segmentos, sin texto adicional:
|
||||||
|
[
|
||||||
|
{{"title": "Nombre del segmento", "query": "términos búsqueda", "words": 700}},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
|
||||||
|
Genera entre 5 y 7 segmentos. Flujo natural: gancho → contexto →
|
||||||
|
desarrollo → controversia → conclusión.
|
||||||
|
|
||||||
|
Material disponible (resumen):
|
||||||
|
{context_summary}
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Ghost CMS ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _b64url(data: bytes | str) -> str:
|
||||||
|
if isinstance(data, str):
|
||||||
|
data = data.encode()
|
||||||
|
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_title(content: str) -> str:
|
||||||
|
"""Return first H1 heading from markdown, skipping the ResearchOwl header block."""
|
||||||
|
in_header = False
|
||||||
|
for line in content.splitlines():
|
||||||
|
stripped = line.strip()
|
||||||
|
if stripped == "---":
|
||||||
|
in_header = not in_header
|
||||||
|
continue
|
||||||
|
if in_header:
|
||||||
|
continue
|
||||||
|
if stripped.startswith("# ") and not stripped.startswith("## "):
|
||||||
|
return stripped[2:].strip()
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def _strip_researchowl_header(content: str) -> str:
|
||||||
|
"""Remove the ---...--- metadata block that ResearchOwl prepends to outputs."""
|
||||||
|
lines = content.splitlines(keepends=True)
|
||||||
|
dashes_seen = 0
|
||||||
|
for i, line in enumerate(lines):
|
||||||
|
if line.strip() == "---":
|
||||||
|
dashes_seen += 1
|
||||||
|
if dashes_seen == 2:
|
||||||
|
return "".join(lines[i + 1:]).lstrip("\n")
|
||||||
|
return content
|
||||||
|
|
||||||
|
|
||||||
|
class GhostPublisher:
|
||||||
|
def __init__(self, lang: str = "es"):
|
||||||
|
if lang == "en":
|
||||||
|
self.url = (settings.ghost_url_en or "").rstrip("/")
|
||||||
|
self.api_key = settings.ghost_api_key_en or ""
|
||||||
|
else:
|
||||||
|
self.url = (settings.ghost_url or "").rstrip("/")
|
||||||
|
self.api_key = settings.ghost_api_key or ""
|
||||||
|
|
||||||
|
def is_configured(self) -> bool:
|
||||||
|
return bool(self.url and self.api_key)
|
||||||
|
|
||||||
|
def _make_token(self) -> str:
|
||||||
|
key_id, secret = self.api_key.split(":", 1)
|
||||||
|
now = int(time.time())
|
||||||
|
header = _b64url(json.dumps({"alg": "HS256", "typ": "JWT", "kid": key_id}))
|
||||||
|
payload = _b64url(json.dumps({"iat": now, "exp": now + 300, "aud": "/admin/"}))
|
||||||
|
signing = f"{header}.{payload}"
|
||||||
|
sig = _b64url(
|
||||||
|
hmac.new(bytes.fromhex(secret), signing.encode(), hashlib.sha256).digest()
|
||||||
|
)
|
||||||
|
return f"{signing}.{sig}"
|
||||||
|
|
||||||
|
async def publish_draft(self, title: str, markdown_content: str,
|
||||||
|
tags: list[str] | None = None) -> dict:
|
||||||
|
import aiohttp as _aio
|
||||||
|
import markdown as _md
|
||||||
|
|
||||||
|
clean = _strip_researchowl_header(markdown_content)
|
||||||
|
html = _md.markdown(clean, extensions=["extra"])
|
||||||
|
|
||||||
|
# Ghost añade el título automáticamente — eliminar el primer <h1> para evitar duplicado
|
||||||
|
html = re.sub(r"<h1[^>]*>.*?</h1>", "", html, count=1, flags=re.DOTALL).lstrip()
|
||||||
|
|
||||||
|
logger.info("Ghost publish_draft", html_length=len(html),
|
||||||
|
html_preview=html[:200])
|
||||||
|
|
||||||
|
if not html.strip():
|
||||||
|
raise ValueError("Ghost: HTML vacío tras conversión markdown — contenido no enviado")
|
||||||
|
|
||||||
|
# Ghost 5.x (Lexical editor): el campo "html" es solo de lectura en la API.
|
||||||
|
# La forma fiable de enviar HTML arbitrario es via mobiledoc con HTML card,
|
||||||
|
# que Ghost acepta en todas las versiones de v5 y renderiza sin conversión.
|
||||||
|
mobiledoc = json.dumps({
|
||||||
|
"version": "0.3.1",
|
||||||
|
"atoms": [],
|
||||||
|
"cards": [["html", {"html": html}]],
|
||||||
|
"markups": [],
|
||||||
|
"sections": [[10, 0]],
|
||||||
|
})
|
||||||
|
|
||||||
|
token = self._make_token()
|
||||||
|
body = {
|
||||||
|
"posts": [{
|
||||||
|
"title": title,
|
||||||
|
"mobiledoc": mobiledoc,
|
||||||
|
"status": "draft",
|
||||||
|
"tags": [{"name": t} for t in (tags or ["investigacion"])],
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
async with _aio.ClientSession() as sess:
|
||||||
|
async with sess.post(
|
||||||
|
f"{self.url}/ghost/api/admin/posts/",
|
||||||
|
json=body,
|
||||||
|
headers={
|
||||||
|
"Authorization": f"Ghost {token}",
|
||||||
|
"Accept-Version": "v5.0",
|
||||||
|
},
|
||||||
|
) as resp:
|
||||||
|
if resp.status not in (200, 201):
|
||||||
|
text = await resp.text()
|
||||||
|
raise ValueError(f"Ghost API {resp.status}: {text[:300]}")
|
||||||
|
return await resp.json()
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Output generation ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
class OutputGenerator:
|
class OutputGenerator:
|
||||||
def __init__(self, db: ResearchDB, ollama: OllamaClient, processor: ContentProcessor):
|
def __init__(self, db: ResearchDB, ollama: OllamaClient, processor: ContentProcessor):
|
||||||
self.db = db
|
self.db = db
|
||||||
@@ -137,8 +361,14 @@ class OutputGenerator:
|
|||||||
self.processor = processor
|
self.processor = processor
|
||||||
|
|
||||||
async def generate(self, session_id: int, output_type: OutputType,
|
async def generate(self, session_id: int, output_type: OutputType,
|
||||||
progress_callback=None) -> str:
|
progress_callback=None, lang: str = "es") -> str:
|
||||||
"""Generate an output for a research session"""
|
"""Generate an output for a research session"""
|
||||||
|
if output_type in (OutputType.REPORT_EXTENDED,
|
||||||
|
OutputType.BLOG_EXTENDED,
|
||||||
|
OutputType.PODCAST_EXTENDED):
|
||||||
|
return await self.generate_extended(session_id, output_type, progress_callback,
|
||||||
|
lang=lang)
|
||||||
|
|
||||||
session = await self.db.get_session(session_id)
|
session = await self.db.get_session(session_id)
|
||||||
if not session:
|
if not session:
|
||||||
raise ValueError(f"Session {session_id} not found")
|
raise ValueError(f"Session {session_id} not found")
|
||||||
@@ -151,7 +381,7 @@ class OutputGenerator:
|
|||||||
|
|
||||||
# RAG: get most relevant context for this output type
|
# RAG: get most relevant context for this output type
|
||||||
query = self._get_rag_query(output_type, topic)
|
query = self._get_rag_query(output_type, topic)
|
||||||
context = await self.processor.rag_query(session_id, query, top_k=30)
|
context = await self.processor.rag_query(session_id, query, top_k=80)
|
||||||
|
|
||||||
if not context:
|
if not context:
|
||||||
# Fallback: use raw top chunks
|
# Fallback: use raw top chunks
|
||||||
@@ -161,20 +391,19 @@ class OutputGenerator:
|
|||||||
if not context:
|
if not context:
|
||||||
raise ValueError("No processed content available. Run /process first.")
|
raise ValueError("No processed content available. Run /process first.")
|
||||||
|
|
||||||
# Truncate context to avoid Ollama context limits
|
backend = "Claude Haiku" if settings.anthropic_api_key else "Ollama"
|
||||||
context_words = context.split()
|
|
||||||
if len(context_words) > 6000:
|
|
||||||
context = " ".join(context_words[:6000]) + "\n\n[... additional material truncated ...]"
|
|
||||||
|
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
await progress_callback(f"✍️ Generating {output_type} with Ollama... (this takes 2-5 min)")
|
await progress_callback(f"✍️ Generando {output_type} con {backend}... (2-5 min)")
|
||||||
|
|
||||||
# Build prompt
|
# Build prompt
|
||||||
system = self._get_system(output_type)
|
system = self._get_system(output_type)
|
||||||
prompt = PROMPTS[output_type].format(topic=topic, context=context)
|
prompt = PROMPTS[output_type].format(topic=topic, context=context)
|
||||||
|
|
||||||
# Generate — temperature=0.7 reduces repetition in small models
|
if lang == "en" and output_type == OutputType.BLOG:
|
||||||
output = await self.ollama.generate(prompt, system=system, timeout=300, temperature=0.7)
|
system = BLOG_SYSTEM_EN
|
||||||
|
prompt = BLOG_PROMPT_EN.format(topic=topic, context=context)
|
||||||
|
|
||||||
|
output = await self._generate(prompt, system, output_type, session_id)
|
||||||
|
|
||||||
# Add metadata header
|
# Add metadata header
|
||||||
stats = await self.db.get_session_stats(session_id)
|
stats = await self.db.get_session_stats(session_id)
|
||||||
@@ -184,8 +413,60 @@ class OutputGenerator:
|
|||||||
# Save to DB
|
# Save to DB
|
||||||
await self.db.save_output(session_id, output_type, full_output)
|
await self.db.save_output(session_id, output_type, full_output)
|
||||||
|
|
||||||
|
# Auto-publish to Ghost for blog outputs
|
||||||
|
ghost_notice = ""
|
||||||
|
if output_type in (OutputType.BLOG, OutputType.BLOG_EXTENDED):
|
||||||
|
ghost = GhostPublisher(lang=lang)
|
||||||
|
if ghost.is_configured():
|
||||||
|
try:
|
||||||
|
title = _extract_title(full_output) or topic
|
||||||
|
result = await ghost.publish_draft(title, full_output)
|
||||||
|
post = result["posts"][0]
|
||||||
|
ghost_notice = (
|
||||||
|
f"\n\n---\n"
|
||||||
|
f"📤 *Borrador publicado en Ghost*\n"
|
||||||
|
f"Editar: {ghost.url}/ghost/#/editor/post/{post['id']}"
|
||||||
|
)
|
||||||
|
logger.info("Auto-published blog to Ghost", post_id=post["id"])
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Auto-publish to Ghost failed", error=str(e))
|
||||||
|
|
||||||
logger.info("Output generated", type=output_type, length=len(full_output))
|
logger.info("Output generated", type=output_type, length=len(full_output))
|
||||||
return full_output
|
return full_output + ghost_notice
|
||||||
|
|
||||||
|
async def _generate(self, prompt: str, system: str, output_type: OutputType,
|
||||||
|
session_id: int | None = None) -> str:
|
||||||
|
if settings.anthropic_api_key:
|
||||||
|
return await self._generate_with_claude(prompt, system, output_type, session_id)
|
||||||
|
return await self._generate_with_ollama(prompt, system)
|
||||||
|
|
||||||
|
async def _generate_with_claude(self, prompt: str, system: str, output_type: OutputType,
|
||||||
|
session_id: int | None = None) -> str:
|
||||||
|
import anthropic
|
||||||
|
max_tokens = 4096 if output_type == OutputType.THREAD else 16000
|
||||||
|
try:
|
||||||
|
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||||||
|
msg = await client.messages.create(
|
||||||
|
model=settings.claude_model,
|
||||||
|
max_tokens=max_tokens,
|
||||||
|
system=system,
|
||||||
|
messages=[{"role": "user", "content": prompt}],
|
||||||
|
)
|
||||||
|
if session_id is not None:
|
||||||
|
try:
|
||||||
|
await self.db.log_api_call(
|
||||||
|
session_id, "generation", settings.claude_model,
|
||||||
|
msg.usage.input_tokens, msg.usage.output_tokens
|
||||||
|
)
|
||||||
|
except Exception as log_err:
|
||||||
|
logger.warning("Failed to log API usage", error=str(log_err))
|
||||||
|
return msg.content[0].text.strip()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Claude generation failed, falling back to Ollama", error=str(e))
|
||||||
|
return await self._generate_with_ollama(prompt, system)
|
||||||
|
|
||||||
|
async def _generate_with_ollama(self, prompt: str, system: str) -> str:
|
||||||
|
return await self.ollama.generate(prompt, system=system, timeout=300, temperature=0.7)
|
||||||
|
|
||||||
def _get_rag_query(self, output_type: OutputType, topic: str) -> str:
|
def _get_rag_query(self, output_type: OutputType, topic: str) -> str:
|
||||||
queries = {
|
queries = {
|
||||||
@@ -205,6 +486,155 @@ class OutputGenerator:
|
|||||||
}
|
}
|
||||||
return systems.get(output_type, "You are a helpful research assistant.")
|
return systems.get(output_type, "You are a helpful research assistant.")
|
||||||
|
|
||||||
|
async def generate_extended(self, session_id: int, output_type: OutputType,
|
||||||
|
progress_callback=None, lang: str = "es") -> str:
|
||||||
|
"""
|
||||||
|
Generación por secciones para outputs exhaustivos.
|
||||||
|
1. Recupera muestra de contexto para el outline
|
||||||
|
2. Genera outline con Claude (lista de secciones)
|
||||||
|
3. Para cada sección: RAG específico → genera sección
|
||||||
|
4. Concatena y guarda
|
||||||
|
"""
|
||||||
|
session = await self.db.get_session(session_id)
|
||||||
|
if not session:
|
||||||
|
raise ValueError(f"Session {session_id} not found")
|
||||||
|
topic = session["topic"]
|
||||||
|
|
||||||
|
# Paso 1: contexto resumen para el outline (top 10 chunks)
|
||||||
|
top_chunks = await self.db.get_top_chunks(session_id, limit=10)
|
||||||
|
if not top_chunks:
|
||||||
|
raise ValueError("No processed content available. Run /process first.")
|
||||||
|
context_summary = "\n\n".join(
|
||||||
|
f"- {c.get('title', '')}: {c['content'][:300]}"
|
||||||
|
for c in top_chunks
|
||||||
|
)
|
||||||
|
|
||||||
|
if progress_callback:
|
||||||
|
await progress_callback("🗂️ Generando estructura del documento…")
|
||||||
|
|
||||||
|
# Paso 2: outline
|
||||||
|
base_type = output_type.value.replace("_extended", "")
|
||||||
|
outline_prompts = {
|
||||||
|
"report": OUTLINE_REPORT,
|
||||||
|
"blog": OUTLINE_BLOG,
|
||||||
|
"podcast": OUTLINE_PODCAST,
|
||||||
|
}
|
||||||
|
outline_prompt = outline_prompts[base_type].format(
|
||||||
|
topic=topic, context_summary=context_summary
|
||||||
|
)
|
||||||
|
|
||||||
|
outline_json = await self._generate_raw(outline_prompt, session_id)
|
||||||
|
try:
|
||||||
|
import json as _json
|
||||||
|
clean = outline_json.strip()
|
||||||
|
if clean.startswith("```"):
|
||||||
|
clean = "\n".join(clean.split("\n")[1:])
|
||||||
|
if clean.endswith("```"):
|
||||||
|
clean = "\n".join(clean.split("\n")[:-1])
|
||||||
|
sections = _json.loads(clean.strip())
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to parse outline", error=str(e), raw=outline_json[:200])
|
||||||
|
raise ValueError(f"No se pudo generar el outline: {e}")
|
||||||
|
|
||||||
|
if progress_callback:
|
||||||
|
await progress_callback(
|
||||||
|
f"✍️ Generando {len(sections)} secciones… (esto tardará varios minutos)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Paso 3: generar cada sección
|
||||||
|
base_output_type = OutputType(base_type)
|
||||||
|
system = self._get_system(base_output_type)
|
||||||
|
if lang == "en" and output_type == OutputType.BLOG_EXTENDED:
|
||||||
|
system = BLOG_SYSTEM_EN
|
||||||
|
sections_text = []
|
||||||
|
|
||||||
|
for i, section in enumerate(sections, 1):
|
||||||
|
title = section.get("title", f"Sección {i}")
|
||||||
|
query = section.get("query", topic)
|
||||||
|
target_words = section.get("words", 600)
|
||||||
|
|
||||||
|
if progress_callback:
|
||||||
|
await progress_callback(
|
||||||
|
f"✍️ Sección {i}/{len(sections)}: {title[:40]}…"
|
||||||
|
)
|
||||||
|
|
||||||
|
section_context = await self.processor.rag_query(session_id, query, top_k=40)
|
||||||
|
if not section_context:
|
||||||
|
section_context = context_summary
|
||||||
|
|
||||||
|
lang_rule = "- Write in English\n" if lang == "en" else "- Escribe en español\n"
|
||||||
|
section_prompt = (
|
||||||
|
f"Escribe la sección '{title}' del {base_type} sobre: '{topic}'\n\n"
|
||||||
|
f"REGLAS:\n"
|
||||||
|
f"- NO incluyas ningún título o encabezado al inicio de tu respuesta — el título de la sección ya está incluido externamente\n"
|
||||||
|
f"- Esta es UNA sección de un documento más largo — no repitas introducción ni conclusión general\n"
|
||||||
|
f"- No incluyas encabezados del documento completo, solo el contenido de esta sección\n"
|
||||||
|
f"- Objetivo: aproximadamente {target_words} palabras\n"
|
||||||
|
f"- Usa SOLO información del material siguiente — no inventes datos\n"
|
||||||
|
f"{lang_rule}"
|
||||||
|
f"\nMATERIAL:\n{section_context}"
|
||||||
|
)
|
||||||
|
|
||||||
|
section_text = await self._generate(
|
||||||
|
section_prompt, system, base_output_type, session_id
|
||||||
|
)
|
||||||
|
sections_text.append(f"## {title}\n\n{section_text}")
|
||||||
|
|
||||||
|
# Paso 4: concatenar
|
||||||
|
full_content = "\n\n---\n\n".join(sections_text)
|
||||||
|
stats = await self.db.get_session_stats(session_id)
|
||||||
|
header = self._build_header(topic, output_type, session, stats)
|
||||||
|
full_output = header + "\n\n" + full_content
|
||||||
|
|
||||||
|
await self.db.save_output(session_id, output_type, full_output)
|
||||||
|
|
||||||
|
# Auto-publish to Ghost for extended blog outputs
|
||||||
|
ghost_notice = ""
|
||||||
|
if output_type == OutputType.BLOG_EXTENDED:
|
||||||
|
ghost = GhostPublisher(lang=lang)
|
||||||
|
if ghost.is_configured():
|
||||||
|
try:
|
||||||
|
title = _extract_title(full_output) or topic
|
||||||
|
result = await ghost.publish_draft(title, full_output)
|
||||||
|
post = result["posts"][0]
|
||||||
|
ghost_notice = (
|
||||||
|
f"\n\n---\n"
|
||||||
|
f"📤 *Borrador publicado en Ghost*\n"
|
||||||
|
f"Editar: {ghost.url}/ghost/#/editor/post/{post['id']}"
|
||||||
|
)
|
||||||
|
logger.info("Auto-published extended blog to Ghost", post_id=post["id"])
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Auto-publish to Ghost failed (extended)", error=str(e))
|
||||||
|
|
||||||
|
logger.info("Extended output generated", type=output_type,
|
||||||
|
sections=len(sections), length=len(full_output))
|
||||||
|
return full_output + ghost_notice
|
||||||
|
|
||||||
|
async def _generate_raw(self, prompt: str,
|
||||||
|
session_id: int | None = None) -> str:
|
||||||
|
if settings.anthropic_api_key:
|
||||||
|
import anthropic
|
||||||
|
try:
|
||||||
|
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||||||
|
msg = await client.messages.create(
|
||||||
|
model=settings.claude_model,
|
||||||
|
max_tokens=2048,
|
||||||
|
messages=[{"role": "user", "content": prompt}],
|
||||||
|
)
|
||||||
|
if session_id is not None:
|
||||||
|
try:
|
||||||
|
await self.db.log_api_call(
|
||||||
|
session_id, "outline", settings.claude_model,
|
||||||
|
msg.usage.input_tokens, msg.usage.output_tokens
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return msg.content[0].text.strip()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Claude outline failed", error=str(e))
|
||||||
|
raise
|
||||||
|
raise ValueError("Claude API key required for extended generation")
|
||||||
|
|
||||||
def _build_header(self, topic: str, output_type: OutputType,
|
def _build_header(self, topic: str, output_type: OutputType,
|
||||||
session: dict, stats: dict) -> str:
|
session: dict, stats: dict) -> str:
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@@ -218,3 +648,279 @@ Iterations: {session.get('iterations', 0)}
|
|||||||
Total words researched: {session.get('total_words', 0):,}
|
Total words researched: {session.get('total_words', 0):,}
|
||||||
---
|
---
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def _remove_duplicate_headings(text: str) -> str:
|
||||||
|
lines = text.split('\n')
|
||||||
|
result = []
|
||||||
|
i = 0
|
||||||
|
while i < len(lines):
|
||||||
|
line = lines[i].rstrip()
|
||||||
|
if line.startswith('# ') and not line.startswith('## '):
|
||||||
|
h1_text = line[2:].strip().lower()
|
||||||
|
window = result[-5:] if len(result) >= 5 else result[:]
|
||||||
|
recent_h2s = {
|
||||||
|
p.strip()[3:].strip().lower()
|
||||||
|
for p in window
|
||||||
|
if p.strip().startswith('## ')
|
||||||
|
}
|
||||||
|
if h1_text in recent_h2s:
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
result.append(lines[i])
|
||||||
|
i += 1
|
||||||
|
return '\n'.join(result)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_pdf(content: str, title: str = "ResearchOwl Output") -> bytes:
|
||||||
|
content = _remove_duplicate_headings(content)
|
||||||
|
try:
|
||||||
|
from reportlab.lib.pagesizes import A4
|
||||||
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||||||
|
from reportlab.lib.units import cm
|
||||||
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, HRFlowable
|
||||||
|
from reportlab.lib.enums import TA_LEFT
|
||||||
|
from reportlab.lib import colors
|
||||||
|
import io
|
||||||
|
import re
|
||||||
|
except ImportError:
|
||||||
|
raise ImportError("reportlab is required for PDF export — pip install reportlab")
|
||||||
|
|
||||||
|
buf = io.BytesIO()
|
||||||
|
doc = SimpleDocTemplate(
|
||||||
|
buf,
|
||||||
|
pagesize=A4,
|
||||||
|
rightMargin=2 * cm,
|
||||||
|
leftMargin=2 * cm,
|
||||||
|
topMargin=2.5 * cm,
|
||||||
|
bottomMargin=2 * cm,
|
||||||
|
title=title,
|
||||||
|
)
|
||||||
|
|
||||||
|
base = getSampleStyleSheet()
|
||||||
|
normal = ParagraphStyle("RO_Normal", parent=base["Normal"],
|
||||||
|
fontSize=10, leading=14, spaceAfter=4)
|
||||||
|
h1 = ParagraphStyle("RO_H1", parent=base["Heading1"],
|
||||||
|
fontSize=18, spaceBefore=12, spaceAfter=6,
|
||||||
|
textColor=colors.HexColor("#1a1a2e"))
|
||||||
|
h2 = ParagraphStyle("RO_H2", parent=base["Heading2"],
|
||||||
|
fontSize=14, spaceBefore=10, spaceAfter=4,
|
||||||
|
textColor=colors.HexColor("#16213e"))
|
||||||
|
h3 = ParagraphStyle("RO_H3", parent=base["Heading3"],
|
||||||
|
fontSize=12, spaceBefore=8, spaceAfter=4)
|
||||||
|
code_style = ParagraphStyle("RO_Code", parent=base["Code"],
|
||||||
|
fontSize=9, leading=12, fontName="Courier",
|
||||||
|
backColor=colors.HexColor("#f4f4f4"), spaceAfter=4)
|
||||||
|
bullet_style = ParagraphStyle("RO_Bullet", parent=normal,
|
||||||
|
leftIndent=20, bulletIndent=10, spaceAfter=2)
|
||||||
|
|
||||||
|
def md_to_para(text: str) -> str:
|
||||||
|
text = text.replace("&", "&").replace("<", "<").replace(">", ">")
|
||||||
|
text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
|
||||||
|
text = re.sub(r'__(.+?)__', r'<b>\1</b>', text)
|
||||||
|
text = re.sub(r'\*(.+?)\*', r'<i>\1</i>', text)
|
||||||
|
text = re.sub(r'_(.+?)_', r'<i>\1</i>', text)
|
||||||
|
text = re.sub(r'`(.+?)`', r'<font name="Courier">\1</font>', text)
|
||||||
|
return text
|
||||||
|
|
||||||
|
story = []
|
||||||
|
lines = content.split("\n")
|
||||||
|
in_code = False
|
||||||
|
code_buf = []
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
if line.startswith("```"):
|
||||||
|
if not in_code:
|
||||||
|
in_code = True
|
||||||
|
code_buf = []
|
||||||
|
else:
|
||||||
|
in_code = False
|
||||||
|
try:
|
||||||
|
story.append(Paragraph(
|
||||||
|
"<br/>".join(l.replace("&", "&").replace("<", "<").replace(">", ">")
|
||||||
|
for l in code_buf),
|
||||||
|
code_style
|
||||||
|
))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
continue
|
||||||
|
|
||||||
|
if in_code:
|
||||||
|
code_buf.append(line)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if re.match(r'^[-*_]{3,}$', line.strip()):
|
||||||
|
story.append(HRFlowable(width="100%", thickness=0.5,
|
||||||
|
color=colors.grey, spaceAfter=6))
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
if line.startswith("### "):
|
||||||
|
story.append(Paragraph(md_to_para(line[4:]), h3))
|
||||||
|
elif line.startswith("## "):
|
||||||
|
story.append(Paragraph(md_to_para(line[3:]), h2))
|
||||||
|
elif line.startswith("# "):
|
||||||
|
story.append(Paragraph(md_to_para(line[2:]), h1))
|
||||||
|
elif re.match(r'^[-*+] ', line):
|
||||||
|
story.append(Paragraph("• " + md_to_para(line[2:]), bullet_style))
|
||||||
|
elif re.match(r'^\d+\. ', line):
|
||||||
|
story.append(Paragraph(md_to_para(line), bullet_style))
|
||||||
|
elif line.strip() == "":
|
||||||
|
story.append(Spacer(1, 6))
|
||||||
|
else:
|
||||||
|
story.append(Paragraph(md_to_para(line), normal))
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
story.append(Paragraph(line[:300], normal))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
doc.build(story)
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
async def generate_diff_summary(
|
||||||
|
topic: str,
|
||||||
|
new_urls: set,
|
||||||
|
old_urls: set,
|
||||||
|
new_chunks: list,
|
||||||
|
session_id: int,
|
||||||
|
db,
|
||||||
|
) -> str | None:
|
||||||
|
from src.config import settings
|
||||||
|
import structlog
|
||||||
|
diff_logger = structlog.get_logger()
|
||||||
|
|
||||||
|
added_urls = new_urls - old_urls
|
||||||
|
pct_new = len(added_urls) / max(len(new_urls), 1)
|
||||||
|
|
||||||
|
diff_logger.info("Diff analysis", topic=topic,
|
||||||
|
new_urls=len(new_urls), old_urls=len(old_urls),
|
||||||
|
added=len(added_urls), pct_new=round(pct_new, 2))
|
||||||
|
|
||||||
|
if pct_new < 0.20 and len(added_urls) < 5:
|
||||||
|
diff_logger.info("Diff: no significant new sources", topic=topic)
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not new_chunks:
|
||||||
|
return None
|
||||||
|
|
||||||
|
context = "\n\n---\n\n".join(
|
||||||
|
f"[{c.get('source_type', 'web').upper()}] {c.get('title', '')}\n{c['content'][:400]}"
|
||||||
|
for c in new_chunks[:20]
|
||||||
|
)
|
||||||
|
|
||||||
|
if not settings.anthropic_api_key:
|
||||||
|
return (
|
||||||
|
f"📊 *Novedades detectadas sobre {topic}*\n\n"
|
||||||
|
f"• {len(added_urls)} fuentes nuevas encontradas\n"
|
||||||
|
f"• {len(new_chunks)} chunks de contenido procesados\n\n"
|
||||||
|
f"Usa /generate report para ver el análisis completo."
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
import anthropic
|
||||||
|
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||||||
|
prompt = (
|
||||||
|
f'Analiza el siguiente material de investigación sobre "{topic}" '
|
||||||
|
f'y genera un resumen BREVE (máximo 300 palabras) de las novedades '
|
||||||
|
f'más importantes encontradas. Escribe en español.\n\n'
|
||||||
|
f'Si el contenido es muy similar a investigaciones anteriores o no '
|
||||||
|
f'contiene información genuinamente nueva, responde SOLO con: '
|
||||||
|
f'"SIN_NOVEDADES"\n\n'
|
||||||
|
f'Material nuevo:\n{context}'
|
||||||
|
)
|
||||||
|
msg = await client.messages.create(
|
||||||
|
model=settings.claude_model,
|
||||||
|
max_tokens=500,
|
||||||
|
messages=[{"role": "user", "content": prompt}]
|
||||||
|
)
|
||||||
|
summary = msg.content[0].text.strip()
|
||||||
|
|
||||||
|
if summary == "SIN_NOVEDADES":
|
||||||
|
diff_logger.info("Diff: Claude found no new information", topic=topic)
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
await db.log_api_call(session_id, "diff", settings.claude_model,
|
||||||
|
msg.usage.input_tokens, msg.usage.output_tokens)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return f"🔔 *Novedades — {topic}*\n\n{summary}\n\nUsa /generate para report completo."
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
diff_logger.warning("Diff summary generation failed", error=str(e))
|
||||||
|
return (
|
||||||
|
f"📊 *Actualización — {topic}*\n\n"
|
||||||
|
f"• {len(added_urls)} fuentes nuevas\n"
|
||||||
|
f"Usa /generate report para ver el análisis completo."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def generate_comparison(
|
||||||
|
topic_a: str,
|
||||||
|
topic_b: str,
|
||||||
|
context_a: str,
|
||||||
|
context_b: str,
|
||||||
|
session_id_a: int,
|
||||||
|
db,
|
||||||
|
) -> str:
|
||||||
|
from src.config import settings
|
||||||
|
import structlog
|
||||||
|
cmp_logger = structlog.get_logger()
|
||||||
|
|
||||||
|
if not settings.anthropic_api_key:
|
||||||
|
raise ValueError("Claude API key required for comparison")
|
||||||
|
|
||||||
|
def _truncate(text: str, max_words: int = 3000) -> str:
|
||||||
|
words = text.split()
|
||||||
|
if len(words) > max_words:
|
||||||
|
return " ".join(words[:max_words]) + "\n\n[... contenido adicional truncado ...]"
|
||||||
|
return text
|
||||||
|
|
||||||
|
context_a = _truncate(context_a)
|
||||||
|
context_b = _truncate(context_b)
|
||||||
|
|
||||||
|
prompt = (
|
||||||
|
f'Eres un analista experto. Compara en profundidad estos dos temas:\n'
|
||||||
|
f'TEMA A: "{topic_a}"\n'
|
||||||
|
f'TEMA B: "{topic_b}"\n\n'
|
||||||
|
f'Escribe el análisis en español con esta estructura:\n\n'
|
||||||
|
f'## Resumen comparativo\n'
|
||||||
|
f'(2-3 párrafos con las diferencias y similitudes más importantes)\n\n'
|
||||||
|
f'## {topic_a}\n'
|
||||||
|
f'(Puntos clave únicos de este tema)\n\n'
|
||||||
|
f'## {topic_b}\n'
|
||||||
|
f'(Puntos clave únicos de este tema)\n\n'
|
||||||
|
f'## Similitudes\n'
|
||||||
|
f'(Qué tienen en común)\n\n'
|
||||||
|
f'## Diferencias clave\n'
|
||||||
|
f'(Tabla markdown o lista de las diferencias más relevantes)\n\n'
|
||||||
|
f'## Conclusión\n'
|
||||||
|
f'(Cuál es mejor/más relevante según el contexto, o qué conclusión se extrae)\n\n'
|
||||||
|
f'---\n'
|
||||||
|
f'MATERIAL DE INVESTIGACIÓN — {topic_a}:\n{context_a}\n\n'
|
||||||
|
f'---\n'
|
||||||
|
f'MATERIAL DE INVESTIGACIÓN — {topic_b}:\n{context_b}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
import anthropic
|
||||||
|
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||||||
|
msg = await client.messages.create(
|
||||||
|
model=settings.claude_model,
|
||||||
|
max_tokens=8192,
|
||||||
|
messages=[{"role": "user", "content": prompt}]
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await db.log_api_call(
|
||||||
|
session_id_a, "comparison", settings.claude_model,
|
||||||
|
msg.usage.input_tokens, msg.usage.output_tokens
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return msg.content[0].text.strip()
|
||||||
|
except Exception as e:
|
||||||
|
cmp_logger.error("Comparison generation failed", error=str(e))
|
||||||
|
raise
|
||||||
|
|||||||
@@ -129,6 +129,8 @@ class ContentProcessor:
|
|||||||
scraped = [s for s in sources if s["status"] == "scraped"]
|
scraped = [s for s in sources if s["status"] == "scraped"]
|
||||||
|
|
||||||
logger.info("Processing sources", total=len(scraped))
|
logger.info("Processing sources", total=len(scraped))
|
||||||
|
scraped = await self._dedup_sources(session_id, scraped)
|
||||||
|
logger.info("After dedup", unique=len(scraped))
|
||||||
total_chunks = 0
|
total_chunks = 0
|
||||||
total_words = 0
|
total_words = 0
|
||||||
|
|
||||||
@@ -161,6 +163,56 @@ class ContentProcessor:
|
|||||||
|
|
||||||
return {"total_chunks": total_chunks, "total_words": total_words}
|
return {"total_chunks": total_chunks, "total_words": total_words}
|
||||||
|
|
||||||
|
async def _dedup_sources(self, session_id: int,
|
||||||
|
scraped: list[dict]) -> list[dict]:
|
||||||
|
try:
|
||||||
|
import hashlib
|
||||||
|
seen_hashes: set = set()
|
||||||
|
seen_prefixes: list = []
|
||||||
|
unique: list = []
|
||||||
|
duplicates = 0
|
||||||
|
|
||||||
|
for source in scraped:
|
||||||
|
content = await self.db.get_source_content(source["id"])
|
||||||
|
if not content:
|
||||||
|
unique.append(source)
|
||||||
|
continue
|
||||||
|
|
||||||
|
content_hash = hashlib.md5(content[:2000].encode()).hexdigest()
|
||||||
|
if content_hash in seen_hashes:
|
||||||
|
duplicates += 1
|
||||||
|
await self.db.update_source(source["id"], status="skipped")
|
||||||
|
continue
|
||||||
|
seen_hashes.add(content_hash)
|
||||||
|
|
||||||
|
prefix = content[:300].strip().lower()
|
||||||
|
prefix_words = set(prefix.split())
|
||||||
|
is_dup = False
|
||||||
|
if len(prefix_words) >= 10:
|
||||||
|
for seen_prefix_words in seen_prefixes:
|
||||||
|
intersection = len(prefix_words & seen_prefix_words)
|
||||||
|
union = len(prefix_words | seen_prefix_words)
|
||||||
|
if intersection / max(union, 1) > 0.85:
|
||||||
|
is_dup = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if is_dup:
|
||||||
|
duplicates += 1
|
||||||
|
await self.db.update_source(source["id"], status="skipped")
|
||||||
|
continue
|
||||||
|
|
||||||
|
seen_prefixes.append(prefix_words)
|
||||||
|
unique.append(source)
|
||||||
|
|
||||||
|
if duplicates > 0:
|
||||||
|
logger.info("Dedup complete", session_id=session_id,
|
||||||
|
original=len(scraped), duplicates=duplicates,
|
||||||
|
unique=len(unique))
|
||||||
|
return unique
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Dedup failed, processing all sources", error=str(e))
|
||||||
|
return scraped
|
||||||
|
|
||||||
async def _process_source(self, session_id: int, topic: str, source: dict) -> int:
|
async def _process_source(self, session_id: int, topic: str, source: dict) -> int:
|
||||||
"""Chunk, score, embed and store a single source. Returns chunk count."""
|
"""Chunk, score, embed and store a single source. Returns chunk count."""
|
||||||
source_id = source["id"]
|
source_id = source["id"]
|
||||||
@@ -182,7 +234,7 @@ class ContentProcessor:
|
|||||||
if words < 30:
|
if words < 30:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
quality = await self._score_quality(chunk, topic)
|
quality = await self._score_quality(chunk, topic, session_id)
|
||||||
if quality < settings.quality_threshold:
|
if quality < settings.quality_threshold:
|
||||||
filtered_quality += 1
|
filtered_quality += 1
|
||||||
logger.debug("Chunk filtered by quality", source_id=source_id,
|
logger.debug("Chunk filtered by quality", source_id=source_id,
|
||||||
@@ -215,16 +267,20 @@ class ContentProcessor:
|
|||||||
logger.info("Source processed", source_id=source_id, stored=stored)
|
logger.info("Source processed", source_id=source_id, stored=stored)
|
||||||
return stored
|
return stored
|
||||||
|
|
||||||
async def _score_quality(self, chunk: str, topic: str) -> float:
|
async def _score_quality(self, chunk: str, topic: str,
|
||||||
|
session_id: int | None = None) -> float:
|
||||||
"""Score 0-1 relevance to topic. Uses Claude Haiku if API key set, else Ollama."""
|
"""Score 0-1 relevance to topic. Uses Claude Haiku if API key set, else Ollama."""
|
||||||
if settings.anthropic_api_key:
|
if settings.anthropic_api_key:
|
||||||
return await self._score_with_claude(chunk, topic)
|
return await self._score_with_claude(chunk, topic, session_id)
|
||||||
return await self._score_with_ollama(chunk, topic)
|
return await self._score_with_ollama(chunk, topic)
|
||||||
|
|
||||||
async def _score_with_claude(self, chunk: str, topic: str) -> float:
|
async def _score_with_claude(self, chunk: str, topic: str,
|
||||||
|
session_id: int | None = None) -> float:
|
||||||
import anthropic
|
import anthropic
|
||||||
prompt = (
|
prompt = (
|
||||||
f'Rate 0-10 how relevant this text is to the topic "{topic}". '
|
f'Rate 0-10 how relevant this text is to the topic "{topic}". '
|
||||||
|
f'Be generous — if the text is tangentially related, score 4+. '
|
||||||
|
f'Only score below 3 if completely unrelated. '
|
||||||
f'Reply with only a number.\n\nText:\n{chunk[:500]}'
|
f'Reply with only a number.\n\nText:\n{chunk[:500]}'
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
@@ -234,6 +290,14 @@ class ContentProcessor:
|
|||||||
max_tokens=10,
|
max_tokens=10,
|
||||||
messages=[{"role": "user", "content": prompt}]
|
messages=[{"role": "user", "content": prompt}]
|
||||||
)
|
)
|
||||||
|
if session_id is not None:
|
||||||
|
try:
|
||||||
|
await self.db.log_api_call(
|
||||||
|
session_id, "scoring", settings.claude_model,
|
||||||
|
msg.usage.input_tokens, msg.usage.output_tokens
|
||||||
|
)
|
||||||
|
except Exception as log_err:
|
||||||
|
logger.warning("Failed to log API usage", error=str(log_err))
|
||||||
response = msg.content[0].text.strip()
|
response = msg.content[0].text.strip()
|
||||||
numbers = re.findall(r'\b(\d+(?:\.\d+)?)\b', response)
|
numbers = re.findall(r'\b(\d+(?:\.\d+)?)\b', response)
|
||||||
if numbers:
|
if numbers:
|
||||||
@@ -241,7 +305,7 @@ class ContentProcessor:
|
|||||||
normalized = min(1.0, score / 10.0)
|
normalized = min(1.0, score / 10.0)
|
||||||
logger.debug("Claude relevance score", raw=score, normalized=round(normalized, 2))
|
logger.debug("Claude relevance score", raw=score, normalized=round(normalized, 2))
|
||||||
return normalized
|
return normalized
|
||||||
return 0.6
|
return 0.5
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Claude scoring failed, falling back to Ollama", error=str(e))
|
logger.warning("Claude scoring failed, falling back to Ollama", error=str(e))
|
||||||
return await self._score_with_ollama(chunk, topic)
|
return await self._score_with_ollama(chunk, topic)
|
||||||
@@ -275,7 +339,7 @@ class ContentProcessor:
|
|||||||
query_embedding = await self.ollama.embed(query)
|
query_embedding = await self.ollama.embed(query)
|
||||||
|
|
||||||
# Get top quality chunks
|
# Get top quality chunks
|
||||||
chunks = await self.db.get_top_chunks(session_id, limit=100)
|
chunks = await self.db.get_top_chunks(session_id, limit=300)
|
||||||
|
|
||||||
if query_embedding and chunks:
|
if query_embedding and chunks:
|
||||||
# Rank by embedding similarity
|
# Rank by embedding similarity
|
||||||
|
|||||||
+159
-24
@@ -3,6 +3,7 @@ ResearchOwl Exhaustive Scraper
|
|||||||
Core engine: discovers, expands, and evaluates sources recursively
|
Core engine: discovers, expands, and evaluates sources recursively
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import random
|
||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
@@ -54,6 +55,22 @@ REDDIT_RE = re.compile(r"reddit\.com/(r/\w+/comments/\w+)")
|
|||||||
WIKIPEDIA_RE = re.compile(r"wikipedia\.org/wiki/(.+)")
|
WIKIPEDIA_RE = re.compile(r"wikipedia\.org/wiki/(.+)")
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_with_retry(fetch_fn, source_name: str, max_retries: int = 3):
|
||||||
|
last_exc = None
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
return await fetch_fn()
|
||||||
|
except Exception as e:
|
||||||
|
last_exc = e
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
wait = 2 ** attempt + random.random()
|
||||||
|
logger.debug("fetch_with_retry backoff", source=source_name[:60],
|
||||||
|
attempt=attempt + 1, wait=round(wait, 1), error=str(e))
|
||||||
|
await asyncio.sleep(wait)
|
||||||
|
logger.warning("fetch_with_retry exhausted", source=source_name[:60], error=str(last_exc))
|
||||||
|
raise last_exc
|
||||||
|
|
||||||
|
|
||||||
def detect_source_type(url: str) -> str:
|
def detect_source_type(url: str) -> str:
|
||||||
if YOUTUBE_RE.search(url):
|
if YOUTUBE_RE.search(url):
|
||||||
return "youtube"
|
return "youtube"
|
||||||
@@ -140,16 +157,15 @@ class ExhaustiveScraper:
|
|||||||
"""Initial broad search across multiple sources"""
|
"""Initial broad search across multiple sources"""
|
||||||
logger.info("Seeding research", topic=self.topic)
|
logger.info("Seeding research", topic=self.topic)
|
||||||
tasks = [
|
tasks = [
|
||||||
self._seed_duckduckgo(),
|
self._seed_search(),
|
||||||
self._seed_wikipedia(),
|
self._seed_wikipedia(),
|
||||||
self._seed_reddit(),
|
self._seed_reddit(),
|
||||||
self._seed_youtube(),
|
self._seed_youtube(),
|
||||||
]
|
]
|
||||||
await asyncio.gather(*tasks, return_exceptions=True)
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
async def _seed_duckduckgo(self):
|
async def _generate_ddg_queries(self) -> list[str]:
|
||||||
"""Multiple DDG queries — fresh DDGS() per query to avoid cascading ratelimits"""
|
fallback = [
|
||||||
queries = [
|
|
||||||
self.topic,
|
self.topic,
|
||||||
f"{self.topic} history facts",
|
f"{self.topic} history facts",
|
||||||
f"{self.topic} evidence analysis",
|
f"{self.topic} evidence analysis",
|
||||||
@@ -159,13 +175,103 @@ class ExhaustiveScraper:
|
|||||||
f"{self.topic} documentary",
|
f"{self.topic} documentary",
|
||||||
f"{self.topic} research study",
|
f"{self.topic} research study",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
if not settings.anthropic_api_key:
|
||||||
|
return fallback
|
||||||
|
|
||||||
|
try:
|
||||||
|
import anthropic
|
||||||
|
logger.info("Generating DDG queries with Claude", topic=self.topic)
|
||||||
|
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
|
||||||
|
prompt = (
|
||||||
|
f'Generate exactly 8 DuckDuckGo search queries to research: "{self.topic}"\n\n'
|
||||||
|
f'Rules:\n'
|
||||||
|
f'- Each query must be specific and distinct — no generic templates\n'
|
||||||
|
f'- Cover different angles: facts, history, official sources, criticism, '
|
||||||
|
f'technical details, recent developments, expert opinions, primary sources\n'
|
||||||
|
f'- Use the most specific terminology for this topic\n'
|
||||||
|
f'- Include the topic language naturally (if topic is in Spanish, '
|
||||||
|
f'mix Spanish and English queries for broader coverage)\n'
|
||||||
|
f'- Output ONLY the 8 queries, one per line, no numbering, '
|
||||||
|
f'no explanations, no markdown\n'
|
||||||
|
)
|
||||||
|
msg = await client.messages.create(
|
||||||
|
model=settings.claude_model,
|
||||||
|
max_tokens=300,
|
||||||
|
messages=[{"role": "user", "content": prompt}]
|
||||||
|
)
|
||||||
|
raw = msg.content[0].text.strip()
|
||||||
|
queries = [q.strip() for q in raw.split('\n') if q.strip()]
|
||||||
|
if self.topic not in queries:
|
||||||
|
queries = [self.topic] + queries[:7]
|
||||||
|
queries = queries[:8]
|
||||||
|
logger.info("DDG queries generated by Claude", queries=queries)
|
||||||
|
return queries
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Claude query generation failed, using fallback",
|
||||||
|
error=str(e), error_type=type(e).__name__)
|
||||||
|
return fallback
|
||||||
|
|
||||||
|
async def _search_searxng(self, query: str) -> list[dict]:
|
||||||
|
"""Busca en SearXNG y retorna lista de {href, title}. Retorna [] si no disponible."""
|
||||||
|
import aiohttp
|
||||||
|
searxng_url = "http://searxng-svc.researchowl.svc.cluster.local:8080/search"
|
||||||
|
params = {
|
||||||
|
"q": query,
|
||||||
|
"format": "json",
|
||||||
|
"engines": "duckduckgo,google,bing,brave",
|
||||||
|
"language": "all",
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
"Accept": "application/json",
|
||||||
|
"X-Forwarded-For": "127.0.0.1",
|
||||||
|
"User-Agent": "ResearchOwl/1.0",
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
async with session.get(
|
||||||
|
searxng_url,
|
||||||
|
params=params,
|
||||||
|
headers=headers,
|
||||||
|
timeout=aiohttp.ClientTimeout(total=15)
|
||||||
|
) as resp:
|
||||||
|
if resp.status == 200:
|
||||||
|
data = await resp.json()
|
||||||
|
results = data.get("results", [])
|
||||||
|
logger.info("SearXNG query ok", query=query, results=len(results))
|
||||||
|
return [
|
||||||
|
{"href": r.get("url", ""), "title": r.get("title", "")}
|
||||||
|
for r in results
|
||||||
|
if r.get("url")
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
logger.warning("SearXNG non-200", status=resp.status, query=query)
|
||||||
|
return []
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("SearXNG failed", query=query, error=str(e))
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def _seed_search(self):
|
||||||
|
"""SearXNG primary + DDG fallback per query"""
|
||||||
|
queries = await self._generate_ddg_queries()
|
||||||
for query in queries:
|
for query in queries:
|
||||||
if self._stop:
|
if self._stop:
|
||||||
break
|
break
|
||||||
|
results = await self._search_searxng(query)
|
||||||
|
if not results:
|
||||||
|
logger.info("SearXNG vacío, usando DDG", query=query)
|
||||||
try:
|
try:
|
||||||
# Fresh instance per query — a ratelimit on one won't poison the rest
|
|
||||||
with DDGS() as ddgs:
|
with DDGS() as ddgs:
|
||||||
results = list(ddgs.text(query, max_results=settings.max_pages_per_search))
|
ddg_results = list(ddgs.text(
|
||||||
|
query,
|
||||||
|
max_results=settings.max_pages_per_search
|
||||||
|
))
|
||||||
|
results = ddg_results
|
||||||
|
logger.info("DDG fallback ok", query=query, results=len(results))
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("DDG fallback failed", query=query, error=str(e))
|
||||||
|
results = []
|
||||||
|
|
||||||
for r in results:
|
for r in results:
|
||||||
url = normalize_url(r.get("href", ""))
|
url = normalize_url(r.get("href", ""))
|
||||||
if url and not is_blacklisted(url):
|
if url and not is_blacklisted(url):
|
||||||
@@ -175,10 +281,7 @@ class ExhaustiveScraper:
|
|||||||
depth=0,
|
depth=0,
|
||||||
title=r.get("title")
|
title=r.get("title")
|
||||||
)
|
)
|
||||||
logger.info("DDG query ok", query=query, results=len(results))
|
await asyncio.sleep(random.uniform(1, 3))
|
||||||
except Exception as e:
|
|
||||||
logger.warning("DDG query failed", query=query, error=str(e))
|
|
||||||
await asyncio.sleep(settings.request_delay * 2)
|
|
||||||
|
|
||||||
async def _seed_wikipedia(self):
|
async def _seed_wikipedia(self):
|
||||||
"""Search Wikipedia API for correct article URLs.
|
"""Search Wikipedia API for correct article URLs.
|
||||||
@@ -290,12 +393,7 @@ class ExhaustiveScraper:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if self.progress_callback:
|
if self.progress_callback:
|
||||||
await self.progress_callback(
|
await self.progress_callback(self.iteration, self.total_sources)
|
||||||
iteration=self.iteration,
|
|
||||||
total=self.total_sources,
|
|
||||||
new_this_round=new_sources,
|
|
||||||
stats=stats
|
|
||||||
)
|
|
||||||
|
|
||||||
# Saturation check: if we found very few new URLs, we're done
|
# Saturation check: if we found very few new URLs, we're done
|
||||||
if new_sources < 3 and self.iteration > 2:
|
if new_sources < 3 and self.iteration > 2:
|
||||||
@@ -316,10 +414,31 @@ class ExhaustiveScraper:
|
|||||||
source_id = source["id"]
|
source_id = source["id"]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
try:
|
||||||
|
cached = await self.db.get_cached_content(url)
|
||||||
|
except Exception as cache_err:
|
||||||
|
logger.warning("Cache lookup failed", url=url, error=str(cache_err))
|
||||||
|
cached = None
|
||||||
|
|
||||||
|
if cached:
|
||||||
|
logger.debug("Cache hit", url=url)
|
||||||
|
await self.db.save_source_content(source_id, cached)
|
||||||
|
await self.db.update_source(
|
||||||
|
source_id,
|
||||||
|
status="scraped",
|
||||||
|
scraped_at=time.time(),
|
||||||
|
word_count=len(cached.split()),
|
||||||
|
)
|
||||||
|
return 0
|
||||||
|
|
||||||
if source_type == "youtube":
|
if source_type == "youtube":
|
||||||
content, title = await self._extract_youtube(url)
|
content, title = await fetch_with_retry(
|
||||||
|
lambda: self._extract_youtube(url), url
|
||||||
|
)
|
||||||
elif source_type == "wikipedia":
|
elif source_type == "wikipedia":
|
||||||
content, title, new_urls = await self._extract_wikipedia(url)
|
content, title, new_urls = await fetch_with_retry(
|
||||||
|
lambda: self._extract_wikipedia(url), url
|
||||||
|
)
|
||||||
added = 0
|
added = 0
|
||||||
for new_url in (new_urls or []):
|
for new_url in (new_urls or []):
|
||||||
if self._url_is_relevant(new_url):
|
if self._url_is_relevant(new_url):
|
||||||
@@ -331,13 +450,18 @@ class ExhaustiveScraper:
|
|||||||
await self._mark_scraped(source_id, content, title, url)
|
await self._mark_scraped(source_id, content, title, url)
|
||||||
return added
|
return added
|
||||||
elif source_type == "reddit":
|
elif source_type == "reddit":
|
||||||
content, title = await self._extract_reddit(url)
|
content, title = await fetch_with_retry(
|
||||||
# Small delay between Reddit requests to avoid rate limiting
|
lambda: self._extract_reddit(url), url
|
||||||
|
)
|
||||||
await asyncio.sleep(settings.request_delay)
|
await asyncio.sleep(settings.request_delay)
|
||||||
elif source_type == "pdf":
|
elif source_type == "pdf":
|
||||||
content, title = await self._extract_pdf(url)
|
content, title = await fetch_with_retry(
|
||||||
|
lambda: self._extract_pdf(url), url
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
content, title, new_urls = await self._extract_web(url, source["depth"])
|
content, title, new_urls = await fetch_with_retry(
|
||||||
|
lambda: self._extract_web(url, source["depth"]), url
|
||||||
|
)
|
||||||
added = 0
|
added = 0
|
||||||
for new_url in (new_urls or []):
|
for new_url in (new_urls or []):
|
||||||
if self._url_is_relevant(new_url):
|
if self._url_is_relevant(new_url):
|
||||||
@@ -469,12 +593,23 @@ class ExhaustiveScraper:
|
|||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
video_id = match.group(1)
|
video_id = match.group(1)
|
||||||
try:
|
loop = asyncio.get_event_loop()
|
||||||
transcript_list = YouTubeTranscriptApi.get_transcript(
|
|
||||||
|
def _fetch():
|
||||||
|
return YouTubeTranscriptApi.get_transcript(
|
||||||
video_id, languages=["en", "es", "en-US", "en-GB"]
|
video_id, languages=["en", "es", "en-US", "en-GB"]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
transcript_list = await asyncio.wait_for(
|
||||||
|
loop.run_in_executor(None, _fetch),
|
||||||
|
timeout=30.0
|
||||||
|
)
|
||||||
text = " ".join(t["text"] for t in transcript_list)
|
text = " ".join(t["text"] for t in transcript_list)
|
||||||
return text, f"YouTube: {video_id}"
|
return text, f"YouTube: {video_id}"
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logger.warning("YouTube transcript timed out", video_id=video_id)
|
||||||
|
return None, None
|
||||||
except NoTranscriptFound:
|
except NoTranscriptFound:
|
||||||
return None, None
|
return None, None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
Reference in New Issue
Block a user