""" ResearchOwl Telegram Bot Main user interface — all commands handled here """ import asyncio import os from datetime import datetime from typing import Optional import structlog from telegram import Update, Message from telegram.ext import ( Application, CommandHandler, MessageHandler, filters, ContextTypes ) from telegram.constants import ParseMode from src.config import settings from src.db.database import get_db, ResearchDB, ResearchStatus, OutputType from src.scraper.exhaustive import ExhaustiveScraper from src.processor.processor import OllamaClient, ContentProcessor from src.generator.generator import OutputGenerator logger = structlog.get_logger() # Active research tasks per chat _active_tasks: dict[int, asyncio.Task] = {} _active_sessions: dict[int, int] = {} # chat_id -> session_id def is_authorized(user_id: int) -> bool: allowed = settings.allowed_user_ids return not allowed or user_id in allowed def fmt_progress(iteration: int, total: int, new: int, stats: dict) -> str: scraped = stats.get("scraped", 0) failed = stats.get("failed", 0) pending = stats.get("pending", 0) return ( f"🔄 *Iteration {iteration}*\n" f"📚 Sources found: `{total}`\n" f"✅ Scraped: `{scraped}` | ❌ Failed: `{failed}` | ⏳ Pending: `{pending}`\n" f"🆕 New this round: `{new}`" ) async def send_chunked(message: Message, text: str, parse_mode=None): """Send long text in chunks of 4000 chars (Telegram limit)""" max_len = 4000 for i in range(0, len(text), max_len): chunk = text[i:i + max_len] await message.reply_text(chunk, parse_mode=parse_mode) if len(text) > max_len: await asyncio.sleep(0.5) # ─── Commands ───────────────────────────────────────────────────────────────── async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return await update.message.reply_text( "🦉 *ResearchOwl* — Exhaustive Research Engine\n\n" "Commands:\n" "`/research ` — Start exhaustive research\n" "`/status` — Check current research progress\n" "`/finish` — Stop research and proceed to generation\n" "`/process` — Manually trigger chunk processing\n" "`/generate ` — Generate output (podcast|blog|report|thread)\n" "`/sources` — List all sources found\n" "`/outputs` — List generated outputs\n" "`/cancel` — Cancel current research\n" "`/help` — Show this message", parse_mode=ParseMode.MARKDOWN ) async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id topic = " ".join(ctx.args).strip() if ctx.args else "" if not topic: await update.message.reply_text( "❌ Please provide a topic.\nExample: `/research Roswell incident`", parse_mode=ParseMode.MARKDOWN ) return # Check for existing active research if chat_id in _active_tasks and not _active_tasks[chat_id].done(): await update.message.reply_text( "⚠️ Research already in progress. Use /status or /finish first." ) return msg = await update.message.reply_text( f"🦉 *ResearchOwl* starting research on:\n`{topic}`\n\n" f"🌱 Seeding sources from:\n" f"• DuckDuckGo (8 queries)\n" f"• Wikipedia + internal links\n" f"• Reddit top posts\n" f"• YouTube transcripts\n\n" f"This will run exhaustively until saturation. Use /finish to stop early.", parse_mode=ParseMode.MARKDOWN ) async def run_research(): db_conn = await get_db() db = ResearchDB(db_conn) try: session_id = await db.create_session(topic, chat_id) _active_sessions[chat_id] = session_id progress_msg = msg iteration_count = [0] async def on_progress(iteration, total, new_this_round, stats): iteration_count[0] = iteration text = fmt_progress(iteration, total, new_this_round, stats) try: await progress_msg.edit_text(text, parse_mode=ParseMode.MARKDOWN) except Exception: pass scraper = ExhaustiveScraper(db, session_id, topic, on_progress) final_stats = await scraper.run() await db.update_session(session_id, status=ResearchStatus.SATURATED) scraped = final_stats.get("scraped", 0) await update.message.reply_text( f"✅ *Research complete!*\n\n" f"📊 Results:\n" f"• Sources found & scraped: `{scraped}`\n" f"• Iterations: `{iteration_count[0]}`\n\n" f"Now processing content with Ollama...\n" f"Use `/generate podcast|blog|report|thread` when ready.", parse_mode=ParseMode.MARKDOWN ) # Auto-process after scraping ollama = OllamaClient() if await ollama.is_available(): processor = ContentProcessor(db, ollama) async def proc_progress(total_chunks, total_words): await update.message.reply_text( f"🧠 *Processing complete!*\n" f"• Chunks stored: `{total_chunks}`\n" f"• Words researched: `{total_words:,}`\n\n" f"Ready! Use `/generate podcast|blog|report|thread`", parse_mode=ParseMode.MARKDOWN ) await processor.process_session(session_id, topic, proc_progress) else: await update.message.reply_text( "⚠️ Ollama not reachable — skipping processing.\n" "You can still use `/generate` (will use raw content)." ) except asyncio.CancelledError: await db.update_session( _active_sessions.get(chat_id, 0), status=ResearchStatus.FINISHED ) await update.message.reply_text("🛑 Research cancelled.") except Exception as e: logger.error("Research task failed", error=str(e)) await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}") finally: await db_conn.close() task = asyncio.create_task(run_research()) _active_tasks[chat_id] = task async def cmd_status(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: session = await db.get_active_session(chat_id) if not session: # Try to find last session cursor = await db_conn.execute( "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", (chat_id,) ) row = await cursor.fetchone() session = dict(row) if row else None if not session: await update.message.reply_text("No research sessions found. Start with /research ") return stats = await db.get_session_stats(session["id"]) is_active = chat_id in _active_tasks and not _active_tasks[chat_id].done() status_emoji = {"running": "🔄", "saturated": "✅", "finished": "🏁", "error": "❌"} emoji = status_emoji.get(session["status"], "❓") await update.message.reply_text( f"{emoji} *Research Status*\n\n" f"📝 Topic: `{session['topic']}`\n" f"🔁 Status: `{session['status']}`\n" f"🔢 Iterations: `{session.get('iterations', 0)}`\n" f"📚 Total sources: `{stats.get('total', 0)}`\n" f"✅ Scraped: `{stats.get('scraped', 0)}`\n" f"❌ Failed: `{stats.get('failed', 0)}`\n" f"⏳ Pending: `{stats.get('pending', 0)}`\n" f"💬 Chunks: `{session.get('total_chunks', 0)}`\n" f"📖 Words: `{session.get('total_words', 0):,}`\n" f"{'🟢 Active' if is_active else '⚫ Idle'}", parse_mode=ParseMode.MARKDOWN ) finally: await db_conn.close() async def cmd_finish(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id task = _active_tasks.get(chat_id) if task and not task.done(): task.cancel() await update.message.reply_text( "🛑 Stopping research...\n" "Use `/generate podcast|blog|report|thread` to generate output.", parse_mode=ParseMode.MARKDOWN ) else: await update.message.reply_text( "No active research. Use `/generate` to create output from last session.", parse_mode=ParseMode.MARKDOWN ) async def cmd_generate(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id output_arg = ctx.args[0].lower() if ctx.args else "" type_map = { "podcast": OutputType.PODCAST, "blog": OutputType.BLOG, "report": OutputType.REPORT, "thread": OutputType.THREAD, "hilo": OutputType.THREAD, "informe": OutputType.REPORT, } if output_arg not in type_map: await update.message.reply_text( "❌ Invalid output type.\n" "Use: `/generate podcast|blog|report|thread`", parse_mode=ParseMode.MARKDOWN ) return output_type = type_map[output_arg] db_conn = await get_db() db = ResearchDB(db_conn) try: # Find last session for this chat cursor = await db_conn.execute( """SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1""", (chat_id,) ) row = await cursor.fetchone() if not row: await update.message.reply_text("No research sessions found. Start with /research ") return session = dict(row) session_id = session["id"] msg = await update.message.reply_text( f"⚙️ Generating *{output_type}* for: `{session['topic']}`\n" f"Using Ollama ({settings.ollama_model})...\n" f"This may take 2-5 minutes ☕", parse_mode=ParseMode.MARKDOWN ) async def gen_progress(text): try: await msg.edit_text(text) except Exception: pass ollama = OllamaClient() processor = ContentProcessor(db, ollama) generator = OutputGenerator(db, ollama, processor) output = await generator.generate(session_id, output_type, gen_progress) # Send as file if very long if len(output) > 8000: import tempfile ext_map = { OutputType.PODCAST: "script.md", OutputType.BLOG: "post.md", OutputType.REPORT: "report.md", OutputType.THREAD: "thread.txt", } filename = f"researchowl_{session['topic'][:30].replace(' ', '_')}_{ext_map[output_type]}" with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f: f.write(output) tmp_path = f.name with open(tmp_path, "rb") as f: await update.message.reply_document( document=f, filename=filename, caption=f"📄 *{output_type.upper()}* — {session['topic']}\n" f"Generated by ResearchOwl 🦉", parse_mode=ParseMode.MARKDOWN ) os.unlink(tmp_path) else: await send_chunked(update.message, output) except Exception as e: logger.error("Generate failed", error=str(e)) await update.message.reply_text(f"❌ Generation failed: {str(e)[:200]}") finally: await db_conn.close() async def cmd_sources(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: cursor = await db_conn.execute( "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", (chat_id,) ) row = await cursor.fetchone() if not row: await update.message.reply_text("No sessions found.") return session_id = row["id"] sources = await db.get_all_sources(session_id) by_type: dict = {} for s in sources: t = s["source_type"] by_type.setdefault(t, []).append(s) lines = [f"📚 *Sources for session #{session_id}*\n"] for stype, srcs in by_type.items(): scraped = sum(1 for s in srcs if s["status"] == "scraped") lines.append(f"\n*{stype.upper()}* ({scraped}/{len(srcs)} scraped)") for s in srcs[:5]: # show top 5 per type quality = s.get("quality_score", 0) status_icon = {"scraped": "✅", "failed": "❌", "pending": "⏳", "skipped": "⏭️"}.get(s["status"], "❓") title = (s.get("title") or s["url"])[:50] lines.append(f"{status_icon} {title} (q:{quality:.1f})") if len(srcs) > 5: lines.append(f" ... and {len(srcs)-5} more") await send_chunked(update.message, "\n".join(lines), parse_mode=ParseMode.MARKDOWN) finally: await db_conn.close() async def cmd_outputs(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: cursor = await db_conn.execute( "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", (chat_id,) ) row = await cursor.fetchone() if not row: await update.message.reply_text("No sessions found.") return outputs = await db.get_outputs(row["id"]) if not outputs: await update.message.reply_text( "No outputs generated yet. Use `/generate podcast|blog|report|thread`", parse_mode=ParseMode.MARKDOWN ) return lines = [f"📄 *Outputs for: {row['topic']}*\n"] for o in outputs: from datetime import datetime dt = datetime.utcfromtimestamp(o['created_at']).strftime("%Y-%m-%d %H:%M") lines.append(f"• `{o['output_type']}` — {dt} ({len(o['content'])} chars)") await update.message.reply_text( "\n".join(lines), parse_mode=ParseMode.MARKDOWN ) finally: await db_conn.close() async def cmd_process(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id db_conn = await get_db() db = ResearchDB(db_conn) try: cursor = await db_conn.execute( "SELECT * FROM research_sessions WHERE telegram_chat_id = ? ORDER BY created_at DESC LIMIT 1", (chat_id,) ) row = await cursor.fetchone() if not row: await update.message.reply_text("No research sessions found. Start with /research ") return session = dict(row) session_id = session["id"] topic = session["topic"] msg = await update.message.reply_text( f"🧠 Processing session #{session_id}: `{topic}`\n" f"Chunking & scoring with Ollama ({settings.ollama_model})...\n" f"This may take a few minutes.", parse_mode=ParseMode.MARKDOWN ) ollama = OllamaClient() if not await ollama.is_available(): await msg.edit_text("❌ Ollama not reachable. Check OLLAMA_URL setting.") return processor = ContentProcessor(db, ollama) async def proc_progress(total_chunks, total_words): try: await msg.edit_text( f"🧠 *Processing complete!*\n" f"• Chunks stored: `{total_chunks}`\n" f"• Words researched: `{total_words:,}`\n\n" f"Ready! Use `/generate podcast|blog|report|thread`\n" f"_If 0 chunks: set `QUALITY_THRESHOLD=0.3` or `0` and retry_", parse_mode=ParseMode.MARKDOWN ) except Exception: pass await processor.process_session(session_id, topic, proc_progress) except Exception as e: logger.error("Process command failed", error=str(e)) await update.message.reply_text(f"❌ Processing failed: {str(e)[:200]}") finally: await db_conn.close() async def cmd_cancel(update: Update, ctx: ContextTypes.DEFAULT_TYPE): if not is_authorized(update.effective_user.id): return chat_id = update.effective_chat.id task = _active_tasks.get(chat_id) if task and not task.done(): task.cancel() await update.message.reply_text("🛑 Research cancelled.") else: await update.message.reply_text("No active research to cancel.") async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE): await cmd_start(update, ctx) # ─── Bot setup ──────────────────────────────────────────────────────────────── def create_bot() -> Application: app = Application.builder().token(settings.telegram_bot_token).build() app.add_handler(CommandHandler("start", cmd_start)) app.add_handler(CommandHandler("help", cmd_help)) app.add_handler(CommandHandler("research", cmd_research)) app.add_handler(CommandHandler("status", cmd_status)) app.add_handler(CommandHandler("finish", cmd_finish)) app.add_handler(CommandHandler("generate", cmd_generate)) app.add_handler(CommandHandler("sources", cmd_sources)) app.add_handler(CommandHandler("outputs", cmd_outputs)) app.add_handler(CommandHandler("process", cmd_process)) app.add_handler(CommandHandler("cancel", cmd_cancel)) return app def run(): logger.info("Starting ResearchOwl bot") app = create_bot() app.run_polling(allowed_updates=Update.ALL_TYPES)