caf763c23e
Build & Deploy ResearchOwl / build-and-push (push) Successful in 6s
Si hay una sesión activa registrada para el chat, se consulta directamente por id en lugar de por created_at DESC, evitando que /generate use la sesión más reciente en vez de la actual. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1194 lines
43 KiB
Python
1194 lines
43 KiB
Python
"""
|
|
ResearchOwl Telegram Bot
|
|
Main user interface — all commands handled here
|
|
"""
|
|
import asyncio
|
|
import os
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
import structlog
|
|
from telegram import Update, Message
|
|
from telegram.ext import (
|
|
Application, CommandHandler, MessageHandler,
|
|
filters, ContextTypes
|
|
)
|
|
from telegram.constants import ParseMode
|
|
|
|
from src.config import settings
|
|
from src.db.database import get_db, ResearchDB, ResearchStatus, OutputType
|
|
from src.scraper.exhaustive import ExhaustiveScraper
|
|
from src.processor.processor import OllamaClient, ContentProcessor
|
|
from src.generator.generator import OutputGenerator
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
# Active research tasks per chat
|
|
_active_tasks: dict[int, asyncio.Task] = {}
|
|
_active_sessions: dict[int, int] = {} # chat_id -> session_id
|
|
|
|
|
|
def is_authorized(user_id: int) -> bool:
|
|
allowed = settings.allowed_user_ids
|
|
return not allowed or user_id in allowed
|
|
|
|
|
|
class ProgressReporter:
|
|
def __init__(self, reply_target: Message = None, *, bot=None, chat_id: int = None):
|
|
self._reply_target = reply_target
|
|
self._bot = bot
|
|
self._chat_id = chat_id
|
|
self._msg: Optional[Message] = None
|
|
|
|
async def start(self, text: str):
|
|
if self._reply_target is not None:
|
|
self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN)
|
|
elif self._bot is not None and self._chat_id is not None:
|
|
self._msg = await self._bot.send_message(self._chat_id, text, parse_mode=ParseMode.MARKDOWN)
|
|
|
|
async def update(self, text: str):
|
|
if not self._msg:
|
|
return
|
|
try:
|
|
await self._msg.edit_text(text, parse_mode=ParseMode.MARKDOWN)
|
|
except Exception:
|
|
pass
|
|
|
|
async def done(self, text: str):
|
|
await self.update(text)
|
|
|
|
|
|
async def send_chunked(message: Message, text: str, parse_mode=None):
|
|
"""Send long text in chunks of 4000 chars (Telegram limit)"""
|
|
max_len = 4000
|
|
for i in range(0, len(text), max_len):
|
|
chunk = text[i:i + max_len]
|
|
await message.reply_text(chunk, parse_mode=parse_mode)
|
|
if len(text) > max_len:
|
|
await asyncio.sleep(0.5)
|
|
|
|
|
|
# ─── Shared research logic ────────────────────────────────────────────────────
|
|
|
|
async def run_scheduled_research(bot, chat_id: int, topic: str,
|
|
session_id: int, db: ResearchDB,
|
|
progress_message=None,
|
|
silent_completion: bool = False):
|
|
if progress_message is not None:
|
|
reporter = ProgressReporter(progress_message)
|
|
else:
|
|
reporter = ProgressReporter(bot=bot, chat_id=chat_id)
|
|
|
|
try:
|
|
await reporter.start(f"🔍 Iniciando scraping de `{topic}`…")
|
|
|
|
async def on_progress(iter_num, total_sources):
|
|
await reporter.update(
|
|
f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas"
|
|
)
|
|
|
|
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
|
|
final_stats = await scraper.run()
|
|
|
|
await db.update_session(session_id, status=ResearchStatus.SATURATED)
|
|
scraped = final_stats.get("scraped", 0)
|
|
|
|
await reporter.update(f"⚡ Procesando `{scraped}` fuentes…")
|
|
|
|
ollama = OllamaClient()
|
|
if await ollama.is_available():
|
|
processor = ContentProcessor(db, ollama)
|
|
|
|
async def proc_progress(total_chunks, total_words):
|
|
await reporter.update(
|
|
f"⚡ Scoring chunks… (`{total_chunks}` procesados)"
|
|
)
|
|
|
|
await processor.process_session(session_id, topic, proc_progress)
|
|
chunk_count = await db.get_chunks_count(session_id)
|
|
if silent_completion:
|
|
await reporter.done(
|
|
f"🔍 Investigación completada — analizando novedades…"
|
|
)
|
|
else:
|
|
await reporter.done(
|
|
f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate <tipo>"
|
|
)
|
|
else:
|
|
await reporter.done(
|
|
f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n"
|
|
f"Usa /generate para generar contenido."
|
|
)
|
|
|
|
except asyncio.CancelledError:
|
|
await db.update_session(session_id, status=ResearchStatus.FINISHED)
|
|
try:
|
|
await reporter.done("🛑 Investigación cancelada.")
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logger.error("Research task failed", error=str(e))
|
|
try:
|
|
await reporter.done(f"❌ Error: {str(e)[:200]}")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ─── Commands ─────────────────────────────────────────────────────────────────
|
|
|
|
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
await update.message.reply_text(
|
|
"🦉 *ResearchOwl* — Exhaustive Research Engine\n\n"
|
|
"Commands:\n"
|
|
"`/research <topic>` — Start exhaustive research\n"
|
|
"`/status` — Check current research progress\n"
|
|
"`/finish` — Stop research and proceed to generation\n"
|
|
"`/process` — Manually trigger chunk processing\n"
|
|
"`/generate <type>` — Generate output\n"
|
|
" Tipos: podcast|blog|report|thread\n"
|
|
" Extended: podcast_extended|blog_extended|report_extended\n"
|
|
"`/sources` — List all sources found\n"
|
|
"`/outputs` — List generated outputs\n"
|
|
"`/export` — Exportar último output como PDF\n"
|
|
"`/publish` — Publicar último output en Ghost como borrador\n"
|
|
"`/compare <tema1> vs <tema2>` — Análisis comparativo\n"
|
|
"`/costs` — Show API usage costs\n"
|
|
"`/watch <topic> [h]` — Schedule periodic research\n"
|
|
"`/unwatch <topic>` — Remove a watch\n"
|
|
"`/watches` — List your watched topics\n"
|
|
"`/cancel` — Cancel current research\n"
|
|
"`/help` — Show this message",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
|
|
|
|
async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
topic = " ".join(ctx.args).strip() if ctx.args else ""
|
|
|
|
if not topic:
|
|
await update.message.reply_text(
|
|
"❌ Please provide a topic.\nExample: `/research Roswell incident`",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
|
await update.message.reply_text(
|
|
"⚠️ Research already in progress. Use /status or /finish first."
|
|
)
|
|
return
|
|
|
|
async def run_research():
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
try:
|
|
session_id = await db.create_session(topic, chat_id)
|
|
_active_sessions[chat_id] = session_id
|
|
await run_scheduled_research(
|
|
ctx.bot, chat_id, topic, session_id, db,
|
|
progress_message=update.message
|
|
)
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
task = asyncio.create_task(run_research())
|
|
_active_tasks[chat_id] = task
|
|
|
|
|
|
async def cmd_status(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
|
|
try:
|
|
session = await db.get_active_session(chat_id)
|
|
if not session:
|
|
# Try to find last session
|
|
cursor = await db_conn.execute(
|
|
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
|
(chat_id,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
session = dict(row) if row else None
|
|
|
|
if not session:
|
|
await update.message.reply_text("No research sessions found. Start with /research <topic>")
|
|
return
|
|
|
|
stats = await db.get_session_stats(session["id"])
|
|
is_active = chat_id in _active_tasks and not _active_tasks[chat_id].done()
|
|
|
|
status_emoji = {"running": "🔄", "saturated": "✅", "finished": "🏁", "error": "❌"}
|
|
emoji = status_emoji.get(session["status"], "❓")
|
|
|
|
await update.message.reply_text(
|
|
f"{emoji} *Research Status*\n\n"
|
|
f"📝 Topic: `{session['topic']}`\n"
|
|
f"🔁 Status: `{session['status']}`\n"
|
|
f"🔢 Iterations: `{session.get('iterations', 0)}`\n"
|
|
f"📚 Total sources: `{stats.get('total') or 0}`\n"
|
|
f"✅ Scraped: `{stats.get('scraped') or 0}`\n"
|
|
f"⏭️ Skipped: `{stats.get('skipped') or 0}`\n"
|
|
f"❌ Failed: `{stats.get('failed') or 0}`\n"
|
|
f"⏳ Pending: `{stats.get('pending') or 0}`\n"
|
|
f"💬 Chunks: `{session.get('total_chunks', 0)}`\n"
|
|
f"📖 Words: `{session.get('total_words', 0):,}`\n"
|
|
f"{'🟢 Active — stats update each iteration' if is_active else '⚫ Idle'}",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_finish(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
task = _active_tasks.get(chat_id)
|
|
|
|
if task and not task.done():
|
|
task.cancel()
|
|
await update.message.reply_text(
|
|
"🛑 Stopping research...\n"
|
|
"Use `/generate podcast|blog|report|thread` to generate output.",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
else:
|
|
await update.message.reply_text(
|
|
"No active research. Use `/generate` to create output from last session.",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
|
|
|
|
async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
output_arg = ctx.args[0].lower() if ctx.args else ""
|
|
|
|
type_map = {
|
|
"podcast": OutputType.PODCAST,
|
|
"blog": OutputType.BLOG,
|
|
"report": OutputType.REPORT,
|
|
"thread": OutputType.THREAD,
|
|
"hilo": OutputType.THREAD,
|
|
"informe": OutputType.REPORT,
|
|
"report_extended": OutputType.REPORT_EXTENDED,
|
|
"blog_extended": OutputType.BLOG_EXTENDED,
|
|
"podcast_extended": OutputType.PODCAST_EXTENDED,
|
|
"informe_extended": OutputType.REPORT_EXTENDED,
|
|
}
|
|
|
|
if output_arg not in type_map:
|
|
await update.message.reply_text(
|
|
"❌ Invalid output type.\n"
|
|
"Use: `/generate podcast|blog|report|thread`",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
output_type = type_map[output_arg]
|
|
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
|
|
try:
|
|
# Usa la sesión activa si existe, si no la más reciente
|
|
session_id = _active_sessions.get(chat_id)
|
|
if session_id:
|
|
cursor = await db_conn.execute(
|
|
"SELECT * FROM research_sessions WHERE id = ?",
|
|
(session_id,)
|
|
)
|
|
else:
|
|
cursor = await db_conn.execute(
|
|
"""SELECT * FROM research_sessions WHERE telegram_chat_id = ?
|
|
ORDER BY created_at DESC LIMIT 1""",
|
|
(chat_id,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
await update.message.reply_text("No research sessions found. Start with /research <topic>")
|
|
return
|
|
|
|
session = dict(row)
|
|
session_id = session["id"]
|
|
|
|
msg = await update.message.reply_text(
|
|
f"⚙️ Generating *{output_type}* for: `{session['topic']}`\n"
|
|
f"Using Ollama ({settings.ollama_model})...\n"
|
|
f"This may take 2-5 minutes ☕",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
|
|
async def gen_progress(text):
|
|
try:
|
|
await msg.edit_text(text)
|
|
except Exception:
|
|
pass
|
|
|
|
ollama = OllamaClient()
|
|
processor = ContentProcessor(db, ollama)
|
|
generator = OutputGenerator(db, ollama, processor)
|
|
|
|
output = await generator.generate(session_id, output_type, gen_progress)
|
|
|
|
# Send as file if very long
|
|
if len(output) > 8000:
|
|
import tempfile
|
|
import re as _re
|
|
ext_map = {
|
|
OutputType.PODCAST: "script.md",
|
|
OutputType.BLOG: "post.md",
|
|
OutputType.REPORT: "report.md",
|
|
OutputType.THREAD: "thread.txt",
|
|
OutputType.REPORT_EXTENDED: "report_extended.md",
|
|
OutputType.BLOG_EXTENDED: "blog_extended.md",
|
|
OutputType.PODCAST_EXTENDED: "script_extended.md",
|
|
}
|
|
# Use the topic from the output header (written at generation time)
|
|
# instead of the pre-fetched session dict which may be stale.
|
|
_m = _re.search(r'^Topic:\s*(.+)$', output[:500], _re.MULTILINE)
|
|
_topic = _m.group(1).strip() if _m else session["topic"]
|
|
filename = f"researchowl_{_topic[:30].replace(' ', '_')}_{ext_map[output_type]}"
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
|
|
f.write(output)
|
|
tmp_path = f.name
|
|
|
|
with open(tmp_path, "rb") as f:
|
|
await update.message.reply_document(
|
|
document=f,
|
|
filename=filename,
|
|
caption=f"📄 *{output_type.upper()}* — {session['topic']}\n"
|
|
f"Generated by ResearchOwl 🦉",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
os.unlink(tmp_path)
|
|
else:
|
|
await send_chunked(update.message, output)
|
|
|
|
try:
|
|
stats = await db.get_usage_stats(session_id)
|
|
total_cost = sum(s.get("total_cost", 0) for s in stats)
|
|
if total_cost > settings.cost_alert_threshold:
|
|
await update.message.reply_text(
|
|
f"⚠️ Coste acumulado de esta sesión: `${total_cost:.4f}`"
|
|
f" (umbral: `${settings.cost_alert_threshold:.2f}`)",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.error("Generate failed", error=str(e))
|
|
await update.message.reply_text(f"❌ Generation failed: {str(e)[:200]}")
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_sources(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
|
|
try:
|
|
cursor = await db_conn.execute(
|
|
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
|
(chat_id,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
await update.message.reply_text("No sessions found.")
|
|
return
|
|
|
|
session_id = row["id"]
|
|
sources = await db.get_all_sources(session_id)
|
|
|
|
by_type: dict = {}
|
|
for s in sources:
|
|
t = s["source_type"]
|
|
by_type.setdefault(t, []).append(s)
|
|
|
|
lines = [f"📚 *Sources for session #{session_id}*\n"]
|
|
for stype, srcs in by_type.items():
|
|
scraped = sum(1 for s in srcs if s["status"] == "scraped")
|
|
lines.append(f"\n*{stype.upper()}* ({scraped}/{len(srcs)} scraped)")
|
|
for s in srcs[:5]: # show top 5 per type
|
|
quality = s.get("quality_score", 0)
|
|
status_icon = {"scraped": "✅", "failed": "❌", "pending": "⏳", "skipped": "⏭️"}.get(s["status"], "❓")
|
|
title = (s.get("title") or s["url"])[:50]
|
|
lines.append(f"{status_icon} {title} (q:{quality:.1f})")
|
|
if len(srcs) > 5:
|
|
lines.append(f" ... and {len(srcs)-5} more")
|
|
|
|
await send_chunked(update.message, "\n".join(lines), parse_mode=ParseMode.MARKDOWN)
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_outputs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
|
|
try:
|
|
cursor = await db_conn.execute(
|
|
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
|
(chat_id,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
await update.message.reply_text("No sessions found.")
|
|
return
|
|
|
|
outputs = await db.get_outputs(row["id"])
|
|
if not outputs:
|
|
await update.message.reply_text(
|
|
"No outputs generated yet. Use `/generate podcast|blog|report|thread`",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
lines = [f"📄 *Outputs for: {row['topic']}*\n"]
|
|
for o in outputs:
|
|
from datetime import datetime
|
|
dt = datetime.utcfromtimestamp(o['created_at']).strftime("%Y-%m-%d %H:%M")
|
|
lines.append(f"• `{o['output_type']}` — {dt} ({len(o['content'])} chars)")
|
|
|
|
await update.message.reply_text(
|
|
"\n".join(lines),
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_costs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
|
|
try:
|
|
cursor = await db_conn.execute(
|
|
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
|
(chat_id,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
await update.message.reply_text("No sessions found.")
|
|
return
|
|
|
|
session_id = row["id"]
|
|
topic = row["topic"]
|
|
|
|
by_type = {r["call_type"]: r for r in await db.get_usage_stats(session_id)}
|
|
totals = await db.get_total_usage_stats()
|
|
|
|
lines = [f"📊 *Costes ResearchOwl*\n"]
|
|
lines.append(f"Última sesión (`{topic}`):")
|
|
|
|
session_total = 0.0
|
|
for call_type, label in [("scoring", "Scoring"), ("generation", "Generación")]:
|
|
row_data = by_type.get(call_type)
|
|
if row_data:
|
|
calls = row_data["calls"]
|
|
tokens = row_data["total_tokens"]
|
|
cost = row_data["total_cost"]
|
|
session_total += cost
|
|
lines.append(f" {label}: {calls} llamadas · {tokens:,} tokens · ${cost:.4f}")
|
|
else:
|
|
lines.append(f" {label}: —")
|
|
|
|
lines.append(f" Total: ${session_total:.4f}")
|
|
lines.append("")
|
|
lines.append("Acumulado total:")
|
|
acc_cost = totals.get("total_cost") or 0.0
|
|
acc_sessions = totals.get("sessions") or 0
|
|
lines.append(f" ${acc_cost:.4f} ({acc_sessions} sesiones)")
|
|
|
|
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN)
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_watch(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
args = ctx.args or []
|
|
|
|
if not args:
|
|
await update.message.reply_text(
|
|
"❌ Uso: `/watch <tema> [horas]`\nEjemplo: `/watch Incidente Roswell 24`",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
interval_hours = 24
|
|
if args[-1].isdigit():
|
|
interval_hours = int(args[-1])
|
|
topic = " ".join(args[:-1]).strip()
|
|
else:
|
|
topic = " ".join(args).strip()
|
|
|
|
if not topic:
|
|
await update.message.reply_text("❌ Debes especificar un tema.")
|
|
return
|
|
|
|
if not (1 <= interval_hours <= 168):
|
|
await update.message.reply_text(
|
|
"❌ El intervalo debe estar entre 1 y 168 horas (1 semana)."
|
|
)
|
|
return
|
|
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
try:
|
|
try:
|
|
await db.add_watch(topic, chat_id, interval_hours)
|
|
await update.message.reply_text(
|
|
f"👁 Watching: `{topic}` — cada {interval_hours}h\n"
|
|
f"Primera ejecución en ~{interval_hours}h.\n"
|
|
f"Usa /watches para ver todos tus temas.",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
except Exception as e:
|
|
if "UNIQUE" in str(e):
|
|
await update.message.reply_text(
|
|
f"Ya estás watching `{topic}`", parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
else:
|
|
raise
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_unwatch(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
topic = " ".join(ctx.args).strip() if ctx.args else ""
|
|
|
|
if not topic:
|
|
await update.message.reply_text(
|
|
"❌ Uso: `/unwatch <tema>`", parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
try:
|
|
removed = await db.remove_watch(topic, chat_id)
|
|
if removed:
|
|
await update.message.reply_text(
|
|
f"✅ Ya no vigilas `{topic}`.", parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
else:
|
|
await update.message.reply_text(f"No estabas watching `{topic}`.")
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_watches(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
try:
|
|
watches = await db.list_watches(chat_id)
|
|
if not watches:
|
|
await update.message.reply_text(
|
|
"No tienes temas vigilados. Usa `/watch <tema>`",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
now = time.time()
|
|
lines = ["👁 *Tus temas vigilados:*\n"]
|
|
for i, w in enumerate(watches, 1):
|
|
secs_remaining = max(0.0, w["next_run_at"] - now)
|
|
hours_remaining = secs_remaining / 3600
|
|
eta = f"{int(secs_remaining / 60)}min" if hours_remaining < 1 else f"{hours_remaining:.1f}h"
|
|
status = "✅" if w["enabled"] else "⏸"
|
|
lines.append(
|
|
f"{i}. {status} `{w['topic']}` — cada {w['interval_hours']}h · próxima en {eta}"
|
|
)
|
|
|
|
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN)
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_process(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
|
|
try:
|
|
cursor = await db_conn.execute(
|
|
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
|
(chat_id,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
await update.message.reply_text("No research sessions found. Start with /research <topic>")
|
|
return
|
|
|
|
session = dict(row)
|
|
session_id = session["id"]
|
|
topic = session["topic"]
|
|
|
|
msg = await update.message.reply_text(
|
|
f"🧠 Processing session #{session_id}: `{topic}`\n"
|
|
f"Chunking & scoring with Ollama ({settings.ollama_model})...\n"
|
|
f"This may take a few minutes.",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
|
|
ollama = OllamaClient()
|
|
if not await ollama.is_available():
|
|
await msg.edit_text("❌ Ollama not reachable. Check OLLAMA_URL setting.")
|
|
return
|
|
|
|
processor = ContentProcessor(db, ollama)
|
|
|
|
completion_text = None
|
|
|
|
async def proc_progress(total_chunks, total_words):
|
|
nonlocal completion_text
|
|
completion_text = (
|
|
f"🧠 *Processing complete!*\n"
|
|
f"• Chunks stored: `{total_chunks}`\n"
|
|
f"• Words researched: `{total_words:,}`\n\n"
|
|
f"Ready! Use `/generate podcast|blog|report|thread`"
|
|
)
|
|
try:
|
|
await msg.edit_text(completion_text, parse_mode=ParseMode.MARKDOWN)
|
|
completion_text = None # sent, no need to resend
|
|
except Exception:
|
|
pass
|
|
|
|
await processor.process_session(session_id, topic, proc_progress)
|
|
|
|
# Fallback: if edit_text failed silently, send a new message
|
|
if completion_text:
|
|
await update.message.reply_text(completion_text, parse_mode=ParseMode.MARKDOWN)
|
|
|
|
except Exception as e:
|
|
logger.error("Process command failed", error=str(e))
|
|
await update.message.reply_text(f"❌ Processing failed: {str(e)[:200]}")
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_cancel(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
task = _active_tasks.get(chat_id)
|
|
if task and not task.done():
|
|
task.cancel()
|
|
await update.message.reply_text("🛑 Research cancelled.")
|
|
else:
|
|
await update.message.reply_text("No active research to cancel.")
|
|
|
|
|
|
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
await cmd_start(update, ctx)
|
|
|
|
|
|
# ─── Bot setup ────────────────────────────────────────────────────────────────
|
|
|
|
async def _purge_on_startup(app: Application) -> None:
|
|
db_conn = await get_db()
|
|
try:
|
|
db = ResearchDB(db_conn)
|
|
result = await db.purge_old_sessions(30)
|
|
if result["sessions"] > 0:
|
|
logger.info("Startup purge done", **result)
|
|
except Exception as e:
|
|
logger.warning("Startup purge failed — bot continues", error=str(e))
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def _scheduler_loop(app: Application):
|
|
while True:
|
|
db_conn = None
|
|
try:
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
due = await db.get_due_watches()
|
|
for watch in due:
|
|
chat_id = watch["chat_id"]
|
|
topic = watch["topic"]
|
|
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
|
continue
|
|
session_id = await db.create_session(topic, chat_id)
|
|
_active_sessions[chat_id] = session_id
|
|
await db.update_watch_run(watch["id"])
|
|
|
|
async def _task(c=chat_id, t=topic, s=session_id, w_id=watch["id"]):
|
|
inner_db_conn = await get_db()
|
|
inner_db = ResearchDB(inner_db_conn)
|
|
try:
|
|
await run_scheduled_research(app.bot, c, t, s, inner_db,
|
|
silent_completion=True)
|
|
|
|
prev_session = await inner_db.get_previous_session(c, t, s)
|
|
new_urls = await inner_db.get_session_urls(s)
|
|
old_urls = await inner_db.get_session_urls(prev_session["id"]) \
|
|
if prev_session else set()
|
|
new_chunks = await inner_db.get_top_chunks(s, limit=30)
|
|
|
|
try:
|
|
from src.generator.generator import generate_diff_summary
|
|
summary = await generate_diff_summary(
|
|
t, new_urls, old_urls, new_chunks, s, inner_db
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Diff summary failed", error=str(e))
|
|
summary = (
|
|
f"📊 *Actualización disponible — {t}*\n\n"
|
|
f"Usa /generate report para ver el análisis completo."
|
|
)
|
|
|
|
if summary:
|
|
await app.bot.send_message(
|
|
c, summary, parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
else:
|
|
await app.bot.send_message(
|
|
c,
|
|
f"🔄 *{t}* — sin novedades significativas esta vez.",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
finally:
|
|
await inner_db_conn.close()
|
|
|
|
task = asyncio.create_task(_task())
|
|
_active_tasks[chat_id] = task
|
|
await app.bot.send_message(
|
|
chat_id,
|
|
f"🔄 Investigación automática iniciada: `{topic}`",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Scheduler loop error", error=str(e))
|
|
finally:
|
|
if db_conn:
|
|
try:
|
|
await db_conn.close()
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(60)
|
|
|
|
|
|
async def _start_scheduler(app: Application) -> None:
|
|
asyncio.create_task(_scheduler_loop(app))
|
|
|
|
|
|
async def _on_startup(app: Application) -> None:
|
|
await _purge_on_startup(app)
|
|
await _start_scheduler(app)
|
|
|
|
|
|
async def cmd_export(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
|
|
try:
|
|
session = await db.get_latest_session(chat_id)
|
|
if not session:
|
|
await update.message.reply_text("No hay sesiones de investigación.")
|
|
return
|
|
|
|
session_id = session["id"]
|
|
topic = session["topic"]
|
|
|
|
outputs = await db.get_outputs(session_id)
|
|
if not outputs:
|
|
await update.message.reply_text(
|
|
"No hay outputs generados. Usa `/generate <tipo>` primero.",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
priority = [
|
|
"report_extended", "blog_extended", "podcast_extended",
|
|
"report", "blog", "podcast", "thread",
|
|
]
|
|
chosen = None
|
|
for ptype in priority:
|
|
for o in outputs:
|
|
if o["output_type"] == ptype:
|
|
chosen = o
|
|
break
|
|
if chosen:
|
|
break
|
|
if not chosen:
|
|
chosen = outputs[0]
|
|
|
|
msg = await update.message.reply_text(
|
|
f"📄 Generando PDF para `{topic}`…",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
|
|
try:
|
|
from src.generator.generator import generate_pdf
|
|
pdf_bytes = generate_pdf(chosen["content"], title=topic)
|
|
except ImportError:
|
|
await msg.edit_text("❌ reportlab no está instalado. Ejecuta: `pip install reportlab`")
|
|
return
|
|
except Exception as e:
|
|
await msg.edit_text(f"❌ Error generando PDF: {str(e)[:200]}")
|
|
return
|
|
|
|
safe_topic = topic[:40].replace(" ", "_").replace("/", "-")
|
|
filename = f"researchowl_{safe_topic}_{chosen['output_type']}.pdf"
|
|
|
|
import io
|
|
await update.message.reply_document(
|
|
document=io.BytesIO(pdf_bytes),
|
|
filename=filename,
|
|
caption=f"📄 *{chosen['output_type'].upper()}* — {topic}\nExportado por ResearchOwl 🦉",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
try:
|
|
await msg.delete()
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.error("Export failed", error=str(e))
|
|
await update.message.reply_text(f"❌ Export failed: {str(e)[:200]}")
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_purge(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
args = ctx.args or []
|
|
|
|
if not args:
|
|
days = 30
|
|
else:
|
|
try:
|
|
days = int(args[0])
|
|
except ValueError:
|
|
await update.message.reply_text(
|
|
"❌ Uso: `/purge [días]`", parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
if days < 0:
|
|
await update.message.reply_text("❌ El número de días debe ser ≥ 0.")
|
|
return
|
|
if days == 0 and not (len(args) >= 2 and args[1] == "confirm"):
|
|
await update.message.reply_text(
|
|
"⚠️ Esto borrará *todas* las sesiones completadas.\n"
|
|
"Envía `/purge 0 confirm` para confirmar.",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
db_conn = await get_db()
|
|
try:
|
|
db = ResearchDB(db_conn)
|
|
result = await db.purge_old_sessions(days)
|
|
await update.message.reply_text(
|
|
f"🗑️ Purged: {result['sessions']} sessions, "
|
|
f"{result['sources']} sources, "
|
|
f"{result['chunks']} chunks, "
|
|
f"{result['outputs']} outputs"
|
|
)
|
|
except Exception as e:
|
|
logger.error("Purge command failed", error=str(e))
|
|
await update.message.reply_text(f"❌ Purge failed: {str(e)[:200]}")
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_publish(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
db_conn = await get_db()
|
|
db = ResearchDB(db_conn)
|
|
|
|
try:
|
|
from src.generator.generator import GhostPublisher, _extract_title
|
|
|
|
ghost = GhostPublisher()
|
|
if not ghost.is_configured():
|
|
await update.message.reply_text(
|
|
"❌ Ghost no configurado. Asegúrate de que `GHOST_URL` y `GHOST_API_KEY` están definidos.",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
cursor = await db_conn.execute(
|
|
"SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1",
|
|
(chat_id,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
await update.message.reply_text("No hay sesiones. Usa /research primero.")
|
|
return
|
|
|
|
session = dict(row)
|
|
outputs = await db.get_outputs(session["id"])
|
|
if not outputs:
|
|
await update.message.reply_text(
|
|
"No hay outputs generados. Usa `/generate blog|report` primero.",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
priority = ["blog_extended", "blog", "report_extended", "report",
|
|
"podcast_extended", "podcast", "thread"]
|
|
chosen = None
|
|
for ptype in priority:
|
|
for o in outputs:
|
|
if o["output_type"] == ptype:
|
|
chosen = o
|
|
break
|
|
if chosen:
|
|
break
|
|
if not chosen:
|
|
chosen = outputs[-1]
|
|
|
|
msg = await update.message.reply_text("📤 Publicando en Ghost como borrador…")
|
|
|
|
title = _extract_title(chosen["content"]) or session["topic"]
|
|
result = await ghost.publish_draft(title, chosen["content"])
|
|
post = result["posts"][0]
|
|
admin_url = f"{ghost.url}/ghost/#/editor/post/{post['id']}"
|
|
|
|
await msg.edit_text(
|
|
f"✅ *Publicado en Ghost como borrador*\n\n"
|
|
f"📝 Título: `{title}`\n"
|
|
f"🔗 Editar: {admin_url}",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("Publish to Ghost failed", error=str(e))
|
|
await update.message.reply_text(f"❌ Error publicando en Ghost: {str(e)[:200]}")
|
|
finally:
|
|
await db_conn.close()
|
|
|
|
|
|
async def cmd_compare(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
chat_id = update.effective_chat.id
|
|
text = " ".join(ctx.args).strip() if ctx.args else ""
|
|
|
|
import re
|
|
match = re.split(r'\s+vs\.?\s+|\s+versus\s+', text, maxsplit=1, flags=re.IGNORECASE)
|
|
if len(match) != 2 or not match[0].strip() or not match[1].strip():
|
|
await update.message.reply_text(
|
|
"❌ Uso: `/compare <tema1> vs <tema2>`\n"
|
|
"Ejemplo: `/compare energía solar vs energía nuclear`",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
return
|
|
|
|
topic_a = match[0].strip()
|
|
topic_b = match[1].strip()
|
|
|
|
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
|
|
await update.message.reply_text(
|
|
"⚠️ Ya hay una investigación en curso. Usa /cancel primero."
|
|
)
|
|
return
|
|
|
|
msg = await update.message.reply_text(
|
|
f"🔍 Comparando `{topic_a}` vs `{topic_b}`…\n"
|
|
f"Esto lanzará dos investigaciones en paralelo y tardará varios minutos.",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
|
|
async def run_compare():
|
|
db_conn_a = await get_db()
|
|
db_conn_b = await get_db()
|
|
db_a = ResearchDB(db_conn_a)
|
|
db_b = ResearchDB(db_conn_b)
|
|
|
|
try:
|
|
session_id_a = await db_a.create_session(topic_a, chat_id)
|
|
session_id_b = await db_b.create_session(topic_b, chat_id)
|
|
_active_sessions[chat_id] = session_id_a
|
|
|
|
await msg.edit_text(
|
|
f"🔍 Investigando en paralelo:\n"
|
|
f"• `{topic_a}`\n"
|
|
f"• `{topic_b}`\n\n"
|
|
f"Esto puede tardar 10-20 minutos…",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
|
|
async def research_topic(session_id, topic, db):
|
|
scraper = ExhaustiveScraper(db, session_id, topic)
|
|
await scraper.run()
|
|
await db.update_session(session_id, status=ResearchStatus.SATURATED)
|
|
ollama = OllamaClient()
|
|
if await ollama.is_available():
|
|
processor = ContentProcessor(db, ollama)
|
|
await processor.process_session(session_id, topic)
|
|
|
|
await msg.edit_text(
|
|
f"🔍 Scraping en paralelo:\n• `{topic_a}`\n• `{topic_b}`…",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
|
|
await asyncio.gather(
|
|
research_topic(session_id_a, topic_a, db_a),
|
|
research_topic(session_id_b, topic_b, db_b),
|
|
)
|
|
|
|
await msg.edit_text(
|
|
"✍️ Generando análisis comparativo…",
|
|
parse_mode=ParseMode.MARKDOWN
|
|
)
|
|
|
|
ollama = OllamaClient()
|
|
processor_a = ContentProcessor(db_a, ollama)
|
|
processor_b = ContentProcessor(db_b, ollama)
|
|
|
|
context_a = await processor_a.rag_query(session_id_a, topic_a, top_k=40)
|
|
context_b = await processor_b.rag_query(session_id_b, topic_b, top_k=40)
|
|
|
|
if not context_a:
|
|
chunks = await db_a.get_top_chunks(session_id_a, limit=20)
|
|
context_a = "\n\n---\n\n".join(c["content"] for c in chunks)
|
|
if not context_b:
|
|
chunks = await db_b.get_top_chunks(session_id_b, limit=20)
|
|
context_b = "\n\n---\n\n".join(c["content"] for c in chunks)
|
|
|
|
from src.generator.generator import generate_comparison
|
|
comparison = await generate_comparison(
|
|
topic_a, topic_b, context_a, context_b, session_id_a, db_a
|
|
)
|
|
|
|
from datetime import datetime, timezone
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
|
header = (
|
|
f"---\n"
|
|
f"ResearchOwl | COMPARISON\n"
|
|
f"Topic A: {topic_a}\n"
|
|
f"Topic B: {topic_b}\n"
|
|
f"Generated: {now}\n"
|
|
f"---\n\n"
|
|
)
|
|
full_output = header + comparison
|
|
|
|
await db_a.save_output(session_id_a, OutputType.REPORT, full_output)
|
|
|
|
if len(full_output) > 8000:
|
|
import io
|
|
filename = (
|
|
f"compare_{topic_a[:20]}_{topic_b[:20]}.md"
|
|
.replace(" ", "_")
|
|
)
|
|
await update.message.reply_document(
|
|
document=io.BytesIO(full_output.encode()),
|
|
filename=filename,
|
|
caption=f"📊 Comparación: {topic_a} vs {topic_b}"
|
|
)
|
|
try:
|
|
await msg.delete()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
await msg.edit_text(full_output, parse_mode=ParseMode.MARKDOWN)
|
|
|
|
except asyncio.CancelledError:
|
|
await msg.edit_text("🛑 Comparación cancelada.")
|
|
except Exception as e:
|
|
logger.error("Compare task failed", error=str(e))
|
|
try:
|
|
await msg.edit_text(f"❌ Error: {str(e)[:200]}")
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
await db_conn_a.close()
|
|
await db_conn_b.close()
|
|
|
|
task = asyncio.create_task(run_compare())
|
|
_active_tasks[chat_id] = task
|
|
|
|
|
|
def create_bot() -> Application:
|
|
app = (
|
|
Application.builder()
|
|
.token(settings.telegram_bot_token)
|
|
.post_init(_on_startup)
|
|
.build()
|
|
)
|
|
|
|
app.add_handler(CommandHandler("start", cmd_start))
|
|
app.add_handler(CommandHandler("help", cmd_help))
|
|
app.add_handler(CommandHandler("research", cmd_research))
|
|
app.add_handler(CommandHandler("status", cmd_status))
|
|
app.add_handler(CommandHandler("finish", cmd_finish))
|
|
app.add_handler(CommandHandler("generate", cmd_generate))
|
|
app.add_handler(CommandHandler("sources", cmd_sources))
|
|
app.add_handler(CommandHandler("outputs", cmd_outputs))
|
|
app.add_handler(CommandHandler("export", cmd_export))
|
|
app.add_handler(CommandHandler("costs", cmd_costs))
|
|
app.add_handler(CommandHandler("watch", cmd_watch))
|
|
app.add_handler(CommandHandler("unwatch", cmd_unwatch))
|
|
app.add_handler(CommandHandler("watches", cmd_watches))
|
|
app.add_handler(CommandHandler("process", cmd_process))
|
|
app.add_handler(CommandHandler("cancel", cmd_cancel))
|
|
app.add_handler(CommandHandler("purge", cmd_purge))
|
|
app.add_handler(CommandHandler("publish", cmd_publish))
|
|
app.add_handler(CommandHandler("compare", cmd_compare))
|
|
|
|
return app
|
|
|
|
|
|
def run():
|
|
logger.info("Starting ResearchOwl bot")
|
|
app = create_bot()
|
|
app.run_polling(allowed_updates=Update.ALL_TYPES)
|