""" ResearchOwl Telegram Bot Main user interface — all commands handled here """ import asyncio import os import time from datetime import datetime from typing import Optional import structlog from telegram import Update, Message from telegram.ext import ( Application, CommandHandler, MessageHandler, filters, ContextTypes ) from telegram.constants import ParseMode from src.config import settings from src.db.database import get_db, ResearchDB, ResearchStatus, OutputType from src.scraper.exhaustive import ExhaustiveScraper from src.processor.processor import OllamaClient, ContentProcessor from src.generator.generator import OutputGenerator logger = structlog.get_logger() # Active research tasks per chat _active_tasks: dict[int, asyncio.Task] = {} _active_sessions: dict[int, int] = {} # chat_id -> session_id def is_authorized(user_id: int) -> bool: allowed = settings.allowed_user_ids return not allowed or user_id in allowed class ProgressReporter: def __init__(self, reply_target: Message = None, *, bot=None, chat_id: int = None): self._reply_target = reply_target self._bot = bot self._chat_id = chat_id self._msg: Optional[Message] = None async def start(self, text: str): if self._reply_target is not None: self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN) elif self._bot is not None and self._chat_id is not None: self._msg = await self._bot.send_message(self._chat_id, text, parse_mode=ParseMode.MARKDOWN) async def update(self, text: str): if not self._msg: return try: await self._msg.edit_text(text, parse_mode=ParseMode.MARKDOWN) except Exception: pass async def done(self, text: str): await self.update(text) async def send_chunked(message: Message, text: str, parse_mode=None): """Send long text in chunks of 4000 chars (Telegram limit)""" max_len = 4000 for i in range(0, len(text), max_len): chunk = text[i:i + max_len] await message.reply_text(chunk, parse_mode=parse_mode) if len(text) > max_len: await asyncio.sleep(0.5) # ─── Shared research logic ──────────────────────────────────────────────────── async def run_scheduled_research(bot, chat_id: int, topic: str, session_id: int, db: ResearchDB, progress_message=None, silent_completion: bool = False): if progress_message is not None: reporter = ProgressReporter(progress_message) else: reporter = ProgressReporter(bot=bot, chat_id=chat_id) try: await reporter.start(f"🔍 Iniciando scraping de `{topic}`…") async def on_progress(iter_num, total_sources): await reporter.update( f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas" ) scraper = ExhaustiveScraper(db, session_id, topic, on_progress) final_stats = await scraper.run() await db.update_session(session_id, status=ResearchStatus.SATURATED) scraped = final_stats.get("scraped", 0) await reporter.update(f"⚡ Procesando `{scraped}` fuentes…") ollama = OllamaClient() if await ollama.is_available(): processor = ContentProcessor(db, ollama) async def proc_progress(total_chunks, total_words): await reporter.update( f"⚡ Scoring chunks… (`{total_chunks}` procesados)" ) await processor.process_session(session_id, topic, proc_progress) chunk_count = await db.get_chunks_count(session_id) if silent_completion: await reporter.done( f"🔍 Investigación completada — analizando novedades…" ) else: await reporter.done( f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate " ) else: await reporter.done( f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n" f"Usa /generate para generar contenido." ) except asyncio.CancelledError: await db.update_session(session_id, status=ResearchStatus.FINISHED) try: await reporter.done("🛑 Investigación cancelada.") except Exception: pass except Exception as e: logger.error("Research task failed", error=str(e)) try: await reporter.done(f"❌ Error: {str(e)[:200]}") except Exception: pass # ─── Commands ───────────────────────────────────────────────────────────────── async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return await update.message.reply_text( "🦉 *ResearchOwl* — Exhaustive Research Engine\n\n" "Commands:\n" "`/research ` — Start exhaustive research\n" "`/status` — Check current research progress\n" "`/finish` — Stop research and proceed to generation\n" "`/process` — Manually trigger chunk processing\n" "`/generate ` — Generate output\n" " Tipos: podcast|blog|report|thread\n" " Extended: podcast_extended|blog_extended|report_extended\n" "`/sources` — List all sources found\n" "`/outputs` — List generated outputs\n" "`/export` — Exportar último output como PDF\n" "`/costs` — Show API usage costs\n" "`/watch [h]` — Schedule periodic research\n" "`/unwatch ` — Remove a watch\n" "`/watches` — List your watched topics\n" "`/cancel` — Cancel current research\n" "`/help` — Show this message", parse_mode=ParseMode.MARKDOWN ) async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id topic = " ".join(ctx.args).strip() if ctx.args else "" if not topic: await update.message.reply_text( "❌ Please provide a topic.\nExample: `/research Roswell incident`", parse_mode=ParseMode.MARKDOWN ) return if chat_id in _active_tasks and not _active_tasks[chat_id].done(): await update.message.reply_text( "⚠️ Research already in progress. Use /status or /finish first." ) return async def run_research(): db_conn = await get_db() db = ResearchDB(db_conn) try: session_id = await db.create_session(topic, chat_id) _active_sessions[chat_id] = session_id await run_scheduled_research( ctx.bot, chat_id, topic, session_id, db, progress_message=update.message ) finally: await db_conn.close() task = asyncio.create_task(run_research()) _active_tasks[chat_id] = task async def cmd_status(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: session = await db.get_active_session(chat_id) if not session: # Try to find last session cursor = await db_conn.execute( "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", (chat_id,) ) row = await cursor.fetchone() session = dict(row) if row else None if not session: await update.message.reply_text("No research sessions found. Start with /research ") return stats = await db.get_session_stats(session["id"]) is_active = chat_id in _active_tasks and not _active_tasks[chat_id].done() status_emoji = {"running": "🔄", "saturated": "✅", "finished": "🏁", "error": "❌"} emoji = status_emoji.get(session["status"], "❓") await update.message.reply_text( f"{emoji} *Research Status*\n\n" f"📝 Topic: `{session['topic']}`\n" f"🔁 Status: `{session['status']}`\n" f"🔢 Iterations: `{session.get('iterations', 0)}`\n" f"📚 Total sources: `{stats.get('total') or 0}`\n" f"✅ Scraped: `{stats.get('scraped') or 0}`\n" f"⏭️ Skipped: `{stats.get('skipped') or 0}`\n" f"❌ Failed: `{stats.get('failed') or 0}`\n" f"⏳ Pending: `{stats.get('pending') or 0}`\n" f"💬 Chunks: `{session.get('total_chunks', 0)}`\n" f"📖 Words: `{session.get('total_words', 0):,}`\n" f"{'🟢 Active — stats update each iteration' if is_active else '⚫ Idle'}", parse_mode=ParseMode.MARKDOWN ) finally: await db_conn.close() async def cmd_finish(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id task = _active_tasks.get(chat_id) if task and not task.done(): task.cancel() await update.message.reply_text( "🛑 Stopping research...\n" "Use `/generate podcast|blog|report|thread` to generate output.", parse_mode=ParseMode.MARKDOWN ) else: await update.message.reply_text( "No active research. Use `/generate` to create output from last session.", parse_mode=ParseMode.MARKDOWN ) async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id output_arg = ctx.args[0].lower() if ctx.args else "" type_map = { "podcast": OutputType.PODCAST, "blog": OutputType.BLOG, "report": OutputType.REPORT, "thread": OutputType.THREAD, "hilo": OutputType.THREAD, "informe": OutputType.REPORT, "report_extended": OutputType.REPORT_EXTENDED, "blog_extended": OutputType.BLOG_EXTENDED, "podcast_extended": OutputType.PODCAST_EXTENDED, "informe_extended": OutputType.REPORT_EXTENDED, } if output_arg not in type_map: await update.message.reply_text( "❌ Invalid output type.\n" "Use: `/generate podcast|blog|report|thread`", parse_mode=ParseMode.MARKDOWN ) return output_type = type_map[output_arg] db_conn = await get_db() db = ResearchDB(db_conn) try: # Find last session for this chat cursor = await db_conn.execute( """SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1""", (chat_id,) ) row = await cursor.fetchone() if not row: await update.message.reply_text("No research sessions found. Start with /research ") return session = dict(row) session_id = session["id"] msg = await update.message.reply_text( f"⚙️ Generating *{output_type}* for: `{session['topic']}`\n" f"Using Ollama ({settings.ollama_model})...\n" f"This may take 2-5 minutes ☕", parse_mode=ParseMode.MARKDOWN ) async def gen_progress(text): try: await msg.edit_text(text) except Exception: pass ollama = OllamaClient() processor = ContentProcessor(db, ollama) generator = OutputGenerator(db, ollama, processor) output = await generator.generate(session_id, output_type, gen_progress) # Send as file if very long if len(output) > 8000: import tempfile ext_map = { OutputType.PODCAST: "script.md", OutputType.BLOG: "post.md", OutputType.REPORT: "report.md", OutputType.THREAD: "thread.txt", OutputType.REPORT_EXTENDED: "report_extended.md", OutputType.BLOG_EXTENDED: "blog_extended.md", OutputType.PODCAST_EXTENDED: "script_extended.md", } filename = f"researchowl_{session['topic'][:30].replace(' ', '_')}_{ext_map[output_type]}" with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f: f.write(output) tmp_path = f.name with open(tmp_path, "rb") as f: await update.message.reply_document( document=f, filename=filename, caption=f"📄 *{output_type.upper()}* — {session['topic']}\n" f"Generated by ResearchOwl 🦉", parse_mode=ParseMode.MARKDOWN ) os.unlink(tmp_path) else: await send_chunked(update.message, output) except Exception as e: logger.error("Generate failed", error=str(e)) await update.message.reply_text(f"❌ Generation failed: {str(e)[:200]}") finally: await db_conn.close() async def cmd_sources(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: cursor = await db_conn.execute( "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", (chat_id,) ) row = await cursor.fetchone() if not row: await update.message.reply_text("No sessions found.") return session_id = row["id"] sources = await db.get_all_sources(session_id) by_type: dict = {} for s in sources: t = s["source_type"] by_type.setdefault(t, []).append(s) lines = [f"📚 *Sources for session #{session_id}*\n"] for stype, srcs in by_type.items(): scraped = sum(1 for s in srcs if s["status"] == "scraped") lines.append(f"\n*{stype.upper()}* ({scraped}/{len(srcs)} scraped)") for s in srcs[:5]: # show top 5 per type quality = s.get("quality_score", 0) status_icon = {"scraped": "✅", "failed": "❌", "pending": "⏳", "skipped": "⏭️"}.get(s["status"], "❓") title = (s.get("title") or s["url"])[:50] lines.append(f"{status_icon} {title} (q:{quality:.1f})") if len(srcs) > 5: lines.append(f" ... and {len(srcs)-5} more") await send_chunked(update.message, "\n".join(lines), parse_mode=ParseMode.MARKDOWN) finally: await db_conn.close() async def cmd_outputs(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: cursor = await db_conn.execute( "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", (chat_id,) ) row = await cursor.fetchone() if not row: await update.message.reply_text("No sessions found.") return outputs = await db.get_outputs(row["id"]) if not outputs: await update.message.reply_text( "No outputs generated yet. Use `/generate podcast|blog|report|thread`", parse_mode=ParseMode.MARKDOWN ) return lines = [f"📄 *Outputs for: {row['topic']}*\n"] for o in outputs: from datetime import datetime dt = datetime.utcfromtimestamp(o['created_at']).strftime("%Y-%m-%d %H:%M") lines.append(f"• `{o['output_type']}` — {dt} ({len(o['content'])} chars)") await update.message.reply_text( "\n".join(lines), parse_mode=ParseMode.MARKDOWN ) finally: await db_conn.close() async def cmd_costs(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: cursor = await db_conn.execute( "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", (chat_id,) ) row = await cursor.fetchone() if not row: await update.message.reply_text("No sessions found.") return session_id = row["id"] topic = row["topic"] by_type = {r["call_type"]: r for r in await db.get_usage_stats(session_id)} totals = await db.get_total_usage_stats() lines = [f"📊 *Costes ResearchOwl*\n"] lines.append(f"Última sesión (`{topic}`):") session_total = 0.0 for call_type, label in [("scoring", "Scoring"), ("generation", "Generación")]: row_data = by_type.get(call_type) if row_data: calls = row_data["calls"] tokens = row_data["total_tokens"] cost = row_data["total_cost"] session_total += cost lines.append(f" {label}: {calls} llamadas · {tokens:,} tokens · ${cost:.4f}") else: lines.append(f" {label}: —") lines.append(f" Total: ${session_total:.4f}") lines.append("") lines.append("Acumulado total:") acc_cost = totals.get("total_cost") or 0.0 acc_sessions = totals.get("sessions") or 0 lines.append(f" ${acc_cost:.4f} ({acc_sessions} sesiones)") await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN) finally: await db_conn.close() async def cmd_watch(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id args = ctx.args or [] if not args: await update.message.reply_text( "❌ Uso: `/watch [horas]`\nEjemplo: `/watch Incidente Roswell 24`", parse_mode=ParseMode.MARKDOWN ) return interval_hours = 24 if args[-1].isdigit(): interval_hours = int(args[-1]) topic = " ".join(args[:-1]).strip() else: topic = " ".join(args).strip() if not topic: await update.message.reply_text("❌ Debes especificar un tema.") return if not (1 <= interval_hours <= 168): await update.message.reply_text( "❌ El intervalo debe estar entre 1 y 168 horas (1 semana)." ) return db_conn = await get_db() db = ResearchDB(db_conn) try: try: await db.add_watch(topic, chat_id, interval_hours) await update.message.reply_text( f"👁 Watching: `{topic}` — cada {interval_hours}h\n" f"Primera ejecución en ~{interval_hours}h.\n" f"Usa /watches para ver todos tus temas.", parse_mode=ParseMode.MARKDOWN ) except Exception as e: if "UNIQUE" in str(e): await update.message.reply_text( f"Ya estás watching `{topic}`", parse_mode=ParseMode.MARKDOWN ) else: raise finally: await db_conn.close() async def cmd_unwatch(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id topic = " ".join(ctx.args).strip() if ctx.args else "" if not topic: await update.message.reply_text( "❌ Uso: `/unwatch `", parse_mode=ParseMode.MARKDOWN ) return db_conn = await get_db() db = ResearchDB(db_conn) try: removed = await db.remove_watch(topic, chat_id) if removed: await update.message.reply_text( f"✅ Ya no vigilas `{topic}`.", parse_mode=ParseMode.MARKDOWN ) else: await update.message.reply_text(f"No estabas watching `{topic}`.") finally: await db_conn.close() async def cmd_watches(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: watches = await db.list_watches(chat_id) if not watches: await update.message.reply_text( "No tienes temas vigilados. Usa `/watch `", parse_mode=ParseMode.MARKDOWN ) return now = time.time() lines = ["👁 *Tus temas vigilados:*\n"] for i, w in enumerate(watches, 1): secs_remaining = max(0.0, w["next_run_at"] - now) hours_remaining = secs_remaining / 3600 eta = f"{int(secs_remaining / 60)}min" if hours_remaining < 1 else f"{hours_remaining:.1f}h" status = "✅" if w["enabled"] else "⏸" lines.append( f"{i}. {status} `{w['topic']}` — cada {w['interval_hours']}h · próxima en {eta}" ) await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN) finally: await db_conn.close() async def cmd_process(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: cursor = await db_conn.execute( "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", (chat_id,) ) row = await cursor.fetchone() if not row: await update.message.reply_text("No research sessions found. Start with /research ") return session = dict(row) session_id = session["id"] topic = session["topic"] msg = await update.message.reply_text( f"🧠 Processing session #{session_id}: `{topic}`\n" f"Chunking & scoring with Ollama ({settings.ollama_model})...\n" f"This may take a few minutes.", parse_mode=ParseMode.MARKDOWN ) ollama = OllamaClient() if not await ollama.is_available(): await msg.edit_text("❌ Ollama not reachable. Check OLLAMA_URL setting.") return processor = ContentProcessor(db, ollama) completion_text = None async def proc_progress(total_chunks, total_words): nonlocal completion_text completion_text = ( f"🧠 *Processing complete!*\n" f"• Chunks stored: `{total_chunks}`\n" f"• Words researched: `{total_words:,}`\n\n" f"Ready! Use `/generate podcast|blog|report|thread`" ) try: await msg.edit_text(completion_text, parse_mode=ParseMode.MARKDOWN) completion_text = None # sent, no need to resend except Exception: pass await processor.process_session(session_id, topic, proc_progress) # Fallback: if edit_text failed silently, send a new message if completion_text: await update.message.reply_text(completion_text, parse_mode=ParseMode.MARKDOWN) except Exception as e: logger.error("Process command failed", error=str(e)) await update.message.reply_text(f"❌ Processing failed: {str(e)[:200]}") finally: await db_conn.close() async def cmd_cancel(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id task = _active_tasks.get(chat_id) if task and not task.done(): task.cancel() await update.message.reply_text("🛑 Research cancelled.") else: await update.message.reply_text("No active research to cancel.") async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE): await cmd_start(update, ctx) # ─── Bot setup ──────────────────────────────────────────────────────────────── async def _purge_on_startup(app: Application) -> None: db_conn = await get_db() try: db = ResearchDB(db_conn) result = await db.purge_old_sessions(30) if result["sessions"] > 0: logger.info("Startup purge done", **result) except Exception as e: logger.warning("Startup purge failed — bot continues", error=str(e)) finally: await db_conn.close() async def _scheduler_loop(app: Application): while True: db_conn = None try: db_conn = await get_db() db = ResearchDB(db_conn) due = await db.get_due_watches() for watch in due: chat_id = watch["chat_id"] topic = watch["topic"] if chat_id in _active_tasks and not _active_tasks[chat_id].done(): continue session_id = await db.create_session(topic, chat_id) _active_sessions[chat_id] = session_id await db.update_watch_run(watch["id"]) async def _task(c=chat_id, t=topic, s=session_id, w_id=watch["id"]): inner_db_conn = await get_db() inner_db = ResearchDB(inner_db_conn) try: await run_scheduled_research(app.bot, c, t, s, inner_db, silent_completion=True) prev_session = await inner_db.get_previous_session(c, t, s) new_urls = await inner_db.get_session_urls(s) old_urls = await inner_db.get_session_urls(prev_session["id"]) \ if prev_session else set() new_chunks = await inner_db.get_top_chunks(s, limit=30) try: from src.generator.generator import generate_diff_summary summary = await generate_diff_summary( t, new_urls, old_urls, new_chunks, s, inner_db ) except Exception as e: logger.warning("Diff summary failed", error=str(e)) summary = ( f"📊 *Actualización disponible — {t}*\n\n" f"Usa /generate report para ver el análisis completo." ) if summary: await app.bot.send_message( c, summary, parse_mode=ParseMode.MARKDOWN ) else: await app.bot.send_message( c, f"🔄 *{t}* — sin novedades significativas esta vez.", parse_mode=ParseMode.MARKDOWN ) finally: await inner_db_conn.close() task = asyncio.create_task(_task()) _active_tasks[chat_id] = task await app.bot.send_message( chat_id, f"🔄 Investigación automática iniciada: `{topic}`", parse_mode=ParseMode.MARKDOWN ) except Exception as e: logger.warning("Scheduler loop error", error=str(e)) finally: if db_conn: try: await db_conn.close() except Exception: pass await asyncio.sleep(60) async def _start_scheduler(app: Application) -> None: asyncio.create_task(_scheduler_loop(app)) async def _on_startup(app: Application) -> None: await _purge_on_startup(app) await _start_scheduler(app) async def cmd_export(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: session = await db.get_latest_session(chat_id) if not session: await update.message.reply_text("No hay sesiones de investigación.") return session_id = session["id"] topic = session["topic"] outputs = await db.get_outputs(session_id) if not outputs: await update.message.reply_text( "No hay outputs generados. Usa `/generate ` primero.", parse_mode=ParseMode.MARKDOWN ) return priority = [ "report_extended", "blog_extended", "podcast_extended", "report", "blog", "podcast", "thread", ] chosen = None for ptype in priority: for o in outputs: if o["output_type"] == ptype: chosen = o break if chosen: break if not chosen: chosen = outputs[0] msg = await update.message.reply_text( f"📄 Generando PDF para `{topic}`…", parse_mode=ParseMode.MARKDOWN ) try: from src.generator.generator import generate_pdf pdf_bytes = generate_pdf(chosen["content"], title=topic) except ImportError: await msg.edit_text("❌ reportlab no está instalado. Ejecuta: `pip install reportlab`") return except Exception as e: await msg.edit_text(f"❌ Error generando PDF: {str(e)[:200]}") return safe_topic = topic[:40].replace(" ", "_").replace("/", "-") filename = f"researchowl_{safe_topic}_{chosen['output_type']}.pdf" import io await update.message.reply_document( document=io.BytesIO(pdf_bytes), filename=filename, caption=f"📄 *{chosen['output_type'].upper()}* — {topic}\nExportado por ResearchOwl 🦉", parse_mode=ParseMode.MARKDOWN ) try: await msg.delete() except Exception: pass except Exception as e: logger.error("Export failed", error=str(e)) await update.message.reply_text(f"❌ Export failed: {str(e)[:200]}") finally: await db_conn.close() async def cmd_purge(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return args = ctx.args or [] if not args: days = 30 else: try: days = int(args[0]) except ValueError: await update.message.reply_text( "❌ Uso: `/purge [días]`", parse_mode=ParseMode.MARKDOWN ) return if days < 0: await update.message.reply_text("❌ El número de días debe ser ≥ 0.") return if days == 0 and not (len(args) >= 2 and args[1] == "confirm"): await update.message.reply_text( "⚠️ Esto borrará *todas* las sesiones completadas.\n" "Envía `/purge 0 confirm` para confirmar.", parse_mode=ParseMode.MARKDOWN ) return db_conn = await get_db() try: db = ResearchDB(db_conn) result = await db.purge_old_sessions(days) await update.message.reply_text( f"🗑️ Purged: {result['sessions']} sessions, " f"{result['sources']} sources, " f"{result['chunks']} chunks, " f"{result['outputs']} outputs" ) except Exception as e: logger.error("Purge command failed", error=str(e)) await update.message.reply_text(f"❌ Purge failed: {str(e)[:200]}") finally: await db_conn.close() def create_bot() -> Application: app = ( Application.builder() .token(settings.telegram_bot_token) .post_init(_on_startup) .build() ) app.add_handler(CommandHandler("start", cmd_start)) app.add_handler(CommandHandler("help", cmd_help)) app.add_handler(CommandHandler("research", cmd_research)) app.add_handler(CommandHandler("status", cmd_status)) app.add_handler(CommandHandler("finish", cmd_finish)) app.add_handler(CommandHandler("generate", cmd_generate)) app.add_handler(CommandHandler("sources", cmd_sources)) app.add_handler(CommandHandler("outputs", cmd_outputs)) app.add_handler(CommandHandler("export", cmd_export)) app.add_handler(CommandHandler("costs", cmd_costs)) app.add_handler(CommandHandler("watch", cmd_watch)) app.add_handler(CommandHandler("unwatch", cmd_unwatch)) app.add_handler(CommandHandler("watches", cmd_watches)) app.add_handler(CommandHandler("process", cmd_process)) app.add_handler(CommandHandler("cancel", cmd_cancel)) app.add_handler(CommandHandler("purge", cmd_purge)) return app def run(): logger.info("Starting ResearchOwl bot") app = create_bot() app.run_polling(allowed_updates=Update.ALL_TYPES)