feat: scheduler /watch — watched_topics + scheduler loop + /watch /unwatch /watches
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s

This commit is contained in:
ChemaVX
2026-05-04 07:48:05 +00:00
parent b33ae202b8
commit b5518ac95a
2 changed files with 304 additions and 55 deletions
+241 -55
View File
@@ -4,6 +4,7 @@ Main user interface — all commands handled here
"""
import asyncio
import os
import time
from datetime import datetime
from typing import Optional
@@ -34,12 +35,17 @@ def is_authorized(user_id: int) -> bool:
class ProgressReporter:
def __init__(self, reply_target: Message):
def __init__(self, reply_target: Message = None, *, bot=None, chat_id: int = None):
self._reply_target = reply_target
self._bot = bot
self._chat_id = chat_id
self._msg: Optional[Message] = None
async def start(self, text: str):
self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN)
if self._reply_target is not None:
self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN)
elif self._bot is not None and self._chat_id is not None:
self._msg = await self._bot.send_message(self._chat_id, text, parse_mode=ParseMode.MARKDOWN)
async def update(self, text: str):
if not self._msg:
@@ -63,6 +69,66 @@ async def send_chunked(message: Message, text: str, parse_mode=None):
await asyncio.sleep(0.5)
# ─── Shared research logic ────────────────────────────────────────────────────
async def run_scheduled_research(bot, chat_id: int, topic: str,
session_id: int, db: ResearchDB,
progress_message=None):
if progress_message is not None:
reporter = ProgressReporter(progress_message)
else:
reporter = ProgressReporter(bot=bot, chat_id=chat_id)
try:
await reporter.start(f"🔍 Iniciando scraping de `{topic}`…")
async def on_progress(iter_num, total_sources):
await reporter.update(
f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas"
)
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
final_stats = await scraper.run()
await db.update_session(session_id, status=ResearchStatus.SATURATED)
scraped = final_stats.get("scraped", 0)
await reporter.update(f"⚡ Procesando `{scraped}` fuentes…")
ollama = OllamaClient()
if await ollama.is_available():
processor = ContentProcessor(db, ollama)
async def proc_progress(total_chunks, total_words):
await reporter.update(
f"⚡ Scoring chunks… (`{total_chunks}` procesados)"
)
await processor.process_session(session_id, topic, proc_progress)
chunk_count = await db.get_chunks_count(session_id)
await reporter.done(
f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate <tipo>"
)
else:
await reporter.done(
f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n"
f"Usa /generate para generar contenido."
)
except asyncio.CancelledError:
await db.update_session(session_id, status=ResearchStatus.FINISHED)
try:
await reporter.done("🛑 Investigación cancelada.")
except Exception:
pass
except Exception as e:
logger.error("Research task failed", error=str(e))
try:
await reporter.done(f"❌ Error: {str(e)[:200]}")
except Exception:
pass
# ─── Commands ─────────────────────────────────────────────────────────────────
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
@@ -79,6 +145,9 @@ async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"`/sources` — List all sources found\n"
"`/outputs` — List generated outputs\n"
"`/costs` — Show API usage costs\n"
"`/watch <topic> [h]` — Schedule periodic research\n"
"`/unwatch <topic>` — Remove a watch\n"
"`/watches` — List your watched topics\n"
"`/cancel` — Cancel current research\n"
"`/help` — Show this message",
parse_mode=ParseMode.MARKDOWN
@@ -108,62 +177,13 @@ async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
async def run_research():
db_conn = await get_db()
db = ResearchDB(db_conn)
reporter = None
try:
session_id = await db.create_session(topic, chat_id)
_active_sessions[chat_id] = session_id
reporter = ProgressReporter(update.message)
await reporter.start(f"🔍 Iniciando scraping de `{topic}`…")
async def on_progress(iter_num, total_sources):
await reporter.update(
f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas"
)
scraper = ExhaustiveScraper(db, session_id, topic, on_progress)
final_stats = await scraper.run()
await db.update_session(session_id, status=ResearchStatus.SATURATED)
scraped = final_stats.get("scraped", 0)
await reporter.update(f"⚡ Procesando `{scraped}` fuentes…")
ollama = OllamaClient()
if await ollama.is_available():
processor = ContentProcessor(db, ollama)
async def proc_progress(total_chunks, total_words):
await reporter.update(
f"⚡ Scoring chunks… (`{total_chunks}` procesados)"
)
await processor.process_session(session_id, topic, proc_progress)
chunk_count = await db.get_chunks_count(session_id)
await reporter.done(
f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate <tipo>"
)
else:
await reporter.done(
f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n"
f"Usa /generate para generar contenido."
)
except asyncio.CancelledError:
await db.update_session(
_active_sessions.get(chat_id, 0),
status=ResearchStatus.FINISHED
await run_scheduled_research(
ctx.bot, chat_id, topic, session_id, db,
progress_message=update.message
)
if reporter:
await reporter.done("🛑 Investigación cancelada.")
else:
await update.message.reply_text("🛑 Research cancelled.")
except Exception as e:
logger.error("Research task failed", error=str(e))
if reporter:
await reporter.done(f"❌ Error: {str(e)[:200]}")
else:
await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}")
finally:
await db_conn.close()
@@ -471,6 +491,118 @@ async def cmd_costs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
await db_conn.close()
async def cmd_watch(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
args = ctx.args or []
if not args:
await update.message.reply_text(
"❌ Uso: `/watch <tema> [horas]`\nEjemplo: `/watch Incidente Roswell 24`",
parse_mode=ParseMode.MARKDOWN
)
return
interval_hours = 24
if args[-1].isdigit():
interval_hours = int(args[-1])
topic = " ".join(args[:-1]).strip()
else:
topic = " ".join(args).strip()
if not topic:
await update.message.reply_text("❌ Debes especificar un tema.")
return
if not (1 <= interval_hours <= 168):
await update.message.reply_text(
"❌ El intervalo debe estar entre 1 y 168 horas (1 semana)."
)
return
db_conn = await get_db()
db = ResearchDB(db_conn)
try:
try:
await db.add_watch(topic, chat_id, interval_hours)
await update.message.reply_text(
f"👁 Watching: `{topic}` — cada {interval_hours}h\n"
f"Primera ejecución en ~{interval_hours}h.\n"
f"Usa /watches para ver todos tus temas.",
parse_mode=ParseMode.MARKDOWN
)
except Exception as e:
if "UNIQUE" in str(e):
await update.message.reply_text(
f"Ya estás watching `{topic}`", parse_mode=ParseMode.MARKDOWN
)
else:
raise
finally:
await db_conn.close()
async def cmd_unwatch(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
topic = " ".join(ctx.args).strip() if ctx.args else ""
if not topic:
await update.message.reply_text(
"❌ Uso: `/unwatch <tema>`", parse_mode=ParseMode.MARKDOWN
)
return
db_conn = await get_db()
db = ResearchDB(db_conn)
try:
removed = await db.remove_watch(topic, chat_id)
if removed:
await update.message.reply_text(
f"✅ Ya no vigilas `{topic}`.", parse_mode=ParseMode.MARKDOWN
)
else:
await update.message.reply_text(f"No estabas watching `{topic}`.")
finally:
await db_conn.close()
async def cmd_watches(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
chat_id = update.effective_chat.id
db_conn = await get_db()
db = ResearchDB(db_conn)
try:
watches = await db.list_watches(chat_id)
if not watches:
await update.message.reply_text(
"No tienes temas vigilados. Usa `/watch <tema>`",
parse_mode=ParseMode.MARKDOWN
)
return
now = time.time()
lines = ["👁 *Tus temas vigilados:*\n"]
for i, w in enumerate(watches, 1):
secs_remaining = max(0.0, w["next_run_at"] - now)
hours_remaining = secs_remaining / 3600
eta = f"{int(secs_remaining / 60)}min" if hours_remaining < 1 else f"{hours_remaining:.1f}h"
status = "" if w["enabled"] else ""
lines.append(
f"{i}. {status} `{w['topic']}` — cada {w['interval_hours']}h · próxima en {eta}"
)
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN)
finally:
await db_conn.close()
async def cmd_process(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
@@ -568,6 +700,57 @@ async def _purge_on_startup(app: Application) -> None:
await db_conn.close()
async def _scheduler_loop(app: Application):
while True:
db_conn = None
try:
db_conn = await get_db()
db = ResearchDB(db_conn)
due = await db.get_due_watches()
for watch in due:
chat_id = watch["chat_id"]
topic = watch["topic"]
if chat_id in _active_tasks and not _active_tasks[chat_id].done():
continue
session_id = await db.create_session(topic, chat_id)
_active_sessions[chat_id] = session_id
await db.update_watch_run(watch["id"])
async def _task(c=chat_id, t=topic, s=session_id):
inner_db_conn = await get_db()
inner_db = ResearchDB(inner_db_conn)
try:
await run_scheduled_research(app.bot, c, t, s, inner_db)
finally:
await inner_db_conn.close()
task = asyncio.create_task(_task())
_active_tasks[chat_id] = task
await app.bot.send_message(
chat_id,
f"🔄 Investigación automática iniciada: `{topic}`",
parse_mode=ParseMode.MARKDOWN
)
except Exception as e:
logger.warning("Scheduler loop error", error=str(e))
finally:
if db_conn:
try:
await db_conn.close()
except Exception:
pass
await asyncio.sleep(60)
async def _start_scheduler(app: Application) -> None:
asyncio.create_task(_scheduler_loop(app))
async def _on_startup(app: Application) -> None:
await _purge_on_startup(app)
await _start_scheduler(app)
async def cmd_purge(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not is_authorized(update.effective_user.id):
return
@@ -616,7 +799,7 @@ def create_bot() -> Application:
app = (
Application.builder()
.token(settings.telegram_bot_token)
.post_init(_purge_on_startup)
.post_init(_on_startup)
.build()
)
@@ -629,6 +812,9 @@ def create_bot() -> Application:
app.add_handler(CommandHandler("sources", cmd_sources))
app.add_handler(CommandHandler("outputs", cmd_outputs))
app.add_handler(CommandHandler("costs", cmd_costs))
app.add_handler(CommandHandler("watch", cmd_watch))
app.add_handler(CommandHandler("unwatch", cmd_unwatch))
app.add_handler(CommandHandler("watches", cmd_watches))
app.add_handler(CommandHandler("process", cmd_process))
app.add_handler(CommandHandler("cancel", cmd_cancel))
app.add_handler(CommandHandler("purge", cmd_purge))
+63
View File
@@ -99,6 +99,18 @@ CREATE TABLE IF NOT EXISTS api_usage (
cost_usd REAL NOT NULL,
created_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS watched_topics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
chat_id INTEGER NOT NULL,
interval_hours INTEGER NOT NULL DEFAULT 24,
next_run_at REAL NOT NULL,
last_run_at REAL,
enabled INTEGER NOT NULL DEFAULT 1,
created_at REAL NOT NULL,
UNIQUE(topic, chat_id)
);
"""
@@ -319,6 +331,57 @@ class ResearchDB:
row = await cursor.fetchone()
return dict(row) if row else {"sessions": 0, "total_cost": 0}
# --- Watched Topics ---
async def add_watch(self, topic: str, chat_id: int, interval_hours: int) -> int:
now = time.time()
cursor = await self.db.execute(
"""INSERT OR REPLACE INTO watched_topics
(topic, chat_id, interval_hours, next_run_at, created_at)
VALUES (?, ?, ?, ?, ?)""",
(topic, chat_id, interval_hours, now + interval_hours * 3600, now)
)
await self.db.commit()
return cursor.lastrowid
async def remove_watch(self, topic: str, chat_id: int) -> bool:
cursor = await self.db.execute(
"DELETE FROM watched_topics WHERE topic = ? AND chat_id = ?",
(topic, chat_id)
)
await self.db.commit()
return cursor.rowcount > 0
async def list_watches(self, chat_id: int) -> list[dict]:
cursor = await self.db.execute(
"SELECT * FROM watched_topics WHERE chat_id = ? ORDER BY created_at ASC",
(chat_id,)
)
rows = await cursor.fetchall()
return [dict(r) for r in rows]
async def get_due_watches(self) -> list[dict]:
cursor = await self.db.execute(
"SELECT * FROM watched_topics WHERE enabled = 1 AND next_run_at <= ?",
(time.time(),)
)
rows = await cursor.fetchall()
return [dict(r) for r in rows]
async def update_watch_run(self, watch_id: int):
cursor = await self.db.execute(
"SELECT interval_hours FROM watched_topics WHERE id = ?", (watch_id,)
)
row = await cursor.fetchone()
if not row:
return
now = time.time()
await self.db.execute(
"UPDATE watched_topics SET last_run_at = ?, next_run_at = ? WHERE id = ?",
(now, now + row[0] * 3600, watch_id)
)
await self.db.commit()
# --- Maintenance ---
async def purge_old_sessions(self, max_age_days: int = 30) -> dict: