feat: TTL purge — purge_old_sessions + /purge command + startup hook
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
This commit is contained in:
+64
-1
@@ -503,8 +503,70 @@ async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
|
||||
# ─── Bot setup ────────────────────────────────────────────────────────────────
|
||||
|
||||
async def _purge_on_startup(app: Application) -> None:
|
||||
db_conn = await get_db()
|
||||
try:
|
||||
db = ResearchDB(db_conn)
|
||||
result = await db.purge_old_sessions(30)
|
||||
if result["sessions"] > 0:
|
||||
logger.info("Startup purge done", **result)
|
||||
except Exception as e:
|
||||
logger.warning("Startup purge failed — bot continues", error=str(e))
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
async def cmd_purge(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
if not is_authorized(update.effective_user.id):
|
||||
return
|
||||
|
||||
args = ctx.args or []
|
||||
|
||||
if not args:
|
||||
days = 30
|
||||
else:
|
||||
try:
|
||||
days = int(args[0])
|
||||
except ValueError:
|
||||
await update.message.reply_text(
|
||||
"❌ Uso: `/purge [días]`", parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
if days < 0:
|
||||
await update.message.reply_text("❌ El número de días debe ser ≥ 0.")
|
||||
return
|
||||
if days == 0 and not (len(args) >= 2 and args[1] == "confirm"):
|
||||
await update.message.reply_text(
|
||||
"⚠️ Esto borrará *todas* las sesiones completadas.\n"
|
||||
"Envía `/purge 0 confirm` para confirmar.",
|
||||
parse_mode=ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
|
||||
db_conn = await get_db()
|
||||
try:
|
||||
db = ResearchDB(db_conn)
|
||||
result = await db.purge_old_sessions(days)
|
||||
await update.message.reply_text(
|
||||
f"🗑️ Purged: {result['sessions']} sessions, "
|
||||
f"{result['sources']} sources, "
|
||||
f"{result['chunks']} chunks, "
|
||||
f"{result['outputs']} outputs"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Purge command failed", error=str(e))
|
||||
await update.message.reply_text(f"❌ Purge failed: {str(e)[:200]}")
|
||||
finally:
|
||||
await db_conn.close()
|
||||
|
||||
|
||||
def create_bot() -> Application:
|
||||
app = Application.builder().token(settings.telegram_bot_token).build()
|
||||
app = (
|
||||
Application.builder()
|
||||
.token(settings.telegram_bot_token)
|
||||
.post_init(_purge_on_startup)
|
||||
.build()
|
||||
)
|
||||
|
||||
app.add_handler(CommandHandler("start", cmd_start))
|
||||
app.add_handler(CommandHandler("help", cmd_help))
|
||||
@@ -516,6 +578,7 @@ def create_bot() -> Application:
|
||||
app.add_handler(CommandHandler("outputs", cmd_outputs))
|
||||
app.add_handler(CommandHandler("process", cmd_process))
|
||||
app.add_handler(CommandHandler("cancel", cmd_cancel))
|
||||
app.add_handler(CommandHandler("purge", cmd_purge))
|
||||
|
||||
return app
|
||||
|
||||
|
||||
@@ -5,8 +5,12 @@ from pathlib import Path
|
||||
from typing import Optional
|
||||
from enum import Enum
|
||||
|
||||
import structlog
|
||||
|
||||
from src.config import settings
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
class ResearchStatus(str, Enum):
|
||||
RUNNING = "running"
|
||||
@@ -266,3 +270,36 @@ class ResearchDB:
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
# --- Maintenance ---
|
||||
|
||||
async def purge_old_sessions(self, max_age_days: int = 30) -> dict:
|
||||
await self.db.execute("PRAGMA foreign_keys = ON")
|
||||
|
||||
threshold = time.time() - max_age_days * 86400
|
||||
cursor = await self.db.execute(
|
||||
"SELECT id FROM research_sessions WHERE created_at < ? AND status != 'running'",
|
||||
(threshold,)
|
||||
)
|
||||
session_ids = [row[0] for row in await cursor.fetchall()]
|
||||
|
||||
counts = {"sessions": 0, "sources": 0, "chunks": 0, "outputs": 0}
|
||||
|
||||
for sid in session_ids:
|
||||
await self.db.execute(
|
||||
"DELETE FROM source_contents WHERE source_id IN (SELECT id FROM sources WHERE session_id = ?)",
|
||||
(sid,)
|
||||
)
|
||||
cur = await self.db.execute("DELETE FROM chunks WHERE session_id = ?", (sid,))
|
||||
counts["chunks"] += cur.rowcount
|
||||
cur = await self.db.execute("DELETE FROM outputs WHERE session_id = ?", (sid,))
|
||||
counts["outputs"] += cur.rowcount
|
||||
cur = await self.db.execute("DELETE FROM sources WHERE session_id = ?", (sid,))
|
||||
counts["sources"] += cur.rowcount
|
||||
cur = await self.db.execute("DELETE FROM research_sessions WHERE id = ?", (sid,))
|
||||
counts["sessions"] += cur.rowcount
|
||||
|
||||
await self.db.commit()
|
||||
logger.info("Purged sessions older than days",
|
||||
sessions=counts["sessions"], days=max_age_days)
|
||||
return counts
|
||||
|
||||
Reference in New Issue
Block a user