diff --git a/src/bot/bot.py b/src/bot/bot.py index eb48759..7d31439 100644 --- a/src/bot/bot.py +++ b/src/bot/bot.py @@ -4,6 +4,7 @@ Main user interface — all commands handled here """ import asyncio import os +import time from datetime import datetime from typing import Optional @@ -34,12 +35,17 @@ def is_authorized(user_id: int) -> bool: class ProgressReporter: - def __init__(self, reply_target: Message): + def __init__(self, reply_target: Message = None, *, bot=None, chat_id: int = None): self._reply_target = reply_target + self._bot = bot + self._chat_id = chat_id self._msg: Optional[Message] = None async def start(self, text: str): - self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN) + if self._reply_target is not None: + self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN) + elif self._bot is not None and self._chat_id is not None: + self._msg = await self._bot.send_message(self._chat_id, text, parse_mode=ParseMode.MARKDOWN) async def update(self, text: str): if not self._msg: @@ -63,6 +69,66 @@ async def send_chunked(message: Message, text: str, parse_mode=None): await asyncio.sleep(0.5) +# ─── Shared research logic ──────────────────────────────────────────────────── + +async def run_scheduled_research(bot, chat_id: int, topic: str, + session_id: int, db: ResearchDB, + progress_message=None): + if progress_message is not None: + reporter = ProgressReporter(progress_message) + else: + reporter = ProgressReporter(bot=bot, chat_id=chat_id) + + try: + await reporter.start(f"🔍 Iniciando scraping de `{topic}`…") + + async def on_progress(iter_num, total_sources): + await reporter.update( + f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas" + ) + + scraper = ExhaustiveScraper(db, session_id, topic, on_progress) + final_stats = await scraper.run() + + await db.update_session(session_id, status=ResearchStatus.SATURATED) + scraped = final_stats.get("scraped", 0) + + await reporter.update(f"⚡ Procesando `{scraped}` fuentes…") + + ollama = OllamaClient() + if await ollama.is_available(): + processor = ContentProcessor(db, ollama) + + async def proc_progress(total_chunks, total_words): + await reporter.update( + f"⚡ Scoring chunks… (`{total_chunks}` procesados)" + ) + + await processor.process_session(session_id, topic, proc_progress) + chunk_count = await db.get_chunks_count(session_id) + await reporter.done( + f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate " + ) + else: + await reporter.done( + f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n" + f"Usa /generate para generar contenido." + ) + + except asyncio.CancelledError: + await db.update_session(session_id, status=ResearchStatus.FINISHED) + try: + await reporter.done("🛑 Investigación cancelada.") + except Exception: + pass + except Exception as e: + logger.error("Research task failed", error=str(e)) + try: + await reporter.done(f"❌ Error: {str(e)[:200]}") + except Exception: + pass + + # ─── Commands ───────────────────────────────────────────────────────────────── async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE): @@ -79,6 +145,9 @@ async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE): "`/sources` — List all sources found\n" "`/outputs` — List generated outputs\n" "`/costs` — Show API usage costs\n" + "`/watch [h]` — Schedule periodic research\n" + "`/unwatch ` — Remove a watch\n" + "`/watches` — List your watched topics\n" "`/cancel` — Cancel current research\n" "`/help` — Show this message", parse_mode=ParseMode.MARKDOWN @@ -108,62 +177,13 @@ async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE): async def run_research(): db_conn = await get_db() db = ResearchDB(db_conn) - reporter = None try: session_id = await db.create_session(topic, chat_id) _active_sessions[chat_id] = session_id - - reporter = ProgressReporter(update.message) - await reporter.start(f"🔍 Iniciando scraping de `{topic}`…") - - async def on_progress(iter_num, total_sources): - await reporter.update( - f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas" - ) - - scraper = ExhaustiveScraper(db, session_id, topic, on_progress) - final_stats = await scraper.run() - - await db.update_session(session_id, status=ResearchStatus.SATURATED) - scraped = final_stats.get("scraped", 0) - - await reporter.update(f"⚡ Procesando `{scraped}` fuentes…") - - ollama = OllamaClient() - if await ollama.is_available(): - processor = ContentProcessor(db, ollama) - - async def proc_progress(total_chunks, total_words): - await reporter.update( - f"⚡ Scoring chunks… (`{total_chunks}` procesados)" - ) - - await processor.process_session(session_id, topic, proc_progress) - chunk_count = await db.get_chunks_count(session_id) - await reporter.done( - f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate " - ) - else: - await reporter.done( - f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n" - f"Usa /generate para generar contenido." - ) - - except asyncio.CancelledError: - await db.update_session( - _active_sessions.get(chat_id, 0), - status=ResearchStatus.FINISHED + await run_scheduled_research( + ctx.bot, chat_id, topic, session_id, db, + progress_message=update.message ) - if reporter: - await reporter.done("🛑 Investigación cancelada.") - else: - await update.message.reply_text("🛑 Research cancelled.") - except Exception as e: - logger.error("Research task failed", error=str(e)) - if reporter: - await reporter.done(f"❌ Error: {str(e)[:200]}") - else: - await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}") finally: await db_conn.close() @@ -471,6 +491,118 @@ async def cmd_costs(update: Update, ctx: ContextTypes.DEFAULT_TYPE): await db_conn.close() +async def cmd_watch(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + args = ctx.args or [] + + if not args: + await update.message.reply_text( + "❌ Uso: `/watch [horas]`\nEjemplo: `/watch Incidente Roswell 24`", + parse_mode=ParseMode.MARKDOWN + ) + return + + interval_hours = 24 + if args[-1].isdigit(): + interval_hours = int(args[-1]) + topic = " ".join(args[:-1]).strip() + else: + topic = " ".join(args).strip() + + if not topic: + await update.message.reply_text("❌ Debes especificar un tema.") + return + + if not (1 <= interval_hours <= 168): + await update.message.reply_text( + "❌ El intervalo debe estar entre 1 y 168 horas (1 semana)." + ) + return + + db_conn = await get_db() + db = ResearchDB(db_conn) + try: + try: + await db.add_watch(topic, chat_id, interval_hours) + await update.message.reply_text( + f"👁 Watching: `{topic}` — cada {interval_hours}h\n" + f"Primera ejecución en ~{interval_hours}h.\n" + f"Usa /watches para ver todos tus temas.", + parse_mode=ParseMode.MARKDOWN + ) + except Exception as e: + if "UNIQUE" in str(e): + await update.message.reply_text( + f"Ya estás watching `{topic}`", parse_mode=ParseMode.MARKDOWN + ) + else: + raise + finally: + await db_conn.close() + + +async def cmd_unwatch(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + topic = " ".join(ctx.args).strip() if ctx.args else "" + + if not topic: + await update.message.reply_text( + "❌ Uso: `/unwatch `", parse_mode=ParseMode.MARKDOWN + ) + return + + db_conn = await get_db() + db = ResearchDB(db_conn) + try: + removed = await db.remove_watch(topic, chat_id) + if removed: + await update.message.reply_text( + f"✅ Ya no vigilas `{topic}`.", parse_mode=ParseMode.MARKDOWN + ) + else: + await update.message.reply_text(f"No estabas watching `{topic}`.") + finally: + await db_conn.close() + + +async def cmd_watches(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not is_authorized(update.effective_user.id): + return + + chat_id = update.effective_chat.id + db_conn = await get_db() + db = ResearchDB(db_conn) + try: + watches = await db.list_watches(chat_id) + if not watches: + await update.message.reply_text( + "No tienes temas vigilados. Usa `/watch `", + parse_mode=ParseMode.MARKDOWN + ) + return + + now = time.time() + lines = ["👁 *Tus temas vigilados:*\n"] + for i, w in enumerate(watches, 1): + secs_remaining = max(0.0, w["next_run_at"] - now) + hours_remaining = secs_remaining / 3600 + eta = f"{int(secs_remaining / 60)}min" if hours_remaining < 1 else f"{hours_remaining:.1f}h" + status = "✅" if w["enabled"] else "⏸" + lines.append( + f"{i}. {status} `{w['topic']}` — cada {w['interval_hours']}h · próxima en {eta}" + ) + + await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN) + finally: + await db_conn.close() + + async def cmd_process(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return @@ -568,6 +700,57 @@ async def _purge_on_startup(app: Application) -> None: await db_conn.close() +async def _scheduler_loop(app: Application): + while True: + db_conn = None + try: + db_conn = await get_db() + db = ResearchDB(db_conn) + due = await db.get_due_watches() + for watch in due: + chat_id = watch["chat_id"] + topic = watch["topic"] + if chat_id in _active_tasks and not _active_tasks[chat_id].done(): + continue + session_id = await db.create_session(topic, chat_id) + _active_sessions[chat_id] = session_id + await db.update_watch_run(watch["id"]) + + async def _task(c=chat_id, t=topic, s=session_id): + inner_db_conn = await get_db() + inner_db = ResearchDB(inner_db_conn) + try: + await run_scheduled_research(app.bot, c, t, s, inner_db) + finally: + await inner_db_conn.close() + + task = asyncio.create_task(_task()) + _active_tasks[chat_id] = task + await app.bot.send_message( + chat_id, + f"🔄 Investigación automática iniciada: `{topic}`", + parse_mode=ParseMode.MARKDOWN + ) + except Exception as e: + logger.warning("Scheduler loop error", error=str(e)) + finally: + if db_conn: + try: + await db_conn.close() + except Exception: + pass + await asyncio.sleep(60) + + +async def _start_scheduler(app: Application) -> None: + asyncio.create_task(_scheduler_loop(app)) + + +async def _on_startup(app: Application) -> None: + await _purge_on_startup(app) + await _start_scheduler(app) + + async def cmd_purge(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return @@ -616,7 +799,7 @@ def create_bot() -> Application: app = ( Application.builder() .token(settings.telegram_bot_token) - .post_init(_purge_on_startup) + .post_init(_on_startup) .build() ) @@ -629,6 +812,9 @@ def create_bot() -> Application: app.add_handler(CommandHandler("sources", cmd_sources)) app.add_handler(CommandHandler("outputs", cmd_outputs)) app.add_handler(CommandHandler("costs", cmd_costs)) + app.add_handler(CommandHandler("watch", cmd_watch)) + app.add_handler(CommandHandler("unwatch", cmd_unwatch)) + app.add_handler(CommandHandler("watches", cmd_watches)) app.add_handler(CommandHandler("process", cmd_process)) app.add_handler(CommandHandler("cancel", cmd_cancel)) app.add_handler(CommandHandler("purge", cmd_purge)) diff --git a/src/db/database.py b/src/db/database.py index cebdf6a..8716337 100644 --- a/src/db/database.py +++ b/src/db/database.py @@ -99,6 +99,18 @@ CREATE TABLE IF NOT EXISTS api_usage ( cost_usd REAL NOT NULL, created_at REAL NOT NULL ); + +CREATE TABLE IF NOT EXISTS watched_topics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + topic TEXT NOT NULL, + chat_id INTEGER NOT NULL, + interval_hours INTEGER NOT NULL DEFAULT 24, + next_run_at REAL NOT NULL, + last_run_at REAL, + enabled INTEGER NOT NULL DEFAULT 1, + created_at REAL NOT NULL, + UNIQUE(topic, chat_id) +); """ @@ -319,6 +331,57 @@ class ResearchDB: row = await cursor.fetchone() return dict(row) if row else {"sessions": 0, "total_cost": 0} + # --- Watched Topics --- + + async def add_watch(self, topic: str, chat_id: int, interval_hours: int) -> int: + now = time.time() + cursor = await self.db.execute( + """INSERT OR REPLACE INTO watched_topics + (topic, chat_id, interval_hours, next_run_at, created_at) + VALUES (?, ?, ?, ?, ?)""", + (topic, chat_id, interval_hours, now + interval_hours * 3600, now) + ) + await self.db.commit() + return cursor.lastrowid + + async def remove_watch(self, topic: str, chat_id: int) -> bool: + cursor = await self.db.execute( + "DELETE FROM watched_topics WHERE topic = ? AND chat_id = ?", + (topic, chat_id) + ) + await self.db.commit() + return cursor.rowcount > 0 + + async def list_watches(self, chat_id: int) -> list[dict]: + cursor = await self.db.execute( + "SELECT * FROM watched_topics WHERE chat_id = ? ORDER BY created_at ASC", + (chat_id,) + ) + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + async def get_due_watches(self) -> list[dict]: + cursor = await self.db.execute( + "SELECT * FROM watched_topics WHERE enabled = 1 AND next_run_at <= ?", + (time.time(),) + ) + rows = await cursor.fetchall() + return [dict(r) for r in rows] + + async def update_watch_run(self, watch_id: int): + cursor = await self.db.execute( + "SELECT interval_hours FROM watched_topics WHERE id = ?", (watch_id,) + ) + row = await cursor.fetchone() + if not row: + return + now = time.time() + await self.db.execute( + "UPDATE watched_topics SET last_run_at = ?, next_run_at = ? WHERE id = ?", + (now, now + row[0] * 3600, watch_id) + ) + await self.db.commit() + # --- Maintenance --- async def purge_old_sessions(self, max_age_days: int = 30) -> dict: