From 7704f071d6f67d3db8d24a15c8afd17aec029892 Mon Sep 17 00:00:00 2001 From: ChemaVX Date: Sun, 3 May 2026 16:40:37 +0000 Subject: [PATCH] feat: retry+backoff en scraper, ProgressReporter en bot --- src/bot/bot.py | 98 ++++++++++++++++++--------------------- src/scraper/exhaustive.py | 45 +++++++++++++----- 2 files changed, 77 insertions(+), 66 deletions(-) diff --git a/src/bot/bot.py b/src/bot/bot.py index cbc8ef3..d7f74a1 100644 --- a/src/bot/bot.py +++ b/src/bot/bot.py @@ -33,17 +33,24 @@ def is_authorized(user_id: int) -> bool: return not allowed or user_id in allowed -def fmt_progress(iteration: int, total: int, new: int, stats: dict) -> str: - scraped = stats.get("scraped") or 0 - failed = stats.get("failed") or 0 - pending = stats.get("pending") or 0 - skipped = stats.get("skipped") or 0 - return ( - f"🔄 *Iteration {iteration}*\n" - f"📚 Sources found: `{total}`\n" - f"✅ Scraped: `{scraped}` | ⏭️ Skipped: `{skipped}` | ❌ Failed: `{failed}` | ⏳ Pending: `{pending}`\n" - f"🆕 New URLs this round: `{new}`" - ) +class ProgressReporter: + def __init__(self, reply_target: Message): + self._reply_target = reply_target + self._msg: Optional[Message] = None + + async def start(self, text: str): + self._msg = await self._reply_target.reply_text(text, parse_mode=ParseMode.MARKDOWN) + + async def update(self, text: str): + if not self._msg: + return + try: + await self._msg.edit_text(text, parse_mode=ParseMode.MARKDOWN) + except Exception: + pass + + async def done(self, text: str): + await self.update(text) async def send_chunked(message: Message, text: str, parse_mode=None): @@ -91,77 +98,54 @@ async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE): ) return - # Check for existing active research if chat_id in _active_tasks and not _active_tasks[chat_id].done(): await update.message.reply_text( "⚠️ Research already in progress. Use /status or /finish first." ) return - msg = await update.message.reply_text( - f"🦉 *ResearchOwl* starting research on:\n`{topic}`\n\n" - f"🌱 Seeding sources from:\n" - f"• DuckDuckGo (8 queries)\n" - f"• Wikipedia + internal links\n" - f"• Reddit top posts\n" - f"• YouTube transcripts\n\n" - f"This will run exhaustively until saturation. Use /finish to stop early.", - parse_mode=ParseMode.MARKDOWN - ) - async def run_research(): db_conn = await get_db() db = ResearchDB(db_conn) + reporter = None try: session_id = await db.create_session(topic, chat_id) _active_sessions[chat_id] = session_id - progress_msg = msg - iteration_count = [0] + reporter = ProgressReporter(update.message) + await reporter.start(f"🔍 Iniciando scraping de `{topic}`…") - async def on_progress(iteration, total, new_this_round, stats): - iteration_count[0] = iteration - text = fmt_progress(iteration, total, new_this_round, stats) - try: - await progress_msg.edit_text(text, parse_mode=ParseMode.MARKDOWN) - except Exception: - pass + async def on_progress(iter_num, total_sources): + await reporter.update( + f"🔍 Scraping — iteración `{iter_num}` | `{total_sources}` fuentes encontradas" + ) scraper = ExhaustiveScraper(db, session_id, topic, on_progress) final_stats = await scraper.run() await db.update_session(session_id, status=ResearchStatus.SATURATED) - scraped = final_stats.get("scraped", 0) - await update.message.reply_text( - f"✅ *Research complete!*\n\n" - f"📊 Results:\n" - f"• Sources found & scraped: `{scraped}`\n" - f"• Iterations: `{iteration_count[0]}`\n\n" - f"Now processing content with Ollama...\n" - f"Use `/generate podcast|blog|report|thread` when ready.", - parse_mode=ParseMode.MARKDOWN - ) - # Auto-process after scraping + await reporter.update(f"⚡ Procesando `{scraped}` fuentes…") + ollama = OllamaClient() if await ollama.is_available(): processor = ContentProcessor(db, ollama) async def proc_progress(total_chunks, total_words): - await update.message.reply_text( - f"🧠 *Processing complete!*\n" - f"• Chunks stored: `{total_chunks}`\n" - f"• Words researched: `{total_words:,}`\n\n" - f"Ready! Use `/generate podcast|blog|report|thread`", - parse_mode=ParseMode.MARKDOWN + await reporter.update( + f"⚡ Scoring chunks… (`{total_chunks}` procesados)" ) await processor.process_session(session_id, topic, proc_progress) + chunk_count = await db.get_chunks_count(session_id) + await reporter.done( + f"✅ Listo — `{scraped}` fuentes · `{chunk_count}` chunks · usa /generate " + ) else: - await update.message.reply_text( - "⚠️ Ollama not reachable — skipping processing.\n" - "You can still use `/generate` (will use raw content)." + await reporter.done( + f"⚠️ Ollama no disponible — `{scraped}` fuentes scraped.\n" + f"Usa /generate para generar contenido." ) except asyncio.CancelledError: @@ -169,10 +153,16 @@ async def cmd_research(update: Update, ctx: ContextTypes.DEFAULT_TYPE): _active_sessions.get(chat_id, 0), status=ResearchStatus.FINISHED ) - await update.message.reply_text("🛑 Research cancelled.") + if reporter: + await reporter.done("🛑 Investigación cancelada.") + else: + await update.message.reply_text("🛑 Research cancelled.") except Exception as e: logger.error("Research task failed", error=str(e)) - await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}") + if reporter: + await reporter.done(f"❌ Error: {str(e)[:200]}") + else: + await update.message.reply_text(f"❌ Research failed: {str(e)[:200]}") finally: await db_conn.close() diff --git a/src/scraper/exhaustive.py b/src/scraper/exhaustive.py index 7995165..4b676d1 100644 --- a/src/scraper/exhaustive.py +++ b/src/scraper/exhaustive.py @@ -3,6 +3,7 @@ ResearchOwl Exhaustive Scraper Core engine: discovers, expands, and evaluates sources recursively """ import asyncio +import random import re import time from typing import Optional @@ -54,6 +55,22 @@ REDDIT_RE = re.compile(r"reddit\.com/(r/\w+/comments/\w+)") WIKIPEDIA_RE = re.compile(r"wikipedia\.org/wiki/(.+)") +async def fetch_with_retry(fetch_fn, source_name: str, max_retries: int = 3): + last_exc = None + for attempt in range(max_retries): + try: + return await fetch_fn() + except Exception as e: + last_exc = e + if attempt < max_retries - 1: + wait = 2 ** attempt + random.random() + logger.debug("fetch_with_retry backoff", source=source_name[:60], + attempt=attempt + 1, wait=round(wait, 1), error=str(e)) + await asyncio.sleep(wait) + logger.warning("fetch_with_retry exhausted", source=source_name[:60], error=str(last_exc)) + raise last_exc + + def detect_source_type(url: str) -> str: if YOUTUBE_RE.search(url): return "youtube" @@ -290,12 +307,7 @@ class ExhaustiveScraper: ) if self.progress_callback: - await self.progress_callback( - iteration=self.iteration, - total=self.total_sources, - new_this_round=new_sources, - stats=stats - ) + await self.progress_callback(self.iteration, self.total_sources) # Saturation check: if we found very few new URLs, we're done if new_sources < 3 and self.iteration > 2: @@ -317,9 +329,13 @@ class ExhaustiveScraper: try: if source_type == "youtube": - content, title = await self._extract_youtube(url) + content, title = await fetch_with_retry( + lambda: self._extract_youtube(url), url + ) elif source_type == "wikipedia": - content, title, new_urls = await self._extract_wikipedia(url) + content, title, new_urls = await fetch_with_retry( + lambda: self._extract_wikipedia(url), url + ) added = 0 for new_url in (new_urls or []): if self._url_is_relevant(new_url): @@ -331,13 +347,18 @@ class ExhaustiveScraper: await self._mark_scraped(source_id, content, title, url) return added elif source_type == "reddit": - content, title = await self._extract_reddit(url) - # Small delay between Reddit requests to avoid rate limiting + content, title = await fetch_with_retry( + lambda: self._extract_reddit(url), url + ) await asyncio.sleep(settings.request_delay) elif source_type == "pdf": - content, title = await self._extract_pdf(url) + content, title = await fetch_with_retry( + lambda: self._extract_pdf(url), url + ) else: - content, title, new_urls = await self._extract_web(url, source["depth"]) + content, title, new_urls = await fetch_with_retry( + lambda: self._extract_web(url, source["depth"]), url + ) added = 0 for new_url in (new_urls or []): if self._url_is_relevant(new_url):