feat: TTL purge — purge_old_sessions + /purge command + startup hook
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
Build & Deploy ResearchOwl / build-and-push (push) Successful in 5s
This commit is contained in:
+64
-1
@@ -503,8 +503,70 @@ async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||||||
|
|
||||||
# ─── Bot setup ────────────────────────────────────────────────────────────────
|
# ─── Bot setup ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
async def _purge_on_startup(app: Application) -> None:
|
||||||
|
db_conn = await get_db()
|
||||||
|
try:
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
result = await db.purge_old_sessions(30)
|
||||||
|
if result["sessions"] > 0:
|
||||||
|
logger.info("Startup purge done", **result)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Startup purge failed — bot continues", error=str(e))
|
||||||
|
finally:
|
||||||
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def cmd_purge(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||||
|
if not is_authorized(update.effective_user.id):
|
||||||
|
return
|
||||||
|
|
||||||
|
args = ctx.args or []
|
||||||
|
|
||||||
|
if not args:
|
||||||
|
days = 30
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
days = int(args[0])
|
||||||
|
except ValueError:
|
||||||
|
await update.message.reply_text(
|
||||||
|
"❌ Uso: `/purge [días]`", parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if days < 0:
|
||||||
|
await update.message.reply_text("❌ El número de días debe ser ≥ 0.")
|
||||||
|
return
|
||||||
|
if days == 0 and not (len(args) >= 2 and args[1] == "confirm"):
|
||||||
|
await update.message.reply_text(
|
||||||
|
"⚠️ Esto borrará *todas* las sesiones completadas.\n"
|
||||||
|
"Envía `/purge 0 confirm` para confirmar.",
|
||||||
|
parse_mode=ParseMode.MARKDOWN
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
db_conn = await get_db()
|
||||||
|
try:
|
||||||
|
db = ResearchDB(db_conn)
|
||||||
|
result = await db.purge_old_sessions(days)
|
||||||
|
await update.message.reply_text(
|
||||||
|
f"🗑️ Purged: {result['sessions']} sessions, "
|
||||||
|
f"{result['sources']} sources, "
|
||||||
|
f"{result['chunks']} chunks, "
|
||||||
|
f"{result['outputs']} outputs"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Purge command failed", error=str(e))
|
||||||
|
await update.message.reply_text(f"❌ Purge failed: {str(e)[:200]}")
|
||||||
|
finally:
|
||||||
|
await db_conn.close()
|
||||||
|
|
||||||
|
|
||||||
def create_bot() -> Application:
|
def create_bot() -> Application:
|
||||||
app = Application.builder().token(settings.telegram_bot_token).build()
|
app = (
|
||||||
|
Application.builder()
|
||||||
|
.token(settings.telegram_bot_token)
|
||||||
|
.post_init(_purge_on_startup)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
|
||||||
app.add_handler(CommandHandler("start", cmd_start))
|
app.add_handler(CommandHandler("start", cmd_start))
|
||||||
app.add_handler(CommandHandler("help", cmd_help))
|
app.add_handler(CommandHandler("help", cmd_help))
|
||||||
@@ -516,6 +578,7 @@ def create_bot() -> Application:
|
|||||||
app.add_handler(CommandHandler("outputs", cmd_outputs))
|
app.add_handler(CommandHandler("outputs", cmd_outputs))
|
||||||
app.add_handler(CommandHandler("process", cmd_process))
|
app.add_handler(CommandHandler("process", cmd_process))
|
||||||
app.add_handler(CommandHandler("cancel", cmd_cancel))
|
app.add_handler(CommandHandler("cancel", cmd_cancel))
|
||||||
|
app.add_handler(CommandHandler("purge", cmd_purge))
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
|
|||||||
@@ -5,8 +5,12 @@ from pathlib import Path
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
|
|
||||||
|
logger = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
class ResearchStatus(str, Enum):
|
class ResearchStatus(str, Enum):
|
||||||
RUNNING = "running"
|
RUNNING = "running"
|
||||||
@@ -266,3 +270,36 @@ class ResearchDB:
|
|||||||
)
|
)
|
||||||
rows = await cursor.fetchall()
|
rows = await cursor.fetchall()
|
||||||
return [dict(r) for r in rows]
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
# --- Maintenance ---
|
||||||
|
|
||||||
|
async def purge_old_sessions(self, max_age_days: int = 30) -> dict:
|
||||||
|
await self.db.execute("PRAGMA foreign_keys = ON")
|
||||||
|
|
||||||
|
threshold = time.time() - max_age_days * 86400
|
||||||
|
cursor = await self.db.execute(
|
||||||
|
"SELECT id FROM research_sessions WHERE created_at < ? AND status != 'running'",
|
||||||
|
(threshold,)
|
||||||
|
)
|
||||||
|
session_ids = [row[0] for row in await cursor.fetchall()]
|
||||||
|
|
||||||
|
counts = {"sessions": 0, "sources": 0, "chunks": 0, "outputs": 0}
|
||||||
|
|
||||||
|
for sid in session_ids:
|
||||||
|
await self.db.execute(
|
||||||
|
"DELETE FROM source_contents WHERE source_id IN (SELECT id FROM sources WHERE session_id = ?)",
|
||||||
|
(sid,)
|
||||||
|
)
|
||||||
|
cur = await self.db.execute("DELETE FROM chunks WHERE session_id = ?", (sid,))
|
||||||
|
counts["chunks"] += cur.rowcount
|
||||||
|
cur = await self.db.execute("DELETE FROM outputs WHERE session_id = ?", (sid,))
|
||||||
|
counts["outputs"] += cur.rowcount
|
||||||
|
cur = await self.db.execute("DELETE FROM sources WHERE session_id = ?", (sid,))
|
||||||
|
counts["sources"] += cur.rowcount
|
||||||
|
cur = await self.db.execute("DELETE FROM research_sessions WHERE id = ?", (sid,))
|
||||||
|
counts["sessions"] += cur.rowcount
|
||||||
|
|
||||||
|
await self.db.commit()
|
||||||
|
logger.info("Purged sessions older than days",
|
||||||
|
sessions=counts["sessions"], days=max_age_days)
|
||||||
|
return counts
|
||||||
|
|||||||
Reference in New Issue
Block a user